# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - API and implementation and/or documentation
# :author: Damien Guenot - 26 avr. 2016
# :author: Francois Gallard, refactoring
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""Driver library.
A driver library aims to solve an :class:`.OptimizationProblem`
using a particular algorithm from a particular family of numerical methods.
This algorithm will be in charge of evaluating the objective and constraints
functions at different points of the design space, using the
:meth:`.DriverLib.execute` method.
The most famous kinds of numerical methods to solve an optimization problem
are optimization algorithms and design of experiments (DOE). A DOE driver
browses the design space agnostically, i.e. without taking into
account the function evaluations. On the contrary, an optimization algorithm
uses this information to make the journey through design space
as relevant as possible in order to reach as soon as possible the optimum.
These families are implemented in :class:`.DOELibrary`
and :class:`.OptimizationLibrary`.
"""
from __future__ import annotations
import io
import logging
import string
from dataclasses import dataclass
from time import time
from typing import Callable
from typing import ClassVar
from typing import List
from typing import Union
import tqdm
from numpy import ndarray
from numpy import ones
from numpy import where
from numpy import zeros
from tqdm.utils import _unicode
from tqdm.utils import disp_len
from gemseo.algos.algo_lib import AlgoLib
from gemseo.algos.algo_lib import AlgorithmDescription
from gemseo.algos.design_space import DesignSpace
from gemseo.algos.opt_problem import OptimizationProblem
from gemseo.algos.opt_result import OptimizationResult
from gemseo.algos.stop_criteria import DesvarIsNan
from gemseo.algos.stop_criteria import FtolReached
from gemseo.algos.stop_criteria import FunctionIsNan
from gemseo.algos.stop_criteria import MaxIterReachedException
from gemseo.algos.stop_criteria import MaxTimeReached
from gemseo.algos.stop_criteria import TerminationCriterion
from gemseo.algos.stop_criteria import XtolReached
from gemseo.utils.string_tools import MultiLineString
DriverLibOptionType = Union[str, float, int, bool, List[str], ndarray]
LOGGER = logging.getLogger(__name__)
[docs]class TqdmToLogger(io.StringIO):
"""Redirect tqdm output to the gemseo logger."""
[docs] def write(self, buf):
"""Write buffer."""
buf = buf.strip(string.whitespace)
if buf:
LOGGER.info(buf)
[docs]@dataclass
class DriverDescription(AlgorithmDescription):
"""The description of a driver."""
handle_integer_variables: bool = False
"""Whether the optimization algorithm handles integer variables."""
require_gradient: bool = False
"""Whether the optimization algorithm requires the gradient."""
[docs]class ProgressBar(tqdm.tqdm):
"""Extend tqdm progress bar with better time units.
Use hour, day or week for slower processes.
"""
@staticmethod
def __convert_rate(total, elapsed):
rps = float(total) / elapsed
if rps >= 0:
rate = rps
unit = "sec"
rpm = rps * 60.0
if rpm < 60.0:
rate = rpm
unit = "min"
rph = rpm * 60.0
if rph < 60.0:
rate = rph
unit = "hour"
rpd = rph * 24.0
if rpd < 24.0:
rate = rpd
unit = "day"
return rate, f" it/{unit}"
[docs] def status_printer(
self, file: io.TextIOWrapper | io.StringIO
) -> Callable[[str], None]:
"""Overload the status_printer method to avoid the use of closures.
Args:
file: Specifies where to output the progress messages.
Returns:
The function to print the status in the progress bar.
"""
self._last_len = [0]
return self._print_status
def _print_status(self, s):
len_s = disp_len(s)
self.fp.write(
_unicode("\r{}{}".format(s, (" " * max(self._last_len[0] - len_s, 0))))
)
self.fp.flush()
self._last_len[0] = len_s
def __getstate__(self):
state = self.__dict__.copy()
# A file-like stream cannot be pickled.
del state["fp"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
# Set back the file-like stream to its state as done in tqdm.__init__.
self.fp = tqdm.utils.DisableOnWriteError(TqdmToLogger(), tqdm_instance=self)
[docs]class DriverLib(AlgoLib):
"""Abstract class for DOE & optimization libraries interfaces.
Lists available methods in the library for the proposed
problem to be solved.
To integrate an optimization package, inherit from this class
and put your file in gemseo.algos.doe or gemseo.algo.opt packages.
"""
USER_DEFINED_GRADIENT = OptimizationProblem.USER_GRAD
COMPLEX_STEP_METHOD = OptimizationProblem.COMPLEX_STEP
FINITE_DIFF_METHOD = OptimizationProblem.FINITE_DIFFERENCES
DIFFERENTIATION_METHODS = [
USER_DEFINED_GRADIENT,
COMPLEX_STEP_METHOD,
FINITE_DIFF_METHOD,
]
INEQ_TOLERANCE = "ineq_tolerance"
EQ_TOLERANCE = "eq_tolerance"
MAX_TIME = "max_time"
USE_DATABASE_OPTION = "use_database"
NORMALIZE_DESIGN_SPACE_OPTION = "normalize_design_space"
_NORMALIZE_DS = True
ROUND_INTS_OPTION = "round_ints"
EVAL_OBS_JAC_OPTION = "eval_obs_jac"
MAX_DS_SIZE_PRINT = 40
_ACTIVATE_PROGRESS_BAR_OPTION_NAME = "activate_progress_bar"
"""The name of the option to activate the progress bar in the optimization log."""
activate_progress_bar: ClassVar[bool] = True
"""Whether to activate the progress bar in the optimization log."""
def __init__(self):
# Library settings and check
super().__init__()
self.__progress_bar = None
self.__activate_progress_bar = self.activate_progress_bar
self.__max_iter = 0
self.__iter = 0
self._start_time = None
self._max_time = None
self.__message = None
[docs] def deactivate_progress_bar(self) -> None:
"""Deactivate the progress bar."""
self.__progress_bar = None
[docs] def init_iter_observer(
self,
max_iter: int,
message: str,
) -> None:
"""Initialize the iteration observer.
It will handle the stopping criterion and the logging of the progress bar.
Args:
max_iter: The maximum number of iterations.
message: The message to display at the beginning.
Raises:
ValueError: If the `max_iter` is not greater than or equal to one.
"""
if max_iter < 1:
raise ValueError(f"max_iter must be >=1, got {max_iter}")
self.__max_iter = max_iter
self.__iter = 0
self.__message = message
if self.__activate_progress_bar:
self.__progress_bar = ProgressBar(
total=self.__max_iter,
desc=self.__message,
ascii=False,
bar_format="... {percentage:3.0f}%|{bar}{r_bar}",
file=TqdmToLogger(),
)
else:
self.deactivate_progress_bar()
self._start_time = time()
self.problem.max_iter = max_iter
def __set_progress_bar_objective_value(self, x_vect: ndarray | None) -> None:
"""Set the objective value in the progress bar.
Args:
x_vect: The design variables values.
If None, consider the objective at the last iteration.
"""
if x_vect is None:
value = self.problem.objective.last_eval
else:
value = self.problem.database.get_f_of_x(
self.problem.objective.name, x_vect
)
if value is not None:
# if maximization problem: take the opposite
if (
not self.problem.minimize_objective
and not self.problem.use_standardized_objective
):
value = -value
self.__progress_bar.set_postfix(refresh=False, obj=value)
self.__progress_bar.update()
else:
self.__progress_bar.update()
[docs] def new_iteration_callback(self, x_vect: ndarray | None = None) -> None:
"""Callback called at each new iteration, i.e. every time a design vector that
is not already in the database is proposed by the optimizer.
Iterate the progress bar, implement the stop criteria.
Args:
x_vect: The design variables values. If None, use the values of the
last iteration.
Raises:
MaxTimeReached: If the elapsed time is greater than the maximum
execution time.
"""
# First check if the max_iter is reached and update the progress bar
self.__iter += 1
self.problem.current_iter = self.__iter
if self._max_time > 0:
delta_t = time() - self._start_time
if delta_t > self._max_time:
raise MaxTimeReached()
if self.__progress_bar is not None:
self.__set_progress_bar_objective_value(x_vect)
[docs] def finalize_iter_observer(self) -> None:
"""Finalize the iteration observer."""
if self.__progress_bar is not None:
self.__progress_bar.close()
def _pre_run(
self,
problem: OptimizationProblem,
algo_name: str,
**options: DriverLibOptionType,
) -> None:
"""To be overridden by subclasses.
Specific method to be executed just before _run method call.
Args:
problem: The optimization problem.
algo_name: The name of the algorithm.
**options: The options of the algorithm,
see the associated JSON file.
"""
self._max_time = options.get(self.MAX_TIME, 0.0)
LOGGER.info("%s", problem)
if problem.design_space.dimension <= self.MAX_DS_SIZE_PRINT:
log = MultiLineString()
log.indent()
log.add("over the design space:")
for line in str(problem.design_space).split("\n")[1:]:
log.add(line)
LOGGER.info("%s", log)
LOGGER.info("Solving optimization problem with algorithm %s:", algo_name)
def _post_run(self, problem, algo_name, result, **options): # pylint: disable=W0613
"""To be overridden by subclasses Specific method to be executed just after _run
method call.
Args:
problem: The problem to be solved.
algo_name: The name of the algorithm.
result: The result of the run, e.g. an :class:`.OptimizationResult`.
**options: The options of the algorithm.
"""
opt_result_str = result._strings
LOGGER.info("%s", opt_result_str[0])
if result.constraints_values:
if result.is_feasible:
LOGGER.info("%s", opt_result_str[1])
else:
LOGGER.warning("%s", opt_result_str[1])
LOGGER.info("%s", opt_result_str[2])
problem.solution = result
if result.x_opt is not None:
problem.design_space.set_current_value(result)
if problem.design_space.dimension <= self.MAX_DS_SIZE_PRINT:
log = MultiLineString()
log.indent()
log.indent()
for line in str(problem.design_space).split("\n"):
log.add(line)
LOGGER.info("%s", log)
def _check_integer_handling(
self,
design_space: DesignSpace,
force_execution: bool,
) -> None:
"""Check if the algo handles integer variables.
The user may force the execution if needed, in this case a warning is logged.
Args:
design_space: The design space of the problem.
force_execution: Whether to force the execution of the algorithm when
the problem includes integer variables and the algo does not handle
them.
Raises:
ValueError: If `force_execution` is set to `False` and
the algo does not handle integer variables and the
design space includes at least one integer variable.
"""
if (
design_space.has_integer_variables()
and not self.descriptions[self.algo_name].handle_integer_variables
):
if not force_execution:
raise ValueError(
"Algorithm {} is not adapted to the problem, it does not handle "
"integer variables.\n"
"Execution may be forced setting the 'skip_int_check' "
"argument to 'True'.".format(self.algo_name)
)
else:
LOGGER.warning(
"Forcing the execution of an algorithm that does not handle "
"integer variables."
)
[docs] def execute(
self,
problem: OptimizationProblem,
algo_name: str | None = None,
eval_obs_jac: bool = False,
skip_int_check: bool = False,
**options: DriverLibOptionType,
) -> OptimizationResult:
"""Execute the driver.
Args:
problem: The problem to be solved.
algo_name: The name of the algorithm.
If None, use the algo_name attribute
which may have been set by the factory.
eval_obs_jac: Whether to evaluate the Jacobian of the observables.
skip_int_check: Whether to skip the integer variable handling check
of the selected algorithm.
**options: The options for the algorithm.
Returns:
The optimization result.
Raises:
ValueError: If `algo_name` was not either set by the factory or given
as an argument.
"""
self.problem = problem
if algo_name is not None:
self.algo_name = algo_name
if self.algo_name is None:
raise ValueError(
"Algorithm name must be either passed as "
"argument or set by the attribute 'algo_name'."
)
self._check_algorithm(self.algo_name, problem)
self._check_integer_handling(problem.design_space, skip_int_check)
activate_progress_bar = options.pop(
self._ACTIVATE_PROGRESS_BAR_OPTION_NAME, None
)
if activate_progress_bar is not None:
self.__activate_progress_bar = activate_progress_bar
options = self._update_algorithm_options(**options)
self.internal_algo_name = self.descriptions[
self.algo_name
].internal_algorithm_name
problem.check()
problem.preprocess_functions(
is_function_input_normalized=options.get(
self.NORMALIZE_DESIGN_SPACE_OPTION, self._NORMALIZE_DS
),
use_database=options.get(self.USE_DATABASE_OPTION, True),
round_ints=options.get(self.ROUND_INTS_OPTION, True),
eval_obs_jac=eval_obs_jac,
)
try: # Term criteria such as max iter or max_time can be triggered in pre_run
self._pre_run(problem, self.algo_name, **options)
result = self._run(**options)
except TerminationCriterion as error:
result = self._termination_criterion_raised(error)
self.finalize_iter_observer()
self.problem.clear_listeners()
self._post_run(problem, algo_name, result, **options)
return result
def _process_specific_option(self, options, option_key):
"""Process one option as a special treatment, at the beginning of the general
treatment and checks of _process_options.
Args:
options: The options as preprocessed by _process_options.
option_key: The current option key to process.
"""
if option_key == self.INEQ_TOLERANCE:
self.problem.ineq_tolerance = options[option_key]
del options[option_key]
elif option_key == self.EQ_TOLERANCE:
self.problem.eq_tolerance = options[option_key]
del options[option_key]
def _termination_criterion_raised(self, error): # pylint: disable=W0613
"""Retrieve the best known iterate when max iter has been reached.
Args:
error: The obtained error from the algorithm.
"""
if isinstance(error, TerminationCriterion):
message = ""
if isinstance(error, MaxIterReachedException):
message = "Maximum number of iterations reached."
elif isinstance(error, FunctionIsNan):
message = "Function value or gradient or constraint is NaN, "
message += "and problem.stop_if_nan is set to True."
elif isinstance(error, DesvarIsNan):
message = "Design variables are NaN."
elif isinstance(error, XtolReached):
message = "Successive iterates of the design variables "
message += "are closer than xtol_rel or xtol_abs."
elif isinstance(error, FtolReached):
message = "Successive iterates of the objective function "
message += "are closer than ftol_rel or ftol_abs."
elif isinstance(error, MaxTimeReached):
message = f"Maximum time reached: {self._max_time} seconds."
message += " GEMSEO Stopped the driver"
else:
message = error.args[0]
result = self.get_optimum_from_database(message)
return result
[docs] def get_optimum_from_database(self, message=None, status=None):
"""Retrieves the optimum from the database and builds an optimization result
object from it."""
problem = self.problem
if len(problem.database) == 0:
return OptimizationResult(
optimizer_name=self.algo_name,
message=message,
status=status,
n_obj_call=0,
)
x_0 = problem.database.get_x_by_iter(0)
# compute the best feasible or infeasible point
f_opt, x_opt, is_feas, c_opt, c_opt_grad = problem.get_optimum()
if (
f_opt is not None
and not problem.minimize_objective
and not problem.use_standardized_objective
):
f_opt = -f_opt
if x_opt is None:
optimum_index = None
else:
optimum_index = problem.database.get_index_of(x_opt)
return OptimizationResult(
x_0=x_0,
x_opt=x_opt,
f_opt=f_opt,
optimizer_name=self.algo_name,
message=message,
status=status,
n_obj_call=problem.objective.n_calls,
is_feasible=is_feas,
constraints_values=c_opt,
constraints_grad=c_opt_grad,
optimum_index=optimum_index,
)
def _get_options(self, **options):
"""Retrieve the options of the library. To be overloaded by subclasses. Used to
define default values for options using keyword arguments.
Args:
**options: The options of the driver.
"""
raise NotImplementedError()
def _run(self, **options):
"""Run the algorithm, to be overloaded by subclasses.
Args:
**options: The options of the driver.
"""
raise NotImplementedError()
[docs] def is_algo_requires_grad(self, algo_name):
"""Returns True if the algorithm requires a gradient evaluation.
Args:
algo_name: The name of the algorithm.
"""
if algo_name not in self.descriptions:
raise ValueError(f"Algorithm {algo_name} is not available.")
return self.descriptions[algo_name].require_gradient
[docs] def get_x0_and_bounds_vects(self, normalize_ds):
"""Gets x0, bounds, normalized or not depending on algo options, all as numpy
arrays.
Args:
normalize_ds: Whether to normalize the input variables
that are not integers,
according to the normalization policy of the design space.
Returns:
The current value, the lower bounds and the upper bounds.
"""
design_space = self.problem.design_space
l_b = design_space.get_lower_bounds()
u_b = design_space.get_upper_bounds()
# remove normalization from options for algo
if normalize_ds:
norm_array = design_space.dict_to_array(design_space.normalize)
l_b = where(norm_array, zeros(norm_array.shape), l_b)
u_b = where(norm_array, ones(norm_array.shape), u_b)
current_x = self.problem.get_x0_normalized(cast_to_real=True)
else:
current_x = self.problem.design_space.get_current_value(
complex_to_real=True
)
return current_x, l_b, u_b
[docs] def ensure_bounds(self, orig_func, normalize=True):
"""Project the design vector onto the design space before execution.
Args:
orig_func: The original function.
normalize: Whether to use the normalized design space.
Returns:
A function calling the original function
with the input data projected onto the design space.
"""
def wrapped_func(x_vect):
x_proj = self.problem.design_space.project_into_bounds(x_vect, normalize)
return orig_func(x_proj)
return wrapped_func