Aggregating Outputs in RunPod Serverless Functions
This tutorial will guide you through using the return_aggregate_stream
feature in RunPod to simplify result handling in your serverless functions.
Using return_aggregate_stream
allows you to automatically collect and aggregate all yielded results from a generator handler into a single response.
This simplifies result handling, making it easier to manage and return a consolidated set of results from asynchronous tasks, such as concurrent sentiment analysis or object detection, without needing additional code to collect and format the results manually.
We'll create a multi-purpose analyzer that can perform sentiment analysis on text and object detection in images, demonstrating how to aggregate outputs efficiently.
Setting up your Serverless Function
Let's break down the process of creating our multi-purpose analyzer into steps.
Import required libraries
First, import the necessary libraries:
import runpod
import time
import random
Create Helper Functions
Define functions to simulate sentiment analysis and object detection:
def analyze_sentiment(text):
"""Simulate sentiment analysis of text."""
sentiments = ["Positive", "Neutral", "Negative"]
score = random.uniform(-1, 1)
sentiment = random.choice(sentiments)
return f"Sentiment: {sentiment}, Score: {score:.2f}"
def detect_objects(image_url):
"""Simulate object detection in an image."""
objects = ["person", "car", "dog", "cat", "tree", "building"]
detected = random.sample(objects, random.randint(1, 4))
confidences = [random.uniform(0.7, 0.99) for _ in detected]
return [f"{obj}: {conf:.2f}" for obj, conf in zip(detected, confidences)]
These functions:
- Simulate sentiment analysis, returning a random sentiment and score
- Simulate object detection, returning a list of detected objects with confidence scores
Create the main Handler Function
Now, let's create the main handler function that processes jobs and yields results:
def handler(job):
job_input = job['input']
task_type = job_input.get('task_type', 'sentiment')
items = job_input.get('items', [])
results = []
for item in items:
time.sleep(random.uniform(0.5, 2)) # Simulate processing time
if task_type == 'sentiment':
result = analyze_sentiment(item)
elif task_type == 'object_detection':
result = detect_objects(item)
else:
result = f"Unknown task type: {task_type}"
results.append(result)
yield result
return results
This handler:
- Determines the task type (sentiment analysis or object detection)
- Processes each item in the input
- Yields results incrementally
- Returns the complete list of results
Set up the Serverless Function starter
Create a function to start the serverless handler with proper configuration:
def start_handler():
def wrapper(job):
generator = handler(job)
if job.get('id') == 'local_test':
return list(generator)
return generator
runpod.serverless.start({
"handler": wrapper,
"return_aggregate_stream": True
})
if __name__ == "__main__":
start_handler()
This setup:
- Creates a wrapper to handle both local testing and RunPod environments
- Uses
return_aggregate_stream=True
to automatically aggregate yielded results
Complete code example
Here's the full code for our multi-purpose analyzer with output aggregation:
import runpod
import time
import random
def analyze_sentiment(text):
"""Simulate sentiment analysis of text."""
sentiments = ["Positive", "Neutral", "Negative"]
score = random.uniform(-1, 1)
sentiment = random.choice(sentiments)
return f"Sentiment: {sentiment}, Score: {score:.2f}"
def detect_objects(image_url):
"""Simulate object detection in an image."""
objects = ["person", "car", "dog", "cat", "tree", "building"]
detected = random.sample(objects, random.randint(1, 4))
confidences = [random.uniform(0.7, 0.99) for _ in detected]
return [f"{obj}: {conf:.2f}" for obj, conf in zip(detected, confidences)]
def handler(job):
job_input = job['input']
task_type = job_input.get('task_type', 'sentiment')
items = job_input.get('items', [])
results = []
for item in items:
time.sleep(random.uniform(0.5, 2)) # Simulate processing time
if task_type == 'sentiment':
result = analyze_sentiment(item)
elif task_type == 'object_detection':
result = detect_objects(item)
else:
result = f"Unknown task type: {task_type}"
results.append(result)
yield result
return results
def start_handler():
def wrapper(job):
generator = handler(job)
if job.get('id') == 'local_test':
return list(generator)
return generator
runpod.serverless.start({
"handler": wrapper,
"return_aggregate_stream": True
})
if __name__ == "__main__":
start_handler()
Testing your Serverless Function
To test your function locally, use these commands:
For sentiment analysis:
python your_script.py --test_input '
{
"input": {
"task_type": "sentiment",
"items": [
"I love this product!",
"The service was terrible.",
"It was okay, nothing special."
]
}
}'
For object detection:
python your_script.py --test_input '
{
"input": {
"task_type": "object_detection",
"items": [
"image1.jpg",
"image2.jpg",
"image3.jpg"
]
}
}'
Understanding the output
When you run the sentiment analysis test, you'll see output similar to this:
--- Starting Serverless Worker | Version 1.6.2 ---
INFO | test_input set, using test_input as job input.
DEBUG | Retrieved local job: {'input': {'task_type': 'sentiment', 'items': ['I love this product!', 'The service was terrible.', 'It was okay, nothing special.']}, 'id': 'local_test'}
INFO | local_test | Started.
DEBUG | local_test | Handler output: ['Sentiment: Positive, Score: 0.85', 'Sentiment: Negative, Score: -0.72', 'Sentiment: Neutral, Score: 0.12']
DEBUG | local_test | run_job return: {'output': ['Sentiment: Positive, Score: 0.85', 'Sentiment: Negative, Score: -0.72', 'Sentiment: Neutral, Score: 0.12']}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': ['Sentiment: Positive, Score: 0.85', 'Sentiment: Negative, Score: -0.72', 'Sentiment: Neutral, Score: 0.12']}
INFO | Local testing complete, exiting.
This output demonstrates:
- The serverless worker starting and processing the job
- The handler generating results for each input item
- The aggregation of results into a single list
Conclusion
You've now created a serverless function using RunPod's Python SDK that demonstrates efficient output aggregation for both local testing and production environments. This approach simplifies result handling and ensures consistent behavior across different execution contexts.
To further enhance this application, consider:
- Implementing real sentiment analysis and object detection models
- Adding error handling and logging for each processing step
- Exploring RunPod's advanced features for handling larger datasets or parallel processing
RunPod's serverless library, with features like return_aggregate_stream
, provides a powerful foundation for building scalable, efficient applications that can process and aggregate data seamlessly.