The Cancellation API

See also

Waiting for New Data Without Blocking the GUI

User guide page on the topic.

Tools for cooperative multitasking in synchronous code.

Cancellation is implemented as two classes with two-way communication between them: TokenSource and Token:

>>> import threading, time
...
>>> def loop(token: Token) -> None:
...     # Regularly check the token for a cancellation request.
...     while not token.cancellation_requested:
...         # Something that takes a long time:
...         time.sleep(0.01)
...
>>> source = TokenSource()
>>> thread = threading.Thread(target=loop, args=(source.token,))
>>> thread.start()
>>> source.cancel()
...
>>> # This line would deadlock if we had
>>> # not sent a cancellation request.
>>> thread.join()

Usually, it is more convenient to check the token’s state with Token.raise_if_cancellation_requested():

>>> def loop(token: Token) -> None:
...     while True:
...         token.raise_if_cancellation_requested()
...         # Something that takes a long time:
...         time.sleep(0.01)

Note that cernml.japc_utils.subscribe_stream() supports cancellation tokens. This makes it easy to interrupt a thread that is waiting a long time for accelerator data.

Normally, a cancellation request cannot be undone. This is on purpose, as cancellation might have left the task object in an unclean state. However, the task may explicitly declare that it has cleaned itself up after a cancellation by completing it. Completing a cancellation allows the source to reset and reuse it for another cancellation.

Take this class. It reads values from some machine, but occasionally gets stuck in an infinite loop:

>>> class SporadicFailure:
...     def __init__(self, token):
...         self.token = token
...         self.next_value = 0
...
...     def get_next(self):
...         # We know that our class won't break if it gets
...         # interrupted. Hence, once we have handled a
...         # cancellation, we can mark it as completed.
...         try:
...             value = self.read_from_machine()
...             return value
...         except CancelledError:
...             self.token.complete_cancellation()
...             raise
...
...     def read_from_machine(self):
...         self.next_value += 1
...         # Deadlock sometimes:
...         if self.next_value == 2:
...             self.deadlock()
...         self.token.raise_if_cancellation_requested()
...         return self.next_value
...
...     def deadlock(self):
...         # This waits indefinitely. However, because we use a
...         # cancellation token, it can be interrupted.
...         with self.token.wait_handle:
...             while not self.token.cancellation_requested:
...                 self.token.wait_handle.wait()

We want to fetch a value from it and add it to a list. With a real machine, this could take a while:

>>> source = TokenSource()
>>> receiver = SporadicFailure(source.token)
>>> collected_data = []
>>> collected_data.append(receiver.get_next())
>>> collected_data
[1]

Fetching the next value will deadlock for some reason, so we will have to cancel it. In a real application, we would put fetching into a background thread and cancel when the user clicks a button:

>>> timer = threading.Timer(0.1, source.cancel)
>>> timer.start()
>>> collected_data.append(receiver.get_next())
Traceback (most recent call last):
...
coi.cancellation.CancelledError

However, we have a problem: If we now tried to fetch the next value, the token would still be cancelled, so we would get another exception:

>>> collected_data.append(receiver.get_next())
Traceback (most recent call last):
...
coi.cancellation.CancelledError

However, we are lucky, because the receiver still is in a usable state (and is telling us as much):

>>> source.can_reset_cancellation
True
>>> source.reset_cancellation()

And now we can collect data again:

>>> collected_data.append(receiver.get_next())
>>> collected_data
[1, 4]
class cernml.coi.cancellation.TokenSource

Bases: object

Sending half of a cancellation channel.

This half is usually created by a host application. It contains a token that the application can send to a Problem upon instantiation.

Whenever a Problem enters a long-running calculation, it should periodically check the token for a cancellation request. If such a request has arrived, the problem has a chance to gracefully abort its calculation.

As a convenience feature, token sources are also context managers. They yield their token when entering a context and automatically cancel it when leaving the context:

>>> import threading, time
...
>>> def loop(token: Token) -> None:
...     """An infinite loop that can be cancelled."""
...     while not token.cancellation_requested:
...         time.sleep(0.01)
...
>>> # Create source + token and start the thread.
>>> with TokenSource() as token:
...     thread = threading.Thread(target=loop, args=(token,))
...     thread.start()
...     # Do something complex or just wait ...
...     time.sleep(0.01)
...
>>> # Leaving the `with` block cancels the token.
>>> thread.join()  # No deadlock!
property token: Token

The token associated with this source.

Pass this token to a Problem to be able to communicate a cancellation to it.

property cancellation_requested: bool

True if cancel() has been called.

cancel() None

Send a cancellation request through the token.

If there are any threads waiting for a cancellation request, they all get notified. Note that it is up the receiver of the token to honor the request.

Example

>>> source = TokenSource()
>>> source.token.cancellation_requested
False
>>> source.cancel()
>>> source.token.cancellation_requested
True

Cancelling the same token twice is a no-op:

>>> source.cancel()
>>> source.token.cancellation_requested
True
property can_reset_cancellation: bool

True if a previous cancellation can be reset.

Example

>>> source = TokenSource()
>>> source.can_reset_cancellation
True
>>> source.cancel()
>>> source.can_reset_cancellation
False
>>> source.token.complete_cancellation()
>>> source.can_reset_cancellation
True
reset_cancellation() None

Reset a cancellation request.

This can only be done if a previous cancellation request has been completed by the holder of the token. It resets the state back to as if there never was a cancellation.

If no cancellation has been requested, this does nothing.

Raises:

CannotReset – if a cancellation has been requested but not completed.

Example

>>> source = TokenSource()
>>> source.cancellation_requested
False
>>> source.cancel()
>>> source.cancellation_requested
True
>>> source.reset_cancellation()
Traceback (most recent call last):
...
coi.cancellation.CannotReset
>>> source.token.complete_cancellation()
>>> source.cancellation_requested
True
>>> source.reset_cancellation()
>>> source.cancellation_requested
False

Resetting twice is a no-op:

>>> source.reset_cancellation()
>>> source.cancellation_requested
False
class cernml.coi.cancellation.Token(cancelled: bool = False)

Bases: object

Receiving half of a cancellation channel.

Usually, you create this object via a TokenSource. It creates a token that can receive cancellation requests and mark them as completed. Marking a request as completed allows the token source to send further cancellation requests.

Parameters:

cancelled – If False (the default), create a token that cannot be cancelled. If True, create a token that is already cancelled.

Manually created tokens can never change their state:

>>> c = Token()
>>> c.can_be_cancelled, c.cancellation_requested
(False, False)
>>> c = Token(True)
>>> c.can_be_cancelled, c.cancellation_requested
(True, True)
property wait_handle: Condition

A condition variable on which to wait for cancellation.

If you do not use Condition variables to synchronize multiple threads, you may safely ignore this attribute.

This lazily creates the condition variable. You may use it to wait for cancellation. To avoid deadlocks, you should check cancellation_requested while the condition variable is locked:

>>> import threading
>>> def loop(token: Token) -> None:
...     with token.wait_handle:
...         while not token.cancellation_requested:
...             token.wait_handle.wait()
...
>>> source = TokenSource()
>>> thread = threading.Thread(
...     target=loop,
...     args=(source.token,),
... )
...
>>> thread.start()
>>> source.cancel()
>>> # Doesn't deadlock, thread got notified by `cancel()`.
>>> thread.join()
property can_be_cancelled: bool

True if a cancellation request can arrive or has arrived.

property cancellation_requested: bool

True if a cancellation request has arrived.

raise_if_cancellation_requested() None

Raise an exception if a cancellation request has arrived.

Raises:

CancelledError – If cancellation_requested is True. Note that it inherits from Exception, so it can be caught by an overly broad except clause.

complete_cancellation() None

Mark an ongoing cancellation as completed.

Once a cancellation has been completed, the token source is free to reset it and later send another one. Hence, you should only call this function at the very end. Otherwise, the source may send a second request while you’re still handling the first one.

Calling this method more than once does nothing.

Raises:

RuntimeError – if no cancellation has been requested.

Examples

>>> # Does nothing: cancellation already completed.
>>> Token(True).complete_cancellation()
>>> # Raises an exception: no cancellation ongoing.
>>> Token(False).complete_cancellation()
Traceback (most recent call last):
...
RuntimeError: no cancellation request to be completed
exception cernml.coi.cancellation.CancelledError

Bases: Exception

The current task has been requested to be cancelled.

Note that it inherits from Exception, so it can be caught by an overly broad except clause.

exception cernml.coi.cancellation.CannotReset

Bases: Exception

Cancellation cannot be reset as the task did not complete it.

There are two possible reasons for this:

  1. The task might have simply forgotten to call Token.complete_cancellation().

  2. The task has ended up in a poisoned state because of the cancellation. For example, two variables meant to be consistent with each other no longer are.

Because of #2, it is not safe to reset a cancellation that has not been completed.