Handling task cancellation with asyncio.CancelledError in Python

Python Asynchronous: Exercise-6 with Solution

Write a Python program to create a coroutine that simulates a time-consuming task and use asyncio.CancelledError to handle task cancellation.

Sample Solution:


import asyncio
import random
async def time_consuming_task():
    print('Time-consuming task started...')
        for i in range(1, 6):
            await asyncio.sleep(random.randint(1,5))
            print(f'Step {i} completed')
    except asyncio.CancelledError:
        print('Time consuming task was cancelled')
async def main():
    task = asyncio.create_task(time_consuming_task())
    await asyncio.sleep(random.randint(1,3))    
        await task
    except asyncio.CancelledError:
        print('Main coroutine caught task cancellation!')


Time-consuming task started...
Time consuming task was cancelled
Main coroutine caught task cancellation!


The above code creates an async coroutine `time_consuming_task` that simulates work by sleeping for random intervals. It uses a try/except block to catch asyncio.CancelledError if the task is cancelled.

The main() coroutine creates the task, waits a random amount of time, cancels it, and uses another try/except to catch the cancellation.


Flowchart: Handling task cancellation with asyncio.CancelledError in Python.

Previous: Measuring total execution time of concurrent asynchronous tasks in Python.
Next: Setting timeouts for asynchronous operations in Python using asyncio.wait_for().

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.

Follow us on Facebook and Twitter for latest update.