w3resource

Simulating a producer-consumer scenario with asyncio queues in Python


8. Async Producer-Consumer with Queue

Write a Python program that uses asyncio queues to simulate a producer-consumer scenario with multiple producers and a single consumer.

Sample Solution:

Code:

import asyncio
import random
async def producer(queue, id):
    for i in range(3):
        item = f"Item: {id}-{i}"
        await queue.put(item)
        print(f"Producer {id} produced-> {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consumer consumed {item}")
        queue.task_done()
async def main():
    queue = asyncio.Queue()
    producers = [asyncio.create_task(producer(queue, i)) for i in range(3)]
    consumer_task = asyncio.create_task(consumer(queue))
    await asyncio.gather(*producers)
    await queue.join()
    await queue.put(None)  # Signal the consumer to stop
    await consumer_task
# Run the event loop
asyncio.run(main())

Output:

Producer 0 produced-> Item: 0-0
Producer 1 produced-> Item: 1-0
Producer 2 produced-> Item: 2-0
Consumer consumed Item: 0-0
Consumer consumed Item: 1-0
Consumer consumed Item: 2-0
Producer 1 produced-> Item: 1-1
Consumer consumed Item: 1-1
Producer 0 produced-> Item: 0-1
Consumer consumed Item: 0-1
Producer 2 produced-> Item: 2-1
Consumer consumed Item: 2-1
Producer 1 produced-> Item: 1-2
Consumer consumed Item: 1-2
Producer 0 produced-> Item: 0-2
Consumer consumed Item: 0-2
Producer 2 produced-> Item: 2-2
Consumer consumed Item: 2-2

Explanation:

In the above exercise -

  • The "producer()" coroutine simulates a producer that adds items to the queue. Each producer produces 3 items with an associated ID.
  • The "consumer()" coroutine simulates a single consumer that consumes items from the queue. It runs in an infinite loop, consuming items until a None value is encountered, indicating it should stop.
  • The "main()" coroutine sets up the event loop and the queue. It creates three producer tasks and one consumer task using asyncio.create_task().
  • The 'producers' produce items and add them to the queue asynchronously. The "consumer()" consumes items from the queue as they become available.
  • After all 'producers' have finished producing, the program uses await queue.join() to wait until all items in the queue are processed.
  • To signal the consumer to stop, the program sends 'None' to the queue, and then waits for the consumer to complete its task.
  • The event loop is started using asyncio.run(main()).

When we run this program, the producers adding items to the queue and the 'consumer' consuming them.

Flowchart:

Flowchart: Simulating a producer-consumer scenario with asyncio queues in Python.

For more Practice: Solve these Related Problems:

  • Write a Python program to simulate a producer-consumer scenario using asyncio.Queue, where multiple producers add items and one consumer processes them.
  • Write a Python script to create a producer coroutine that puts random numbers into an asyncio.Queue and a consumer coroutine that retrieves and prints these numbers.
  • Write a Python function to implement an async producer-consumer model using multiple producers and a single consumer, then display the order in which items are processed.
  • Write a Python program to use asyncio queues to manage a producer-consumer system, where producers add items at random intervals and the consumer prints each item along with a timestamp.

Go to:


Previous: Handling task cancellation with asyncio.CancelledError in Python.
Next: Pythpn Modules Exercises Home.

Python Code Editor :

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.