# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - API and implementation and/or documentation
# :author: Damien Guenot - 26 avr. 2016
# :author: Francois Gallard, refactoring
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""
Driver library
==============
A driver library aims to solve an :class:`.DriverLib`
using a particular algorithm from a particular family of numerical methods.
This algorithm will be in charge of evaluating the objective and constraints
functions at different points of the design space, using the
:meth:`.DriverLib.execute` method.
The most famous kinds of numerical methods to solve an optimization problem
are optimization algorithms and design of experiments (DOE). A DOE driver
browses the design space agnostically, i.e. without taking into
account the function evaluations. On the contrary, an optimization algorithm
uses this information to make the journey through design space
as relevant as possible in order to reach as soon as possible the optimum.
These families are implemented in :class:`.DOELibrary`
and :class:`.OptimizationLibrary`.
"""
from __future__ import division, unicode_literals
import inspect
import io
import logging
import string
from os.path import dirname, exists, join
from time import time
import tqdm
from numpy import ones_like, where, zeros_like
from gemseo.algos.opt_problem import OptimizationProblem
from gemseo.algos.opt_result import OptimizationResult
from gemseo.algos.stop_criteria import (
DesvarIsNan,
FtolReached,
FunctionIsNan,
MaxIterReachedException,
MaxTimeReached,
TerminationCriterion,
XtolReached,
)
from gemseo.core.grammar import InvalidDataException
from gemseo.core.json_grammar import JSONGrammar
from gemseo.utils.source_parsing import get_options_doc
LOGGER = logging.getLogger(__name__)
[docs]class TqdmToLogger(io.StringIO):
"""Redirect tqdm output to the gemseo logger."""
[docs] def write(self, buf):
"""Write buffer."""
buf = buf.strip(string.whitespace)
if buf:
LOGGER.info(buf)
[docs]class ProgressBar(tqdm.tqdm):
"""Extend tqdm progress bar with better time units.
Use hour, day or week for slower processes.
"""
@staticmethod
def __convert_rate(total, elapsed):
rps = float(total) / elapsed
if rps >= 0:
rate = rps
unit = "sec"
rpm = rps * 60.0
if rpm < 60.0:
rate = rpm
unit = "min"
rph = rpm * 60.0
if rph < 60.0:
rate = rph
unit = "hour"
rpd = rph * 24.0
if rpd < 24.0:
rate = rpd
unit = "day"
return rate, " it/{}".format(unit)
[docs]class DriverLib(object):
"""Abstract class for DOE & optimization libraries interfaces.
Lists available methods in the library for the proposed
problem to be solved.
To integrate an optimization package, inherit from this class
and put your file in gemseo.algos.doe or gemseo.algo.opt packages.
"""
USER_DEFINED_GRADIENT = OptimizationProblem.USER_GRAD
COMPLEX_STEP_METHOD = OptimizationProblem.COMPLEX_STEP
FINITE_DIFF_METHOD = OptimizationProblem.FINITE_DIFFERENCES
DIFFERENTIATION_METHODS = [
USER_DEFINED_GRADIENT,
COMPLEX_STEP_METHOD,
FINITE_DIFF_METHOD,
]
LIB = "lib"
INTERNAL_NAME = "internal_algo_name"
OPTIONS_DIR = "options"
OPTIONS_MAP = {}
PROBLEM_TYPE = "problem_type"
REQUIRE_GRAD = "require_grad"
HANDLE_EQ_CONS = "handle_equality_constraints"
HANDLE_INEQ_CONS = "handle_inequality_constraints"
POSITIVE_CONSTRAINTS = "positive_constraints"
INEQ_TOLERANCE = "ineq_tolerance"
EQ_TOLERANCE = "eq_tolerance"
MAX_TIME = "max_time"
USE_DATABASE_OPTION = "use_database"
NORMALIZE_DESIGN_SPACE_OPTION = "normalize_design_space"
ROUND_INTS_OPTION = "round_ints"
WEBSITE = "website"
DESCRIPTION = "description"
MAX_DS_SIZE_PRINT = 40
def __init__(self):
"""Constructor."""
# Library settings and check
self.lib_dict = {}
self.algo_name = None
self.internal_algo_name = None
self.problem = None
self.opt_grammar = None
self.__progress_bar = None
self.__max_iter = 0
self.__iter = 0
self._start_time = None
self._max_time = None
[docs] def init_options_grammar(self, algo_name):
"""Initializes the options grammar.
:param algo_name: name of the algorithm
"""
# Try algo name convention which has the prioryty over
# library options
basename = algo_name.upper() + "_options.json"
lib_dir = inspect.getfile(self.__class__)
opt_dir = join(dirname(lib_dir), self.OPTIONS_DIR)
algo_schema_file = join(opt_dir, basename)
if exists(algo_schema_file):
schema_file = algo_schema_file
else:
# Try to load library options convention by default
basename = self.__class__.__name__.upper() + "_options.json"
lib_schema_file = join(opt_dir, basename)
if exists(lib_schema_file):
schema_file = lib_schema_file
else:
msg = "Options grammar file " + algo_schema_file
msg += " for algorithm: " + algo_name
msg += " not found. And library options grammar file "
msg += lib_schema_file + " not found either."
raise ValueError(msg)
descr_dict = get_options_doc(self.__class__._get_options)
self.opt_grammar = JSONGrammar(
algo_name, schema_file=schema_file, descriptions=descr_dict
)
return self.opt_grammar
@property
def algorithms(self):
"""Return the available algorithms."""
return list(self.lib_dict.keys())
[docs] def init_iter_observer(self, max_iter, message):
"""Initialize the iteration observer.
It will handle the stopping criterion and the logging of the progress bar.
:param max_iter: maximum number of calls
:param message: message to display at the beginning
"""
if max_iter < 1:
raise ValueError("max_iter must be >=1, got {}".format(max_iter))
self.__max_iter = max_iter
self.__iter = len(self.problem.database)
self.__progress_bar = ProgressBar(
total=self.__max_iter,
desc=message,
ascii=False,
file=TqdmToLogger(),
)
def __set_progress_bar_objective_value(self):
value = self.problem.objective.last_eval
if value is not None:
# if maximization problem: take the opposite
if not self.problem.minimize_objective:
value = -value
self.__progress_bar.set_postfix(refresh=False, obj=value)
[docs] def new_iteration_callback(self):
"""Callback called at each new iteration, ie every time a design vector that is
not already in the database is proposed by the optimizer.
Iterates the progress bar, implements the stop criteria.
"""
# First check if the max_iter is reached and update the progress bar
self.__iter += 1
if self.__iter > self.__max_iter:
raise MaxIterReachedException()
if self._max_time > 0:
delta_t = time() - self._start_time
if delta_t > self._max_time:
raise MaxTimeReached()
self.__set_progress_bar_objective_value()
self.__progress_bar.update()
[docs] def finalize_iter_observer(self):
"""Finalize the iteration observer."""
self.__set_progress_bar_objective_value()
self.__progress_bar.close()
def _pre_run(self, problem, algo_name, **options):
"""To be overridden by subclasses Specific method to be executed just before
_run method call.
:param problem: the problem to be solved
:param algo_name: name of the algorithm
:param options: the options dict for the algorithm, see associated JSON file
"""
self._start_time = time()
self._max_time = options.get(self.MAX_TIME, 0.0)
def _post_run(self, problem, algo_name, result, **options): # pylint: disable=W0613
"""To be overridden by subclasses Specific method to be executed just after _run
method call.
:param problem: the problem to be solved
:param algo_name: name of the algorithm
:param result: result of the run such as an OptimizationResult
:param options: the options dict for the algorithm, see associated JSON file
"""
LOGGER.info("%s", result)
problem.solution = result
if result.x_opt is not None:
problem.design_space.set_current_x(result)
if problem.design_space.dimension <= self.MAX_DS_SIZE_PRINT:
LOGGER.info("%s", problem.design_space)
[docs] def driver_has_option(self, option_key):
"""Checks if the option key exists.
:param option_key: the name of the option
:return: True if the option is in the grammar
"""
return self.opt_grammar.is_data_name_existing(option_key)
def _process_options(self, **options):
"""After _get_options is called, the options are converted to algorithm specific
options, and checked.
:param options: driver options
"""
for option_key in list(options.keys()): # Copy keys on purpose
# Remove extra options added in the _get_option method of the
# driver
if not self.driver_has_option(option_key):
del options[option_key]
elif option_key == self.INEQ_TOLERANCE:
self.problem.ineq_tolerance = options[option_key]
del options[option_key]
elif option_key == self.EQ_TOLERANCE:
self.problem.eq_tolerance = options[option_key]
del options[option_key]
elif options[option_key] is None:
del options[option_key]
try:
self.opt_grammar.load_data(options)
except InvalidDataException:
raise ValueError("Invalid options for algorithm " + self.opt_grammar.name)
for option_key in list(options.keys()): # Copy keys on purpose
lib_option_key = self.OPTIONS_MAP.get(option_key)
# Overload with specific keys
if lib_option_key is not None:
options[lib_option_key] = options[option_key]
if lib_option_key != option_key:
del options[option_key]
return options
def _check_ignored_options(self, options):
"""Check that the user did not passed options that do not exist for this driver.
Raises a warning if it is the case
:param options: options dict
"""
for option_key in options:
if not self.driver_has_option(option_key):
msg = "Driver " + self.algo_name + " has no option " + option_key
msg += ", option is ignored !"
LOGGER.warning(msg)
[docs] def execute(self, problem, algo_name=None, **options):
"""Executes the driver.
:param problem: the problem to be solved
:param algo_name: name of the algorithm
if None, use self.algo_name
which may have been set by the factory (Default value = None)
:param options: the options dict for the algorithm
"""
self.problem = problem
if algo_name is not None:
self.algo_name = algo_name
if self.algo_name is None:
raise ValueError(
"Algorithm name must be either passed as "
+ "argument or set by the attribute self.algo_name"
)
self.__check_algorithm(self.algo_name, problem)
self.init_options_grammar(self.algo_name)
use_database = options.get(self.USE_DATABASE_OPTION, True)
normalize = options.get(self.NORMALIZE_DESIGN_SPACE_OPTION, True)
round_ints = options.get(self.ROUND_INTS_OPTION, True)
self._check_ignored_options(options)
options = self._get_options(**options)
self.internal_algo_name = self.lib_dict[self.algo_name][self.INTERNAL_NAME]
problem.check()
problem.preprocess_functions(
normalize=normalize, use_database=use_database, round_ints=round_ints
)
try: # Term criteria such as max iter or max_time can be triggered in pre_run
self._pre_run(problem, self.algo_name, **options)
result = self._run(**options)
except TerminationCriterion as error:
result = self._termination_criterion_raised(error)
self.finalize_iter_observer()
self.problem.clear_listeners()
self._post_run(problem, algo_name, result, **options)
return result
def _termination_criterion_raised(self, error): # pylint: disable=W0613
"""Retrieve the best known iterate when max iter has been reached.
:param error: the obtained error from the algorithm
"""
if isinstance(error, TerminationCriterion):
message = ""
if isinstance(error, MaxIterReachedException):
message = "Maximum number of iterations reached."
elif isinstance(error, FunctionIsNan):
message = "Function value or gradient or constraint is NaN, "
message += "and problem.stop_if_nan is set to True."
elif isinstance(error, DesvarIsNan):
message = "Design variables are NaN."
elif isinstance(error, XtolReached):
message = "Successive iterates of the design variables "
message += "are closer than xtol_rel or xtol_abs."
elif isinstance(error, FtolReached):
message = "Successive iterates of the objective function "
message += "are closer than ftol_rel or ftol_abs."
elif isinstance(error, MaxTimeReached):
message = "Maximum time reached : {} seconds.".format(self._max_time)
message += " GEMSEO Stopped the driver"
else:
message = error.args[0]
result = self.get_optimum_from_database(message)
return result
[docs] def get_optimum_from_database(self, message=None, status=None):
"""Retrieves the optimum from the database and builds an optimization result
object from it.
:param message: Default value = None)
:param status: Default value = None)
"""
problem = self.problem
if len(problem.database) == 0:
return OptimizationResult(
optimizer_name=self.algo_name,
message=message,
status=status,
n_obj_call=0,
)
x_0 = problem.database.get_x_by_iter(0)
# compute the best feasible or infeasible point
f_opt, x_opt, is_feas, c_opt, c_opt_grad = problem.get_optimum()
if f_opt is not None and not problem.minimize_objective:
f_opt = -f_opt
return OptimizationResult(
x_0=x_0,
x_opt=x_opt,
f_opt=f_opt,
optimizer_name=self.algo_name,
message=message,
status=status,
n_obj_call=problem.objective.n_calls,
is_feasible=is_feas,
constraints_values=c_opt,
constraints_grad=c_opt_grad,
)
def _get_options(self, **options):
"""Retrieves the options of the library To be overloaded by subclasses Used to
define default values for options using keyword arguments.
:param options: options of the driver
"""
raise NotImplementedError()
def _run(self, **options):
"""Runs the algorithm, to be overloaded by subclasses.
:param options: the options dict for the algorithm
"""
raise NotImplementedError()
def __check_algorithm(self, algo_name, problem):
"""Checks that algorithm required by user is available and adapted to the
problem. Set optimization library Set algorithm name according to optimization
library requirements.
:param algo_name : name of algorithm
:type algo_name: str
:param problem: optimization problem
:type problem: OptimizationProblem
"""
# Check that the algorithm is available
if algo_name not in self.lib_dict:
raise KeyError(
"Requested optimization algorithm"
+ " %s is not in list of available algorithms %s"
% (algo_name, list(self.lib_dict.keys()))
)
# Check that the algorithm is suited to the problem
algo_dict = self.lib_dict[self.algo_name]
if not self.is_algorithm_suited(algo_dict, problem):
raise ValueError(
"Algorithm {} is not adapted to the problem.".format(algo_name)
)
@staticmethod
def _display_result(result):
"""Displays the optimization result.
:param result: the result to display
"""
LOGGER.info("Algorithm execution finished, result is:")
LOGGER.info(result)
[docs] @staticmethod
def is_algorithm_suited(algo_dict, problem):
"""Checks if the algorithm is suited to the problem according to its algo dict.
:param algo_dict: the algorithm characteristics
:param problem: the opt_problem to be solved
"""
raise NotImplementedError()
[docs] def filter_adapted_algorithms(self, problem):
"""Filters the algorithms capable of solving the problem.
:param problem: the opt_problem to be solved
:returns: the list of adapted algorithms names
"""
available = []
for algo_name, algo_dict in self.lib_dict.items():
if self.is_algorithm_suited(algo_dict, problem):
available.append(algo_name)
return available
[docs] def is_algo_requires_grad(self, algo_name):
"""Returns True if the algorithm requires a gradient evaluation.
:param algo_name: name of the algorithm
"""
lib_alg = self.lib_dict.get(algo_name, None)
if lib_alg is None:
raise ValueError("Algorithm " + str(algo_name) + " is not available !")
return lib_alg.get(self.REQUIRE_GRAD, False)
[docs] def get_x0_and_bounds_vects(self, normalize_ds):
"""Gets x0, bounds, normalized or not depending on algo options, all as numpy
arrays.
:param normalize_ds: if True, normalizes all input vars
that are not integers, according to design space
normalization policy
:returns: x, lower bounds, upper bounds
"""
dspace = self.problem.design_space
l_b = dspace.get_lower_bounds()
u_b = dspace.get_upper_bounds()
# remove normalization from options for algo
if normalize_ds:
norm_dict = dspace.normalize
norm_array = dspace.dict_to_array(norm_dict)
xvec = self.problem.get_x0_normalized()
l_b = where(norm_array, zeros_like(xvec), l_b)
u_b = where(norm_array, ones_like(xvec), u_b)
else:
xvec = self.problem.design_space.get_current_x()
return xvec, l_b, u_b
[docs] def ensure_bounds(self, orig_func, normalize=True):
"""Project the design vector onto the design space before execution.
:param orig_func: the original function
:param normalize: if True, use the normalized design space
:returns: the wrapped function
"""
def wrapped_func(x_vect):
x_proj = self.problem.design_space.project_into_bounds(x_vect, normalize)
return orig_func(x_proj)
return wrapped_func