Skip to main content

Aggregating Outputs in RunPod Serverless Functions

This tutorial will guide you through using the return_aggregate_stream feature in RunPod to simplify result handling in your serverless functions. Using return_aggregate_stream allows you to automatically collect and aggregate all yielded results from a generator handler into a single response. This simplifies result handling, making it easier to manage and return a consolidated set of results from asynchronous tasks, such as concurrent sentiment analysis or object detection, without needing additional code to collect and format the results manually.

We'll create a multi-purpose analyzer that can perform sentiment analysis on text and object detection in images, demonstrating how to aggregate outputs efficiently.

Setting up your Serverless Function

Let's break down the process of creating our multi-purpose analyzer into steps.

Import required libraries

First, import the necessary libraries:

import runpod
import time
import random

Create Helper Functions

Define functions to simulate sentiment analysis and object detection:

def analyze_sentiment(text):
"""Simulate sentiment analysis of text."""
sentiments = ["Positive", "Neutral", "Negative"]
score = random.uniform(-1, 1)
sentiment = random.choice(sentiments)
return f"Sentiment: {sentiment}, Score: {score:.2f}"

def detect_objects(image_url):
"""Simulate object detection in an image."""
objects = ["person", "car", "dog", "cat", "tree", "building"]
detected = random.sample(objects, random.randint(1, 4))
confidences = [random.uniform(0.7, 0.99) for _ in detected]
return [f"{obj}: {conf:.2f}" for obj, conf in zip(detected, confidences)]

These functions:

  1. Simulate sentiment analysis, returning a random sentiment and score
  2. Simulate object detection, returning a list of detected objects with confidence scores

Create the main Handler Function

Now, let's create the main handler function that processes jobs and yields results:

def handler(job):
job_input = job['input']
task_type = job_input.get('task_type', 'sentiment')
items = job_input.get('items', [])

results = []
for item in items:
time.sleep(random.uniform(0.5, 2)) # Simulate processing time

if task_type == 'sentiment':
result = analyze_sentiment(item)
elif task_type == 'object_detection':
result = detect_objects(item)
else:
result = f"Unknown task type: {task_type}"

results.append(result)
yield result

return results

This handler:

  1. Determines the task type (sentiment analysis or object detection)
  2. Processes each item in the input
  3. Yields results incrementally
  4. Returns the complete list of results

Set up the Serverless Function starter

Create a function to start the serverless handler with proper configuration:

def start_handler():
def wrapper(job):
generator = handler(job)
if job.get('id') == 'local_test':
return list(generator)
return generator

runpod.serverless.start({
"handler": wrapper,
"return_aggregate_stream": True
})

if __name__ == "__main__":
start_handler()

This setup:

  1. Creates a wrapper to handle both local testing and RunPod environments
  2. Uses return_aggregate_stream=True to automatically aggregate yielded results

Complete code example

Here's the full code for our multi-purpose analyzer with output aggregation:

import runpod
import time
import random

def analyze_sentiment(text):
"""Simulate sentiment analysis of text."""
sentiments = ["Positive", "Neutral", "Negative"]
score = random.uniform(-1, 1)
sentiment = random.choice(sentiments)
return f"Sentiment: {sentiment}, Score: {score:.2f}"

def detect_objects(image_url):
"""Simulate object detection in an image."""
objects = ["person", "car", "dog", "cat", "tree", "building"]
detected = random.sample(objects, random.randint(1, 4))
confidences = [random.uniform(0.7, 0.99) for _ in detected]
return [f"{obj}: {conf:.2f}" for obj, conf in zip(detected, confidences)]

def handler(job):
job_input = job['input']
task_type = job_input.get('task_type', 'sentiment')
items = job_input.get('items', [])

results = []
for item in items:
time.sleep(random.uniform(0.5, 2)) # Simulate processing time

if task_type == 'sentiment':
result = analyze_sentiment(item)
elif task_type == 'object_detection':
result = detect_objects(item)
else:
result = f"Unknown task type: {task_type}"

results.append(result)
yield result

return results

def start_handler():
def wrapper(job):
generator = handler(job)
if job.get('id') == 'local_test':
return list(generator)
return generator

runpod.serverless.start({
"handler": wrapper,
"return_aggregate_stream": True
})

if __name__ == "__main__":
start_handler()

Testing your Serverless Function

To test your function locally, use these commands:

For sentiment analysis:

python your_script.py --test_input '
{
"input": {
"task_type": "sentiment",
"items": [
"I love this product!",
"The service was terrible.",
"It was okay, nothing special."
]
}
}'

For object detection:

python your_script.py --test_input '
{
"input": {
"task_type": "object_detection",
"items": [
"image1.jpg",
"image2.jpg",
"image3.jpg"
]
}
}'

Understanding the output

When you run the sentiment analysis test, you'll see output similar to this:

--- Starting Serverless Worker |  Version 1.6.2 ---
INFO | test_input set, using test_input as job input.
DEBUG | Retrieved local job: {'input': {'task_type': 'sentiment', 'items': ['I love this product!', 'The service was terrible.', 'It was okay, nothing special.']}, 'id': 'local_test'}
INFO | local_test | Started.
DEBUG | local_test | Handler output: ['Sentiment: Positive, Score: 0.85', 'Sentiment: Negative, Score: -0.72', 'Sentiment: Neutral, Score: 0.12']
DEBUG | local_test | run_job return: {'output': ['Sentiment: Positive, Score: 0.85', 'Sentiment: Negative, Score: -0.72', 'Sentiment: Neutral, Score: 0.12']}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': ['Sentiment: Positive, Score: 0.85', 'Sentiment: Negative, Score: -0.72', 'Sentiment: Neutral, Score: 0.12']}
INFO | Local testing complete, exiting.

This output demonstrates:

  1. The serverless worker starting and processing the job
  2. The handler generating results for each input item
  3. The aggregation of results into a single list

Conclusion

You've now created a serverless function using RunPod's Python SDK that demonstrates efficient output aggregation for both local testing and production environments. This approach simplifies result handling and ensures consistent behavior across different execution contexts.

To further enhance this application, consider:

  • Implementing real sentiment analysis and object detection models
  • Adding error handling and logging for each processing step
  • Exploring RunPod's advanced features for handling larger datasets or parallel processing

RunPod's serverless library, with features like return_aggregate_stream, provides a powerful foundation for building scalable, efficient applications that can process and aggregate data seamlessly.