The Core API¶
This page describes the various pieces of the common optimization interfaces. You are invited to skip to a section that interests you, or to read the page top to bottom, at your leisure.
Keep in mind that while these interfaces are the most important ones, there are also others that provide important features. See, for example, Making an Optimization Configurable via GUI and Custom Per-Problem Optimizers.
The Interface Hierarchy¶
“Fig. 1: Inheritance diagram of the core interfaces”¶
The interfaces are designed in a modular fashion: depending on the algorithms
that an optimization problem supports, it either implements SingleOptimizable
(for classical single-objective optimization), Env
(for reinforcement
learning) or both. The Problem
interface captures the greatest common
denominator – that, which all interfaces have in common.
As a convenience, this package also provides the OptEnv
interface. It is
simply an intersection of SingleOptimizable
and Env
. This means that
implementing it is the same as implementing both of its bases. At the same
time, every class that implements both base interfaces also implements
OptEnv
. A demonstration:
>>> import gymnasium
>>> from cernml import coi
...
>>> class Indirect(coi.SingleOptimizable, gymnasium.Env):
... optimization_space = ...
... observation_space = ...
... action_space = ...
...
>>> issubclass(Indirect, coi.OptEnv)
True
Minimal Implementations¶
This section shows you the absolute bare minimum to write any optimization problem at all. They’re intended to get your feet off the ground if you are new to this library. They are not interesting optimization problems. Anything non-trivial (e.g. communicating with an external machine) will require some additional steps. See Implementing SingleOptimizable for a more comprehensive tutorial.
Single-Objective Optimization Problems¶
For a minimal working example, you should inherit from
cernml.coi.SingleOptimizable
to have it fill in as many defaults as possible.
With it as a superclass, you only have to fill in three missing pieces:
get_initial_params()
to give the initial point of an optimization;compute_single_objective()
as the objective function to be minimized [1];optimization_space
to specify the problem’s domain, i.e. valid inputs to the objective function. See also Spaces for more information.
You also have to register your class so that the central function
cernml.coi.make()
can instantiate it. The page on Making Your Code Findable has
more information.
This is a minimal, runnable example problem:
1import numpy as np
2from gymnasium.spaces import Box
3
4from cernml import coi
5
6
7class Quadratic(coi.SingleOptimizable):
8 # This class doesn't do any rendering, but it's still useful to pass
9 # this parameter on, in case, you want to add rendering later.
10 def __init__(self, render_mode=None):
11 # The inherited initializer checks for us that `render_mode` is
12 # valid, and saves it as `self.render_mode`.
13 super().__init__(render_mode)
14
15 # Here, we define our problem's domain. Since the space is
16 # constant, we could've defined `optimization_space = Box(...)`
17 # at class scope as well.
18 self.optimization_space = Box(-1.0, 1.0, shape=(5,))
19
20 # The goal to be found by the optimizer. Randomized on each call
21 # to `get_initial_params()`.
22 self.goal = np.zeros(5)
23
24 # Defining the x_0 for our optimization problem. `seed` allows
25 # fixing random-number generation (RNG), `options` is a free-form
26 # dict that we can use for customization.
27 def get_initial_params(self, *, seed=None, options=None):
28 # The inherited function seeds an RNG `self.np_random` for us.
29 super().get_initial_params(seed=seed)
30
31 # We bind these attributes here to keep our code short.
32 space = self.optimization_space
33 rng = self.np_random
34
35 # Randomize the goal we want to move to and the initial point.
36 # We use `np_random` so that if the user passes `seed`, the
37 # problem is completely deterministic.
38 self.goal = rng.uniform(space.low, space.high, size=space.shape)
39 return rng.uniform(space.low, space.high, size=space.shape)
40
41 # Our objective function is simply the RMS of the distance between
42 # the two points.
43 def compute_single_objective(self, params):
44 return np.linalg.norm(self.goal - params)
45
46
47# Never forget to register your optimization problem!
48coi.register("QuadraticSearch-v1", entry_point=Quadratic)
Single-Objective Function Optimization Problems¶
See also
- Optimizing Points on an LSA Function
User guide page on function optimization problems.
For a minimal working example, you should inherit from
cernml.coi.FunctionOptimizable
to have it fill in as many defaults as
possible. With it as a superclass, you only have to fill in three missing
pieces:
get_initial_params()
to give the initial point for each individual optimization;compute_function_objective()
as the objective function to be minimized [1];get_optimization_space()
to specify the domain, i.e. valid inputs to the objective function. See also Spaces for more information.
You also have to register your class so that the central function
cernml.coi.make()
can instantiate it. The page on Making Your Code Findable has
more information.
This is a minimal, runnable example problem:
1import numpy as np
2from gymnasium.spaces import Box
3
4from cernml import coi
5
6
7class StraightLineSteering(coi.FunctionOptimizable):
8 # This class doesn't do any rendering, but it's still useful to pass
9 # this parameter on, in case, you want to add rendering later.
10 def __init__(self, render_mode=None):
11 # The inherited initializer checks for us that `render_mode` is
12 # valid, and saves it as `self.render_mode`.
13 super().__init__(render_mode)
14
15 # Our problem has a number of disturbances, initialized in
16 # `get_initial_params()`. Each one deviates our trajectory
17 # either to the left (negative values) or to the right (positive
18 # values). Our goal is to keep the trajectory as close to zero
19 # as possible.
20 self.disturbances = {}
21
22 # Our problem is particularly simple and has the same optimization
23 # space everywhere.
24 def get_optimization_space(self, cycle_time):
25 return Box(-1, 1, shape=())
26
27 # Defining the x_0 for our optimization problem. `seed` allows
28 # fixing random-number generation (RNG), `options` is a free-form
29 # dict that we can use for customization.
30 def get_initial_params(self, cycle_time, *, seed=None, options=None):
31 # The inherited function seeds an RNG `self.np_random` for us.
32 super().get_initial_params(cycle_time, seed=seed)
33
34 # Check that the given cycle time is allowed.
35 if not 0.0 < cycle_time < 1500.0:
36 raise ValueError(f"cycle time out of bounds: {cycle_time!r}")
37
38 # Initialize the disturbances here. We want the RNG to have been
39 # seeded already.
40 if not self.disturbances:
41 self.disturbances = {
42 self.np_random.integers(0, 1500): self.np_random.normal()
43 for _ in range(3)
44 }
45
46 return np.array(self.disturbances.get(int(cycle_time), 0.0))
47
48 # Our objective function is the integrated deviation from the ideal
49 # trajectory. Because each segment is a straight line, we simply
50 # calculate according to the trapezoidal rule.
51 def compute_function_objective(self, cycle_time, params):
52 # Apply the given parameters.
53 self.disturbances[int(cycle_time)] = float(params.item())
54 # Calculate the loss function by iterating over all disturbances
55 # in order.
56 integral = 0.0
57 prev_time = 0.0
58 prev_pos = 0.0
59 trajectory = 0.0
60 for time, disturbance in sorted(self.disturbances.items()):
61 pos = trajectory * (time - prev_time)
62 integral += 0.5 * (pos + prev_pos) * (time - prev_time)
63 trajectory += disturbance
64 prev_time = time
65 prev_pos = pos
66 time = 1500
67 pos = trajectory * (time - prev_time)
68 integral += 0.5 * (pos + prev_pos) * (time - prev_time)
69 # Cost function is the square because negative deviations are
70 # just as bad as positive ones.
71 return integral**2
72
73
74# Never forget to register your optimization problem!
75coi.register("StraightLineSteering-v1", entry_point=StraightLineSteering)
Reinforcement Learning Environments¶
For a minimal working example, you inherit from gymnasium.Env
and fill in the
X missing pieces:
reset()
to initialize the environment for a new episode and receive an initial observation;step()
to take an action in the current episode;observation_space
to specify the domain of observations that are returned byreset()
andstep()
;action_space
to specify the domain of actions that are accepted bystep()
. See also Spaces for more information.
You also have to register your class so that the central function
cernml.coi.make()
can instantiate it. The page on Making Your Code Findable has
more information.
This is a minimal, runnable example problem:
1import numpy as np
2from gymnasium import Env
3from gymnasium.spaces import Box
4
5from cernml import coi
6
7
8class Quadratic(Env):
9 # This class doesn't do any rendering, but it's still useful to
10 # accept this parameter, in case, you want to add rendering later.
11 def __init__(self, render_mode=None):
12 # The `render_mode` attribute is defined by `Env`.
13 self.render_mode = render_mode
14
15 # Here, we define our problem's domain. The observations that we
16 # receive are 2×5 arrays containing the goal and the current
17 # position …
18 self.observation_space = Box(-5.0, 5.0, shape=(2, 5))
19
20 # … and the actions are 5D arrays containing the direction where
21 # to walk on each step.
22 self.action_space = Box(-1.0, 1.0, shape=(5,))
23
24 # The environment state is the position where we are, and the
25 # goal where we should go.
26 self.position = np.zeros(5)
27 self.goal = np.zeros(5)
28
29 # Defining the initial state for each episode. `seed` allows fixing
30 # random-number generation (RNG), `options` is a free-form dict that
31 # we can use for customization.
32 def reset(self, *, seed=None, options=None):
33 # The inherited function seeds an RNG `self.np_random` for us.
34 super().reset(seed=seed)
35
36 # We bind these attributes here to keep our code short.
37 rng = self.np_random
38 space = self.observation_space
39
40 # Randomize the goal we want to move to and the initial point.
41 # We use `np_random` so that if the user passes `seed`, the
42 # problem is completely deterministic.
43 self.goal = rng.uniform(space.low, space.high, size=space.shape)
44 self.position = rng.uniform(space.low, space.high, size=space.shape)
45
46 # `Env` expects us to return `obs` (with the shape and limits
47 # given by `observation_space`) and a free-form *info* dict,
48 # which may contain metrics or debugging or logging info.
49 obs = np.stack((self.goal, self.position))
50 info = {}
51 return obs, info
52
53 # The state transition function. Accepts an action and returns
54 # a 5-tuple of: observation, reward for this step, boolean flags
55 # that indicate whether the episode is over, and an info dict like
56 # in `reset()`.
57 def step(self, action):
58 # Update our internal state and ensure everything stays within
59 # its limits.
60 self.position += action
61 self.position = np.clip(
62 self.position,
63 self.observation_space.low[1],
64 self.observation_space.high[1],
65 )
66
67 # We use the negative distance from the goal as reward. (Higher
68 # rewards are better, unlike with `SingleOptimizable`.) We end
69 # the episode when sufficiently close to the goal.
70 distance = np.linalg.norm(self.goal - self.position)
71 obs = np.stack((self.goal, self.position))
72 reward = -distance
73 terminated = distance < 0.01
74 truncated = False
75 info = {}
76 return obs, reward, terminated, truncated, info
77
78
79# Never forget to register your optimization problem!
80coi.register("QuadraticSearch-v2", entry_point=Quadratic)
Running Your Optimization Problem¶
See also
- Control Flow of Optimization Problems
User guide page with detailed information on each kind of execution loop.
Optimization problems – no matter whether based on Env
, SingleOptimizable
or another interface – are expected to be run as plugins into a host
application. While the Geoff project maintains a reference implementation of such a host application,
institutes and users are encouraged to write their own host applications,
tailored to their specific needs and re-using components of the broader Geoff
project as necessary.
Typically, host applications end up implementing one kind or another of execution loop executes an algorithm (e.g. a numerical optimizer or an RL policy) on a given problem. Minimal execution loops for the different kinds of problems (which might be useful for debugging) may look like this:
1from gymnasium.spaces import Box
2from numpy import clip
3
4from cernml import coi
5
6problem = coi.make("MySingleOptimizableProblem-v0")
7assert isinstance(problem, coi.SingleOptimizable)
8with problem:
9 # Fetch initial state.
10 optimizer = get_optimizer()
11 space = problem.optimization_space
12 assert isinstance(space, Box)
13 initial = params = problem.get_initial_params()
14 best = (float("inf"), initial)
15
16 while not optimizer.is_done():
17 # Update optimum.
18 loss = problem.compute_single_objective(params)
19 best = min(best, (float(loss), params))
20
21 # Fetch next set of parameters.
22 params = optimizer.step(loss)
23 params = clip(params, space.low, space.high)
24
25 if optimizer.has_failed():
26 # Restore initial state.
27 problem.compute_single_objective(initial)
28 else:
29 # Restore best state.
30 problem.compute_single_objective(best[1])
1from gymnasium.spaces import Box
2from numpy import clip
3
4from cernml import coi
5
6problem = coi.make("MyFunctionOptimizableProblem-v0")
7assert isinstance(problem, coi.FunctionOptimizable)
8with problem:
9 # Select skeleton points.
10 skeleton_points = problem.override_skeleton_points()
11 if skeleton_points is None:
12 skeleton_points = request_skeleton_points()
13
14 # Keep track of which points we have modified and which not.
15 restore_on_failure = []
16
17 try:
18 for time in skeleton_points:
19 # Fetch initial state.
20 optimizer = get_optimizer()
21 space = problem.get_optimization_space(time)
22 assert isinstance(space, Box)
23 initial = params = problem.get_initial_params(time)
24 best = (float("inf"), initial)
25 restore_on_failure.append((time, initial))
26
27 while not optimizer.is_done():
28 # Update optimum.
29 loss = problem.compute_function_objective(time, params)
30 best = min(best, (float(loss), params))
31
32 # Fetch next set of parameters.
33 params = optimizer.step(loss)
34 params = clip(params, space.low, space.high)
35
36 if optimizer.has_failed():
37 raise OptFailed(f"optimizer failed at t={time}")
38 else:
39 # Restore best state.
40 problem.compute_function_objective(time, best[1])
41 except:
42 # If anything fails, restore initial state not only for the
43 # current skeleton point, but all previous ones as well.
44 while restore_on_failure:
45 time, params = restore_on_failure.pop()
46 problem.compute_function_objective(time, params)
47 raise
1from gymnasium import Env
2from gymnasium.spaces import Box
3from numpy import clip
4
5from cernml import coi
6
7policy = get_policy()
8num_episodes = get_num_episodes()
9
10# Limit steps per episode to prevent infinite loops.
11env = coi.make("MyEnv-v0", max_episode_steps=10)
12assert isinstance(env, Env)
13with env:
14 ac_space = env.action_space
15 assert isinstance(ac_space, Box)
16
17 for _ in range(num_episodes):
18 terminated = truncated = False
19 obs, info = env.reset()
20 while not (terminated or truncated):
21 action = policy.predict(obs)
22 action = clip(action, ac_space.low, ac_space.high)
23 obs, reward, terminated, truncated, info = env.step(action)
While these examples are very bare-bones, various libraries already provide pre-packaged execution loops with a number of additional conveniences:
- Stable Baselines 3
supports the
Env
API and RL environments can be passed directly to the variousagent.learn()
methods; in addition, the package provides a functionevaluate_policy()
to solve a problem with a given agent or policy.- cernml-rltools
provides a module
cernml.rltools.envloop
with an older and more general-purpose implementation of the environment interaction loop.- cernml-coi-optimizers
provides a uniform interface for solvers of
SingleOptimizable
problems. Its general-purposesolve()
function is directly compatible with the COI.
In addition, many optimizers like scipy.optimize.minimize()
and
Py-BOBYQA are able to
consume SingleOptimizable
with only minor adjustments.
Spaces¶
Optimization is always executed over a certain numeric domain, i.e. a space
of allowed values. These domains are encapsulated by Gym’s concept of
a Space
. While Gym provides many different kinds of spaces (discrete,
continuous, aggregate, …), the COI only support Box
at this
time. This restriction may be lifted in the future, depending on user feedback.
The interfaces make use of spaces as follows:
SingleOptimizable.optimization_space
the domain of valid inputs to
compute_single_objective()
;Env.action_space
the domain of valid inputs to
step()
;Env.observation_space
the domain of valid observations returned by
reset()
andstep()
.
Naming Your Quantities¶
In many cases, your objective function and parameters directly correspond to machine parameters. For example, many optimization problems might only scale their parameters and otherwise send them unmodified to the machine via JAPC. Similarly, the objective function might only be a rescaled or inverted reading from a detector on the accelerator.
In such cases, it is useful to declare the meaning of your quantities. A host
application may use this to annotate its graphs of the parameters and objective
function. The SingleOptimizable
class provides three attributes for this
purpose:
from cernml import coi
class SomeProblem(coi.SingleOptimizable):
objective_name = "RMS BPM Position (mm)"
param_names = [
"CORRECTOR.10",
"CORRECTOR.20",
"CORRECTOR.30",
"CORRECTOR.40",
]
constraint_names = [
"BCT Intensity",
]
def compute_single_objective(self, params):
for name, value in zip(self.param_names, params):
self._japc.setParam(f"logical.{name}/K", value)
...
Note that these three values need not be defined inside the class scope. You
are free to define them inside your __init__()
method or change
them at run-time. This is useful because some optimization problems might
decide to be configurable in the exact devices they talk to.
You are free not to define these attributes at all. In this case, the host application will see the inherited default values and assume no particular meaning of your quantities.
Metadata¶
Every optimization problem should have a class attribute called
Problem.metadata
, which is a dict with string keys. The dict should be
defined at the class level and immutable [2]. It communicates
fundamental properties of the class and how a host application can use it.
While the API reference contains the full definition of the Standard Metadata Keys, the following is an abridged version:
"render_modes"
the render modes that the optimization problem understands (see Rendering);
"cern.machine"
the accelerator that an optimization problem is associated with (see
cernml.coi.Machine
);"cern.japc"
a boolean flag indicating whether the problem’s constructor expects an argument named japc of type
PyJapc
;"cern.cancellable"
A boolean flag indicating whether the problem’s constructor expects a cancellation token. (see Cancellation).
While authors of optimization problems are strongly encouraged to
make metadata
immutable and class-scoped, host applications
cannot rely on this. Edge cases are known where the attribute is either
instance-scoped or the dict is swapped out for another. No cases are known
where an existing dict is modified in-place.
Rendering¶
The metadata entry "render_modes"
allows a problem to declare that its
internal state can be visualized. It should be a list of strings where each
string is a supported render mode. Host applications may pick one of these
strings and pass it to the problems render()
method. For this
to work, render modes need to have well-defined semantics.
The following render modes are standardized by either Gym or this package:
"human"
The default mode, for interactive use. This should e.g. open a window and display the problem’s current state in it. Displaying the window should not block control flow.
"ansi"
Return a text-only representation of the problem. This may contain e.g. terminal control codes for color effects.
"rgb_array"
Return a Numpy array representing color image data.
"matplotlib_figures"
Return a list of Matplotlib
Figure
objects, suitable for embedding into a GUI application.
See the render()
docs for a full spec of each render
mode.
Closing¶
Some optimization problems have to acquire certain resources in order to perform their tasks. Examples include:
spawning processes,
starting threads,
subscribing to JAPC parameters.
While Python garbage-collects objects which are no longer accessible (including
Problem
instances), some of these resources require manual function calls in
order to be properly cleaned up.
If such is the case for an optimization problem, it should override the
close()
method and define all such actions in it. A host application
is required to call close()
when it has no more need for an
optimization problem.
All classes that inherit from Problem
automatically are context
managers that can be used in with
blocks.
Whenever the with
block is exited, close()
gets called
automatically.
Note
If, for some reason, you are dealing with an optimization problem that
doesn’t explicitly subclass Problem
, you can use the contextlib.closing
adapter:
from contextlib import closing
with closing(MyProblem(...)) as problem:
optimize(problem)
This ensures that close()
is called under all circumstances –
even if an exception occurs.
Additional Restrictions¶
For maximum compatibility, this API puts the following additional restrictions on environments:
The
observation_space
,action_space
andoptimization_space
must all beBoxes
. The only exception is if the environment is aGoalEnv
: in that case,observation_space
must beDict
(with exactly the three expected keys) and the three required sub-spaces must beBoxes
.If the environment supports any rendering at all, it should support at least the human, ansi and matplotlib_figures. The former two facilitate debugging and stand-alone usage, the latter makes it possible to embed the environment into a GUI.
At CERN, The environment metadata must contain a key
"cern.machine"
with a value of typeMachine
. It tells users which CERN accelerator the environment belongs to. Outside of CERN, authors are free to omit this key and institutes are allowed to define a category key of their own.
For the convenience of problem authors, this package provides a function
check()
that verifies these requirements on a best-effort basis. If you
package your problem, we recommend adding a unit test to your package that
calls this function and exercise it on every CI job. CERN users are encouraged
to consult the Acc-Py guidelines on testing for further information.