The Cancellation API¶
See also
- Waiting for New Data Without Blocking the GUI
User guide page on the topic.
Tools for cooperative multitasking in synchronous code.
Cancellation is implemented as two classes with two-way communication
between them: TokenSource
and Token
:
>>> import threading, time
...
>>> def loop(token: Token) -> None:
... # Regularly check the token for a cancellation request.
... while not token.cancellation_requested:
... # Something that takes a long time:
... time.sleep(0.01)
...
>>> source = TokenSource()
>>> thread = threading.Thread(target=loop, args=(source.token,))
>>> thread.start()
>>> source.cancel()
...
>>> # This line would deadlock if we had
>>> # not sent a cancellation request.
>>> thread.join()
Usually, it is more convenient to check the token’s state with
Token.raise_if_cancellation_requested()
:
>>> def loop(token: Token) -> None:
... while True:
... token.raise_if_cancellation_requested()
... # Something that takes a long time:
... time.sleep(0.01)
Note that cernml.japc_utils.subscribe_stream()
supports
cancellation tokens. This makes it easy to interrupt a thread that is
waiting a long time for accelerator data.
Normally, a cancellation request cannot be undone. This is on purpose, as cancellation might have left the task object in an unclean state. However, the task may explicitly declare that it has cleaned itself up after a cancellation by completing it. Completing a cancellation allows the source to reset and reuse it for another cancellation.
Take this class. It reads values from some machine, but occasionally gets stuck in an infinite loop:
>>> class SporadicFailure:
... def __init__(self, token):
... self.token = token
... self.next_value = 0
...
... def get_next(self):
... # We know that our class won't break if it gets
... # interrupted. Hence, once we have handled a
... # cancellation, we can mark it as completed.
... try:
... value = self.read_from_machine()
... return value
... except CancelledError:
... self.token.complete_cancellation()
... raise
...
... def read_from_machine(self):
... self.next_value += 1
... # Deadlock sometimes:
... if self.next_value == 2:
... self.deadlock()
... self.token.raise_if_cancellation_requested()
... return self.next_value
...
... def deadlock(self):
... # This waits indefinitely. However, because we use a
... # cancellation token, it can be interrupted.
... with self.token.wait_handle:
... while not self.token.cancellation_requested:
... self.token.wait_handle.wait()
We want to fetch a value from it and add it to a list. With a real machine, this could take a while:
>>> source = TokenSource()
>>> receiver = SporadicFailure(source.token)
>>> collected_data = []
>>> collected_data.append(receiver.get_next())
>>> collected_data
[1]
Fetching the next value will deadlock for some reason, so we will have to cancel it. In a real application, we would put fetching into a background thread and cancel when the user clicks a button:
>>> timer = threading.Timer(0.1, source.cancel)
>>> timer.start()
>>> collected_data.append(receiver.get_next())
Traceback (most recent call last):
...
coi.cancellation.CancelledError
However, we have a problem: If we now tried to fetch the next value, the token would still be cancelled, so we would get another exception:
>>> collected_data.append(receiver.get_next())
Traceback (most recent call last):
...
coi.cancellation.CancelledError
However, we are lucky, because the receiver still is in a usable state (and is telling us as much):
>>> source.can_reset_cancellation
True
>>> source.reset_cancellation()
And now we can collect data again:
>>> collected_data.append(receiver.get_next())
>>> collected_data
[1, 4]
- class cernml.coi.cancellation.TokenSource¶
Bases:
object
Sending half of a cancellation channel.
This half is usually created by a host application. It contains a
token
that the application can send to aProblem
upon instantiation.Whenever a
Problem
enters a long-running calculation, it should periodically check the token for a cancellation request. If such a request has arrived, the problem has a chance to gracefully abort its calculation.As a convenience feature, token sources are also context managers. They yield their token when entering a context and automatically cancel it when leaving the context:
>>> import threading, time ... >>> def loop(token: Token) -> None: ... """An infinite loop that can be cancelled.""" ... while not token.cancellation_requested: ... time.sleep(0.01) ... >>> # Create source + token and start the thread. >>> with TokenSource() as token: ... thread = threading.Thread(target=loop, args=(token,)) ... thread.start() ... # Do something complex or just wait ... ... time.sleep(0.01) ... >>> # Leaving the `with` block cancels the token. >>> thread.join() # No deadlock!
- property token: Token¶
The token associated with this source.
Pass this token to a
Problem
to be able to communicate a cancellation to it.
- cancel() None ¶
Send a cancellation request through the token.
If there are any threads waiting for a cancellation request, they all get notified. Note that it is up the receiver of the token to honor the request.
Example
>>> source = TokenSource() >>> source.token.cancellation_requested False >>> source.cancel() >>> source.token.cancellation_requested True
Cancelling the same token twice is a no-op:
>>> source.cancel() >>> source.token.cancellation_requested True
- property can_reset_cancellation: bool¶
True if a previous cancellation can be reset.
Example
>>> source = TokenSource() >>> source.can_reset_cancellation True >>> source.cancel() >>> source.can_reset_cancellation False >>> source.token.complete_cancellation() >>> source.can_reset_cancellation True
- reset_cancellation() None ¶
Reset a cancellation request.
This can only be done if a previous cancellation request has been completed by the holder of the token. It resets the state back to as if there never was a cancellation.
If no cancellation has been requested, this does nothing.
- Raises:
CannotReset – if a cancellation has been requested but not completed.
Example
>>> source = TokenSource() >>> source.cancellation_requested False >>> source.cancel() >>> source.cancellation_requested True >>> source.reset_cancellation() Traceback (most recent call last): ... coi.cancellation.CannotReset >>> source.token.complete_cancellation() >>> source.cancellation_requested True >>> source.reset_cancellation() >>> source.cancellation_requested False
Resetting twice is a no-op:
>>> source.reset_cancellation() >>> source.cancellation_requested False
- class cernml.coi.cancellation.Token(cancelled: bool = False)¶
Bases:
object
Receiving half of a cancellation channel.
Usually, you create this object via a
TokenSource
. It creates a token that can receive cancellation requests and mark them as completed. Marking a request as completed allows the token source to send further cancellation requests.- Parameters:
cancelled – If False (the default), create a token that cannot be cancelled. If True, create a token that is already cancelled.
Manually created tokens can never change their state:
>>> c = Token() >>> c.can_be_cancelled, c.cancellation_requested (False, False) >>> c = Token(True) >>> c.can_be_cancelled, c.cancellation_requested (True, True)
- property wait_handle: Condition¶
A condition variable on which to wait for cancellation.
If you do not use
Condition
variables to synchronize multiple threads, you may safely ignore this attribute.This lazily creates the condition variable. You may use it to wait for cancellation. To avoid deadlocks, you should check
cancellation_requested
while the condition variable is locked:>>> import threading >>> def loop(token: Token) -> None: ... with token.wait_handle: ... while not token.cancellation_requested: ... token.wait_handle.wait() ... >>> source = TokenSource() >>> thread = threading.Thread( ... target=loop, ... args=(source.token,), ... ) ... >>> thread.start() >>> source.cancel() >>> # Doesn't deadlock, thread got notified by `cancel()`. >>> thread.join()
- raise_if_cancellation_requested() None ¶
Raise an exception if a cancellation request has arrived.
- Raises:
CancelledError – If
cancellation_requested
is True. Note that it inherits fromException
, so it can be caught by an overly broadexcept
clause.
- complete_cancellation() None ¶
Mark an ongoing cancellation as completed.
Once a cancellation has been completed, the token source is free to reset it and later send another one. Hence, you should only call this function at the very end. Otherwise, the source may send a second request while you’re still handling the first one.
Calling this method more than once does nothing.
- Raises:
RuntimeError – if no cancellation has been requested.
Examples
>>> # Does nothing: cancellation already completed. >>> Token(True).complete_cancellation() >>> # Raises an exception: no cancellation ongoing. >>> Token(False).complete_cancellation() Traceback (most recent call last): ... RuntimeError: no cancellation request to be completed
- exception cernml.coi.cancellation.CancelledError¶
Bases:
Exception
The current task has been requested to be cancelled.
Note that it inherits from
Exception
, so it can be caught by an overly broadexcept
clause.
- exception cernml.coi.cancellation.CannotReset¶
Bases:
Exception
Cancellation cannot be reset as the task did not complete it.
There are two possible reasons for this:
The task might have simply forgotten to call
Token.complete_cancellation()
.The task has ended up in a poisoned state because of the cancellation. For example, two variables meant to be consistent with each other no longer are.
Because of #2, it is not safe to reset a cancellation that has not been completed.