Skip to main content
In Flash, endpoints are the bridge between your local Python functions and Runpod’s cloud infrastructure. When you decorate a function with @Endpoint, you’re marking it to run remotely on Runpod instead of your local machine:
from runpod_flash import Endpoint, GpuType

@Endpoint(
    name="my-inference",
    gpu=GpuType.NVIDIA_GEFORCE_RTX_4090,
    dependencies=["torch"]
)
def run_model(data):
    import torch
    # This code runs on a Runpod GPU, not locally
    return {"result": "processed"}
When you call run_model(data), Flash provisions a GPU on Runpod (or reuses an existing one), sends your function code and input to the worker, executes it, and returns the result to your local environment. Each unique endpoint name creates one Serverless endpoint on Runpod with its own URL, scaling configuration, and hardware allocation. The endpoint manages workers that scale up and down based on demand.

Endpoint types

The Endpoint class supports four distinct patterns.

Queue-based endpoints

Use @Endpoint(...) as a decorator for batch processing and async workloads. Each function gets its own endpoint with dedicated workers.
from runpod_flash import Endpoint, GpuType

@Endpoint(
    name="image-processor",
    gpu=GpuType.NVIDIA_GEFORCE_RTX_4090,
    workers=(0, 5),
    dependencies=["torch", "pillow"]
)
async def process_image(image_data: dict) -> dict:
    import torch
    from PIL import Image
    # Process image on GPU
    return {"processed": True}
Queue-based endpoints are ideal for:
  • Batch processing jobs
  • Long-running computations
  • Workloads that don’t need immediate responses

Load-balanced endpoints

Use Endpoint(...) as an instance with route decorators for HTTP APIs. Multiple routes share the same workers.
from runpod_flash import Endpoint, GpuType

api = Endpoint(
    name="inference-api",
    gpu=GpuType.NVIDIA_GEFORCE_RTX_4090,
    workers=(1, 5)
)

@api.post("/predict")
async def predict(data: dict) -> dict:
    import torch
    # Run inference
    return {"prediction": "result"}

@api.get("/health")
async def health():
    return {"status": "ok"}
Load-balanced endpoints are ideal for:
  • REST APIs with multiple routes
  • Low-latency request/response patterns
  • Services requiring custom HTTP methods

Custom Docker images

Deploy pre-built Docker images (like vLLM or your own workers) and interact with them as a client:
from runpod_flash import Endpoint, GpuType

vllm = Endpoint(
    name="vllm-server",
    image="vllm/vllm-openai:latest",
    gpu=GpuType.NVIDIA_A100_80GB_PCIe
)

# Make HTTP calls to the deployed image
result = await vllm.post("/v1/completions", {"prompt": "Hello"})
models = await vllm.get("/v1/models")
See Custom Docker images for complete documentation, including available images and configuration options.

Existing endpoints

Connect to an already-deployed Runpod endpoint by ID:
from runpod_flash import Endpoint

ep = Endpoint(id="abc123")

# Queue-based calls
job = await ep.run({"prompt": "hello"})
await job.wait()
print(job.output)

# Or load-balanced calls
result = await ep.post("/v1/completions", {"prompt": "hello"})

GPU vs CPU

Specify gpu= for GPU endpoints or cpu= for CPU endpoints. They are mutually exclusive.

GPU endpoints

from runpod_flash import Endpoint, GpuType, GpuGroup

# Use a specific GPU type
@Endpoint(name="ml-inference", gpu=GpuType.NVIDIA_A100_80GB_PCIe)
async def infer(data: dict) -> dict: ...

# Use another specific GPU type
@Endpoint(name="rtx-worker", gpu=GpuType.NVIDIA_GEFORCE_RTX_4090)
async def render(data: dict) -> dict: ...

# Use multiple GPU types for better availability
@Endpoint(name="flexible", gpu=[GpuType.NVIDIA_GEFORCE_RTX_4090, GpuType.NVIDIA_RTX_A5000])
async def process(data: dict) -> dict: ...
If neither gpu= nor cpu= is specified, GPU defaults to GpuGroup.ANY.

CPU endpoints

from runpod_flash import Endpoint, CpuInstanceType

# Use string shorthand
@Endpoint(name="data-processor", cpu="cpu5c-4-8")
async def process(data: dict) -> dict: ...

# Or use the enum
@Endpoint(name="data-processor", cpu=CpuInstanceType.CPU5C_4_8)
async def process(data: dict) -> dict: ...
See GPU types and CPU types for available options.

Worker scaling

Control how many workers run for your endpoint with the workers parameter:
# Just a max: scales from 0 to 5
@Endpoint(name="elastic", gpu=GpuGroup.ANY, workers=5)

# Min and max tuple: always keep 2 warm, scale up to 10
@Endpoint(name="always-on", gpu=GpuGroup.ANY, workers=(2, 10))

# Default is (0, 1) if not specified
@Endpoint(name="default", gpu=GpuGroup.ANY)
Setting workers=(1, N) keeps at least one worker warm, avoiding cold starts.

Dependency management

Specify Python packages in the dependencies parameter. Flash installs these on the remote worker before executing your function.
@Endpoint(
    name="text-gen",
    gpu=GpuType.NVIDIA_A100_80GB_PCIe,
    dependencies=["transformers==4.36.0", "torch", "pillow"]
)
def generate_text(prompt):
    from transformers import pipeline
    import torch
    # Your code here

Version pinning

Use standard pip syntax for version constraints:
dependencies=["transformers==4.36.0", "torch>=2.0.0", "numpy<2.0"]

Import packages inside the function body

You must import packages inside the decorated function body, not at the top of your file. This ensures imports happen on the remote worker. Correct: imports inside the function.
@Endpoint(name="compute", gpu=GpuGroup.ANY, dependencies=["numpy"])
def compute(data):
    import numpy as np  # Import here
    return np.sum(data)
Incorrect: imports at top of file won’t work.
import numpy as np  # This import happens locally, not on the worker

@Endpoint(name="compute", gpu=GpuGroup.ANY, dependencies=["numpy"])
def compute(data):
    return np.sum(data)  # numpy not available on the remote worker

System dependencies

Use system_dependencies to install system-level packages (via apt):
@Endpoint(
    name="video-processor",
    gpu=GpuGroup.ANY,
    dependencies=["opencv-python"],
    system_dependencies=["libgl1-mesa-glx", "libglib2.0-0"]
)
async def process_video(video_data):
    import cv2
    # OpenCV processing
    return {"processed": True}

Parallel execution

Endpoint functions are async. Use Python’s asyncio to run multiple operations concurrently:
import asyncio

async def main():
    # Run three functions in parallel
    results = await asyncio.gather(
        process_item(item1),
        process_item(item2),
        process_item(item3)
    )
    return results
This is useful for:
  • Batch processing multiple inputs
  • Running different models on the same data
  • Parallelizing independent pipeline stages

Environment variables

Pass environment variables using the env parameter:
@Endpoint(
    name="api-worker",
    gpu=GpuGroup.ANY,
    env={
        "HF_TOKEN": "your_huggingface_token",
        "MODEL_ID": "gpt2"
    }
)
async def load_model():
    import os
    from transformers import AutoModel

    hf_token = os.getenv("HF_TOKEN")
    model_id = os.getenv("MODEL_ID")

    model = AutoModel.from_pretrained(model_id, token=hf_token)
    return {"model_loaded": model_id}
Environment variables are excluded from configuration hashing. Changing environment values won’t trigger endpoint recreation, making it easy to rotate API keys.

Persistent storage

Attach a network volume for persistent storage across workers. Flash uses the volume name to find an existing volume or create a new one:
from runpod_flash import Endpoint, GpuGroup, NetworkVolume

vol = NetworkVolume(name="model-cache")  # Finds existing or creates new

@Endpoint(
    name="model-server",
    gpu=GpuGroup.ANY,
    volume=vol
)
async def serve(data: dict) -> dict:
    # Access files at /runpod-volume/
    ...
See Flash storage for setup instructions.

Endpoint parameters

For a complete list of parameters available for the Endpoint class, see Endpoint parameters.

Working with jobs (client mode)

When using Endpoint(id=...) or Endpoint(image=...), you get an EndpointJob object for async operations:
ep = Endpoint(id="abc123")

# Submit a job
job = await ep.run({"prompt": "hello"})

# Check status
status = await job.status()  # "IN_PROGRESS", "COMPLETED", etc.

# Wait for completion
await job.wait(timeout=60)  # Optional timeout in seconds

# Access results
print(job.id)      # Job ID
print(job.output)  # Result payload
print(job.error)   # Error message if failed
print(job.done)    # True if completed/failed

# Cancel a job
await job.cancel()

Next steps