Handling task cancellation with asyncio.CancelledError in Python
Python Asynchronous: Exercise-6 with Solution
Write a Python program to create a coroutine that simulates a time-consuming task and use asyncio.CancelledError to handle task cancellation.
Sample Solution:
Code:
import asyncio
import random
async def time_consuming_task():
print('Time-consuming task started...')
try:
for i in range(1, 6):
await asyncio.sleep(random.randint(1,5))
print(f'Step {i} completed')
except asyncio.CancelledError:
print('Time consuming task was cancelled')
raise
async def main():
task = asyncio.create_task(time_consuming_task())
await asyncio.sleep(random.randint(1,3))
task.cancel()
try:
await task
except asyncio.CancelledError:
print('Main coroutine caught task cancellation!')
asyncio.run(main())
Output:
Time-consuming task started... Time consuming task was cancelled Main coroutine caught task cancellation!
Explanation:
The above code creates an async coroutine `time_consuming_task` that simulates work by sleeping for random intervals. It uses a try/except block to catch asyncio.CancelledError if the task is cancelled.
The main() coroutine creates the task, waits a random amount of time, cancels it, and uses another try/except to catch the cancellation.
Flowchart:
Previous: Measuring total execution time of concurrent asynchronous tasks in Python.
Next: Setting timeouts for asynchronous operations in Python using asyncio.wait_for().
What is the difficulty level of this exercise?
Test your Programming skills with w3resource's quiz.
It will be nice if you may share this link in any developer community or anywhere else, from where other developers may find this content. Thanks.
https://www.w3resource.com/python-exercises/asynchronous/python-asynchronous-exercise-6.php
- Weekly Trends and Language Statistics
- Weekly Trends and Language Statistics