# Copyright 2022 Airbus SAS
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Gabriel Max DE MENDONÇA ABRANTES
# François Gallard
# Lluis ARMENGOL GARCIA
# Luca SARTORI
"""Pymoo optimization library wrapper."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from typing import Final
from typing import Union
from gemseo.algos.opt.optimization_library import OptimizationAlgorithmDescription
from gemseo.algos.opt.optimization_library import OptimizationLibrary
from gemseo.algos.opt_problem import OptimizationProblem
from gemseo.algos.opt_result import OptimizationResult
from gemseo.algos.opt_result_multiobj import MultiObjectiveOptimizationResult
from numpy import inf
from numpy import ndarray
from numpy import prod as np_prod
from numpy import size as np_size
from numpy import unique as np_unique
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.algorithms.moo.rnsga3 import RNSGA3
from pymoo.algorithms.moo.unsga3 import UNSGA3
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.core.crossover import Crossover
from pymoo.core.mutation import Mutation
from pymoo.core.sampling import Sampling
from pymoo.core.selection import Selection
from pymoo.operators.mutation.pm import PolynomialMutation
from pymoo.optimize import minimize
from pymoo.util.ref_dirs import get_reference_directions
from gemseo_pymoo.algos.opt.core.pymoo_problem_adapater import PymooProblem
from gemseo_pymoo.algos.stop_criteria import DesignSpaceExploredException
from gemseo_pymoo.algos.stop_criteria import HyperVolumeToleranceReached
from gemseo_pymoo.algos.stop_criteria import MaxGenerationsReached
if TYPE_CHECKING:
from gemseo.algos.stop_criteria import TerminationCriterion
from pymoo.core.operator import Operator
from pymoo.core.population import Population
from pymoo.util.reference_direction import MultiLayerReferenceDirectionFactory
from pymoo.util.reference_direction import ReferenceDirectionFactory
LOGGER = logging.getLogger(__name__)
EvolutionaryOperatorTypes = Union[Crossover, Mutation, Sampling, Selection]
[docs]
@dataclass
class PymooAlgorithmDescription(OptimizationAlgorithmDescription):
"""The description of an optimization algorithm from the pymoo library."""
library_name: str = "pymoo"
handle_integer_variables: bool = True
require_grad: bool = False
handle_equality_constraints: bool = False
handle_inequality_constraints: bool = True
positive_constraints: bool = False
problem_type: OptimizationProblem.ProblemType = (
OptimizationProblem.ProblemType.NON_LINEAR
)
[docs]
class PymooOpt(OptimizationLibrary):
"""Pymoo optimization library interface.
See :class:`gemseo.algos.opt.optimization_library.OptimizationLibrary`.
"""
N_PROCESSES: Final[str] = "n_processes"
"""The tag for the number of processes to use."""
MAX_GEN: Final[str] = "max_gen"
"""The tag for the maximum number of generations allowed."""
HV_TOL_REL: Final[str] = "hv_tol_rel"
"""The tag for the relative tolerance used in the hypervolume convergence check."""
HV_TOL_ABS: Final[str] = "hv_tol_abs"
"""The tag for the absolute tolerance used in the hypervolume convergence check."""
STOP_CRIT_N_HV: Final[str] = "stop_crit_n_hv"
"""The tag for the number of generations to account for in the hypervolume check."""
__PYMOO_WEBPAGE: Final[str] = "https://www.pymoo.org"
"""The pymoo webpage."""
__PYMOO_PREFIX: Final[str] = "PYMOO_"
"""The prefix added to the internal algorithm's name."""
CROSSOVER_OPERATOR: Final[str] = "crossover"
"""The crossover operator's name."""
MUTATION_OPERATOR: Final[str] = "mutation"
"""The mutation operator's name."""
SAMPLING_OPERATOR: Final[str] = "sampling"
"""The sampling operator's name."""
SELECTION_OPERATOR: Final[str] = "selection"
"""The selection operator's name."""
EVOLUTIONARY_OPERATORS: Final[dict[str, Operator]] = {
CROSSOVER_OPERATOR: Crossover,
MUTATION_OPERATOR: Mutation,
SAMPLING_OPERATOR: Sampling,
SELECTION_OPERATOR: Selection,
}
"""A dictionary with all evolutionary operators available as keys and their Pymoo
classes as values."""
PYMOO_GA: Final[str] = "PYMOO_GA"
"""The GEMSEO alias for the Genetic Algorithm."""
PYMOO_NSGA2: Final[str] = "PYMOO_NSGA2"
"""The GEMSEO alias for the Non-dominated Sorting Genetic Algorithm II."""
PYMOO_NSGA3: Final[str] = "PYMOO_NSGA3"
"""The GEMSEO alias for the Non-dominated Sorting Genetic Algorithm III."""
PYMOO_UNSGA3: Final[str] = "PYMOO_UNSGA3"
"""The GEMSEO alias for the Unified NSGA-III."""
PYMOO_RNSGA3: Final[str] = "PYMOO_RNSGA3"
"""The GEMSEO alias for the Reference Point Based NSGA-III."""
__PYMOO_ = "PYMOO_"
__PYMOO_METADATA: Final[dict[str, tuple[str, str]]] = {
PYMOO_GA: ("Genetic Algorithm", "soo/ga.html#GA:-Genetic-Algorithm"),
PYMOO_NSGA2: (
"Non-dominated Sorting Genetic Algorithm II",
"moo/nsga2.html#nb-nsga2",
),
PYMOO_NSGA3: (
"Non-dominated Sorting Genetic Algorithm III",
"moo/nsga3.html#nb-nsga3",
),
PYMOO_UNSGA3: ("Unified NSGA3", "moo/unsga3.html#nb-unsga3"),
PYMOO_RNSGA3: ("Reference Point Based NSGA3", "moo/rnsga3.html#nb-rnsga3"),
}
"""The description and webpage link of the pymoo algorithms."""
LIBRARY_NAME: Final[str] = "pymoo"
"""The library's name."""
pymoo_n_gen: int = 10000000
"""The pymoo's termination criterion based on the number of generations."""
_stop_crit_n_hv: int = 5
"""The number of generations to account for in the hypervolume convergence check."""
_ds_size: int
"""The design space size."""
_RESULT_CLASS: ClassVar[type[OptimizationResult]] = MultiObjectiveOptimizationResult
"""The class used to present the result of the optimization."""
def __init__(self) -> None:
"""Constructor.
Generate the library dict, which contains the list
of algorithms with their characteristics:
- does it require gradient
- does it handle equality constraints
- does it handle inequality constraints
"""
super().__init__()
# The design space size (useful for finite discrete problems).
self._ds_size = inf
# See https://www.pymoo.org/misc/constraints.html for eq constraints.
for algo_name, algo_value in self.__PYMOO_METADATA.items():
internal_name = algo_name.replace(self.__PYMOO_, "")
self.descriptions[algo_name] = PymooAlgorithmDescription(
algorithm_name=internal_name,
internal_algorithm_name=internal_name,
description=algo_value[0],
website=f"{self.__PYMOO_WEBPAGE}/algorithms/{algo_value[1]}",
handle_multiobjective=internal_name != "GA",
)
def _check_mo_handling(
self,
algo_name: str,
opt_problem: OptimizationProblem,
) -> None:
"""Check if the algorithm is capable of handling the optimization problem.
All algorithms implemented are capable of handling multi-objective problems,
except the :class:`~pymoo.algorithms.soo.nonconvex.ga.GA` one.
Args:
algo_name: The name of the algorithm.
opt_problem: The problem to be solved.
Raises:
ValueError: If the algo cannot handle the problem to be solved.
"""
if (
opt_problem.objective.dim > 1
and not self.descriptions[algo_name].handle_multiobjective
):
msg = (
f"Requested optimization algorithm {self.algo_name} can not handle "
"multiple objectives."
)
raise ValueError(msg)
def _get_options(
self,
max_iter: int = 999,
max_gen: int = 10000000,
ftol_rel: float = 1e-9,
ftol_abs: float = 1e-9,
xtol_rel: float = 1e-9,
xtol_abs: float = 1e-9,
hv_tol_rel: float = 1e-9,
hv_tol_abs: float = 1e-9,
stop_crit_n_x: int = 3,
stop_crit_n_hv: int = 5,
normalize_design_space: bool = True,
eq_tolerance: float = 1e-2,
ineq_tolerance: float = 1e-4,
ref_dirs: ndarray | None = None,
pop_size: int = 100,
sampling: Sampling | Population | None = None,
selection: Selection | None = None,
mutation: Mutation | None = None,
crossover: Crossover | None = None,
eliminate_duplicates: bool = True,
n_offsprings: int | None = None,
seed: int = 1,
pop_per_ref_point: int = 1,
mu: float = 0.1,
ref_points: ndarray | None = None,
n_partitions: int = 20,
n_points: int | None = None,
partitions: ndarray | None = None,
scaling_1: float | None = None,
scaling_2: float | None = None,
**options: Any,
) -> dict[str, Any]:
r"""Set the options default values.
To get the best and up-to-date information about algorithms options, go to
pymoo's algorithms `documentation <https://pymoo.org/algorithms/index.html>`_
Args:
max_iter: The maximum number of iterations, i.e. unique calls to f(x).
max_gen: The maximum number of generations.
ftol_rel: A stop criterion, the relative tolerance on the objective
function. If abs(f(xk)-f(xk+1))/abs(f(xk))<= ftol_rel: stop.
ftol_abs: A stop criterion, the absolute tolerance on the objective
function. If abs(f(xk)-f(xk+1))<= ftol_rel: stop.
xtol_rel: A stop criterion, the relative tolerance on the design variables.
If norm(xk-xk+1)/norm(xk)<= xtol_rel: stop.
xtol_abs: A stop criterion, absolute tolerance on the design variables.
If norm(xk-xk+1)<= xtol_abs: stop.
hv_tol_rel: A stop criterion, the relative tolerance on the hypervolume
convergence check. If norm(xk-xk+1)/norm(xk)<= hv_tol_rel: stop.
hv_tol_abs: A stop criterion, absolute tolerance on the hypervolume
convergence check. If norm(xk-xk+1)<= hv_tol_abs: stop.
stop_crit_n_x: The number of design vectors to account for during the
criteria check.
stop_crit_n_hv: The number of generations to account for during the
criterion check on the hypervolume indicator.
normalize_design_space: If True, scale the variables to the range [0, 1].
eq_tolerance: The equality tolerance.
ineq_tolerance: The inequality tolerance.
ref_dirs: The reference directions.
pop_size: The population size.
sampling: The sampling process that generates the initial population.
If None, the algorithm's default is used.
selection: The mating selection operator.
If None, the algorithm's default is used.
mutation: The mutation operator.
If None, the algorithm's default is used.
crossover: The crossover operator used to create offsprings.
If None, the algorithm's default is used.
eliminate_duplicates: If True, eliminate duplicates after merging
the parent and the offspring population.
n_offsprings: Number of offspring that are created through mating.
If None, it will be set equal to the population size.
seed: The random seed to be used.
pop_per_ref_point: The size of the population used for each reference point.
mu: The scaling of the reference lines used during survival selection.
Increasing mu will generate solutions with a larger spread.
ref_points: The reference points (Aspiration Points) as a NumPy array
where each row represents a point and each column a variable.
n_partitions: The number of gaps between two consecutive points
along an objective axis.
n_points: The number of points on the unit simplex.
partitions: The custom partitions.
scaling_1: The scaling of the first simplex.
scaling_2: The scaling of the second simplex.
**options: The other algorithm options.
Notes:
The pymoo library allows the user to define custom operators
to manage the processes of sampling, crossover, mutation and selection.
In case no info regarding these operators is provided, pymoo's
default will be used. Nevertheless, be aware that they may not be
suitable for problems containing integer design variables.
For details about each operator's options,
refer to https://pymoo.org/operators/index.html
Example:
Let us consider an optimization problem where
all the design variables are discrete (integers).
The following operators could be provided among the options:
>>> from pymoo.operators.crossover.sbx import SBX
>>> from pymoo.operators.sampling.rnd import IntegerRandomSampling
>>> from pymoo.operators.selection.rnd import RandomSelection
>>> from pymoo.operators.repair.rounding import RoundingRepair
>>> class MyIntegerMutationOperator(Mutation):
...
... def _do(self, problem, x, **kwargs_):
... # my integer mutation strategy
...
...
>>> operators = dict(
... selection=RandomSelection(),
... sampling=IntegerRandomSampling(),
... crossover=SBX(prob=0.9, eta=30, repair=RoundingRepair()),
... mutation=MyIntegerMutationOperator(),
... )
"""
return self._process_options(
max_iter=max_iter,
max_gen=max_gen,
ftol_rel=ftol_rel,
ftol_abs=ftol_abs,
xtol_rel=xtol_rel,
xtol_abs=xtol_abs,
hv_tol_rel=hv_tol_rel,
hv_tol_abs=hv_tol_abs,
stop_crit_n_x=stop_crit_n_x,
stop_crit_n_hv=stop_crit_n_hv,
normalize_design_space=normalize_design_space,
ineq_tolerance=ineq_tolerance,
eq_tolerance=eq_tolerance,
selection=selection,
sampling=sampling,
mutation=mutation,
crossover=crossover,
eliminate_duplicates=eliminate_duplicates,
n_offsprings=n_offsprings,
seed=seed,
mu=mu,
ref_points=ref_points,
pop_per_ref_point=pop_per_ref_point,
pop_size=pop_size,
ref_dirs=ref_dirs,
n_partitions=n_partitions,
n_points=n_points,
partitions=partitions,
scaling_1=scaling_1,
scaling_2=scaling_2,
**options,
)
@staticmethod
def _check_operator_suitable(
lower_bounds: ndarray,
upper_bounds: ndarray,
operator_instance: EvolutionaryOperatorTypes,
operator_class: Operator,
) -> None:
"""Check operator suitability according to design variables type and bounds.
Args:
lower_bounds: The design variables' lower bounds.
upper_bounds: The design variables' upper bounds.
operator_instance: The pymoo operator instance.
operator_class: The specific class of operator that the
``operator_instance`` that is being checked should be an instance of.
Raises:
TypeError: If the ``operator_instance`` is not an instance of the required
``operator_class``.
ValueError: If ``operator_instance`` refers to the mutation operator
:class:`~pymoo.operators.mutation.pm.PolynomialMutation` and at least
one design variable has equal lower and upper bounds.
"""
if not isinstance(operator_instance, operator_class):
msg = (
f"{operator_instance.__class__.__name__} must be an instance of "
f"{operator_class.__class__.__name__} or inherit from it."
)
raise TypeError(msg)
# Anticipate 'division by zero' errors when using PolynomialMutation,
# which occurs when we have equal lower and upper bounds.
if isinstance(operator_instance, PolynomialMutation) and any(
upper_bounds == lower_bounds
):
msg = (
"PolynomialMutation cannot handle equal lower and upper bounds.\n"
"Consider setting those design variables as constants of your problem."
)
raise ValueError(msg)
def _get_ref_dirs(
self,
ref_dirs_name: str,
**ref_dirs_options: dict[str, str],
) -> ReferenceDirectionFactory | MultiLayerReferenceDirectionFactory:
r"""Return the reference directions.
Get the reference directions using
:meth:`pymoo.factory.get_reference_directions` and based on the mandatory
argument ``ref_dirs_name``.
See `Reference Directions <https://pymoo.org/misc/reference_directions.html>`_
Args:
ref_dirs_name: The reference directions name.
**ref_dirs_options: The reference directions options.
Returns:
The reference directions. If ``ref_dirs_name`` is unknown,
``Riesz s-Energy`` is returned.
Raises:
ValueError: If multiple partitions are provided
for a single-objective problem.
"""
n_obj = self.problem.objective.dim
if ref_dirs_name == "das-dennis":
n_partitions = ref_dirs_options.pop("n_partitions")
return get_reference_directions(
ref_dirs_name, n_obj, n_partitions=n_partitions
)
if ref_dirs_name == "multi-layer":
n_partitions = ref_dirs_options.pop("n_partitions")
scaling_1 = ref_dirs_options.pop("scaling_1")
scaling_2 = ref_dirs_options.pop("scaling_2")
return get_reference_directions(
ref_dirs_name,
get_reference_directions(
"das-dennis", n_obj, n_partitions=n_partitions, scaling=scaling_1
),
get_reference_directions(
"das-dennis", n_obj, n_partitions=n_partitions, scaling=scaling_2
),
)
if ref_dirs_name == "layer-energy":
partitions = ref_dirs_options.pop("partitions")
if n_obj == 1 and np_size(partitions) > 1:
msg = (
"For a single-objective problem, "
"the partitions array must be of size 1!"
)
raise ValueError(msg)
return get_reference_directions(ref_dirs_name, n_obj, partitions)
# By default, return Riesz s-Energy (in case of an unknown name is provided).
n_points = ref_dirs_options.pop("n_points")
seed = ref_dirs_options.pop("seed")
return get_reference_directions(ref_dirs_name, n_obj, n_points, seed=seed)
def _pre_run(
self, problem: OptimizationProblem, algo_name: str, **options: Any
) -> None:
"""Take into account a new check for multi-objectives handling.
Args:
problem: The problem to be solved.
algo_name: The name of the algorithm.
**options: The options for the algorithm, see associated JSON file.
"""
super()._pre_run(problem, algo_name, **options)
self._check_mo_handling(algo_name, problem)
self._stop_crit_n_hv = options.get(self.STOP_CRIT_N_HV)
def _run(
self, **options: Any
) -> OptimizationResult | MultiObjectiveOptimizationResult:
"""Run the algorithm.
Args:
**options: The options for the algorithm.
Returns:
The optimization result.
Raises:
ValueError: If the algorithm's name is not valid.
"""
# Remove normalization from algorithm's options.
normalize_ds = options.pop(self.NORMALIZE_DESIGN_SPACE_OPTION, True)
# Instantiate the pymoo Problem.
pymoo_problem_options = {
self.N_PROCESSES: options.pop(self.N_PROCESSES, 1),
self.MAX_GEN: options.pop(self.MAX_GEN),
self.HV_TOL_REL: options.pop(self.HV_TOL_REL),
self.HV_TOL_ABS: options.pop(self.HV_TOL_ABS),
self.STOP_CRIT_N_HV: options.pop(self.STOP_CRIT_N_HV),
}
pymoo_problem = PymooProblem(
self.problem, normalize_ds, self, **pymoo_problem_options
)
# Problem type (continuous, discrete, mixed).
type_var_unique = np_unique(pymoo_problem.data["type_var"])
# Termination criterion based on the size of the design space.
# It is required to avoid being stuck when dealing with discrete variables,
# because GEMSEO does not count iterations already stored in database.
if len(type_var_unique) == 1 and type_var_unique[0] == "integer":
l_b, u_b = pymoo_problem.bounds()
self._ds_size = int(np_prod(u_b - l_b + 1))
if self._ds_size < self.problem.max_iter:
self.problem.add_callback(self._check_design_space_exploration)
else:
self._ds_size = inf
evol_operators = {}
for operator_name, operator_class in self.EVOLUTIONARY_OPERATORS.items():
operator_instance = options.pop(operator_name)
if operator_instance is not None:
self._check_operator_suitable(
pymoo_problem.xl,
pymoo_problem.xu,
operator_instance,
operator_class,
)
evol_operators[operator_name] = operator_instance
elif (
operator_name != self.SELECTION_OPERATOR
and "integer" in type_var_unique
):
msg = (
"Pymoo's default %s operator may not be suitable "
"for integer variables."
)
LOGGER.warning(msg, operator_name)
# Common GeneticAlgorithm parameters.
common_options = {
"eliminate_duplicates": options.pop("eliminate_duplicates"),
"n_offsprings": options.pop("n_offsprings"),
}
algo_name = self.__PYMOO_PREFIX + self.internal_algo_name
if algo_name == self.PYMOO_RNSGA3:
algorithm = RNSGA3(
ref_points=options.pop("ref_points"),
pop_per_ref_point=options.pop("pop_per_ref_point"),
mu=options.pop("mu"),
**evol_operators,
**common_options,
)
elif algo_name == self.PYMOO_UNSGA3:
ref_dirs_name = options.pop("ref_dirs_name")
directions = self._get_ref_dirs(ref_dirs_name, **options)
algorithm = UNSGA3(
ref_dirs=directions,
**evol_operators,
**common_options,
)
elif algo_name == self.PYMOO_NSGA3:
ref_dirs_name = options.pop("ref_dirs_name")
directions = self._get_ref_dirs(ref_dirs_name, **options)
algorithm = NSGA3(
ref_dirs=directions,
**evol_operators,
**common_options,
)
elif algo_name == self.PYMOO_NSGA2:
algorithm = NSGA2(
pop_size=options.pop("pop_size"),
**evol_operators,
**common_options,
)
elif algo_name == self.PYMOO_GA:
algorithm = GA(
pop_size=options.pop("pop_size"),
**evol_operators,
**common_options,
)
else: # pragma: no cover
# GEMSEO will check in advance if the algorithm is supported.
msg = f"Algorithm not supported : {self.internal_algo_name}"
raise ValueError(msg)
res = minimize(
pymoo_problem,
algorithm=algorithm,
termination=("n_gen", self.pymoo_n_gen),
return_least_infeasible=True,
**options,
)
return self.get_optimum_from_database(res.message, res.success)
[docs]
def get_optimum_from_database(
self, message: str | None = None, status: int | None = None
) -> OptimizationResult | MultiObjectiveOptimizationResult:
"""Retrieve the optimum from the database.
Override the super class method in order to return an
:class:`~gemseo.algos.opt_result.OptimizationResult` instance adapted for
multi-objective results (see
:class:`~gemseo_pymoo.algos.opt_result_mo.MultiObjectiveOptimizationResult`).
Args:
message: The message from the optimizer.
status: The status from the optimizer.
Returns:
An optimization result object based on the optimum found.
"""
# Single-objective problem.
if self.problem.objective.dim == 1:
return OptimizationResult.from_optimization_problem(
self.problem,
message=message,
status=status,
optimizer_name=self.algo_name,
)
return super().get_optimum_from_database(message, status)
def _check_design_space_exploration(self, design_variables: ndarray) -> None:
"""Check on the design space exploration.
Args:
design_variables: The design variables vector.
Raises:
DesignSpaceExploredException: If the design space
has been completely explored.
"""
if self.problem.current_iter == self._ds_size:
raise DesignSpaceExploredException
def _termination_criterion_raised(
self, error: TerminationCriterion
) -> OptimizationResult | MultiObjectiveOptimizationResult:
"""Retrieve the best known iterate when max iter has been reached.
Takes into account some termination criteria specific for multi-objective
and/or mixed variables problems:
- :class:`~gemseo_pymoo.algos.stop_criteria.DesignSpaceExploredException`
- :class:`~gemseo_pymoo.algos.stop_criteria.MaxGenerationsReached`.
Args:
error: The obtained error from the algorithm.
Returns:
An optimization result object.
"""
if isinstance(error, DesignSpaceExploredException):
message = (
f"All {self._ds_size} points of the design space have been explored. "
f"GEMSEO stopped the driver."
)
return self.get_optimum_from_database(message)
if isinstance(error, MaxGenerationsReached):
message = (
"Maximum number of generations reached. GEMSEO stopped the driver."
)
return self.get_optimum_from_database(message)
if isinstance(error, HyperVolumeToleranceReached):
message = (
f"{self._stop_crit_n_hv} successive iterates of the hypervolume "
"indicator are closer than hv_tol_rel or hv_tol_abs. "
"GEMSEO stopped the driver."
)
return self.get_optimum_from_database(message)
return super()._termination_criterion_raised(error)
def _log_result(self) -> None:
if self.problem.objective.dim == 1:
super()._log_result()
else:
LOGGER.info("%s", self.problem.solution)