Endpoints
This documentation provides detailed instructions on how to use the RunPod Python SDK to interact with various endpoints. You can perform synchronous and asynchronous operations, stream data, and check the health status of endpoints.
Prerequisites
Before using the RunPod Python, ensure that you have:
- Installed the RunPod Python SDK.
- Configured your API key.
Set your Endpoint Id
Pass your Endpoint Id on the Endpoint
class.
import runpodimport osrunpod.api_key = os.getenv("RUNPOD_API_KEY")endpoint = runpod.Endpoint("YOUR_ENDPOINT_ID")
This allows all calls to pass through your Endpoint Id with a valid API key.
In most situations, you'll set a variable name endpoint
on the Endpoint
class.
This allows you to use the following methods or instances variables from the Endpoint
class:
Run the Endpoint
Run the Endpoint with the either the asynchronous run
or synchronous run_sync
method.
Choosing between asynchronous and synchronous execution hinges on your task's needs and application design.
-
Asynchronous methods: Choose the asynchronous method for handling tasks efficiently, especially when immediate feedback isn't crucial. They allow your application to stay responsive by running time-consuming operations in the background, ideal for:
- Non-blocking calls: Keep your application active while waiting on long processes.
- Long-running operations: Avoid timeouts on tasks over 30 seconds, letting your app's workflow continue smoothly.
- Job tracking: Get a Job Id to monitor task status, useful for complex or delayed-result operations.
-
Synchronous methods: Choose the synchronous method for these when your application requires immediate results from operations. They're best for:
- Immediate results: Necessary for operations where quick outcomes are essential to continue with your app's logic.
- Short operations: Ideal for tasks under 30 seconds to prevent application delays.
- Simplicity and control: Provides a straightforward execution process, with timeout settings for better operational control.
Run synchronously
To execute an endpoint synchronously and wait for the result, use the run_sync
method.
This method blocks the execution until the endpoint run is complete or until it times out.
import runpodimport osrunpod.api_key = os.getenv("RUNPOD_API_KEY")endpoint = runpod.Endpoint("YOUR_ENDPOINT_ID")try: run_request = endpoint.run_sync( { "input": { "prompt": "Hello, world!", } }, timeout=60, # Timeout in seconds. ) print(run_request)except TimeoutError: print("Job timed out.")
Run asynchronously
Asynchronous execution allows for non-blocking operations, enabling your code to perform other tasks while waiting for an operation to complete. RunPod supports both standard asynchronous execution and advanced asynchronous programming with Python's asyncio framework.
Depending on your application's needs, you can choose the approach that best suits your scenario.
For non-blocking operations, use the run
method.
This method allows you to start an endpoint run and then check its status or wait for its completion at a later time.
Asynchronous execution
This executes a standard Python environment without requiring an asynchronous event loop.
- Python
- Output
import runpodimport osrunpod.api_key = os.getenv("RUNPOD_API_KEY")input_payload = {"input": {"prompt": "Hello, World!"}}try: endpoint = runpod.Endpoint("YOUR_ENDPOINT_ID") run_request = endpoint.run(input_payload) # Initial check without blocking, useful for quick tasks status = run_request.status() print(f"Initial job status: {status}") if status != "COMPLETED": # Polling with timeout for long-running tasks output = run_request.output(timeout=60) else: output = run_request.output() print(f"Job output: {output}")except Exception as e: print(f"An error occurred: {e}")
Initial job status: IN_QUEUEJob output: {'input_tokens': 24, 'output_tokens': 16, 'text': ["Hello! How may I assist you today?\n"]}
Asynchronous execution with asyncio
Use Python's asyncio
library for handling concurrent Endpoint calls efficiently.
This method embraces Python's asyncio framework for asynchronous programming, requiring functions to be defined with async and called with await.
This approach is inherently non-blocking and is built to handle concurrency efficiently.
- Python
- Output
import asyncioimport aiohttpimport osimport runpodfrom runpod import AsyncioEndpoint, AsyncioJob# asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # For Windows users.runpod.api_key = os.getenv("RUNPOD_API_KEY")async def main(): async with aiohttp.ClientSession() as session: input_payload = {"prompt": "Hello, World!"} endpoint = AsyncioEndpoint("YOUR_ENDPOINT_ID", session) job: AsyncioJob = await endpoint.run(input_payload) # Polling job status while True: status = await job.status() print(f"Current job status: {status}") if status == "COMPLETED": output = await job.output() print("Job output:", output) break # Exit the loop once the job is completed. elif status in ["FAILED"]: print("Job failed or encountered an error.") break else: print("Job in queue or processing. Waiting 3 seconds...") await asyncio.sleep(3) # Wait for 3 seconds before polling againif __name__ == "__main__": asyncio.run(main())
Current job status: IN_QUEUEJob in queue or processing. Waiting 3 seconds...Current job status: COMPLETEDJob output: {'input_tokens': 24, 'output_tokens': 16, 'text': ['Hello! How may I assist you today?\n']}
Health check
Monitor the health of an endpoint by checking its status, including jobs completed, failed, in progress, in queue, and retried, as well as the status of workers.
- Python
- Output
import runpodimport jsonimport osrunpod.api_key = os.getenv("RUNPOD_API_KEY")endpoint = runpod.Endpoint("gwp4kx5yd3nur1")endpoint_health = endpoint.health()print(json.dumps(endpoint_health, indent=2))
{ "jobs": { "completed": 100, "failed": 0, "inProgress": 0, "inQueue": 0, "retried": 0 }, "workers": { "idle": 1, "initializing": 0, "ready": 1, "running": 0, "throttled": 0 }}
Streaming
To enable streaming, your handler must support the "return_aggregate_stream": True
option on the start
method of your Handler.
Once enabled, use the stream
method to receive data as it becomes available.
- Endpoint
- Handler
import runpodrunpod.api_key = os.getenv("RUNPOD_API_KEY")endpoint = runpod.Endpoint("YOUR_ENDPOINT_ID")run_request = endpoint.run( { "input": { "prompt": "Hello, world!", } })for output in run_request.stream(): print(output)
from time import sleepimport runpoddef handler(job): job_input = job["input"]["prompt"] for i in job_input: sleep(1) # sleep for 1 second for effect yield irunpod.serverless.start( { "handler": handler, "return_aggregate_stream": True, # Ensures aggregated results are streamed back })
Status
Returns the status of the Job request.
Set the status()
function on the run request to return the status of the Job.
- Python
- Output
import runpodrunpod.api_key = os.getenv("RUNPOD_API_KEY")input_payload = {"input": {"prompt": "Hello, World!"}}try: endpoint = runpod.Endpoint("YOUR_ENDPOINT_ID") run_request = endpoint.run(input_payload) # Initial check without blocking, useful for quick tasks status = run_request.status() print(f"Initial job status: {status}") if status != "COMPLETED": # Polling with timeout for long-running tasks output = run_request.output(timeout=60) else: output = run_request.output() print(f"Job output: {output}")except Exception as e: print(f"An error occurred: {e}")
Initial job status: IN_QUEUEJob output: Hello, World!
Cancel
You can cancel a Job request by using the cancel()
function on the run request.
You might want to cancel a Job because it's stuck with a status of IN_QUEUE
or IN_PROGRESS
, or because you no longer need the result.
The following pattern cancels a job given a human interaction, for example pressing Ctrl+C
in the terminal.
This sends a SIGINT
signal to the running Job by catching the KeyboardInterrupt
exception.
- Python
- Output
import timeimport runpodrunpod.api_key = os.getenv("RUNPOD_API_KEY")input_payload = { "messages": [{"role": "user", "content": f"Hello, World"}], "max_tokens": 2048, "use_openai_format": True,}try: endpoint = runpod.Endpoint("YOUR_ENDPOINT_ID") run_request = rp_endpoint.run(input_payload) while True: status = run_request.status() print(f"Current job status: {status}") if status == "COMPLETED": output = run_request.output() print("Job output:", output) generated_text = ( output.get("choices", [{}])[0].get("message", {}).get("content") ) print(generated_text) break elif status in ["FAILED", "ERROR"]: print("Job failed to complete successfully.") break else: time.sleep(10)except KeyboardInterrupt: # Catch KeyboardInterrupt print("KeyboardInterrupt detected. Canceling the job...") if run_request: # Check if a job is active run_request.cancel() print("Job canceled.")except Exception as e: print(f"An error occurred: {e}")
Current job status: IN_QUEUECurrent job status: IN_PROGRESSKeyboardInterrupt detected. Canceling the job...Job canceled.
Timeout
Use the cancel()
function and the timeout
argument to cancel the Job after a specified time.
In the previous cancel()
example, the Job is canceled due to an external condition.
In this example, you can cancel a running Job that has taken too long to complete.
- Python
- Output
from time import sleepimport runpodimport osrunpod.api_key = os.getenv("RUNPOD_API_KEY")input_payload = {"input": {"prompt": "Hello, World!"}}endpoint = runpod.Endpoint("YOUR_ENDPOINT_ID")# Submit the job requestrun_request = endpoint.run(input_payload)# Retrieve and print the initial job statusinitial_status = run_request.status()print(f"Initial job status: {initial_status}")# Attempt to cancel the job after a specified timeout period (in seconds)# Note: This demonstrates an immediate cancellation for demonstration purposes.# Typically, you'd set the timeout based on expected job completion time.run_request.cancel(timeout=3)# Wait for the timeout period to ensure the cancellation takes effectsleep(3)print("Sleeping for 3 seconds to allow for job cancellation...")# Check and print the job status after the sleep periodfinal_status = run_request.status()print(f"Final job status: {final_status}")
Initial job status: IN_QUEUESleeping for 3 seconds to allow for job cancellation...Final job status: CANCELLED
Purge queue
You can purge all jobs from a queue by using the purge_queue()
function.
You can provide the timeout
parameter to specify how long to wait for the server to respond before purging the queue.
purge_queue()
doesn't affect Jobs in progress.
import runpodimport osrunpod.api_key = os.getenv("RUNPOD_API_KEY")endpoint = runpod.Endpoint("YOUR_ENDPOINT_ID")endpoint.purge_queue(timeout=3)