Load balancing endpoints are currently in beta. We’re actively addressing issues and working to improve the user experience. Join our Discord if you’d like to provide feedback.
This tutorial shows how to build a vLLM application using FastAPI and deploy it as a load balancing Serverless endpoint on Runpod.

What you’ll learn

To get a basic understanding of how to build a load balancing worker (or for more general use cases), see Build a load balancing worker.
In this tutorial you’ll learn how to:
  • Create a FastAPI application to serve your vLLM endpoints.
  • Implement proper health checks for your vLLM workers.
  • Deploy your vLLM application as a load balancing Serverless endpoint.
  • Test and interact with your vLLM APIs.

Requirements

Before you begin you’ll need:
  • A Runpod account.
  • Basic familiarity with Python, REST APIs, and vLLM.
  • Docker installed on your local machine.

Step 1: Create your project files

You can download a preconfigured repository containing the completed code for this tutorial on GitHub.
Start by creating a new directory for your project:
mkdir vllm_worker
cd vllm_worker
Then, create the following files and directories:
touch Dockerfile
touch requirements.txt
mkdir src
touch src/handler.py
touch src/models.py
touch src/utils.py
Your project structure should now look like this:
vllm_worker/
├── Dockerfile
├── requirements.txt
├── src/
    ├── handler.py
    ├── models.py
    └── utils.py

Step 2: Define data models

We’ll start by creating the data models that define the structure of your API. These models specify what data your endpoints expect to receive and what they’ll return. Add the following code to src/models.py:
from typing import Optional, List, Union, Literal
from pydantic import BaseModel, Field

class ChatMessage(BaseModel):
    role: Literal["system", "user", "assistant"]
    content: str

class GenerationRequest(BaseModel):
    prompt: str
    max_tokens: int = Field(default=512, ge=1, le=4096)
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    top_p: float = Field(default=0.9, ge=0.0, le=1.0)
    top_k: int = Field(default=-1, ge=-1)
    frequency_penalty: float = Field(default=0.0, ge=-2.0, le=2.0)
    presence_penalty: float = Field(default=0.0, ge=-2.0, le=2.0)
    stop: Optional[Union[str, List[str]]] = None
    stream: bool = Field(default=False)

class GenerationResponse(BaseModel):
    text: str
    finish_reason: str
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int

class ChatCompletionRequest(BaseModel):
    messages: List[ChatMessage]
    max_tokens: int = Field(default=512, ge=1, le=4096)
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    top_p: float = Field(default=0.9, ge=0.0, le=1.0)
    stop: Optional[Union[str, List[str]]] = None
    stream: bool = Field(default=False)

class ErrorResponse(BaseModel):
    error: str
    detail: str
    request_id: Optional[str] = None
The GenerationRequest and ChatCompletionRequest models specify what data clients need to send, while GenerationResponse and ErrorResponse define what they’ll receive back. Each data model includes validation rules using Pydantic’s Field function to ensure parameters stay within acceptable ranges.

Step 3: Create utility functions

Next, we’ll create a few helper functions to support the main application. These utilities handle common tasks like formatting chat prompts and creating standardized error responses. Add the following code to src/utils.py:
from typing import List
from transformers import AutoTokenizer
from .models import ChatMessage, ErrorResponse


def get_tokenizer(model_name: str):
    """Get tokenizer for the given model"""
    return AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)


def format_chat_prompt(messages: List[ChatMessage], model_name: str) -> str:
    """Format messages using the model's chat template"""
    tokenizer = get_tokenizer(model_name)

    # Use model's built-in chat template if available
    if hasattr(tokenizer, 'apply_chat_template'):
        message_dicts = [{"role": msg.role, "content": msg.content} for msg in messages]
        return tokenizer.apply_chat_template(
            message_dicts,
            tokenize=False,
            add_generation_prompt=True
        )

    # Fallback to common format
    formatted_prompt = ""
    for message in messages:
        if message.role == "system":
            formatted_prompt += f"System: {message.content}\n\n"
        elif message.role == "user":
            formatted_prompt += f"Human: {message.content}\n\n"
        elif message.role == "assistant":
            formatted_prompt += f"Assistant: {message.content}\n\n"

    formatted_prompt += "Assistant: "
    return formatted_prompt


def create_error_response(error: str, detail: str, request_id: str = None) -> ErrorResponse:
    return ErrorResponse(error=error, detail=detail, request_id=request_id)
The format_chat_prompt function converts chat-style conversations into the text format expected by language models. It first tries to use the model’s built-in chat template, then falls back to a generic format if that’s not available. The create_error_response function provides a consistent way to generate error messages throughout your application.

Step 4: Build the main FastAPI application

Now we’ll build the main application file, src/handler.py. This file acts as the orchestrator, bringing together the models and utilities we just created. It uses FastAPI to create the server, defines the API endpoints, and manages the vLLM engine’s lifecycle. Add the following code to src/handler.py:
from fastapi import FastAPI, HTTPException, status
from fastapi.responses import StreamingResponse, JSONResponse
from contextlib import asynccontextmanager
from typing import Optional, AsyncGenerator
import json
import logging
import os
import uvicorn
from vllm import AsyncLLMEngine
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.sampling_params import SamplingParams
from vllm.utils import random_uuid
from utils import format_chat_prompt, create_error_response
from .models import GenerationRequest, GenerationResponse, ChatCompletionRequest

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
    ]
)
logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(_: FastAPI):
    """Initialize the vLLM engine on startup and cleanup on shutdown"""
    # Startup
    await create_engine()
    yield
    # Shutdown cleanup
    global engine, engine_ready
    if engine:
        logger.info("Shutting down vLLM engine...")
        # vLLM AsyncLLMEngine doesn't have an explicit shutdown method,
        # but we can clean up our references
        engine = None
        engine_ready = False
        logger.info("vLLM engine shutdown complete")


app = FastAPI(title="vLLM Load Balancing Server", version="1.0.0", lifespan=lifespan)


# Global variables
engine: Optional[AsyncLLMEngine] = None
engine_ready = False


async def create_engine():
    """Initialize the vLLM engine"""
    global engine, engine_ready
    
    try:
        # Get model name from environment variable
        model_name = os.getenv("MODEL_NAME", "microsoft/DialoGPT-medium")
        
        # Configure engine arguments
        engine_args = AsyncEngineArgs(
            model=model_name,
            tensor_parallel_size=int(os.getenv("TENSOR_PARALLEL_SIZE", "1")),
            dtype=os.getenv("DTYPE", "auto"),
            trust_remote_code=os.getenv("TRUST_REMOTE_CODE", "true").lower() == "true",
            max_model_len=int(os.getenv("MAX_MODEL_LEN")) if os.getenv("MAX_MODEL_LEN") else None,
            gpu_memory_utilization=float(os.getenv("GPU_MEMORY_UTILIZATION", "0.9")),
            enforce_eager=os.getenv("ENFORCE_EAGER", "false").lower() == "true",
        )
        
        # Create the engine
        engine = AsyncLLMEngine.from_engine_args(engine_args)
        engine_ready = True
        logger.info(f"vLLM engine initialized successfully with model: {model_name}")
        
    except Exception as e:
        logger.error(f"Failed to initialize vLLM engine: {str(e)}")
        engine_ready = False
        raise


@app.get("/ping")
async def health_check():
    """Health check endpoint required by Runpod load balancer"""
    if not engine_ready:
        logger.debug("Health check: Engine initializing")
        # Return 503 when initializing
        return JSONResponse(
            content={"status": "initializing"},
            status_code=status.HTTP_204_NO_CONTENT
        )
    
    logger.debug("Health check: Engine healthy")
    # Return 200 when healthy
    return {"status": "healthy"}

@app.get("/")
async def root():
    """Root endpoint with basic info"""
    return {
        "message": "vLLM Load Balancing Server",
        "status": "ready" if engine_ready else "initializing",
        "endpoints": {
            "health": "/ping",
            "generate": "/v1/completions",
            "chat": "/v1/chat/completions"
        }
    }

@app.post("/v1/completions", response_model=GenerationResponse)
async def generate_completion(request: GenerationRequest):
    """Generate text completion"""
    logger.info(f"Received completion request: max_tokens={request.max_tokens}, temperature={request.temperature}, stream={request.stream}")
    
    if not engine_ready or engine is None:
        logger.warning("Completion request rejected: Engine not ready")
        error_response = create_error_response("ServiceUnavailable", "Engine not ready")
        raise HTTPException(status_code=503, detail=error_response.model_dump())
    
    try:
        # Create sampling parameters
        sampling_params = SamplingParams(
            max_tokens=request.max_tokens,
            temperature=request.temperature,
            top_p=request.top_p,
            top_k=request.top_k,
            frequency_penalty=request.frequency_penalty,
            presence_penalty=request.presence_penalty,
            stop=request.stop,
        )
        
        # Generate request ID
        request_id = random_uuid()
        
        if request.stream:
            return StreamingResponse(
                stream_completion(request.prompt, sampling_params, request_id),
                media_type="text/event-stream",
            )
        else:
            # Non-streaming generation
            results = engine.generate(request.prompt, sampling_params, request_id)
            final_output = None
            async for output in results:
                final_output = output
            
            if final_output is None:
                request_id = random_uuid()
                error_response = create_error_response("GenerationError", "No output generated", request_id)
                raise HTTPException(status_code=500, detail=error_response.model_dump())
            
            generated_text = final_output.outputs[0].text
            finish_reason = final_output.outputs[0].finish_reason
            
            # Calculate token counts using actual token IDs when available
            if hasattr(final_output, 'prompt_token_ids') and final_output.prompt_token_ids is not None:
                prompt_tokens = len(final_output.prompt_token_ids)
            else:
                # Fallback to approximate word count
                prompt_tokens = len(request.prompt.split())
            
            completion_tokens = len(final_output.outputs[0].token_ids)
            
            logger.info(f"Completion generated: {completion_tokens} tokens, finish_reason={finish_reason}")
            return GenerationResponse(
                text=generated_text,
                finish_reason=finish_reason,
                prompt_tokens=prompt_tokens,
                completion_tokens=completion_tokens,
                total_tokens=prompt_tokens + completion_tokens
            )
            
    except Exception as e:
        request_id = random_uuid()
        logger.error(f"Generation failed (request_id={request_id}): {str(e)}", exc_info=True)
        error_response = create_error_response("GenerationError", f"Generation failed: {str(e)}", request_id)
        raise HTTPException(status_code=500, detail=error_response.model_dump())

async def stream_completion(prompt: str, sampling_params: SamplingParams, request_id: str) -> AsyncGenerator[str, None]:
    """Stream completion generator"""
    try:
        results = engine.generate(prompt, sampling_params, request_id)
        async for output in results:
            for output_item in output.outputs:
                yield f"data: {json.dumps({'text': output_item.text, 'finish_reason': output_item.finish_reason})}\n\n"
        
        yield "data: [DONE]\n\n"
        
    except Exception as e:
        yield f"data: {json.dumps({'error': str(e)})}\n\n"

@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
    """OpenAI-compatible chat completions endpoint"""
    logger.info(f"Received chat completion request: {len(request.messages)} messages, max_tokens={request.max_tokens}, temperature={request.temperature}")
    
    if not engine_ready or engine is None:
        logger.warning("Chat completion request rejected: Engine not ready")
        error_response = create_error_response("ServiceUnavailable", "Engine not ready")
        raise HTTPException(status_code=503, detail=error_response.model_dump())
    
    try:
        # Extract messages and convert to prompt
        messages = request.messages
        if not messages:
            error_response = create_error_response("ValidationError", "No messages provided")
            raise HTTPException(status_code=400, detail=error_response.model_dump())
        
        # Use proper chat template formatting
        model_name = os.getenv("MODEL_NAME", "microsoft/DialoGPT-medium")
        prompt = format_chat_prompt(messages, model_name)
        
        # Create sampling parameters from request
        sampling_params = SamplingParams(
            max_tokens=request.max_tokens,
            temperature=request.temperature,
            top_p=request.top_p,
            stop=request.stop,
        )
        
        # Generate
        request_id = random_uuid()
        results = engine.generate(prompt, sampling_params, request_id)
        final_output = None
        async for output in results:
            final_output = output
        
        if final_output is None:
            error_response = create_error_response("GenerationError", "No output generated", request_id)
            raise HTTPException(status_code=500, detail=error_response.model_dump())
        
        generated_text = final_output.outputs[0].text
        completion_tokens = len(final_output.outputs[0].token_ids)
        logger.info(f"Chat completion generated: {completion_tokens} tokens, finish_reason={final_output.outputs[0].finish_reason}")
        
        # Return OpenAI-compatible response
        return {
            "id": request_id,
            "object": "chat.completion",
            "model": os.getenv("MODEL_NAME", "unknown"),
            "choices": [{
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": generated_text
                },
                "finish_reason": final_output.outputs[0].finish_reason
            }],
            "usage": {
                "prompt_tokens": len(final_output.prompt_token_ids) if hasattr(final_output, 'prompt_token_ids') and final_output.prompt_token_ids is not None else len(prompt.split()),
                "completion_tokens": len(final_output.outputs[0].token_ids),
                "total_tokens": (len(final_output.prompt_token_ids) if hasattr(final_output, 'prompt_token_ids') and final_output.prompt_token_ids is not None else len(prompt.split())) + len(final_output.outputs[0].token_ids)
            }
        }
        
    except Exception as e:
        request_id = random_uuid()
        logger.error(f"Chat completion failed (request_id={request_id}): {str(e)}", exc_info=True)
        error_response = create_error_response("ChatCompletionError", f"Chat completion failed: {str(e)}", request_id)
        raise HTTPException(status_code=500, detail=error_response.model_dump())

if __name__ == "__main__":
    # Get ports from environment variables
    port = int(os.getenv("PORT", 8000))
    logger.info(f"Starting vLLM server on port {port}")
    
    # If health port is different, you'd need to run a separate health server
    # For simplicity, we're using the same port here
    
    uvicorn.run(
        app, 
        host="0.0.0.0", 
        port=port,
        log_level="info"
    )
This file creates a FastAPI server that manages the vLLM engine and exposes three API endpoints:
  • A health check at /ping that tells the load balancer when your worker is ready.
  • A text completion endpoint at /v1/completions.
  • An OpenAI-compatible chat endpoint at /v1/chat/completions.
The application handles both streaming and non-streaming responses, manages the language model lifecycle, and includes comprehensive error handling and logging.

Step 5: Set up dependencies and build steps

With the application code complete, we still need to define its dependencies and create a Dockerfile to package it into a container image.
  1. Add the following dependencies to requirements.txt:
    ray
    pandas
    pyarrow
    runpod~=1.7.0
    huggingface-hub
    packaging
    typing-extensions==4.7.1
    pydantic
    pydantic-settings
    hf-transfer
    transformers<4.54.0
    
  2. Add the following build steps to your Dockerfile:
    FROM nvidia/cuda:12.1.0-base-ubuntu22.04 
    
    RUN apt-get update -y \
        && apt-get install -y python3-pip
    
    RUN ldconfig /usr/local/cuda-12.1/compat/
    
    # Install Python dependencies
    RUN --mount=type=cache,target=/root/.cache/pip \
        python3 -m pip install --upgrade pip && \
        python3 -m pip install --upgrade -r /requirements.txt
    
    # Pin vLLM version for stability - 0.9.1 is latest stable as of 2024-07
    # FlashInfer provides optimized attention for better performance
    ARG VLLM_VERSION=0.9.1
    ARG CUDA_VERSION=cu121
    ARG TORCH_VERSION=torch2.3
    
    RUN python3 -m pip install vllm==${VLLM_VERSION} && \
        python3 -m pip install flashinfer -i https://flashinfer.ai/whl/${CUDA_VERSION}/${TORCH_VERSION}
    
    ENV PYTHONPATH="/:/vllm-workspace"
    
    COPY src /src
    
    WORKDIR /src
    
    CMD ["python3", "handler.py"]
    

Step 6: Build and push your Docker image

Build and push your Docker image to a container registry:
# Build the image
docker build --platform linux/amd64 -t YOUR_DOCKER_USERNAME/vllm-loadbalancer:v1.0 . 

# Push to Docker Hub
docker push YOUR_DOCKER_USERNAME/vllm-loadbalancer:v1.0

Step 7: Deploy to Runpod

Now, let’s deploy our application to a Serverless endpoint:
  1. Go to the Serverless page in the Runpod console.
  2. Click New Endpoint
  3. Under Custom Source, select Docker Image, then click Next
  4. In the Container Image field, enter your Docker image URL:
    YOUR_DOCKER_USERNAME/vllm-loadbalancer:v1.0
    
    Then click Next.
  5. Give your endpoint a name.
  6. Under Endpoint Type, select Load Balancer.
  7. Under Worker Configuration, select at least one GPU type (16 GB or 24 GB are fine for this example).
  8. Leave all other settings at their defaults.
  9. Click Create Endpoint.

Step 8: Test your endpoints

You can find a Python script to test your vLLM load balancer locally on GitHub.
Once your endpoint has finished deploying, you can access your vLLM APIs at:
https://ENDPOINT_ID.api.runpod.ai/PATH
For example, the vLLM application we defined in step 4 exposes these endpoints:
  • Health check: https://ENDPOINT_ID.api.runpod.ai/ping
  • Generate text: https://ENDPOINT_ID.api.runpod.ai/v1/completions
  • Chat completions: https://ENDPOINT_ID.api.runpod.ai/v1/chat/completions
Use the curl commands below to make test requests to your vLLM load balancer, replacing ENDPOINT_ID and RUNPOD_API_KEY with your actual values. To run a health check:
ping
curl -X GET "https://ENDPOINT_ID.api.runpod.ai/ping" \
    -H 'Authorization: Bearer RUNPOD_API_KEY' \
    -H "Content-Type: application/json" \
For text completions:
curl -X POST "https://ENDPOINT_ID.api.runpod.ai/v1/completions" \
     -H 'Authorization: Bearer RUNPOD_API_KEY' \
     -H 'Content-Type: application/json' \
     -d '{"prompt": "Once upon a time", "max_tokens": 50, "temperature": 0.8}'
For chat completions:
curl -X POST "https://ENDPOINT_ID.api.runpod.ai/v1/chat/completions" \
     -H 'Authorization: Bearer RUNPOD_API_KEY' \
     -H 'Content-Type: application/json' \
     -d '{
       "messages": [
         {"role": "user", "content": "Tell me a short story"}
       ],
       "max_tokens": 100,
       "temperature": 0.8
     }'
After sending a request, your workers will take some time to initialize. You can track their progress by checking the logs in the Workers tab of your endpoint page.
If you see the following error:
{"error":"no workers available"}%
This means your workers did not initialize in time to process the request. If you try running the request again, this will usually resolve the issue.
Congrats! You’ve created a load balancing vLLM endpoint and used it to serve a large language model.

Next steps

Now that you’ve deployed a load balancing vLLM endpoint, you can try:
  • Experimenting with different models and frameworks.
  • Adding authentication to your API.
  • Exploring advanced FastAPI features like background tasks and WebSockets.
  • Optimizing your application for performance and reliability.