Skip to main content

Building an Async Generator Handler for Weather Data Simulation

This tutorial will guide you through creating a serverless function using RunPod's Python SDK that simulates fetching weather data for multiple cities concurrently.

Use asynchronous functions to handle multiple concurrent operations efficiently, especially when dealing with tasks that involve waiting for external resources, such as network requests or I/O operations. Asynchronous programming allows your code to perform other tasks while waiting, rather than blocking the entire program. This is particularly useful in a serverless environment where you want to maximize resource utilization and minimize response times.

We'll use an async generator handler to stream results incrementally, demonstrating how to manage multiple concurrent operations efficiently in a serverless environment.

Setting up your Serverless Function

Let's break down the process of creating our weather data simulator into steps.

SImport required libraries

First, import the necessary libraries:

import runpod
import asyncio
import random
import json
import sys

Create the Weather Data Fetcher

Define an asynchronous function that simulates fetching weather data:

async def fetch_weather_data(city, delay):
await asyncio.sleep(delay)
temperature = random.uniform(-10, 40)
humidity = random.uniform(0, 100)
return {
"city": city,
"temperature": round(temperature, 1),
"humidity": round(humidity, 1)
}

This function:

  1. Simulates a network delay using asyncio.sleep()
  2. Generates random temperature and humidity data
  3. Returns a dictionary with the weather data for a city

Create the Async Generator Handler

Now, let's create the main handler function:

async def async_generator_handler(job):
job_input = job['input']
cities = job_input.get('cities', ['New York', 'London', 'Tokyo', 'Sydney', 'Moscow'])
update_interval = job_input.get('update_interval', 2)
duration = job_input.get('duration', 10)

print(f"Weather Data Stream | Starting job {job['id']}")
print(f"Monitoring cities: {', '.join(cities)}")

start_time = asyncio.get_event_loop().time()

while asyncio.get_event_loop().time() - start_time < duration:
tasks = [fetch_weather_data(city, random.uniform(0.5, 2)) for city in cities]
for completed_task in asyncio.as_completed(tasks):
weather_data = await completed_task
yield {
"timestamp": round(asyncio.get_event_loop().time() - start_time, 2),
"data": weather_data
}

await asyncio.sleep(update_interval)

yield {"status": "completed", "message": "Weather monitoring completed"}

This handler:

  1. Extracts parameters from the job input
  2. Logs the start of the job
  3. Creates tasks for fetching weather data for each city
  4. Uses asyncio.as_completed() to yield results as they become available
  5. Continues fetching data at specified intervals for the given duration

Set up the Main Execution

Finally, Set up the main execution block:

async def run_test(job):
async for item in async_generator_handler(job):
print(json.dumps(item))

if __name__ == "__main__":
if "--test_input" in sys.argv:
# Code for local testing (see full example)
else:
runpod.serverless.start({
"handler": async_generator_handler,
"return_aggregate_stream": True
})

This block allows for both local testing and deployment as a RunPod serverless function.

Complete code example

Here's the full code for our serverless weather data simulator:

import runpod
import asyncio
import random
import json
import sys

async def fetch_weather_data(city, delay):
await asyncio.sleep(delay)
temperature = random.uniform(-10, 40)
humidity = random.uniform(0, 100)
return {
"city": city,
"temperature": round(temperature, 1),
"humidity": round(humidity, 1)
}

async def async_generator_handler(job):
job_input = job['input']
cities = job_input.get('cities', ['New York', 'London', 'Tokyo', 'Sydney', 'Moscow'])
update_interval = job_input.get('update_interval', 2)
duration = job_input.get('duration', 10)

print(f"Weather Data Stream | Starting job {job['id']}")
print(f"Monitoring cities: {', '.join(cities)}")

start_time = asyncio.get_event_loop().time()

while asyncio.get_event_loop().time() - start_time < duration:
tasks = [fetch_weather_data(city, random.uniform(0.5, 2)) for city in cities]
for completed_task in asyncio.as_completed(tasks):
weather_data = await completed_task
yield {
"timestamp": round(asyncio.get_event_loop().time() - start_time, 2),
"data": weather_data
}

await asyncio.sleep(update_interval)

yield {"status": "completed", "message": "Weather monitoring completed"}

async def run_test(job):
async for item in async_generator_handler(job):
print(json.dumps(item))

if __name__ == "__main__":
if "--test_input" in sys.argv:
test_input_index = sys.argv.index("--test_input")
if test_input_index + 1 < len(sys.argv):
test_input_json = sys.argv[test_input_index + 1]
try:
job = json.loads(test_input_json)
asyncio.run(run_test(job))
except json.JSONDecodeError:
print("Error: Invalid JSON in test_input")
else:
print("Error: --test_input requires a JSON string argument")
else:
runpod.serverless.start({
"handler": async_generator_handler,
"return_aggregate_stream": True
})

Testing Your Serverless Function

To test your function locally, use this command:

python your_script.py --test_input '
{
"input": {
"cities": ["New York", "London", "Tokyo", "Paris", "Sydney"],
"update_interval": 3,
"duration": 15
},
"id": "local_test"
}'

Understanding the output

When you run the test, you'll see output similar to this:

Weather Data Stream | Starting job local_test
Monitoring cities: New York, London, Tokyo, Paris, Sydney
{"timestamp": 0.84, "data": {"city": "London", "temperature": 11.0, "humidity": 7.3}}
{"timestamp": 0.99, "data": {"city": "Paris", "temperature": -5.9, "humidity": 59.3}}
{"timestamp": 1.75, "data": {"city": "Tokyo", "temperature": 18.4, "humidity": 34.1}}
{"timestamp": 1.8, "data": {"city": "Sydney", "temperature": 26.8, "humidity": 91.0}}
{"timestamp": 1.99, "data": {"city": "New York", "temperature": 35.9, "humidity": 27.5}}
{"status": "completed", "message": "Weather monitoring completed"}

This output demonstrates:

  1. The concurrent processing of weather data for multiple cities
  2. Real-time updates with timestamps
  3. A completion message when the monitoring duration is reached

Conclusion

You've now created a serverless function using RunPod's Python SDK that simulates concurrent weather data fetching for multiple cities. This example showcases how to handle multiple asynchronous operations and stream results incrementally in a serverless environment.

To further enhance this application, consider:

  • Implementing real API calls to fetch actual weather data
  • Adding error handling for network failures or API limits
  • Exploring RunPod's documentation for advanced features like scaling for high-concurrency scenarios

RunPod's serverless library provides a powerful foundation for building scalable, efficient applications that can process and stream data concurrently in real-time without the need to manage infrastructure.