Waiting for New Data Without Blocking the GUI

A typical use case for COI problems is optimization of parameters of various CERN accelerators. Doing so naturally requires communication with these machines. This communication may take take a long time – especially when the data we’re interested in is cycle-bound (is published in regular intervals of several seconds). Handling this in a clean fashion requires synchronization between our optimization logic and the subscription handler that receives data from the machine.

In addition, machines may exhibit sporadic transient failures. In this case, we want to discard the defective data and wait for the next sample to arrive. At the same time, if a failure turns out to be non-transient (it requires human intervention), we don’t want this logic to get stuck in an infinite loop. In other words, users of our COI problems must be able to cancel them.

Tricky problems indeed! While this package cannot claim to solve them in all possible cases, it provides a few tools to get reasonable behavior with few lines of code in the most common cases.

Synchronization

To solve the problem of synchronization, the Utilities for the Common Optimization Interfaces introduce the concept of parameter streams. Below is a trivial example on how to use them: Please see the dedicated guide for more information.

1from pyjapc import PyJapc
2from cernml.japc_utils import subscribe_stream
3
4japc = PyJapc("LHC.USER.ALL", noSet=True)
5the_field = subscribe_stream(japc, "device/property#field")
6# Blocks execution until the next value is there.
7value, header = the_field.wait_for_next()

Cancellation

In order to cancel long-running data acquisition tasks, the COI have adopted the concept of cancellation tokens from C#. A cancellation token is a small object that is handed to your Problem subclass on which you may check whether the user has requested a cancellation of your operation. If this is the case, you have the ability to cleanly shut down operations – usually by raising an exception.

To use this feature, your problem must first declare that its support it by setting the metadata "cern.cancellable". When it does so, a host application will pass a Token to the constructor. On this token, the problem should check whether cancellation has been requested whenever it enters a loop that may run for a long time.

This sounds complicated, but luckily, parameter streams already support cancellation tokens:

 1from cernml.coi
 2# Requires `pip install cernml-coi-utils`.
 3from cernml.japc_utils import subscribe_stream
 4
 5class MyProblem(coi.SingleOptimizable):
 6    metadata = {
 7        "cern.japc": True,
 8        "cern.cancellable": True,
 9        ...,
10    }
11
12    def __init__(self, japc, cancellation_token):
13        self.japc = japc
14        self.token = cancellation_token
15        # Pass in the token. The stream will hold onto it and monitor it
16        # whenever you you call `.wait_next()`.
17        self.bpm_readings = subscribe_stream(
18            japc, "...", token=cancellation_token
19        )
20
21    def get_initial_params(self):
22        ...
23
24    def compute_single_objective(self, params):
25        self.japc.setParam("...", param)
26        try:
27            # This may block for a long time, depending on how fast
28            # the data arrives and whether the data is valid.
29            # However, if the user clicks Cancel, the token
30            # receives this signal, `wait_next()` will immediately
31            # unblock and raise an exception.
32            while True:
33                value, header = self.bpm_readings.wait_next()
34                if self.is_data_good(value):
35                    return self.compute_loss(value)
36        except coi.cancellation.CancelledError:
37            # Our environment has the nice property that even after
38            # a cancellation, it will still work. Our caller could
39            # call `compute_single_objective()` again and everything
40            # would behave the same. We let the outside world know
41            # that this is the case by marking the cancellation as
42            # "completed".
43            self.token.complete_cancellation()
44            raise
45        return value

If you have your own data acquisition logic, you can use the token yourself by regularly calling raise_if_cancellation_requested() on it:

 1from time import sleep
 2
 3class MyProblem(coi.SingleOptimizable):
 4
 5    def compute_single_objective(self, params):
 6        self.japc.setParam(...)
 7        value = None
 8        while True:
 9            self.token.raise_if_cancellation_requested()
10            sleep(0.5)  # Or any operation that takes a long time …
11            value = ...
12            if is_value_good(value):
13                return value
14
15    ...

If you are writing a host application (i.e. something that runs other people’s optimization problems), you will usually want to create a TokenSource and pass its token to the optimization problem if it is cancellable:

 1from threading import Thread
 2from cernml import coi
 3from cernml.coi import cancellation
 4
 5class MyApp:
 6    def __init__(self):
 7        self.source = cancellation.TokenSource()
 8
 9    def on_start(self):
10        env_name = self.env_name
11        agent = self.agent
12        token = self.source.token
13        self.worker = Thread(
14            target=worker,
15            args=(env_name, agent, token),
16        )
17        self.worker.start()
18
19    def on_stop(self):
20        self.source.cancel()
21        self.worker.join()
22        assert self.source.can_reset_cancellation
23        self.reset_cancellation()
24
25    ...
26
27def worker(env_name, agent, token):
28    kwargs = {}
29    metadata = coi.spec(env_name).metadata
30    if metadata.get("cern.cancellable", False):
31        kwargs["cancellation_token"] = token
32    env = coi.make(env_name, **kwargs)
33    try:
34        while True:
35            # Also check the token ourselves, so that the `Problem`
36            # only has to check it when it enters a loop.
37            token.raise_if_cancellation_requested()
38            obs = env.reset()
39            done = False
40            state = None
41            while not done:
42                # Ditto.
43                token.raise_if_cancellation_requested()
44                action, state = agent.predict(obs, state)
45                obs, _reward, done, _info = env.step(action)
46    except cancellation.CancelledError:
47        # Because the env gets closed at the end of this thread, we
48        # can *definitely* reuse the cancellation token source.
49        token.complete_cancellation()
50    finally:
51        env.close()  # Never forget this!