Building an Async Generator Handler for Weather Data Simulation
This tutorial will guide you through creating a serverless function using RunPod's Python SDK that simulates fetching weather data for multiple cities concurrently.
Use asynchronous functions to handle multiple concurrent operations efficiently, especially when dealing with tasks that involve waiting for external resources, such as network requests or I/O operations. Asynchronous programming allows your code to perform other tasks while waiting, rather than blocking the entire program. This is particularly useful in a serverless environment where you want to maximize resource utilization and minimize response times.
We'll use an async generator handler to stream results incrementally, demonstrating how to manage multiple concurrent operations efficiently in a serverless environment.
Setting up your Serverless Function
Let's break down the process of creating our weather data simulator into steps.
SImport required libraries
First, import the necessary libraries:
import runpod
import asyncio
import random
import json
import sys
Create the Weather Data Fetcher
Define an asynchronous function that simulates fetching weather data:
async def fetch_weather_data(city, delay):
await asyncio.sleep(delay)
temperature = random.uniform(-10, 40)
humidity = random.uniform(0, 100)
return {
"city": city,
"temperature": round(temperature, 1),
"humidity": round(humidity, 1)
}
This function:
- Simulates a network delay using
asyncio.sleep()
- Generates random temperature and humidity data
- Returns a dictionary with the weather data for a city
Create the Async Generator Handler
Now, let's create the main handler function:
async def async_generator_handler(job):
job_input = job['input']
cities = job_input.get('cities', ['New York', 'London', 'Tokyo', 'Sydney', 'Moscow'])
update_interval = job_input.get('update_interval', 2)
duration = job_input.get('duration', 10)
print(f"Weather Data Stream | Starting job {job['id']}")
print(f"Monitoring cities: {', '.join(cities)}")
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < duration:
tasks = [fetch_weather_data(city, random.uniform(0.5, 2)) for city in cities]
for completed_task in asyncio.as_completed(tasks):
weather_data = await completed_task
yield {
"timestamp": round(asyncio.get_event_loop().time() - start_time, 2),
"data": weather_data
}
await asyncio.sleep(update_interval)
yield {"status": "completed", "message": "Weather monitoring completed"}
This handler:
- Extracts parameters from the job input
- Logs the start of the job
- Creates tasks for fetching weather data for each city
- Uses
asyncio.as_completed()
to yield results as they become available - Continues fetching data at specified intervals for the given duration
Set up the Main Execution
Finally, Set up the main execution block:
async def run_test(job):
async for item in async_generator_handler(job):
print(json.dumps(item))
if __name__ == "__main__":
if "--test_input" in sys.argv:
# Code for local testing (see full example)
else:
runpod.serverless.start({
"handler": async_generator_handler,
"return_aggregate_stream": True
})
This block allows for both local testing and deployment as a RunPod serverless function.
Complete code example
Here's the full code for our serverless weather data simulator:
import runpod
import asyncio
import random
import json
import sys
async def fetch_weather_data(city, delay):
await asyncio.sleep(delay)
temperature = random.uniform(-10, 40)
humidity = random.uniform(0, 100)
return {
"city": city,
"temperature": round(temperature, 1),
"humidity": round(humidity, 1)
}
async def async_generator_handler(job):
job_input = job['input']
cities = job_input.get('cities', ['New York', 'London', 'Tokyo', 'Sydney', 'Moscow'])
update_interval = job_input.get('update_interval', 2)
duration = job_input.get('duration', 10)
print(f"Weather Data Stream | Starting job {job['id']}")
print(f"Monitoring cities: {', '.join(cities)}")
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < duration:
tasks = [fetch_weather_data(city, random.uniform(0.5, 2)) for city in cities]
for completed_task in asyncio.as_completed(tasks):
weather_data = await completed_task
yield {
"timestamp": round(asyncio.get_event_loop().time() - start_time, 2),
"data": weather_data
}
await asyncio.sleep(update_interval)
yield {"status": "completed", "message": "Weather monitoring completed"}
async def run_test(job):
async for item in async_generator_handler(job):
print(json.dumps(item))
if __name__ == "__main__":
if "--test_input" in sys.argv:
test_input_index = sys.argv.index("--test_input")
if test_input_index + 1 < len(sys.argv):
test_input_json = sys.argv[test_input_index + 1]
try:
job = json.loads(test_input_json)
asyncio.run(run_test(job))
except json.JSONDecodeError:
print("Error: Invalid JSON in test_input")
else:
print("Error: --test_input requires a JSON string argument")
else:
runpod.serverless.start({
"handler": async_generator_handler,
"return_aggregate_stream": True
})
Testing Your Serverless Function
To test your function locally, use this command:
python your_script.py --test_input '
{
"input": {
"cities": ["New York", "London", "Tokyo", "Paris", "Sydney"],
"update_interval": 3,
"duration": 15
},
"id": "local_test"
}'
Understanding the output
When you run the test, you'll see output similar to this:
Weather Data Stream | Starting job local_test
Monitoring cities: New York, London, Tokyo, Paris, Sydney
{"timestamp": 0.84, "data": {"city": "London", "temperature": 11.0, "humidity": 7.3}}
{"timestamp": 0.99, "data": {"city": "Paris", "temperature": -5.9, "humidity": 59.3}}
{"timestamp": 1.75, "data": {"city": "Tokyo", "temperature": 18.4, "humidity": 34.1}}
{"timestamp": 1.8, "data": {"city": "Sydney", "temperature": 26.8, "humidity": 91.0}}
{"timestamp": 1.99, "data": {"city": "New York", "temperature": 35.9, "humidity": 27.5}}
{"status": "completed", "message": "Weather monitoring completed"}
This output demonstrates:
- The concurrent processing of weather data for multiple cities
- Real-time updates with timestamps
- A completion message when the monitoring duration is reached
Conclusion
You've now created a serverless function using RunPod's Python SDK that simulates concurrent weather data fetching for multiple cities. This example showcases how to handle multiple asynchronous operations and stream results incrementally in a serverless environment.
To further enhance this application, consider:
- Implementing real API calls to fetch actual weather data
- Adding error handling for network failures or API limits
- Exploring RunPod's documentation for advanced features like scaling for high-concurrency scenarios
RunPod's serverless library provides a powerful foundation for building scalable, efficient applications that can process and stream data concurrently in real-time without the need to manage infrastructure.