> ## Documentation Index
> Fetch the complete documentation index at: https://docs.runpod.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Aggregate streaming outputs

> Automatically collect and aggregate yielded results from streaming handler functions.

export const ServerlessTooltip = () => {
  return <Tooltip headline="Serverless" tip="A cloud computing platform that allows you to deploy AI/ML applications without provisioning or managing servers." cta="Learn more about Serverless" href="/serverless/overview">Serverless</Tooltip>;
};

export const HandlerFunctionTooltip = () => {
  return <Tooltip headline="Handler function" tip="The core of a Runpod Serverless application. These functions define how a worker processes incoming requests and returns results." cta="Learn more about handler functions" href="/serverless/workers/handler-functions">handler function</Tooltip>;
};

When building a streaming <HandlerFunctionTooltip /> that yields results incrementally, you can use the `return_aggregate_stream` feature to automatically collect all yielded outputs into a single aggregated response. This simplifies result handling by eliminating the need to manually collect and format streaming results, making your handlers easier to implement and consume.

This guide shows you how to use output aggregation effectively in your <ServerlessTooltip /> applications.

## Understanding output aggregation

By default, streaming handlers that yield results are only accessible via the `/stream` operation, which returns results as they become available. When you enable `return_aggregate_stream`, Runpod automatically:

1. Collects all yielded results as your handler produces them.
2. Aggregates them into a single list.
3. Makes the complete aggregated results available via `/run` and `/runsync` operations.

This allows clients to choose between streaming individual results as they arrive or waiting for the complete aggregated response.

## Basic aggregation example

Here's a simple handler that processes multiple items and yields results incrementally:

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
import runpod

def handler(job):
    job_input = job["input"]
    items = job_input.get("items", [])

    results = []
    for item in items:
        # Process each item
        result = f"Processed: {item}"

        # Yield result immediately for streaming
        yield result

        # Also collect for final return
        results.append(result)

    # Return complete list
    return results

runpod.serverless.start({
    "handler": handler,
    "return_aggregate_stream": True
})
```

When a client calls this handler with multiple items, they can:

* Use `/stream` to receive each result as it's yielded.
* Use `/run` or `/runsync` to receive all results aggregated into a list.

## Processing multiple items

A common pattern is processing a batch of items and yielding results as each completes. This is useful for tasks like:

* Analyzing sentiment for multiple text samples.
* Generating images from multiple prompts.
* Running inference on multiple inputs.

Here's a practical example:

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
import runpod
import time

def analyze_items(items, task_type):
    """Process items based on task type."""
    results = []

    for item in items:
        # Simulate processing time
        time.sleep(0.5)

        # Process based on type
        if task_type == "sentiment":
            result = {"text": item, "sentiment": "positive", "score": 0.92}
        elif task_type == "classify":
            result = {"text": item, "category": "technology", "confidence": 0.88}
        else:
            result = {"error": f"Unknown task type: {task_type}"}

        # Yield each result as it completes
        yield result
        results.append(result)

    return results

def handler(job):
    job_input = job["input"]
    task_type = job_input.get("task_type", "sentiment")
    items = job_input.get("items", [])

    # Validate input
    if not items:
        return {"error": "No items provided"}

    # Process items and yield results
    return analyze_items(items, task_type)

runpod.serverless.start({
    "handler": handler,
    "return_aggregate_stream": True
})
```

This handler processes each item sequentially, yielding results immediately while building a complete list to return.

## Local testing with aggregation

When testing locally, generators behave differently than in production. You need to handle the difference between the local test environment and production:

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
import runpod

def handler(job):
    job_input = job["input"]
    items = job_input.get("items", [])

    for item in items:
        result = f"Processed: {item}"
        yield result

def start_handler():
    """Wrapper to handle local testing vs. production."""
    def wrapper(job):
        generator = handler(job)

        # In local testing, convert generator to list
        if job.get("id") == "local_test":
            return list(generator)

        # In production, return the generator
        return generator

    runpod.serverless.start({
        "handler": wrapper,
        "return_aggregate_stream": True
    })

if __name__ == "__main__":
    start_handler()
```

The wrapper function checks if the job ID is `local_test` (indicating local testing) and converts the generator to a list. In production, it returns the generator directly, allowing Runpod to handle the aggregation.

## Testing locally

Create a test input file to verify your aggregation works correctly:

```json test_input.json theme={"theme":{"light":"github-light","dark":"github-dark"}}
{
  "input": {
    "task_type": "sentiment",
    "items": [
      "I love this product!",
      "The service was okay.",
      "Not great, could be better."
    ]
  }
}
```

Run your handler:

```bash theme={"theme":{"light":"github-light","dark":"github-dark"}}
python handler.py --test_input '{"input": {"task_type": "sentiment", "items": ["Item 1", "Item 2", "Item 3"]}}'
```

You should see output showing each result being processed and the final aggregated list:

```bash theme={"theme":{"light":"github-light","dark":"github-dark"}}
--- Starting Serverless Worker |  Version 1.6.2 ---
INFO   | Using test_input.json as job input.
DEBUG  | Retrieved local job: {'input': {'task_type': 'sentiment', 'items': ['Item 1', 'Item 2', 'Item 3']}, 'id': 'local_test'}
INFO   | local_test | Started.
DEBUG  | local_test | Handler output: ['Processed: Item 1', 'Processed: Item 2', 'Processed: Item 3']
INFO   | Job local_test completed successfully.
```

## Understanding the output format

When `return_aggregate_stream` is enabled, the final output structure includes all yielded results in a list:

**Without aggregation** (streaming only):

* Results arrive one at a time via `/stream`.
* No combined output available via `/run` or `/runsync`.

**With aggregation enabled:**

* Individual results still available via `/stream` as they're yielded.
* Complete aggregated list available via `/run` and `/runsync`:

```json theme={"theme":{"light":"github-light","dark":"github-dark"}}
{
  "output": [
    {"text": "Item 1", "sentiment": "positive", "score": 0.92},
    {"text": "Item 2", "sentiment": "neutral", "score": 0.54},
    {"text": "Item 3", "sentiment": "negative", "score": 0.78}
  ]
}
```

## When to use output aggregation

Use `return_aggregate_stream` for:

* **Batch processing**: Processing multiple items and clients need the complete set of results.
* **Progress tracking**: Clients want to see incremental progress but also need the final aggregated results.
* **Flexible consumption**: Supporting both streaming and batch consumption patterns.
* **Simplified integration**: Clients don't want to implement streaming logic but still benefit from incremental processing.

Don't use it for:

* **Large result sets**: Aggregating thousands of results can create memory pressure and large response payloads.
* **True streaming only**: Results should only be consumed as a stream (like real-time chat).
* **Single result**: Handler only returns one result (no need for aggregation).

## Best practices

1. **Memory management**: Be mindful of memory usage when aggregating large numbers of results.

2. **Payload limits**: Remember the payload size limits:

   * `/run` operation: 10 MB
   * `/runsync` operation: 20 MB

   If aggregated results exceed these limits, consider using streaming only or storing results in cloud storage.

3. **Error handling**: Handle errors for individual items without breaking the entire batch:

   ```python theme={"theme":{"light":"github-light","dark":"github-dark"}}
   def handler(job):
       items = job["input"].get("items", [])

       for item in items:
           try:
               result = process_item(item)
               yield {"success": True, "result": result}
           except Exception as e:
               yield {"success": False, "error": str(e), "item": item}
   ```

4. **Consistent output structure**: Yield results in a consistent format to simplify client-side processing.

## Combining with async handlers

You can also use aggregation with async handlers for concurrent processing:

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
import runpod
import asyncio

async def async_handler(job):
    items = job["input"].get("items", [])

    for item in items:
        # Simulate async processing
        await asyncio.sleep(0.5)

        result = f"Async processed: {item}"
        yield result

runpod.serverless.start({
    "handler": async_handler,
    "return_aggregate_stream": True
})
```

This combines the benefits of async processing with automatic output aggregation.

## Next steps

* Learn more about [streaming handlers](/serverless/workers/handler-functions#streaming-handlers).
* Explore [async handlers](/serverless/workers/handler-functions#asynchronous-handlers) for concurrent processing.
* Understand [error handling](/serverless/workers/handler-functions#error-handling) for robust batch processing.
* Review [payload limits](/serverless/workers/handler-functions#payload-limits) to avoid oversized responses.
