w3resource

Setting timeouts for asynchronous operations in Python using asyncio.wait_for()


7. Async Timeout with wait_for

Write a Python program that implements a timeout for an asynchronous operation using asyncio.wait_for().

Sample Solution:

Code:

import asyncio
import time

async def time_consuming_task(duration):
    print(f'Starting long operation for {duration} seconds...')
    await asyncio.sleep(duration)
    return f'Long operation completed in {duration} seconds'
async def main():
    timeout =3
    try:
        result = await asyncio.wait_for(time_consuming_task(8), timeout)
        print(result)
    except asyncio.TimeoutError:
        print(f'Timeout occurred after waiting for {timeout} seconds')
asyncio.run(main())

Output:

Starting long operation for 8 seconds...
Timeout occurred after waiting for 3 seconds

Explanation:

In the above exercise -

  • asyncio.wait_for() takes an awaitable (coroutine) and a timeout in seconds.
  • It waits for the coroutine to complete or raises asyncio.TimeoutError if timeout is exceeded.
  • The program await the wait_for() call to perform the wait.
  • The try/except block catches the potential TimeoutError.
  • If timeout occurs, we handle it accordingly in the except block.
  • If coroutine completes within timeout, result is returned as normal.

Flowchart:

Flowchart: Setting timeouts for asynchronous operations in Python using asyncio.wait_for().

For more Practice: Solve these Related Problems:

  • Write a Python program to implement an asynchronous function that simulates a long-running task and enforces a timeout using asyncio.wait_for.
  • Write a Python script to use asyncio.wait_for to set a timeout on a coroutine that fetches data, printing an error message if the operation times out.
  • Write a Python function that wraps a coroutine in asyncio.wait_for with a specified timeout and handles asyncio.TimeoutError by returning a default value.
  • Write a Python program to test an async operation with a timeout using asyncio.wait_for, then print whether the operation completed or timed out.

Go to:


Previous: Handling task cancellation with asyncio.CancelledError in Python.
Next: Simulating a producer-consumer scenario with asyncio queues in Python.

Python Code Editor :

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.