> ## Documentation Index
> Fetch the complete documentation index at: https://docs.runpod.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Overview

> Write custom handler functions to process incoming requests to your queue-based endpoints.

export const LoadBalancingEndpointTooltip = () => {
  return <Tooltip headline="Load balancing endpoint" tip="A Serverless endpoint that routes requests directly to worker HTTP servers without queuing, ideal for real-time applications and streaming. Supports custom HTTP frameworks like FastAPI or Flask." cta="Learn more about load balancing endpoints" href="/serverless/load-balancing/overview">load balancing endpoint</Tooltip>;
};

export const QueueBasedEndpointsTooltip = () => {
  return <Tooltip headline="Queue-based endpoint" tip="A Serverless endpoint that processes requests sequentially through a managed queue, providing guaranteed execution and automatic retries. Uses handler functions and standard operations like /run and /runsync." cta="Learn more about queue-based endpoints" href="/serverless/endpoints/overview#queue-based-endpoints">queue-based endpoints</Tooltip>;
};

export const WorkersTooltip = () => {
  return <Tooltip headline="Worker" tip="A container that runs your application code and processes requests to your Serverless endpoint. Workers are automatically started and stopped by Runpod to handle traffic spikes and ensure optimal resource utilization." cta="Learn more about workers" href="/serverless/workers/overview">workers</Tooltip>;
};

export const RequestsTooltip = () => {
  return <Tooltip headline="Requests" tip="HTTP requests that you send to an endpoint, which can include parameters, payloads, and headers that define what the endpoint should process." cta="Learn more about requests" href="/serverless/endpoints/send-requests">requests</Tooltip>;
};

export const JobTooltip = () => {
  return <Tooltip headline="Job" tip="A unit of work submitted to a queue-based Serverless endpoint. Jobs progress through states like IN_QUEUE, RUNNING, and COMPLETED as they are processed by workers." cta="Learn more about job states" href="/serverless/endpoints/job-states">job</Tooltip>;
};

Handler functions form the core of your Runpod Serverless applications. They define how your <WorkersTooltip /> process <RequestsTooltip /> and return results. This section covers everything you need to know about creating effective handler functions.

<Warning>
  Handler functions are only required for <QueueBasedEndpointsTooltip />. If you're building a <LoadBalancingEndpointTooltip />, you can define your own custom API endpoints using any HTTP framework of your choice (like FastAPI or Flask).
</Warning>

## Understanding job input

Before writing a handler function, make sure you understand the structure of the input. When your endpoint receives a request, it sends a JSON object to your handler function in this general format:

```json theme={"theme":{"light":"github-light","dark":"github-dark"}}
{
    "id": "eaebd6e7-6a92-4bb8-a911-f996ac5ea99d",
    "input": { 
        "key": "value" 
    }
}
```

`id` is a unique identifier for the <JobTooltip /> randomly generated by Runpod, while `input` contains data sent by the client for your handler function to process.

To learn how to structure requests to your endpoint, see [Send API requests](/serverless/endpoints/send-requests).

## Basic handler implementation

Here's a simple handler function that processes an endpoint request:

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
import runpod

def handler(job):
    job_input = job["input"]  # Access the input from the request

    # Add your custom code here to process the input

    return "Your job results"

runpod.serverless.start({"handler": handler})  # Required
```

The handler takes extracts the input from the job request, processes it, and returns a result. The `runpod.serverless.start()` function launches your serverless application with the specified handler.

## Local testing

To test your handler locally, you can create a `test_input.json` file with the input data you want to test:

```json test_input.json theme={"theme":{"light":"github-light","dark":"github-dark"}}
{
    "input": {
        "prompt": "Hey there!"
    }
}
```

Then run your handler function using your local terminal:

```sh theme={"theme":{"light":"github-light","dark":"github-dark"}}
python handler.py
```

Instead of creating a `test_input.json` file, you can also provide test input directly in the command line prompt:

```sh theme={"theme":{"light":"github-light","dark":"github-dark"}}
python handler.py --test_input '{"input": {"prompt": "Test prompt"}}'
```

For more information on local testing, including command-line flags and starting a local API server, see [Local testing](/serverless/development/local-testing).

## Handler types

You can create several types of handler functions depending on the needs of your application.

### Standard handlers

The simplest handler type, standard handlers process inputs synchronously and return them when the job is complete.

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
import runpod
import time

def handler(job):
    job_input = job["input"]
    prompt = job_input.get("prompt")
    seconds = job_input.get("seconds", 0)
    
    # Simulate processing time
    time.sleep(seconds)
    
    return prompt

runpod.serverless.start({"handler": handler})
```

### Streaming handlers

Streaming handlers stream results incrementally as they become available. Use these when your application requires real-time updates, for example when streaming results from a language model.

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
import runpod

def streaming_handler(job):
    for count in range(3):
        result = f"This is the {count} generated output."
        yield result

runpod.serverless.start({
    "handler": streaming_handler,
    "return_aggregate_stream": True  # Optional, makes results available via /run
})
```

By default, outputs from streaming handlers are only available using the `/stream` operation. Set `return_aggregate_stream` to `True` to make outputs available from the `/run` and `/runsync` operations as well.

To learn more about aggregating streaming outputs, including best practices for batch processing and handling local testing, see [Aggregate streaming outputs](/serverless/development/aggregate-outputs).

### Asynchronous handlers

Asynchronous handlers process operations concurrently for improved efficiency. Use these for tasks involving I/O operations, API calls, or processing large datasets.

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
import runpod
import asyncio

async def async_handler(job):
    for i in range(5):
        # Generate an asynchronous output token
        output = f"Generated async token output {i}"
        yield output
        
        # Simulate an asynchronous task
        await asyncio.sleep(1)
        
runpod.serverless.start({
    "handler": async_handler,
    "return_aggregate_stream": True
})
```

Async handlers allow your code to handle multiple tasks concurrently without waiting for each operation to complete. This approach offers excellent scalability for applications that deal with high-frequency requests, allowing your workers to remain responsive even under heavy load. Async handlers are also useful for streaming data scenarios and long-running tasks that produce incremental outputs.

<Tip>
  When implementing async handlers, ensure proper use of `async` and `await` keywords throughout your code to maintain truly non-blocking operations and prevent performance bottlenecks, and consider leveraging the `yield` statement to generate outputs progressively over time.

  Always test your async code thoroughly to properly handle asynchronous exceptions and edge cases, as async error patterns can be more complex than in synchronous code.
</Tip>

### Concurrent handlers

Concurrent handlers process multiple requests simultaneously with a single worker. Use these for small, rapid operations that don't fully utlize the worker's GPU.

When increasing concurrency, it's crucial to monitor memory usage carefully and test thoroughly to determine the optimal concurrency levels for your specific workload. Implement proper error handling to prevent one failing request from affecting others, and continuously monitor and adjust concurrency parameters based on real-world performance.

Learn how to build a concurrent handler by [following this guide](/serverless/workers/concurrent-handler).

## Error handling

When an exception occurs in your handler function, the Runpod SDK automatically captures it, marks the [job status](/serverless/endpoints/job-states) as `FAILED` and returns the exception details in the job results.

For custom error responses:

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
import runpod

def handler(job):
    job_input = job["input"]
    
    # Validate the presence of required inputs
    if not job_input.get("seed", False):
        return {
            "error": "Input is missing the 'seed' key. Please include a seed."
        }
    
    # Proceed if the input is valid
    return "Input validation successful."

runpod.serverless.start({"handler": handler})
```

Exercise caution when using `try/except` blocks to avoid unintentionally suppressing errors. Either return the error for a graceful failure or raise it to flag the job as `FAILED`.

## Advanced handler controls

Use these features to fine-tune your Serverless applications for specific use cases.

### Progress updates

Send progress updates during job execution to inform clients about the current state of processing:

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
import runpod

def handler(job):
    for update_number in range(0, 3):
        runpod.serverless.progress_update(job, f"Update {update_number}/3")
    
    return "done"

runpod.serverless.start({"handler": handler})
```

Progress updates will be available when the job status is polled.

### Worker refresh

For long-running or complex jobs, you may want to refresh the worker after completion to start with a clean state for the next job. Enabling worker refresh clears all logs and wipes the worker state after a job is completed.

For example:

```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
# Requires runpod python version 0.9.0+
import runpod
import time

def handler(job):
    job_input = job["input"]  # Access the input from the request

    results = []
    
    # Compute results
    ...

    # Return the results and indicate the worker should be refreshed
    return {"refresh_worker": True, "job_results": results}


# Configure and start the Runpod serverless function
runpod.serverless.start(
    {
        "handler": handler,  # Required: Specify the sync handler
        "return_aggregate_stream": True,  # Optional: Aggregate results are accessible via /run operation
    }
)
```

Your handler must return a dictionary that contains the `refresh_worker` flag. This flag will be removed before the remaining job output is returned.

## Handler function best practices

A short list of best practices to keep in mind as you build your handler function:

1. **Initialize outside the handler**: Load models and other heavy resources outside your handler function to avoid repeated initialization.

   ```python handler.py theme={"theme":{"light":"github-light","dark":"github-dark"}}
   import runpod
   import torch
   from transformers import AutoModelForSequenceClassification, AutoTokenizer

   # Load model and tokenizer outside the handler
   model_name = "distilbert-base-uncased-finetuned-sst-2-english"
   tokenizer = AutoTokenizer.from_pretrained(model_name)
   model = AutoModelForSequenceClassification.from_pretrained(model_name)

   # Move model to GPU if available
   device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
   model.to(device)

   def handler(job):
       # ...

   runpod.serverless.start({"handler": handler})
   ```

2. **Input validation**: [Validate inputs](#error-handling) before processing to avoid errors during execution.

3. **Local testing**: [Test your handlers locally](/serverless/development/local-testing) before deployment.

## Payload limits

Be aware of payload size limits when designing your handler:

* `/run` operation: 10 MB
* `/runsync` operation: 20 MB

If your results exceed these limits, consider stashing them in cloud storage and returning links instead.

## Next steps

Once you've created your handler function, you can:

* [Learn how to aggregate streaming outputs.](/serverless/development/aggregate-outputs)
* [Explore flags for local testing.](/serverless/development/local-testing)
* [Create a Dockerfile for your worker.](/serverless/workers/create-dockerfile)
* [Deploy your worker image to a Serverless endpoint.](/serverless/workers/deploy)
