Write custom handler functions to process incoming requests to your queue-based endpoints.
Handler functions form the core of your Runpod Serverless applications. They define how your process and return results. This section covers everything you need to know about creating effective handler functions.
Handler functions are only required for . If you’re building a , you can define your own custom API endpoints using any HTTP framework of your choice (like FastAPI or Flask).
Before writing a handler function, make sure you understand the structure of the input. When your endpoint receives a request, it sends a JSON object to your handler function in this general format:
id is a unique identifier for the randomly generated by Runpod, while input contains data sent by the client for your handler function to process.To learn how to structure requests to your endpoint, see Send API requests.
Here’s a simple handler function that processes an endpoint request:
handler.py
Copy
import runpoddef handler(job): job_input = job["input"] # Access the input from the request # Add your custom code here to process the input return "Your job results"runpod.serverless.start({"handler": handler}) # Required
The handler takes extracts the input from the job request, processes it, and returns a result. The runpod.serverless.start() function launches your serverless application with the specified handler.
Streaming handlers stream results incrementally as they become available. Use these when your application requires real-time updates, for example when streaming results from a language model.
handler.py
Copy
import runpoddef streaming_handler(job): for count in range(3): result = f"This is the {count} generated output." yield resultrunpod.serverless.start({ "handler": streaming_handler, "return_aggregate_stream": True # Optional, makes results available via /run})
By default, outputs from streaming handlers are only available using the /stream operation. Set return_aggregate_stream to True to make outputs available from the /run and /runsync operations as well.
Asynchronous handlers process operations concurrently for improved efficiency. Use these for tasks involving I/O operations, API calls, or processing large datasets.
handler.py
Copy
import runpodimport asyncioasync def async_handler(job): for i in range(5): # Generate an asynchronous output token output = f"Generated async token output {i}" yield output # Simulate an asynchronous task await asyncio.sleep(1)runpod.serverless.start({ "handler": async_handler, "return_aggregate_stream": True})
Async handlers allow your code to handle multiple tasks concurrently without waiting for each operation to complete. This approach offers excellent scalability for applications that deal with high-frequency requests, allowing your workers to remain responsive even under heavy load. Async handlers are also useful for streaming data scenarios and long-running tasks that produce incremental outputs.
When implementing async handlers, ensure proper use of async and await keywords throughout your code to maintain truly non-blocking operations and prevent performance bottlenecks, and consider leveraging the yield statement to generate outputs progressively over time.Always test your async code thoroughly to properly handle asynchronous exceptions and edge cases, as async error patterns can be more complex than in synchronous code.
Concurrent handlers process multiple requests simultaneously with a single worker. Use these for small, rapid operations that don’t fully utlize the worker’s GPU.When increasing concurrency, it’s crucial to monitor memory usage carefully and test thoroughly to determine the optimal concurrency levels for your specific workload. Implement proper error handling to prevent one failing request from affecting others, and continuously monitor and adjust concurrency parameters based on real-world performance.Learn how to build a concurrent handler by following this guide.
When an exception occurs in your handler function, the Runpod SDK automatically captures it, marks the job status as FAILED and returns the exception details in the job results.For custom error responses:
handler.py
Copy
import runpoddef handler(job): job_input = job["input"] # Validate the presence of required inputs if not job_input.get("seed", False): return { "error": "Input is missing the 'seed' key. Please include a seed." } # Proceed if the input is valid return "Input validation successful."runpod.serverless.start({"handler": handler})
Exercise caution when using try/except blocks to avoid unintentionally suppressing errors. Either return the error for a graceful failure or raise it to flag the job as FAILED.
For long-running or complex jobs, you may want to refresh the worker after completion to start with a clean state for the next job. Enabling worker refresh clears all logs and wipes the worker state after a job is completed.For example:
handler.py
Copy
# Requires runpod python version 0.9.0+import runpodimport timedef handler(job): job_input = job["input"] # Access the input from the request results = [] # Compute results ... # Return the results and indicate the worker should be refreshed return {"refresh_worker": True, "job_results": results}# Configure and start the Runpod serverless functionrunpod.serverless.start( { "handler": handler, # Required: Specify the sync handler "return_aggregate_stream": True, # Optional: Aggregate results are accessible via /run operation })
Your handler must return a dictionary that contains the refresh_worker flag. This flag will be removed before the remaining job output is returned.