Build a concurrent handler
Learn how to implement concurrent handlers to process multiple requests simultaneously with a single worker.
What you’ll learn
In this guide you will learn how to:
- Create an asynchronous handler function.
- Create a concurrency modifier to dynamically adjust concurrency levels.
- Optimize worker resources based on request patterns.
- Test your concurrent handler locally.
Requirements
- You’ve created a RunPod account.
- You’ve installed the RunPod SDK (
pip install runpod
). - You know how to build a basic handler function.
Step 1: Set up your environment
First, set up a virtual environment and install the necessary packages:
Step 2: Create a concurrent handler file
Create a file named concurrent_handler.py
and add the following code:
The process_request
function uses the async
keyword, enabling it to use non-blocking I/O operations with await
. This allows the function to pause during I/O operations (simulated with asyncio.sleep()
) and handle other requests while waiting.
The update_request_rate
function simulates monitoring request patterns for adaptive scaling. This example uses a simple random number generator to simulate changing request patterns. In a production environment, you would:
- Track actual request counts and response times.
- Monitor system resource usage, such as CPU and memory.
- Adjust concurrency based on real performance metrics.
Step 3: Implement dynamic concurrency adjustment
Let’s enhance our handler with dynamic concurrency adjustment. This will allow your worker to handle more requests during high traffic periods and conserve resources during low traffic periods.
Replace the placeholder adjust_concurrency
function with this improved version:
Let’s break down how this function works:
-
Control parameters:
max_concurrency = 10
: Sets an upper limit on concurrency to prevent resource exhaustion.min_concurrency = 1
: Ensures at least one request can be processed at a time.high_request_rate_threshold = 50
: Defines when to consider traffic “high”.
You can adjust these parameters based on your specific workload.
-
Scaling up logic:
This increases concurrency by 1 when:
- The request rate exceeds our threshold (50 requests).
- We haven’t reached our maximum concurrency limit.
-
Scaling down logic:
This decreases concurrency by 1 when:
- The request rate is at or below our threshold.
- We’re above our minimum concurrency level.
-
Default behavior:
If neither condition is met, maintain the current concurrency level.
With these enhancements, your concurrent handler will now dynamically adjust its concurrency level based on the observed request rate, optimizing resource usage and responsiveness.
Step 4: Create a test input file
Now we’re ready to test our handler. Create a file named test_input.json
to test your handler locally:
Step 5: Test your handler locally
Run your handler to verify that it works correctly:
You should see output similar to this:
(Optional) Step 6: Implement real metrics collection
In a production environment, you should to replace the update_request_rate
function with real metrics collection. Here is an example how you could built this functionality:
Next steps
Now that you’ve created a concurrent handler, you’re ready to: