> ## Documentation Index
> Fetch the complete documentation index at: https://docs.runpod.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Build a load balancing vLLM endpoint

> Learn how to deploy a custom vLLM server to a load balancing Serverless endpoint.

This tutorial shows how to build a vLLM application using FastAPI and deploy it as a load balancing Serverless endpoint on Runpod.

<Tip>
  To get a basic understanding of how to build a load balancing worker (or for more general use cases), see [Build a load balancing worker](/serverless/load-balancing/build-a-worker).
</Tip>

## Requirements

Before you begin you'll need:

* A Runpod account.
* Basic familiarity with Python, REST APIs, and vLLM.
* Docker installed on your local machine.

## Step 1: Create your project files

<Tip>
  You can download a preconfigured repository containing the completed code for this tutorial [on GitHub](https://github.com/runpod-workers/vllm-loadbalancer-ep/).
</Tip>

Start by creating a new directory for your project:

```bash theme={"theme":{"light":"github-light","dark":"github-dark"}}
mkdir vllm_worker
cd vllm_worker
```

Then, create the following files and directories:

```bash theme={"theme":{"light":"github-light","dark":"github-dark"}}
touch Dockerfile
touch requirements.txt
mkdir src
touch src/handler.py
touch src/models.py
touch src/utils.py
```

Your project structure should now look like this:

<Tree>
  <Tree.Folder name="vllm_worker" defaultOpen>
    <Tree.File name="Dockerfile" />

    <Tree.File name="requirements.txt" />

    <Tree.Folder name="src" defaultOpen>
      <Tree.File name="handler.py" />

      <Tree.File name="models.py" />

      <Tree.File name="utils.py" />
    </Tree.Folder>
  </Tree.Folder>
</Tree>

## Step 2: Define data models

We'll start by creating the data models that define the structure of your API. These models specify what data your endpoints expect to receive and what they'll return.

Add the following code to `src/models.py`:

```python theme={"theme":{"light":"github-light","dark":"github-dark"}}
from typing import Optional, List, Union, Literal
from pydantic import BaseModel, Field

class ChatMessage(BaseModel):
    role: Literal["system", "user", "assistant"]
    content: str

class GenerationRequest(BaseModel):
    prompt: str
    max_tokens: int = Field(default=512, ge=1, le=4096)
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    top_p: float = Field(default=0.9, ge=0.0, le=1.0)
    top_k: int = Field(default=-1, ge=-1)
    frequency_penalty: float = Field(default=0.0, ge=-2.0, le=2.0)
    presence_penalty: float = Field(default=0.0, ge=-2.0, le=2.0)
    stop: Optional[Union[str, List[str]]] = None
    stream: bool = Field(default=False)

class GenerationResponse(BaseModel):
    text: str
    finish_reason: str
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int

class ChatCompletionRequest(BaseModel):
    messages: List[ChatMessage]
    max_tokens: int = Field(default=512, ge=1, le=4096)
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    top_p: float = Field(default=0.9, ge=0.0, le=1.0)
    stop: Optional[Union[str, List[str]]] = None
    stream: bool = Field(default=False)

class ErrorResponse(BaseModel):
    error: str
    detail: str
    request_id: Optional[str] = None
```

The `GenerationRequest` and `ChatCompletionRequest` models specify what data clients need to send, while `GenerationResponse` and `ErrorResponse` define what they'll receive back.

Each data model includes validation rules using Pydantic's `Field` function to ensure parameters stay within acceptable ranges.

## Step 3: Create utility functions

Next, we'll create a few helper functions to support the main application. These utilities handle common tasks like formatting chat prompts and creating standardized error responses.

Add the following code to `src/utils.py`:

```python theme={"theme":{"light":"github-light","dark":"github-dark"}}
from typing import List
from transformers import AutoTokenizer
from .models import ChatMessage, ErrorResponse


def get_tokenizer(model_name: str):
    """Get tokenizer for the given model"""
    return AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)


def format_chat_prompt(messages: List[ChatMessage], model_name: str) -> str:
    """Format messages using the model's chat template"""
    tokenizer = get_tokenizer(model_name)

    # Use model's built-in chat template if available
    if hasattr(tokenizer, 'apply_chat_template'):
        message_dicts = [{"role": msg.role, "content": msg.content} for msg in messages]
        return tokenizer.apply_chat_template(
            message_dicts,
            tokenize=False,
            add_generation_prompt=True
        )

    # Fallback to common format
    formatted_prompt = ""
    for message in messages:
        if message.role == "system":
            formatted_prompt += f"System: {message.content}\n\n"
        elif message.role == "user":
            formatted_prompt += f"Human: {message.content}\n\n"
        elif message.role == "assistant":
            formatted_prompt += f"Assistant: {message.content}\n\n"

    formatted_prompt += "Assistant: "
    return formatted_prompt


def create_error_response(error: str, detail: str, request_id: str = None) -> ErrorResponse:
    return ErrorResponse(error=error, detail=detail, request_id=request_id)
```

The `format_chat_prompt` function converts chat-style conversations into the text format expected by language models. It first tries to use the model's built-in chat template, then falls back to a generic format if that's not available.

The `create_error_response` function provides a consistent way to generate error messages throughout your application.

## Step 4: Build the main FastAPI application

Now we'll build the main application file, `src/handler.py`. This file acts as the orchestrator, bringing together the models and utilities we just created. It uses FastAPI to create the server, defines the API endpoints, and manages the vLLM engine's lifecycle.

Add the following code to `src/handler.py`:

```python theme={"theme":{"light":"github-light","dark":"github-dark"}}
from fastapi import FastAPI, HTTPException, status
from fastapi.responses import StreamingResponse, JSONResponse
from contextlib import asynccontextmanager
from typing import Optional, AsyncGenerator
import json
import logging
import os
import uvicorn
from vllm import AsyncLLMEngine
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.sampling_params import SamplingParams
from vllm.utils import random_uuid
from utils import format_chat_prompt, create_error_response
from .models import GenerationRequest, GenerationResponse, ChatCompletionRequest

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
    ]
)
logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(_: FastAPI):
    """Initialize the vLLM engine on startup and cleanup on shutdown"""
    # Startup
    await create_engine()
    yield
    # Shutdown cleanup
    global engine, engine_ready
    if engine:
        logger.info("Shutting down vLLM engine...")
        # vLLM AsyncLLMEngine doesn't have an explicit shutdown method,
        # but we can clean up our references
        engine = None
        engine_ready = False
        logger.info("vLLM engine shutdown complete")


app = FastAPI(title="vLLM Load Balancing Server", version="1.0.0", lifespan=lifespan)


# Global variables
engine: Optional[AsyncLLMEngine] = None
engine_ready = False


async def create_engine():
    """Initialize the vLLM engine"""
    global engine, engine_ready
    
    try:
        # Get model name from environment variable
        model_name = os.getenv("MODEL_NAME", "microsoft/DialoGPT-medium")
        
        # Configure engine arguments
        engine_args = AsyncEngineArgs(
            model=model_name,
            tensor_parallel_size=int(os.getenv("TENSOR_PARALLEL_SIZE", "1")),
            dtype=os.getenv("DTYPE", "auto"),
            trust_remote_code=os.getenv("TRUST_REMOTE_CODE", "true").lower() == "true",
            max_model_len=int(os.getenv("MAX_MODEL_LEN")) if os.getenv("MAX_MODEL_LEN") else None,
            gpu_memory_utilization=float(os.getenv("GPU_MEMORY_UTILIZATION", "0.9")),
            enforce_eager=os.getenv("ENFORCE_EAGER", "false").lower() == "true",
        )
        
        # Create the engine
        engine = AsyncLLMEngine.from_engine_args(engine_args)
        engine_ready = True
        logger.info(f"vLLM engine initialized successfully with model: {model_name}")
        
    except Exception as e:
        logger.error(f"Failed to initialize vLLM engine: {str(e)}")
        engine_ready = False
        raise


@app.get("/ping")
async def health_check():
    """Health check endpoint required by Runpod load balancer"""
    if not engine_ready:
        logger.debug("Health check: Engine initializing")
        # Return 503 when initializing
        return JSONResponse(
            content={"status": "initializing"},
            status_code=status.HTTP_204_NO_CONTENT
        )
    
    logger.debug("Health check: Engine healthy")
    # Return 200 when healthy
    return {"status": "healthy"}

@app.get("/")
async def root():
    """Root endpoint with basic info"""
    return {
        "message": "vLLM Load Balancing Server",
        "status": "ready" if engine_ready else "initializing",
        "endpoints": {
            "health": "/ping",
            "generate": "/v1/completions",
            "chat": "/v1/chat/completions"
        }
    }

@app.post("/v1/completions", response_model=GenerationResponse)
async def generate_completion(request: GenerationRequest):
    """Generate text completion"""
    logger.info(f"Received completion request: max_tokens={request.max_tokens}, temperature={request.temperature}, stream={request.stream}")
    
    if not engine_ready or engine is None:
        logger.warning("Completion request rejected: Engine not ready")
        error_response = create_error_response("ServiceUnavailable", "Engine not ready")
        raise HTTPException(status_code=503, detail=error_response.model_dump())
    
    try:
        # Create sampling parameters
        sampling_params = SamplingParams(
            max_tokens=request.max_tokens,
            temperature=request.temperature,
            top_p=request.top_p,
            top_k=request.top_k,
            frequency_penalty=request.frequency_penalty,
            presence_penalty=request.presence_penalty,
            stop=request.stop,
        )
        
        # Generate request ID
        request_id = random_uuid()
        
        if request.stream:
            return StreamingResponse(
                stream_completion(request.prompt, sampling_params, request_id),
                media_type="text/event-stream",
            )
        else:
            # Non-streaming generation
            results = engine.generate(request.prompt, sampling_params, request_id)
            final_output = None
            async for output in results:
                final_output = output
            
            if final_output is None:
                request_id = random_uuid()
                error_response = create_error_response("GenerationError", "No output generated", request_id)
                raise HTTPException(status_code=500, detail=error_response.model_dump())
            
            generated_text = final_output.outputs[0].text
            finish_reason = final_output.outputs[0].finish_reason
            
            # Calculate token counts using actual token IDs when available
            if hasattr(final_output, 'prompt_token_ids') and final_output.prompt_token_ids is not None:
                prompt_tokens = len(final_output.prompt_token_ids)
            else:
                # Fallback to approximate word count
                prompt_tokens = len(request.prompt.split())
            
            completion_tokens = len(final_output.outputs[0].token_ids)
            
            logger.info(f"Completion generated: {completion_tokens} tokens, finish_reason={finish_reason}")
            return GenerationResponse(
                text=generated_text,
                finish_reason=finish_reason,
                prompt_tokens=prompt_tokens,
                completion_tokens=completion_tokens,
                total_tokens=prompt_tokens + completion_tokens
            )
            
    except Exception as e:
        request_id = random_uuid()
        logger.error(f"Generation failed (request_id={request_id}): {str(e)}", exc_info=True)
        error_response = create_error_response("GenerationError", f"Generation failed: {str(e)}", request_id)
        raise HTTPException(status_code=500, detail=error_response.model_dump())

async def stream_completion(prompt: str, sampling_params: SamplingParams, request_id: str) -> AsyncGenerator[str, None]:
    """Stream completion generator"""
    try:
        results = engine.generate(prompt, sampling_params, request_id)
        async for output in results:
            for output_item in output.outputs:
                yield f"data: {json.dumps({'text': output_item.text, 'finish_reason': output_item.finish_reason})}\n\n"
        
        yield "data: [DONE]\n\n"
        
    except Exception as e:
        yield f"data: {json.dumps({'error': str(e)})}\n\n"

@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
    """OpenAI-compatible chat completions endpoint"""
    logger.info(f"Received chat completion request: {len(request.messages)} messages, max_tokens={request.max_tokens}, temperature={request.temperature}")
    
    if not engine_ready or engine is None:
        logger.warning("Chat completion request rejected: Engine not ready")
        error_response = create_error_response("ServiceUnavailable", "Engine not ready")
        raise HTTPException(status_code=503, detail=error_response.model_dump())
    
    try:
        # Extract messages and convert to prompt
        messages = request.messages
        if not messages:
            error_response = create_error_response("ValidationError", "No messages provided")
            raise HTTPException(status_code=400, detail=error_response.model_dump())
        
        # Use proper chat template formatting
        model_name = os.getenv("MODEL_NAME", "microsoft/DialoGPT-medium")
        prompt = format_chat_prompt(messages, model_name)
        
        # Create sampling parameters from request
        sampling_params = SamplingParams(
            max_tokens=request.max_tokens,
            temperature=request.temperature,
            top_p=request.top_p,
            stop=request.stop,
        )
        
        # Generate
        request_id = random_uuid()
        results = engine.generate(prompt, sampling_params, request_id)
        final_output = None
        async for output in results:
            final_output = output
        
        if final_output is None:
            error_response = create_error_response("GenerationError", "No output generated", request_id)
            raise HTTPException(status_code=500, detail=error_response.model_dump())
        
        generated_text = final_output.outputs[0].text
        completion_tokens = len(final_output.outputs[0].token_ids)
        logger.info(f"Chat completion generated: {completion_tokens} tokens, finish_reason={final_output.outputs[0].finish_reason}")
        
        # Return OpenAI-compatible response
        return {
            "id": request_id,
            "object": "chat.completion",
            "model": os.getenv("MODEL_NAME", "unknown"),
            "choices": [{
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": generated_text
                },
                "finish_reason": final_output.outputs[0].finish_reason
            }],
            "usage": {
                "prompt_tokens": len(final_output.prompt_token_ids) if hasattr(final_output, 'prompt_token_ids') and final_output.prompt_token_ids is not None else len(prompt.split()),
                "completion_tokens": len(final_output.outputs[0].token_ids),
                "total_tokens": (len(final_output.prompt_token_ids) if hasattr(final_output, 'prompt_token_ids') and final_output.prompt_token_ids is not None else len(prompt.split())) + len(final_output.outputs[0].token_ids)
            }
        }
        
    except Exception as e:
        request_id = random_uuid()
        logger.error(f"Chat completion failed (request_id={request_id}): {str(e)}", exc_info=True)
        error_response = create_error_response("ChatCompletionError", f"Chat completion failed: {str(e)}", request_id)
        raise HTTPException(status_code=500, detail=error_response.model_dump())

if __name__ == "__main__":
    # Get ports from environment variables
    port = int(os.getenv("PORT", 80))
    logger.info(f"Starting vLLM server on port {port}")
    
    # If health port is different, you'd need to run a separate health server
    # For simplicity, we're using the same port here
    
    uvicorn.run(
        app, 
        host="0.0.0.0", 
        port=port,
        log_level="info"
    )
```

This file creates a FastAPI server that manages the vLLM engine and exposes three API endpoints:

* A health check at `/ping` that tells the load balancer when your worker is ready.
* A text completion endpoint at `/v1/completions`.
* An OpenAI-compatible chat endpoint at `/v1/chat/completions`.

The application handles both streaming and non-streaming responses, manages the language model lifecycle, and includes comprehensive error handling and logging.

## Step 5: Set up dependencies and build steps

With the application code complete, we still need to define its dependencies and create a Dockerfile to package it into a container image.

1. Add the following dependencies to `requirements.txt`:

   ```
   ray
   pandas
   pyarrow
   runpod~=1.7.0
   huggingface-hub
   packaging
   typing-extensions==4.7.1
   pydantic
   pydantic-settings
   hf-transfer
   transformers<4.54.0
   ```

2. Add the following build steps to your `Dockerfile`:

   ```dockerfile theme={"theme":{"light":"github-light","dark":"github-dark"}}
   FROM nvidia/cuda:12.1.0-base-ubuntu22.04 

   RUN apt-get update -y \
       && apt-get install -y python3-pip

   RUN ldconfig /usr/local/cuda-12.1/compat/

   # Install Python dependencies
   RUN --mount=type=cache,target=/root/.cache/pip \
       python3 -m pip install --upgrade pip && \
       python3 -m pip install --upgrade -r /requirements.txt

   # Pin vLLM version for stability - 0.9.1 is latest stable as of 2024-07
   # FlashInfer provides optimized attention for better performance
   ARG VLLM_VERSION=0.9.1
   ARG CUDA_VERSION=cu121
   ARG TORCH_VERSION=torch2.3

   RUN python3 -m pip install vllm==${VLLM_VERSION} && \
       python3 -m pip install flashinfer -i https://flashinfer.ai/whl/${CUDA_VERSION}/${TORCH_VERSION}

   ENV PYTHONPATH="/:/vllm-workspace"

   COPY src /src

   WORKDIR /src

   CMD ["python3", "handler.py"]
   ```

## Step 6: Build and push your Docker image

Build and push your Docker image to a container registry:

```bash theme={"theme":{"light":"github-light","dark":"github-dark"}}
# Build the image
docker build --platform linux/amd64 -t YOUR_DOCKER_USERNAME/vllm-loadbalancer:v1.0 . 

# Push to Docker Hub
docker push YOUR_DOCKER_USERNAME/vllm-loadbalancer:v1.0
```

## Step 7: Deploy to Runpod

Now, let's deploy our application to a Serverless endpoint:

1. Go to the [Serverless page](https://www.runpod.io/console/serverless) in the Runpod console.
2. Click **New Endpoint**
3. Click **Import from Docker Registry**.
4. In the **Container Image** field, enter your Docker image URL:
   ```
   YOUR_DOCKER_USERNAME/vllm-loadbalancer:v1.0
   ```
   Then click **Next**.
5. Give your endpoint a name.
6. Under **Endpoint Type**, select **Load Balancer**.
7. Under **GPU Configuration**, select at least one GPU type (16 GB or 24 GB GPUs are fine for this example).
8. Leave all other settings at their defaults.
9. Click **Create Endpoint**.

## Step 8: Test your endpoints

<Tip>
  You can find a Python script to test your vLLM load balancer locally [on GitHub](https://github.com/runpod-workers/vllm-loadbalancer-ep/blob/main/example.py).
</Tip>

Once your endpoint has finished deploying, you can access your vLLM APIs at:

```
https://ENDPOINT_ID.api.runpod.ai/PATH
```

For example, the vLLM application we defined in step 4 exposes these endpoints:

* Health check: `https://ENDPOINT_ID.api.runpod.ai/ping`
* Generate text: `https://ENDPOINT_ID.api.runpod.ai/v1/completions`
* Chat completions: `https://ENDPOINT_ID.api.runpod.ai/v1/chat/completions`

Use the curl commands below to make test requests to your vLLM load balancer, replacing `ENDPOINT_ID` and `RUNPOD_API_KEY` with your actual values.

To run a health check:

```bash ping theme={"theme":{"light":"github-light","dark":"github-dark"}}
curl -X GET "https://ENDPOINT_ID.api.runpod.ai/ping" \
    -H 'Authorization: Bearer RUNPOD_API_KEY' \
    -H "Content-Type: application/json" \
```

For text completions:

```bash theme={"theme":{"light":"github-light","dark":"github-dark"}}
curl -X POST "https://ENDPOINT_ID.api.runpod.ai/v1/completions" \
     -H 'Authorization: Bearer RUNPOD_API_KEY' \
     -H 'Content-Type: application/json' \
     -d '{"prompt": "Once upon a time", "max_tokens": 50, "temperature": 0.8}'
```

For chat completions:

```bash theme={"theme":{"light":"github-light","dark":"github-dark"}}
curl -X POST "https://ENDPOINT_ID.api.runpod.ai/v1/chat/completions" \
     -H 'Authorization: Bearer RUNPOD_API_KEY' \
     -H 'Content-Type: application/json' \
     -d '{
       "messages": [
         {"role": "user", "content": "Tell me a short story"}
       ],
       "max_tokens": 100,
       "temperature": 0.8
     }'
```

After sending a request, your workers will take some time to initialize. You can track their progress by checking the logs in the **Workers** tab of your endpoint page.

<Tip>
  If you see: `{"error":"no workers available"}%` after running the request, this means your workers did not initialize in time to process it. If you try running the request again, this will usually resolve the issue.

  For production applications, implement a health check with retries before sending requests. See [Handling cold start errors](/serverless/load-balancing/overview#handling-cold-start-errors) for a complete code example.
</Tip>

<Check>
  Congratulations! You've created a load balancing vLLM endpoint and used it to serve a large language model.
</Check>

## Next steps

Now that you've deployed a load balancing vLLM endpoint, you can try:

* Experimenting with different models and frameworks.
* Adding authentication to your API.
* Exploring advanced FastAPI features like background tasks and WebSockets.
* Optimizing your application for performance and reliability.
