Skip to main content
Benchmarking your Serverless workers helps you identify bottlenecks and optimize your code for performance and cost. Performance is measured by two key metrics:
  • Delay time: The time spent waiting for a worker to become available. This includes the cold start time if a new worker needs to be spun up.
  • Execution time: The time the GPU takes to process the request once the worker has received the job.

Send a test request

To gather initial metrics, use curl to send a request to your endpoint. This will initiate the job and return a request ID that you can use to poll for status.
curl -X POST https://api.runpod.ai/v2/YOUR_ENDPOINT_ID/run \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -d '{"input": {"prompt": "Hello, world!"}}'
This returns a JSON object containing the request ID. Poll the /status endpoint to get the delay time and execution time:
curl -X GET https://api.runpod.ai/v2/YOUR_ENDPOINT_ID/status/REQUEST_ID \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_API_KEY"
This returns a JSON object:
{
  "id": "1234567890",
  "status": "COMPLETED",
  "delayTime": 1000,
  "executionTime": 2000
}

Automate benchmarking

To get a statistically significant view of your worker’s performance, you should automate the benchmarking process. The following Python script sends multiple requests and calculates the minimum, maximum, and average times for both delay and execution.
benchmark.py
import requests
import time
import statistics

ENDPOINT_ID = "YOUR_ENDPOINT_ID"
API_KEY = "YOUR_API_KEY"
BASE_URL = f"https://api.runpod.ai/v2/{ENDPOINT_ID}"
HEADERS = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {API_KEY}"
}

def run_benchmark(num_requests=5):
    delay_times = []
    execution_times = []
    
    for i in range(num_requests):
        # Send request
        response = requests.post(
            f"{BASE_URL}/run",
            headers=HEADERS,
            json={"input": {"prompt": f"Test request {i+1}"}}
        )
        request_id = response.json()["id"]
        
        # Poll for completion
        while True:
            status_response = requests.get(
                f"{BASE_URL}/status/{request_id}",
                headers=HEADERS
            )
            status_data = status_response.json()
            
            if status_data["status"] == "COMPLETED":
                delay_times.append(status_data["delayTime"])
                execution_times.append(status_data["executionTime"])
                break
            elif status_data["status"] == "FAILED":
                print(f"Request {i+1} failed")
                break
            
            time.sleep(1)
    
    # Calculate statistics
    print(f"Delay Time - Min: {min(delay_times)}ms, Max: {max(delay_times)}ms, Avg: {statistics.mean(delay_times):.0f}ms")
    print(f"Execution Time - Min: {min(execution_times)}ms, Max: {max(execution_times)}ms, Avg: {statistics.mean(execution_times):.0f}ms")

if __name__ == "__main__":
    run_benchmark(num_requests=5)