The Core API

This page describes the various pieces of the common optimization interfaces. You are invited to skip to a section that interests you, or to read the page top to bottom, at your leisure.

Keep in mind that while these interfaces are the most important ones, there are also others that provide important features. See, for example, Making an Optimization Configurable via GUI and Custom Per-Problem Optimizers.

The Interface Hierarchy

digraph inheritance_diagram { rankdir = "BT"; bgcolor = "#00000000"; node [shape=plaintext, fontname="Open Sans", style=filled, fillcolor="white"]; edge [style=dashed]; problem[label=< <table border="0" cellborder="1" cellspacing="0" cellpadding="4"> <tr><td>cernml.coi.<b>Problem</b></td></tr> <tr> <td>render() → Any<br />close() → None</td> </tr> <tr> <td><i>metadata</i>: dict<br /><i>render_mode</i>: str | None = None<br /><i>unwrapped</i>: Problem</td> </tr> </table> >]; sopt[label=< <table border="0" cellborder="1" cellspacing="0" cellpadding="4"> <tr><td>cernml.coi.<b>SingleOptimizable</b></td></tr> <tr> <td>get_initial_params(<i>seed</i>=None, <i>options</i>=None) → Params<br />compute_single_objective(<i>p</i>: Params) → float</td> </tr> <tr><td><i>optimization_space</i></td></tr> </table> >]; env[label=< <table border="0" cellborder="1" cellspacing="0" cellpadding="4"> <tr><td>gymnasium.<b>Env</b></td></tr> <tr> <td>reset(<i>seed</i>=None, <i>options</i>=None) → tuple[Obs, dict]<br />step(<i>action</i>: Action) → tuple[Obs, float, bool, bool, dict]</td> </tr> <tr> <td><i>action_space</i><br /><i>observation_space</i></td> </tr> </table> >]; optenv[label=< <table border="0" cellborder="1" cellspacing="0" cellpadding="4"> <tr><td>cernml.coi.<b>OptEnv</b></td></tr> </table> >]; optenv -> sopt -> problem; optenv -> env -> problem; }

“Fig. 1: Inheritance diagram of the core interfaces”

The interfaces are designed in a modular fashion: depending on the algorithms that an optimization problem supports, it either implements SingleOptimizable (for classical single-objective optimization), Env (for reinforcement learning) or both. The Problem interface captures the greatest common denominator – that, which all interfaces have in common.

As a convenience, this package also provides the OptEnv interface. It is simply an intersection of SingleOptimizable and Env. This means that implementing it is the same as implementing both of its bases. At the same time, every class that implements both base interfaces also implements OptEnv. A demonstration:

>>> import gymnasium
>>> from cernml import coi
...
>>> class Indirect(coi.SingleOptimizable, gymnasium.Env):
...     optimization_space = ...
...     observation_space = ...
...     action_space = ...
...
>>> issubclass(Indirect, coi.OptEnv)
True

Minimal Implementations

This section shows you the absolute bare minimum to write any optimization problem at all. They’re intended to get your feet off the ground if you are new to this library. They are not interesting optimization problems. Anything non-trivial (e.g. communicating with an external machine) will require some additional steps. See Implementing SingleOptimizable for a more comprehensive tutorial.

Single-Objective Optimization Problems

For a minimal working example, you should inherit from cernml.coi.SingleOptimizable to have it fill in as many defaults as possible. With it as a superclass, you only have to fill in three missing pieces:

  1. get_initial_params() to give the initial point of an optimization;

  2. compute_single_objective() as the objective function to be minimized [1];

  3. optimization_space to specify the problem’s domain, i.e. valid inputs to the objective function. See also Spaces for more information.

You also have to register your class so that the central function cernml.coi.make() can instantiate it. The page on Making Your Code Findable has more information.

This is a minimal, runnable example problem:

 1import numpy as np
 2from gymnasium.spaces import Box
 3
 4from cernml import coi
 5
 6
 7class Quadratic(coi.SingleOptimizable):
 8    # This class doesn't do any rendering, but it's still useful to pass
 9    # this parameter on, in case, you want to add rendering later.
10    def __init__(self, render_mode=None):
11        # The inherited initializer checks for us that `render_mode` is
12        # valid, and saves it as `self.render_mode`.
13        super().__init__(render_mode)
14
15        # Here, we define our problem's domain. Since the space is
16        # constant, we could've defined `optimization_space = Box(...)`
17        # at class scope as well.
18        self.optimization_space = Box(-1.0, 1.0, shape=(5,))
19
20        # The goal to be found by the optimizer. Randomized on each call
21        # to `get_initial_params()`.
22        self.goal = np.zeros(5)
23
24    # Defining the x_0 for our optimization problem. `seed` allows
25    # fixing random-number generation (RNG), `options` is a free-form
26    # dict that we can use for customization.
27    def get_initial_params(self, *, seed=None, options=None):
28        # The inherited function seeds an RNG `self.np_random` for us.
29        super().get_initial_params(seed=seed)
30
31        # We bind these attributes here to keep our code short.
32        space = self.optimization_space
33        rng = self.np_random
34
35        # Randomize the goal we want to move to and the initial point.
36        # We use `np_random` so that if the user passes `seed`, the
37        # problem is completely deterministic.
38        self.goal = rng.uniform(space.low, space.high, size=space.shape)
39        return rng.uniform(space.low, space.high, size=space.shape)
40
41    # Our objective function is simply the RMS of the distance between
42    # the two points.
43    def compute_single_objective(self, params):
44        return np.linalg.norm(self.goal - params)
45
46
47# Never forget to register your optimization problem!
48coi.register("QuadraticSearch-v1", entry_point=Quadratic)

Single-Objective Function Optimization Problems

See also

Optimizing Points on an LSA Function

User guide page on function optimization problems.

For a minimal working example, you should inherit from cernml.coi.FunctionOptimizable to have it fill in as many defaults as possible. With it as a superclass, you only have to fill in three missing pieces:

  1. get_initial_params() to give the initial point for each individual optimization;

  2. compute_function_objective() as the objective function to be minimized [1];

  3. get_optimization_space() to specify the domain, i.e. valid inputs to the objective function. See also Spaces for more information.

You also have to register your class so that the central function cernml.coi.make() can instantiate it. The page on Making Your Code Findable has more information.

This is a minimal, runnable example problem:

 1import numpy as np
 2from gymnasium.spaces import Box
 3
 4from cernml import coi
 5
 6
 7class StraightLineSteering(coi.FunctionOptimizable):
 8    # This class doesn't do any rendering, but it's still useful to pass
 9    # this parameter on, in case, you want to add rendering later.
10    def __init__(self, render_mode=None):
11        # The inherited initializer checks for us that `render_mode` is
12        # valid, and saves it as `self.render_mode`.
13        super().__init__(render_mode)
14
15        # Our problem has a number of disturbances, initialized in
16        # `get_initial_params()`. Each one deviates our trajectory
17        # either to the left (negative values) or to the right (positive
18        # values). Our goal is to keep the trajectory as close to zero
19        # as possible.
20        self.disturbances = {}
21
22    # Our problem is particularly simple and has the same optimization
23    # space everywhere.
24    def get_optimization_space(self, cycle_time):
25        return Box(-1, 1, shape=())
26
27    # Defining the x_0 for our optimization problem. `seed` allows
28    # fixing random-number generation (RNG), `options` is a free-form
29    # dict that we can use for customization.
30    def get_initial_params(self, cycle_time, *, seed=None, options=None):
31        # The inherited function seeds an RNG `self.np_random` for us.
32        super().get_initial_params(cycle_time, seed=seed)
33
34        # Check that the given cycle time is allowed.
35        if not 0.0 < cycle_time < 1500.0:
36            raise ValueError(f"cycle time out of bounds: {cycle_time!r}")
37
38        # Initialize the disturbances here. We want the RNG to have been
39        # seeded already.
40        if not self.disturbances:
41            self.disturbances = {
42                self.np_random.integers(0, 1500): self.np_random.normal()
43                for _ in range(3)
44            }
45
46        return np.array(self.disturbances.get(int(cycle_time), 0.0))
47
48    # Our objective function is the integrated deviation from the ideal
49    # trajectory. Because each segment is a straight line, we simply
50    # calculate according to the trapezoidal rule.
51    def compute_function_objective(self, cycle_time, params):
52        # Apply the given parameters.
53        self.disturbances[int(cycle_time)] = float(params.item())
54        # Calculate the loss function by iterating over all disturbances
55        # in order.
56        integral = 0.0
57        prev_time = 0.0
58        prev_pos = 0.0
59        trajectory = 0.0
60        for time, disturbance in sorted(self.disturbances.items()):
61            pos = trajectory * (time - prev_time)
62            integral += 0.5 * (pos + prev_pos) * (time - prev_time)
63            trajectory += disturbance
64            prev_time = time
65            prev_pos = pos
66        time = 1500
67        pos = trajectory * (time - prev_time)
68        integral += 0.5 * (pos + prev_pos) * (time - prev_time)
69        # Cost function is the square because negative deviations are
70        # just as bad as positive ones.
71        return integral**2
72
73
74# Never forget to register your optimization problem!
75coi.register("StraightLineSteering-v1", entry_point=StraightLineSteering)

Reinforcement Learning Environments

For a minimal working example, you inherit from gymnasium.Env and fill in the X missing pieces:

  1. reset() to initialize the environment for a new episode and receive an initial observation;

  2. step() to take an action in the current episode;

  3. observation_space to specify the domain of observations that are returned by reset() and step();

  4. action_space to specify the domain of actions that are accepted by step(). See also Spaces for more information.

You also have to register your class so that the central function cernml.coi.make() can instantiate it. The page on Making Your Code Findable has more information.

This is a minimal, runnable example problem:

 1import numpy as np
 2from gymnasium import Env
 3from gymnasium.spaces import Box
 4
 5from cernml import coi
 6
 7
 8class Quadratic(Env):
 9    # This class doesn't do any rendering, but it's still useful to
10    # accept this parameter, in case, you want to add rendering later.
11    def __init__(self, render_mode=None):
12        # The `render_mode` attribute is defined by `Env`.
13        self.render_mode = render_mode
14
15        # Here, we define our problem's domain. The observations that we
16        # receive are 2×5 arrays containing the goal and the current
17        # position …
18        self.observation_space = Box(-5.0, 5.0, shape=(2, 5))
19
20        # … and the actions are 5D arrays containing the direction where
21        # to walk on each step.
22        self.action_space = Box(-1.0, 1.0, shape=(5,))
23
24        # The environment state is the position where we are, and the
25        # goal where we should go.
26        self.position = np.zeros(5)
27        self.goal = np.zeros(5)
28
29    # Defining the initial state for each episode. `seed` allows fixing
30    # random-number generation (RNG), `options` is a free-form dict that
31    # we can use for customization.
32    def reset(self, *, seed=None, options=None):
33        # The inherited function seeds an RNG `self.np_random` for us.
34        super().reset(seed=seed)
35
36        # We bind these attributes here to keep our code short.
37        rng = self.np_random
38        space = self.observation_space
39
40        # Randomize the goal we want to move to and the initial point.
41        # We use `np_random` so that if the user passes `seed`, the
42        # problem is completely deterministic.
43        self.goal = rng.uniform(space.low, space.high, size=space.shape)
44        self.position = rng.uniform(space.low, space.high, size=space.shape)
45
46        # `Env` expects us to return `obs` (with the shape and limits
47        # given by `observation_space`) and a free-form *info* dict,
48        # which may contain metrics or debugging or logging info.
49        obs = np.stack((self.goal, self.position))
50        info = {}
51        return obs, info
52
53    # The state transition function. Accepts an action and returns
54    # a 5-tuple of: observation, reward for this step, boolean flags
55    # that indicate whether the episode is over, and an info dict like
56    # in `reset()`.
57    def step(self, action):
58        # Update our internal state and ensure everything stays within
59        # its limits.
60        self.position += action
61        self.position = np.clip(
62            self.position,
63            self.observation_space.low[1],
64            self.observation_space.high[1],
65        )
66
67        # We use the negative distance from the goal as reward. (Higher
68        # rewards are better, unlike with `SingleOptimizable`.) We end
69        # the episode when sufficiently close to the goal.
70        distance = np.linalg.norm(self.goal - self.position)
71        obs = np.stack((self.goal, self.position))
72        reward = -distance
73        terminated = distance < 0.01
74        truncated = False
75        info = {}
76        return obs, reward, terminated, truncated, info
77
78
79# Never forget to register your optimization problem!
80coi.register("QuadraticSearch-v2", entry_point=Quadratic)

Running Your Optimization Problem

See also

Control Flow of Optimization Problems

User guide page with detailed information on each kind of execution loop.

Optimization problems – no matter whether based on Env, SingleOptimizable or another interface – are expected to be run as plugins into a host application. While the Geoff project maintains a reference implementation of such a host application, institutes and users are encouraged to write their own host applications, tailored to their specific needs and re-using components of the broader Geoff project as necessary.

Typically, host applications end up implementing one kind or another of execution loop executes an algorithm (e.g. a numerical optimizer or an RL policy) on a given problem. Minimal execution loops for the different kinds of problems (which might be useful for debugging) may look like this:

 1from gymnasium.spaces import Box
 2from numpy import clip
 3
 4from cernml import coi
 5
 6problem = coi.make("MySingleOptimizableProblem-v0")
 7assert isinstance(problem, coi.SingleOptimizable)
 8with problem:
 9    # Fetch initial state.
10    optimizer = get_optimizer()
11    space = problem.optimization_space
12    assert isinstance(space, Box)
13    initial = params = problem.get_initial_params()
14    best = (float("inf"), initial)
15
16    while not optimizer.is_done():
17        # Update optimum.
18        loss = problem.compute_single_objective(params)
19        best = min(best, (float(loss), params))
20
21        # Fetch next set of parameters.
22        params = optimizer.step(loss)
23        params = clip(params, space.low, space.high)
24
25    if optimizer.has_failed():
26        # Restore initial state.
27        problem.compute_single_objective(initial)
28    else:
29        # Restore best state.
30        problem.compute_single_objective(best[1])
 1from gymnasium.spaces import Box
 2from numpy import clip
 3
 4from cernml import coi
 5
 6problem = coi.make("MyFunctionOptimizableProblem-v0")
 7assert isinstance(problem, coi.FunctionOptimizable)
 8with problem:
 9    # Select skeleton points.
10    skeleton_points = problem.override_skeleton_points()
11    if skeleton_points is None:
12        skeleton_points = request_skeleton_points()
13
14    # Keep track of which points we have modified and which not.
15    restore_on_failure = []
16
17    try:
18        for time in skeleton_points:
19            # Fetch initial state.
20            optimizer = get_optimizer()
21            space = problem.get_optimization_space(time)
22            assert isinstance(space, Box)
23            initial = params = problem.get_initial_params(time)
24            best = (float("inf"), initial)
25            restore_on_failure.append((time, initial))
26
27            while not optimizer.is_done():
28                # Update optimum.
29                loss = problem.compute_function_objective(time, params)
30                best = min(best, (float(loss), params))
31
32                # Fetch next set of parameters.
33                params = optimizer.step(loss)
34                params = clip(params, space.low, space.high)
35
36            if optimizer.has_failed():
37                raise OptFailed(f"optimizer failed at t={time}")
38            else:
39                # Restore best state.
40                problem.compute_function_objective(time, best[1])
41    except:
42        # If anything fails, restore initial state not only for the
43        # current skeleton point, but all previous ones as well.
44        while restore_on_failure:
45            time, params = restore_on_failure.pop()
46            problem.compute_function_objective(time, params)
47        raise
 1from gymnasium import Env
 2from gymnasium.spaces import Box
 3from numpy import clip
 4
 5from cernml import coi
 6
 7policy = get_policy()
 8num_episodes = get_num_episodes()
 9
10# Limit steps per episode to prevent infinite loops.
11env = coi.make("MyEnv-v0", max_episode_steps=10)
12assert isinstance(env, Env)
13with env:
14    ac_space = env.action_space
15    assert isinstance(ac_space, Box)
16
17    for _ in range(num_episodes):
18        terminated = truncated = False
19        obs, info = env.reset()
20        while not (terminated or truncated):
21            action = policy.predict(obs)
22            action = clip(action, ac_space.low, ac_space.high)
23            obs, reward, terminated, truncated, info = env.step(action)

While these examples are very bare-bones, various libraries already provide pre-packaged execution loops with a number of additional conveniences:

Stable Baselines 3

supports the Env API and RL environments can be passed directly to the various agent.learn() methods; in addition, the package provides a function evaluate_policy() to solve a problem with a given agent or policy.

cernml-rltools

provides a module cernml.rltools.envloop with an older and more general-purpose implementation of the environment interaction loop.

cernml-coi-optimizers

provides a uniform interface for solvers of SingleOptimizable problems. Its general-purpose solve() function is directly compatible with the COI.

In addition, many optimizers like scipy.optimize.minimize() and Py-BOBYQA are able to consume SingleOptimizable with only minor adjustments.

Spaces

Optimization is always executed over a certain numeric domain, i.e. a space of allowed values. These domains are encapsulated by Gym’s concept of a Space. While Gym provides many different kinds of spaces (discrete, continuous, aggregate, …), the COI only support Box at this time. This restriction may be lifted in the future, depending on user feedback.

The interfaces make use of spaces as follows:

SingleOptimizable.optimization_space

the domain of valid inputs to compute_single_objective();

Env.action_space

the domain of valid inputs to step();

Env.observation_space

the domain of valid observations returned by reset() and step().

Naming Your Quantities

In many cases, your objective function and parameters directly correspond to machine parameters. For example, many optimization problems might only scale their parameters and otherwise send them unmodified to the machine via JAPC. Similarly, the objective function might only be a rescaled or inverted reading from a detector on the accelerator.

In such cases, it is useful to declare the meaning of your quantities. A host application may use this to annotate its graphs of the parameters and objective function. The SingleOptimizable class provides three attributes for this purpose:

from cernml import coi

class SomeProblem(coi.SingleOptimizable):

    objective_name = "RMS BPM Position (mm)"
    param_names = [
        "CORRECTOR.10",
        "CORRECTOR.20",
        "CORRECTOR.30",
        "CORRECTOR.40",
    ]
    constraint_names = [
        "BCT Intensity",
    ]

    def compute_single_objective(self, params):
        for name, value in zip(self.param_names, params):
            self._japc.setParam(f"logical.{name}/K", value)
        ...

Note that these three values need not be defined inside the class scope. You are free to define them inside your __init__() method or change them at run-time. This is useful because some optimization problems might decide to be configurable in the exact devices they talk to.

You are free not to define these attributes at all. In this case, the host application will see the inherited default values and assume no particular meaning of your quantities.

Metadata

Every optimization problem should have a class attribute called Problem.metadata, which is a dict with string keys. The dict should be defined at the class level and immutable [2]. It communicates fundamental properties of the class and how a host application can use it.

While the API reference contains the full definition of the Standard Metadata Keys, the following is an abridged version:

"render_modes"

the render modes that the optimization problem understands (see Rendering);

"cern.machine"

the accelerator that an optimization problem is associated with (see cernml.coi.Machine);

"cern.japc"

a boolean flag indicating whether the problem’s constructor expects an argument named japc of type PyJapc;

"cern.cancellable"

A boolean flag indicating whether the problem’s constructor expects a cancellation token. (see Cancellation).

Rendering

The metadata entry "render_modes" allows a problem to declare that its internal state can be visualized. It should be a list of strings where each string is a supported render mode. Host applications may pick one of these strings and pass it to the problems render() method. For this to work, render modes need to have well-defined semantics.

The following render modes are standardized by either Gym or this package:

"human"

The default mode, for interactive use. This should e.g. open a window and display the problem’s current state in it. Displaying the window should not block control flow.

"ansi"

Return a text-only representation of the problem. This may contain e.g. terminal control codes for color effects.

"rgb_array"

Return a Numpy array representing color image data.

"matplotlib_figures"

Return a list of Matplotlib Figure objects, suitable for embedding into a GUI application.

See the render() docs for a full spec of each render mode.

Closing

Some optimization problems have to acquire certain resources in order to perform their tasks. Examples include:

  • spawning processes,

  • starting threads,

  • subscribing to JAPC parameters.

While Python garbage-collects objects which are no longer accessible (including Problem instances), some of these resources require manual function calls in order to be properly cleaned up.

If such is the case for an optimization problem, it should override the close() method and define all such actions in it. A host application is required to call close() when it has no more need for an optimization problem.

All classes that inherit from Problem automatically are context managers that can be used in with blocks. Whenever the with block is exited, close() gets called automatically.

Note

If, for some reason, you are dealing with an optimization problem that doesn’t explicitly subclass Problem, you can use the contextlib.closing adapter:

from contextlib import closing

with closing(MyProblem(...)) as problem:
    optimize(problem)

This ensures that close() is called under all circumstances – even if an exception occurs.

Additional Restrictions

For maximum compatibility, this API puts the following additional restrictions on environments:

  • The observation_space, action_space and optimization_space must all be Boxes. The only exception is if the environment is a GoalEnv: in that case, observation_space must be Dict (with exactly the three expected keys) and the three required sub-spaces must be Boxes.

  • If the environment supports any rendering at all, it should support at least the human, ansi and matplotlib_figures. The former two facilitate debugging and stand-alone usage, the latter makes it possible to embed the environment into a GUI.

  • At CERN, The environment metadata must contain a key "cern.machine" with a value of type Machine. It tells users which CERN accelerator the environment belongs to. Outside of CERN, authors are free to omit this key and institutes are allowed to define a category key of their own.

For the convenience of problem authors, this package provides a function check() that verifies these requirements on a best-effort basis. If you package your problem, we recommend adding a unit test to your package that calls this function and exercise it on every CI job. CERN users are encouraged to consult the Acc-Py guidelines on testing for further information.