Simulating a producer-consumer scenario with asyncio queues in Python
Python Asynchronous: Exercise-8 with Solution
Write a Python program that uses asyncio queues to simulate a producer-consumer scenario with multiple producers and a single consumer.
Sample Solution:
Code:
import asyncio
import random
async def producer(queue, id):
for i in range(3):
item = f"Item: {id}-{i}"
await queue.put(item)
print(f"Producer {id} produced-> {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumer consumed {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
producers = [asyncio.create_task(producer(queue, i)) for i in range(3)]
consumer_task = asyncio.create_task(consumer(queue))
await asyncio.gather(*producers)
await queue.join()
await queue.put(None) # Signal the consumer to stop
await consumer_task
# Run the event loop
asyncio.run(main())
Output:
Producer 0 produced-> Item: 0-0 Producer 1 produced-> Item: 1-0 Producer 2 produced-> Item: 2-0 Consumer consumed Item: 0-0 Consumer consumed Item: 1-0 Consumer consumed Item: 2-0 Producer 1 produced-> Item: 1-1 Consumer consumed Item: 1-1 Producer 0 produced-> Item: 0-1 Consumer consumed Item: 0-1 Producer 2 produced-> Item: 2-1 Consumer consumed Item: 2-1 Producer 1 produced-> Item: 1-2 Consumer consumed Item: 1-2 Producer 0 produced-> Item: 0-2 Consumer consumed Item: 0-2 Producer 2 produced-> Item: 2-2 Consumer consumed Item: 2-2
Explanation:
In the above exercise -
- The "producer()" coroutine simulates a producer that adds items to the queue. Each producer produces 3 items with an associated ID.
- The "consumer()" coroutine simulates a single consumer that consumes items from the queue. It runs in an infinite loop, consuming items until a None value is encountered, indicating it should stop.
- The "main()" coroutine sets up the event loop and the queue. It creates three producer tasks and one consumer task using asyncio.create_task().
- The 'producers' produce items and add them to the queue asynchronously. The "consumer()" consumes items from the queue as they become available.
- After all 'producers' have finished producing, the program uses await queue.join() to wait until all items in the queue are processed.
- To signal the consumer to stop, the program sends 'None' to the queue, and then waits for the consumer to complete its task.
- The event loop is started using asyncio.run(main()).
When we run this program, the producers adding items to the queue and the 'consumer' consuming them.
Flowchart:
Previous: Handling task cancellation with asyncio.CancelledError in Python.
What is the difficulty level of this exercise?
Test your Programming skills with w3resource's quiz.
It will be nice if you may share this link in any developer community or anywhere else, from where other developers may find this content. Thanks.
https://www.w3resource.com/python-exercises/asynchronous/python-asynchronous-exercise-8.php
- Weekly Trends and Language Statistics
- Weekly Trends and Language Statistics