"""Modified Normal Boundary Intersection (mNBI) algorithm.
Based on :cite:`shukla2007normal`.
from __future__ import annotations
import logging
from copy import deepcopy
from dataclasses import dataclass
from itertools import combinations
from multiprocessing import Manager
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from typing import Final
from typing import NamedTuple
from typing import Union
from numpy import append
from numpy import argwhere
from numpy import array
from numpy import block
from numpy import column_stack
from numpy import concatenate
from numpy import dot
from numpy import eye
from numpy import hstack
from numpy import linspace
from numpy import newaxis
from numpy import ones
from numpy import vstack
from numpy import zeros
from numpy import zeros_like
from numpy.linalg import LinAlgError
from numpy.linalg import solve
from scipy.optimize import linprog
from gemseo.algos.database import Database
from gemseo.algos.design_space import DesignSpace
from gemseo.algos.doe.doe_factory import DOEFactory
from gemseo.algos.opt._mnbi.constraint_function_wrapper import ConstraintFunctionWrapper
from gemseo.algos.opt._mnbi.function_component_extractor import (
from gemseo.algos.opt._mnbi.sub_optim_constraint import SubOptimConstraint
from gemseo.algos.opt.opt_factory import OptimizersFactory
from gemseo.algos.opt.optimization_library import OptimizationAlgorithmDescription
from gemseo.algos.opt.optimization_library import OptimizationLibrary
from gemseo.algos.opt_problem import OptimizationProblem
from gemseo.algos.opt_result_multiobj import MultiObjectiveOptimizationResult
from gemseo.core.mdofunctions.mdo_function import MDOFunction
from gemseo.core.mdofunctions.mdo_function import NotImplementedCallable
from gemseo.utils.constants import READ_ONLY_EMPTY_DICT
from gemseo.utils.multiprocessing.execution import execute
from collections.abc import Mapping
from pathlib import Path
from gemseo.algos.doe.doe_library import DOELibraryOptionType
from gemseo.algos.driver_library import DriverLibOptionType
from gemseo.algos.opt_problem import OptimizationResult
from gemseo.typing import RealArray
LOGGER = logging.getLogger(__name__)
MNBIOptionsType = Union[bool, int, float]
class IndividualSubOptimOutput(NamedTuple):
"""An output from a sub optimization."""
f_min: RealArray
"""The value of f at the design value minimizing f_i."""
x_min: RealArray
"""The value of the design variables minimizing f_i."""
database: Database
"""The database of the main problem."""
n_calls: int
"""The number of calls to f."""
class BetaSubOptimOutput(NamedTuple):
"""An output from a beta sub optimization."""
f_min: RealArray
"""The coordinates in the objective space of the sub-optimization result."""
x_min: RealArray
"""The coordinates in the design space of the sub-optimization result."""
w: RealArray
"""The vector w used to compute the values of beta that can be skipped in the
following sub-optimizations."""
database: Database
"""The database of the main problem."""
n_calls: int
"""The number of calls to the main objective function by the optimizer during the
class MNBIAlgorithmDescription(OptimizationAlgorithmDescription):
"""The description of the MNBI optimization algorithm."""
handle_equality_constraints: bool = True
handle_inequality_constraints: bool = True
handle_integer_variables: bool = True
handle_multiobjective: bool = True
library_name: str = "MNBI"
class MNBI(OptimizationLibrary):
r"""MNBI optimizer.
This algorithm computes the Pareto front of a multi-objective optimization problem
by decomposing it into a series of constrained single-objective problems.
Considering the following problem:
.. math::
& \min_{x \in D} f(x),\\
& g(x) \leq 0,\\
& h(x) = 0
the algorithm first finds the individual optima :math:`(x_i^\ast)_{i=1..m}`
of the :math:`m` components of the objective function :math:`f`.
The corresponding anchor points :math:`(f(x_i^\ast))_{i=1..m}`
are stored in a matrix :math:`\Phi`.
The simplex formed by the convex hull of the anchor points
can be expressed as :math:`\Phi \beta`,
where :math:`\beta = \{ (b_1, ..., b_m)^T | \sum_{i=1}^m b_i =1 \}`.
Given a list of vectors :math:`\beta`,
mNBI will solve the following single-objective problems:
.. math::
& \max_{x \in D, t \in \mathbb{R}} t,\\
& \Phi \beta + t \hat{n} \geq f(x),\\
& g(x) \leq 0,\\
& h(x) = 0
where :math:`\hat{n}` is a quasi-normal vector to the :math:`\Phi \beta` simplex
pointing towards the origin of the objective space.
If :math:`(x^{*}, t^{*})` is a solution of this problem,
:math:`x^{*}` is proven to be at least weakly Pareto-dominant.
Let :math:`w = \Phi \beta + t^{*} \hat{n}`,
and :math:`\pi` denote the projection (in the direction of :math:`\hat{n}`)
on the simplex formed by the convex hull of the anchor points.
If not all constraints :math:`\Phi \beta + t^{*} \hat{n} \geq f(x^{*})` are active,
:math:`x^{*}` will weakly dominate the solution of the sub-problem
for all values :math:`\beta_{dom}` that verify:
.. math::
\Phi \beta_{dom} \in \pi[(f(x^{*}) + \mathbb{R}_m^{+})
\cap (w - \mathbb{R}_m^{+})]
Therefore, the corresponding sub-optimizations are redundant
and can be skipped to reduce run time.
_debug_results: Database = Database()
"""The results of the sub-optimizations in debug mode."""
_RESULT_CLASS: ClassVar[type[OptimizationResult]] = MultiObjectiveOptimizationResult
"""The class used to present the result of the optimization."""
__beta_sub_optim: OptimizationProblem | None = None
"""The sub-optimization problem that must be run for each value of beta."""
__debug: bool = False
"""Whether the algorithm is running in debug mode."""
__debug_file_path: str = "debug_Pareto.h5"
"""The output file for the Pareto front in debug mode.
mNBI algorithm normally returns one point of the Pareto front per sub-optimization.
The ``get_optimum_from_database`` method returns all Pareto optimal points in the
database. In debug mode, the optima actually returned by mNBI are saved to an hdf5
file, in order to verify that the algorithm has worked as intended.
__ineq_tolerance: float = 1e-4
"""The tolerance on the inequality constraints."""
__sub_optim_max_iter: int
"""The maximum number of iterations for each run of the sub-optimization problem."""
__n_obj: int
"""The number of objectives of the optimization problem."""
__n_processes: int
"""The number of processes to use when running the sub-optimizations."""
__n_sub_optim: int
"""The number of runs of the sub-optimization problem."""
__n_vect: RealArray
"""The quasi-normal vector to the phi simplex."""
__phi: RealArray
r"""The matrix :math:`\Phi=(f(x^{(1),*}) | \ldots | f(x^{(d),*}))`.
Where :math:`f=(f_1,\ldots,f_d)` is the multi-objective function
and :math:`x^{(i),*}` the design point minimizing :math:`f_i`.
It has the shape ``(n_obj, n_obj)``.
__skip_betas: bool
"""Whether to skip the sub-optimizations corresponding to values of beta for which
the theoretical result has already been found.
This can accelerate the main optimization by avoiding redundant sub-optimizations.
But in cases where some sub-optimizations do not properly converge, some values of
betas will be skipped based on false assumptions, and some parts of the Pareto front
can be incorrectly resolved.
__skippable_domains: list[RealArray]
"""The regions of the phi simplex that can be skipped."""
__sub_optim_algo: str
"""The algorithm used for the sub-optimizations."""
__sub_optim_algo_options: Mapping[str, DriverLibOptionType]
"""The options for the sub-optimization algorithm."""
__utopia: RealArray
r"""The utopia :math:`(f_1(x^{(1),*}), \ldots, f_d(x^{(d),*}))`.
Where :math:`x^{(i),*}` the design point minimizing :math:`f_i`.
It has the shape ``(n_obj, 1)`` and it is the diagonal of :attr:`.__phi`.
__SUB_OPTIM_CONSTRAINT_NAME: Final[str] = "beta_sub_optim_constraint"
def __init__(self) -> None: # noqa:D107
self.descriptions = {
"MNBI": MNBIAlgorithmDescription(
description="Modified Normal Boundary Intersection (mNBI) method",
def _get_options(
max_iter: int,
sub_optim_algo: str,
normalize_design_space: bool = False,
n_sub_optim: int = 1,
sub_optim_algo_options: Mapping[
str, DriverLibOptionType
sub_optim_max_iter: int | None = None,
doe_algo: str = "fullfact",
doe_algo_options: Mapping[str, DOELibraryOptionType] = READ_ONLY_EMPTY_DICT,
n_processes: int = 1,
debug_file_path: str | Path = "debug_history.h5",
skip_betas: bool = True,
debug: bool = False,
) -> dict[str, MNBIOptionsType]:
r"""Set the options default values.
max_iter: The maximum number of iterations.
sub_optim_algo: The optimization algorithm used to solve
the generated sub optimization problems.
sub_optim_algo_options: The options for the optimization
n_sub_optim: The number of sub optimization in addition to
the individual minimums of the objectives. mNBI will
generate n_sub_optim points on the Pareto front
between the "number of objective" individual minimas. This value must be
strictly greater than the number of objectives of the problem.
normalize_design_space: Whether to normalize the design
variables between 0 and 1.
sub_optim_max_iter: Maximum number of iterations of the
sub optimization algorithms. If ``None``, the ``max_iter`` value is
doe_algo: The design of experiments algorithm for the
target points on the Pareto front in problems with more than 2
objectives. A ``fullfactorial`` DOE is used default as these tend to be
low dimensions, usually not more than 3 objectives for a given problem.
doe_algo_options: The options for the DOE algorithm.
n_processes: The maximum simultaneous number of processes
used to parallelize the sub optimizations.
debug: Whether to output a database hdf file containing the
sub optimization optimas only.
debug_file_path: The path to the debug file if debug mode is
The mNBI library options with their values.
sub_optim_algo_options = (
sub_optim_algo_options if sub_optim_algo_options else {}
sub_optim_max_iter = sub_optim_max_iter if sub_optim_max_iter else max_iter
doe_algo_options = doe_algo_options if doe_algo_options else {}
return self._process_options(
def __minimize_objective_components_separately(self) -> None:
"""Minimize the component of the multi-objective function separately.
The results are used to create the __phi and __utopia arrays.
LOGGER.info("Searching for the individual optimum of each objective")
optima = execute(
self.__phi = column_stack([optimum[0] for optimum in optima])
self.__utopia = self.__phi.diagonal()[:, newaxis]
def _minimize_objective_component(self, i: int) -> IndividualSubOptimOutput:
"""Minimize the i-th component f_i of the objective function f=(f1,...,f_d).
i: The index of the component of the objective function.
The value of f at the design value minimizing f_i.
The value of the design variables minimizing f_i.
The database of the main problem. This is returned so that it can be copied
in the database of the main process, to store the evaluations done in each
The number of calls to f.
RuntimeError: If no optimum is found for one of the objectives.
n_calls_start = self.problem.objective.n_calls
design_space = DesignSpace()
opt_problem = OptimizationProblem(design_space)
opt_problem.constraints = list(self.problem.constraints)
objective = FunctionComponentExtractor(self.problem.objective, i)
jac = (
if self.problem.objective.jac is NotImplementedCallable
else objective.compute_jacobian
opt_problem.objective = MDOFunction(objective.compute_output, f"f_{i}", jac=jac)
opt_result = OptimizersFactory().execute(
if not opt_result.is_feasible:
msg = f"No feasible optimum found for the {i}-th objective function."
raise RuntimeError(msg)
x_min = opt_result.x_opt
f_min = self.problem.objective(x_min)
n_calls = self.problem.objective.n_calls - n_calls_start
return IndividualSubOptimOutput(f_min, x_min, self.problem.database, n_calls)
def __copy_database_save_minimum(
self, index: int, outputs: IndividualSubOptimOutput
) -> None:
"""Update the database of the optimization problem with that of the sub-problem.
The sub-problem aims to minimize a component f_i of f=(f_1,...,f_d).
In debug mode,
this function stores the objective vector f(x*) and the design value x*
in a debug history file,
where x* minimizes f_i(x*).
index: The index of the worker used to run the sub-optimization.
Provided by |g| but unused here.
outputs: The outputs of the sub-optimization
returned by ``_minimize_objective_component``.
if self.__n_processes > 1:
# Store the sub-process database in the main database
objective = self.problem.objective
objective.n_calls += outputs.n_calls
for functions in [
for f in functions:
f_hist, x_hist = outputs.database.get_function_history(
f.name, with_x_vect=True
for x_value, f_value in zip(x_hist, f_hist):
self.problem.database.store(x_value, {f.name: f_value})
if self.__debug:
self._debug_results.store(outputs.x_min, {"obj": outputs.f_min})
def _t_extraction_func(x_t: RealArray) -> RealArray:
"""Return the value of ``t`` from a vector concatenating ``x`` and ``t``.
x_t: A vector concatenating ``x`` and ``t``.
The value of ``t``.
return x_t[-1]
def _t_extraction_jac(x_t: RealArray) -> RealArray:
"""Compute the Jacobian of `_t_extraction_func`.
x_t: A vector concatenating ``x`` and ``t``.
The Jacobian of ``_t_extraction_func`` at ``x_t``.
jac = zeros_like(x_t)[newaxis]
jac[0][-1] = 1
return jac
def __create_beta_sub_optim(self) -> None:
"""Create the beta sub-optimization problem.
The goal is to maximize ``t``
w.r.t. the design variables ``x`` and the real variable ``t``.
design_space = deepcopy(self.problem.design_space)
design_space.add_variable("t", value=0.0)
self.__beta_sub_optim = OptimizationProblem(design_space)
self.__beta_sub_optim.objective = MDOFunction(
def _run_beta_sub_optim(self, beta: RealArray) -> BetaSubOptimOutput | tuple[()]:
"""Run the sub-optimization problem for a given value of beta.
If the main problem has two objectives, the sub-optimization starts from the
previous sub-problem result to accelerate convergence, since the optima of two
consecutive sub-problems are probably close to each other.
Otherwise, nothing guaranties that the two optima are close to each other since
the betas are spread randomly, therefore the sub-optimization starts from the
initial point.
beta: The coordinates of the considered point in the phi simplex.
The coordinates in the objective space of the sub-optimization result.
The coordinates in the design space of the sub-optimization result.
The vector w used to compute the values of beta that can be skipped in the
following sub-optimizations. This is computed only if one component of
the beta sub-constraint is inactive. Otherwise, returns None.
The database of the main problem. This is returned so that it can be copied
in the database of the main process, to store the evaluations done in
each sub-process.
The number of calls to the main objective function by the optimizer during
the sub-optimization.
Or an empty tuple if the resulting solution of the sub-optimization is
for the given value of beta is already known.
f = self.problem.objective
n_calls_start = f.n_calls
beta = append(beta, 1 - beta.sum())
phi_beta = dot(self.__phi, beta)
# Check if phi_beta is in the skippable domains.
if self.__skip_betas and self.__is_skippable(phi_beta):
"Skipping beta = %s because the resulting solution is already known.",
return ()
LOGGER.info("Solving mNBI sub-problem for beta = %s", beta)
beta_sub_optim_constraint = SubOptimConstraint(phi_beta, self.__n_vect, f)
jac = (
if not isinstance(f.jac, NotImplementedCallable)
else None
beta_sub_cstr = MDOFunction(
# Reset the design space if there are more than two objective, since two
# successive values of beta are not necessarily close
self.__beta_sub_optim.reset(design_space=self.__n_obj != 2)
wrapped_constraints = []
for g in self.problem.constraints:
wrapped_g = ConstraintFunctionWrapper(g)
jac = (
if not isinstance(g.jac, NotImplementedCallable)
else None
self.__beta_sub_optim.constraints = wrapped_constraints
self.__beta_sub_optim.add_constraint(beta_sub_cstr, cstr_type="ineq")
opt_res = OptimizersFactory().execute(
if not opt_res.is_feasible:
LOGGER.warning("No feasible optimum has been found for beta=%s", beta)
x_min = opt_res.x_opt[:-1]
f_min = f(x_min)
n_calls = f.n_calls - n_calls_start
# If some components of the sub-optim constraint are inactive, return the vector
# w to find the values of beta that can be skipped for the next sub-optims
if self.__skip_betas and any(
< self.__ineq_tolerance
w = phi_beta + opt_res.x_opt[-1] * self.__n_vect
w = None
return BetaSubOptimOutput(f_min, x_min, w, self.problem.database, n_calls)
def __beta_sub_optim_callback(
self, index: int, outputs: BetaSubOptimOutput
) -> None:
"""The callback function called after running a beta sub-optimization.
This function main goal is to copy the sub-problem database into the main
problem's one when running in parallel.
If one component of the beta constraint is inactive, this function will also
find the values of beta that can be skipped in the following sub-optimizations.
In debug mode, this function stores the result of the sub-optimization in a
debug history file.
index: The index of the worker used to run the sub-optimization. Provided by
GEMSEO but unused here.
outputs: The outputs of the sub-optimization.
if not outputs:
database = outputs.database
if self.__n_processes > 1 and database is not None:
# Store the sub-process database in the main process database.
self.problem.objective.n_calls += outputs.n_calls
f_hist, x_hist = database.get_function_history(
self.problem.objective.name, with_x_vect=True
for xi, fi in zip(x_hist, f_hist):
self.problem.database.store(xi, {self.problem.objective.name: fi})
for functions in [self.problem.constraints, self.problem.observables]:
for f in functions:
f_hist, x_hist = database.get_function_history(
f.name, with_x_vect=True
for xi, fi in zip(x_hist, f_hist):
self.problem.database.store(xi, {f.name: fi})
f_min = outputs.f_min
if outputs.w is not None:
self.__find_skippable_domain(f_min, outputs.w)
if self.__debug and f_min is not None:
self._debug_results.store(outputs.x_min, {"obj": f_min})
def __find_skippable_domain(self, f: RealArray, w: RealArray) -> None:
"""Find the domain in the phi simplex that can be skipped based on f and w.
Project the hypervolume of the solution space formed by the intersection of the
two subspaces y >= f and y <= w on the phi simplex.
f: The values of the objective functions at the current optimum, and first
extremity of the hypervolume to be projected.
w: The other extremity of the hypervolume.
fw = column_stack((f, w))
# Find all the corners of the hypervolume
proj_points = []
inds = argwhere(abs(f - w) > self.__ineq_tolerance)
for c in combinations([0, 1], len(inds)):
y = zeros((f.size, 1))
for i, j in enumerate(inds):
y[j, 0] = fw[j, c[i]]
proj_y = self.__project_on_phi_simplex(y)
if proj_y is not None:
if proj_points:
def __project_on_phi_simplex(self, y: RealArray) -> RealArray | None:
"""Project a point on the phi simplex.
Solves the linear system |phi -I_m 0| |alpha| |0|
| 0 I_m -n| | z | = |y|
|1_m 0 0| | k | |1|
m is the number of objective functions
I_m is the identity matrix of size m
1_m is a matrix of ones of shape (1, m)
n is the quasi-normal vector to the phi simplex
z is the coordinates of the projection of y on the phi simplex
alpha is the coordinates of z in the phi simplex
k is the coordinates of z on the y + n line
y: The point to project.
The coordinates in the objective space of the projection of y on the phi
simplex. ``None`` when the projection on the phi simplex failed.
m = self.__n_obj
eye_m = eye(m)
zeros_m1 = zeros((m, 1))
lhs = block([
[self.__phi, -eye_m, zeros_m1],
[zeros((m, m)), eye_m, -self.__n_vect.reshape(m, 1)],
[ones((1, m)), zeros((1, m)), 0],
rhs = vstack((zeros_m1, y, array([[1]])))
z = solve(lhs, rhs)[m : 2 * m, [0]]
except LinAlgError:
"Could not solve the projection system, projection on the phi simplex "
return None
return z
def __is_skippable(self, phi_beta: RealArray) -> bool:
"""Check whether the point phi_beta is in a skippable domain of the phi simplex.
phi_beta: The coordinates of the point of the phi simplex.
Whether the point lies in a skippable domain.
for domain in self.__skippable_domains:
if self.__is_in_convex_hull(phi_beta, domain):
return True
return False
def __is_in_convex_hull(y: RealArray, z: RealArray) -> bool:
"""Check whether the point y is in the convex hull of the points z.
y lies in the convex hull of z if it can be expressed as a convex combination of
z, i.e. if here exists a positive vector w solution of | z | | | |y|
| | |w| = | |
|1_n| | | |1|
where 1_n is a matrix of ones of shape (1, n) and n is the number of points in
The linprog call is used to check if the constraint A_eq w = b_eq is satisfied
(the objective is always zero). The positivity of w's components is enforced by
default in linprog.
y: The coordinates of the point to be checked, of shape (dim, 1).
z: The matrix of coordinates of the points z, of shape (dim, n).
Whether y lies in the convex hull of the points of z.
n_pts = z.shape[1]
a = concatenate([z, ones((1, n_pts))], axis=0)
b = concatenate([y, array([1])], axis=0).flatten()
c = zeros(n_pts)
lp = linprog(c, A_eq=a, b_eq=b, method="highs")
return lp.success
def _pre_run(
self, problem: OptimizationProblem, algo_name: str, **options: Any
) -> None:
"""Processes the options and setups the optimization.
- If the algorithm is being used to solve a mono-objective problem.
- If the given `n_sub_optim` is not strictly greater than the number of
- If the name of one of the constraints of the problem coincides with
the protected name for the sub optimization problems used by mNBI.
super()._pre_run(problem, algo_name, **options)
self.__n_obj = self.problem.objective.dim
self.__n_sub_optim = options.pop("n_sub_optim")
if self.__n_obj == 1:
msg = "MNBI optimizer is not suitable for mono-objective problems."
raise ValueError(msg)
if self.__n_sub_optim <= self.__n_obj:
msg = (
"The number of sub-optimization problems must be "
f"strictly greater than the number of objectives {self.__n_obj}; "
f"got {self.__n_sub_optim}."
raise ValueError(msg)
if any(
c.name == self.__SUB_OPTIM_CONSTRAINT_NAME for c in self.problem.constraints
msg = (
f"The constraint name {self.__SUB_OPTIM_CONSTRAINT_NAME} is protected "
f"when using MNBI optimizer."
raise ValueError(msg)
def _run(self, **options: Any) -> None:
self.__n_processes = options.pop("n_processes")
self._normalize_design_space = options.pop("normalize_design_space")
self.__sub_optim_algo = options.pop("sub_optim_algo")
self.__sub_optim_algo_options = options.pop("sub_optim_algo_options")
self.__debug = options.pop("debug")
self.__debug_file_path = options.pop("debug_file_path")
self.__skip_betas = options.pop("skip_betas")
self.__sub_optim_max_iter = options.pop("sub_optim_max_iter")
self._doe_algo_options = options.pop("doe_algo_options")
self.__n_obj = self.problem.objective.dim
self.__ineq_tolerance = options.get(
self.INEQ_TOLERANCE, self.problem.ineq_tolerance
self.__skippable_domains = Manager().list() if self.__n_processes > 1 else []
if self.__debug:
if self.__n_processes > 1:
LOGGER.info("Running mNBI on %s processes", self.__n_processes)
# Find the individual optimum phi of each objective function and the utopia
# Compute the quasi-normal vector to the simplex formed by the points in phi
self.__n_vect = -dot(self.__phi - self.__utopia, ones(self.__n_obj))
# Create the MDOFunction that runs the beta sub-optimization problem
# Run the beta sub-problem for different values of beta corresponding to
# different points of the phi simplex. The number of runs accounts for the
# individual optimizations already performed.
n_samples = self.__n_sub_optim - self.__n_obj
if self.__n_obj == 2:
betas = linspace(0, 1, n_samples + 2)[1:-1, newaxis]
library = DOEFactory().create(options["doe_algo"])
beta_design_space = DesignSpace()
"beta", size=self.__n_obj - 1, l_b=0.0, u_b=1.0
betas = library.compute_doe(
if self.__debug:
return self.get_optimum_from_database()
def _log_result(self) -> None:
LOGGER.info("%s", self.problem.solution)