As developers, we strive to write robust applications that can handle errors gracefully. This becomes especially important in asynchronous Python applications built with Asyncio. In this article, we'll explore some practical techniques for adding retry logic to Asyncio apps using Python.
The Problem: Transient Errors in Distributed Systems
Modern applications often rely on external services like databases, APIs, message queues, etc. These distributed systems can fail unexpectedly - networks flap, servers restart, load balancers route improperly. Many such failures are transient - succeeding on a retry.
Our Asyncio apps need to handle these transient errors to maintain availability. Simply failing on the first error is fragile. Equally problematic is retrying blindly without limit. This can overload failing services and cause cascading failures.
We need a resilient retry strategy - able to identify transient errors, backing off exponentially between retries, and giving up after a reasonable number of attempts. Let's see how to implement this in Python.
Asyncio Background Tasks and Transients
Consider an Asyncio application that fetches data from a web API and stores it in a database. We implement this as a background
async def fetch_and_store_data():
data = await fetch_from_api()
await store_in_database(data)
If
import asyncio
async def fetch_with_retries():
for retry in range(5):
try:
data = await fetch_from_api()
await store_in_database(data)
break
except TransientError:
if retry == 4:
raise
wait_time = 2 ** retry
print(f"Transient error, retrying in {wait_time} seconds")
await asyncio.sleep(wait_time)
async def main():
asyncio.create_task(fetch_with_retries())
asyncio.run(main())
This wraps our fetch logic in a looping task. On transient failures, it waits exponentially longer before retrying up to 5 times. After 5 failed retries, the exception bubbles up. The surrounding Asyncio app stays responsive since this runs in a background task while awaiting
This pattern works well for background data fetching, processing queues, polling APIs etc. The retries happen asynchronously without blocking the event loop. Next, let's explore another approach for synchronous failures.
Retrying Synchronous Work with Exponential Backoff
Sometimes, part of our synchronous logic in an Asyncio app needs retries. For example, making blocking HTTP requests with the
import asyncio
import requests
async def fetch_sync_with_retries():
for retry in range(5):
try:
data = await asyncio.to_thread(requests.get, url)
process_data(data)
break
except TransientError:
if retry == 4:
raise
wait_time = 2 ** retry
print(f"Transient error, retrying sync call in {wait_time} seconds")
await asyncio.sleep(wait_time)
This runs the blocking
For IO-bound sync code like this,
Handling Retry Exceptions Granularly
In complex apps with multiple external services, we often want fine-grained control over retries. For example:
We can encapsulate this logic in a reusable
from functools import wraps
RETRY_ERRORS = (ConnectionError, HTTP_500)
def retry(num_retries=5, on_errors=RETRY_ERRORS):
def wrap(func):
@wraps(func)
async def wrapped(*args, **kwargs):
for retry in range(num_retries):
try:
return await func(*args, **kwargs)
except on_errors as e:
if retry == num_retries-1:
raise
wait = calc_backoff(retry)
print(f"Retrying {func} in {wait} seconds")
await asyncio.sleep(wait)
return wrapped
return wrap
@retry(on_errors=ConnectionError, num_retries=10)
async def connect_to_service():
# implementation
Now individual functions can be decorated with custom retry behavior.
The key ideas are:
Together this provides a resilient retry capability for Asyncio apps.
There are more advanced approaches like tenacity for complex retry scenarios. But the patterns above serve most basic Asyncio retry needs.