Asyncio queues provide a great way to pass data between asynchronous tasks in Python. They enable building scalable asynchronous I/O flows without some of the downsides of threads or processes. In this post, we'll dive into how to use asyncio queues effectively in your Python applications.
What are Asyncio Queues?
An asyncio queue is a first in, first out (FIFO) data structure designed for use in asynchronous programming with Python's asyncio module. Asyncio queues are thread and process safe, designed for passing data between coroutines and tasks.
Some key properties of asyncio queues:
import asyncio
queue = asyncio.Queue()
Asyncio queues unlock asynchronous producer/consumer patterns without many downsides of threads or subprocesses.
Producer/Consumer With an Asyncio Queue
A simple async queue usage pattern is:
async def producer(queue):
for i in range(10):
await queue.put(i)
print(f"Added {i} to queue")
async def consumer(queue):
while True:
item = await queue.get()
print(f"Got {item} off the queue")
await asyncio.sleep(1)
queue = asyncio.Queue()
producer_task = asyncio.create_task(producer(queue))
consumer_task = asyncio.create_task(consumer(queue))
await producer_task
await consumer_task
This shows the basics - producers adding data, consumers processing the data.
Queue Size Limiting
By default asyncio queues can grow unbounded in size. This can lead to runaway memory usage.
We can set a
queue = asyncio.Queue(maxsize=100)
Now if a producer adds data when the queue already has 100 items, it will block waiting for space to open up.
Checking Queue Size
It can be useful for consumers to know the queue size to make processing decisions:
print(f"Queue size: {queue.qsize()}")
The consumer can adjust parallelism based on the current depth.
Blocking Gets with Timeouts
By default
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=1.0)
print(f"Got {item}")
except asyncio.TimeoutError:
print("No item after 1s, doing other work")
Now the consumer coroutine will keep processing events instead of blocking indefinitely.
Async Iteration
We can also iterate a queue without blocking using
async for item in queue:
print(f"Got {item}")
await asyncio.sleep(0.5)
This iterates the queue asynchronously as items become available.
Closing a Queue
Call
Key Benefits Over Threads and Processes
Some key benefits of asyncio queues:
The main downside is asyncio queues only work within a single process. For interprocess communication look at multiprocessing queues.
Practical Tips
Here are some key lessons learned from using asyncio queues in production:
Asyncio queues are a simple but powerful tool for async workflows in Python. By following some best practices they can be used to build highly scalable applications.