Source code for gemseo.formulations.base_formulation
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Francois Gallard
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""The base class for all formulations."""
from __future__ import annotations
import logging
from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from typing import Generic
from typing import TypeVar
from numpy import arange
from numpy import empty
from numpy import ndarray
from numpy import zeros
from scipy.sparse import block_array
from gemseo.algos.optimization_problem import OptimizationProblem
from gemseo.core.mdo_functions.function_from_discipline import FunctionFromDiscipline
from gemseo.core.mdo_functions.mdo_function import MDOFunction
from gemseo.core.mdo_functions.taylor_polynomials import compute_linear_approximation
from gemseo.formulations.base_formulation_settings import BaseFormulationSettings
from gemseo.scenarios.scenario_results.scenario_result import ScenarioResult
from gemseo.utils.discipline import check_disciplines_consistency
from gemseo.utils.metaclasses import ABCGoogleDocstringInheritanceMeta
from gemseo.utils.pydantic import create_model
from gemseo.utils.string_tools import convert_strings_to_iterable
if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Sequence
from gemseo.algos.design_space import DesignSpace
from gemseo.core.discipline import Discipline
from gemseo.core.grammars.json_grammar import JSONGrammar
from gemseo.scenarios.base_scenario import BaseScenario
from gemseo.typing import StrKeyMapping
LOGGER = logging.getLogger(__name__)
T = TypeVar("T", bound=BaseFormulationSettings)
[docs]
class BaseFormulation(Generic[T], metaclass=ABCGoogleDocstringInheritanceMeta):
"""Base MDO formulation class to be extended in subclasses for use.
This class creates the :class:`.MDOFunction` instances
computing the constraints, objective and observables
from the disciplines
and add them to the attached :attr:`.optimization_problem`.
It defines the multidisciplinary process, i.e. dataflow and workflow, implicitly.
By default,
- the objective is minimized,
- the type of a constraint is equality,
- the activation value of a constraint is 0.
The link between the instances of :class:`.Discipline`,
the design variables and
the names of the discipline outputs used as constraints, objective and observables
is made with the :class:`.DisciplineAdapterGenerator`,
which generates instances of :class:`.MDOFunction` from the disciplines.
"""
DEFAULT_SCENARIO_RESULT_CLASS_NAME: ClassVar[str] = ScenarioResult.__name__
"""The name of the :class:`.ScenarioResult` class to be used for post-processing."""
optimization_problem: OptimizationProblem
"""The optimization problem generated by the formulation from the disciplines."""
_objective_name: str | Sequence[str]
"""The name(s) of the discipline output(s) used as objective."""
variable_sizes: dict[str, int]
"""The sizes of the design variables and differentiated inputs substitutes."""
__disciplines: tuple[Discipline, ...]
"""The disciplines."""
Settings: ClassVar[type[T]]
"""The Pydantic model class for the settings of the formulation."""
_settings: T
"""The Pydantic model for the settings of the formulation."""
ConstraintType = MDOFunction.ConstraintType
def __init__(
self,
disciplines: Sequence[Discipline],
objective_name: str | Sequence[str],
design_space: DesignSpace,
settings_model: T | None = None,
**settings: Any,
) -> None:
r"""
Args:
disciplines: The disciplines.
objective_name: The name(s) of the discipline output(s) used as objective.
If multiple names are passed, the objective will be a vector.
design_space: The design space.
settings_model: The settings of the formulation as a Pydantic model.
If ``None``, use ``**settings``.
**settings: The settings of the formulation.
This argument is ignored when ``settings_model`` is not ``None``.
""" # noqa: D205, D212, D415
self._settings = create_model(
self.Settings, settings_model=settings_model, **settings
)
self.__disciplines = tuple(disciplines)
self.__check_disciplines()
self._objective_name = objective_name
self.optimization_problem = OptimizationProblem(design_space)
self.variable_sizes = design_space.variable_sizes.copy()
@property
def disciplines(self) -> tuple[Discipline, ...]:
"""The disciplines."""
return self.__disciplines
@property
def differentiated_input_names_substitute(self) -> Sequence[str]:
"""The names of the inputs with respect to which to differentiate the functions.
If empty, consider the variables of their input space.
"""
return self._settings.differentiated_input_names_substitute
def __check_disciplines(self) -> None:
"""Check that two disciplines do not compute the same output."""
disciplines = set(self.disciplines).difference(self.get_sub_scenarios())
if disciplines:
check_disciplines_consistency(disciplines, False, True)
@property
def design_space(self) -> DesignSpace:
"""The design space on which the formulation is applied."""
return self.optimization_problem.design_space
[docs]
@abstractmethod
def add_constraint(
self,
output_name: str | Sequence[str],
constraint_type: ConstraintType = ConstraintType.EQ,
constraint_name: str = "",
value: float = 0,
positive: bool = False,
) -> None:
r"""Add an equality or inequality constraint to the optimization problem.
An equality constraint is written as :math:`c(x)=a`,
a positive inequality constraint is written as :math:`c(x)\geq a`
and a negative inequality constraint is written as :math:`c(x)\leq a`.
This constraint is in addition to those created by the formulation,
e.g. consistency constraints in IDF.
The strategy of repartition of the constraints is defined by the formulation.
Args:
output_name: The name(s) of the outputs computed by :math:`c(x)`.
If several names are given,
a single discipline must provide all outputs.
constraint_type: The type of constraint.
constraint_name: The name of the constraint to be stored.
If empty,
the name of the constraint is generated
from ``output_name``, ``constraint_type``, ``value`` and ``positive``.
value: The value :math:`a`.
positive: Whether the inequality constraint is positive.
"""
[docs]
@abstractmethod
def add_observable(
self,
output_names: str | Sequence[str],
observable_name: str = "",
discipline: Discipline | None = None,
) -> None:
"""Add an observable to the optimization problem.
The repartition strategy of the observable is defined in the formulation class.
Args:
output_names: The name(s) of the output(s) to observe.
observable_name: The name of the observable.
If empty, the output name is used by default.
discipline: The discipline computing the observed outputs.
If ``None``, the discipline is detected from inner disciplines.
"""
[docs]
@abstractmethod
def get_top_level_disciplines(
self, include_sub_formulations: bool = False
) -> tuple[Discipline, ...]:
"""Return the top level disciplines that are executed in the foreground.
A formulation structures the optimization problem
into multiple levels of disciplines.
The top level disciplines map
from the :attr:`.design_space`
to the objective, constraint and observable spaces.
They can be composed of
both user disciplines and process disciplines added by the formulation,
e.g. :class:`.MDOChain`.
These process disciplines may also include
both user disciplines and process disciplines,
and so on.
Args:
include_sub_formulations: Whether to include the top level disciplines
of the formulations that make up the current one.
Returns:
The top level disciplines.
"""
def _get_dv_indices(
self,
names: Iterable[str],
) -> dict[str, tuple[int, int, int]]:
"""Return the indices associated with specific variables.
Args:
names: The names of the variables.
Returns:
For each variable,
a 3-length tuple
whose first dimensions are its first and last indices in the design space
and last dimension is its size.
"""
start = end = 0
sizes = self.variable_sizes
names_to_indices = {}
for name in names:
size = sizes[name]
end += size
names_to_indices[name] = (start, end, size)
start = end
return names_to_indices
[docs]
def unmask_x_swap_order(
self,
masking_data_names: Sequence[str],
x_masked: ndarray,
all_data_names: Iterable[str] = (),
) -> ndarray:
"""Unmask a vector or matrix from names, with respect to other names.
This method eventually swaps the order of the values
if the order of the data names is inconsistent between these sets.
Args:
masking_data_names: The names of the variables
whose values come from ``x_masked`` (the other are zeros).
x_masked: The vector or matrix to unmask.
all_data_names: The names of the variables
whose values the full array will concatenate.
If empty, use the names of all the design variables.
Returns:
The vector or matrix related to the input mask.
Raises:
ValueError: when the sizes of variables are inconsistent.
"""
if not all_data_names:
all_data_names = self.get_optim_variable_names()
names_to_sizes = self.variable_sizes
mask_size = sum(names_to_sizes[name] for name in masking_data_names)
if (n_samples := x_masked.shape[-1] // mask_size) == 1:
return self.__unmask_x_swap_order_if_one_sample(
x_masked, all_data_names, masking_data_names
)
return self.__unmask_x_swap_order_if_several_samples(
x_masked,
all_data_names,
masking_data_names,
mask_size,
n_samples,
)
def __unmask_x_swap_order_if_one_sample(
self,
x_masked: ndarray,
all_data_names: Iterable[str],
masking_data_names: Sequence[str],
) -> ndarray:
"""Unmasking function if there is only one sample.
Args:
x_masked: The array to unmask.
all_data_names: All the variable names.
masking_data_names: The names of the variables to unmask.
Returns:
The unmasked array.
"""
names_to_sizes = self.variable_sizes
x_unmasked = zeros(
(
*x_masked.shape[:-1],
sum(names_to_sizes[name] for name in all_data_names),
),
dtype=x_masked.dtype,
)
indices = self._get_dv_indices(all_data_names)
masked_position = 0
for variable_name in masking_data_names:
unmasked_position, _, size = indices[variable_name]
x_unmasked[..., unmasked_position : unmasked_position + size] = x_masked[
..., masked_position : masked_position + size
]
masked_position += size
return x_unmasked
def __unmask_x_swap_order_if_several_samples(
self,
x_masked: ndarray,
all_data_names: Iterable[str],
masking_data_names: Sequence[str],
mask_size: int,
n_samples: int,
) -> ndarray:
"""Unmasking function if there are several samples.
Args:
x_masked: The array to unmask.
all_data_names: All the variable names.
masking_data_names: The names of the variables to unmask.
mask_size: The size of the mask.
n_samples: The number of samples.
Returns:
The unmasked array.
"""
masked_position = 0
names_to_indices = {
name: index
for index, name in enumerate(all_data_names)
if name in masking_data_names
}
n_variables = len(all_data_names)
arrays = [None] * n_samples * n_variables
names_to_sizes = self.variable_sizes
for variable_name, variable_index in names_to_indices.items():
size = names_to_sizes[variable_name]
a = variable_index - n_variables
b = masked_position - mask_size
for _ in range(n_samples):
a += n_variables
b += mask_size
arrays[a] = x_masked[..., b : b + size]
masked_position += size
return block_array([arrays])
[docs]
def mask_x_swap_order(
self,
masking_data_names: Iterable[str],
x_vect: ndarray,
all_data_names: Iterable[str] = (),
) -> ndarray:
"""Mask a vector from a subset of names, with respect to a set of names.
This method eventually swaps the order of the values
if the order of the data names is inconsistent between these sets.
Args:
masking_data_names: The names of the kept data.
x_vect: The vector to mask.
all_data_names: The set of all names.
If empty, use the design variables stored in the design space.
Returns:
The masked version of the input vector.
Raises:
IndexError: when the sizes of variables are inconsistent.
"""
x_mask = self.get_x_mask_x_swap_order(masking_data_names, all_data_names)
return x_vect[x_mask]
[docs]
def get_x_mask_x_swap_order(
self,
masking_data_names: Iterable[str],
all_data_names: Iterable[str] = (),
) -> ndarray:
"""Mask a vector from a subset of names, with respect to a set of names.
This method eventually swaps the order of the values
if the order of the data names is inconsistent between these sets.
Args:
masking_data_names: The names of the kept data.
all_data_names: The set of all names.
If empty, use the design variables stored in the design space.
Returns:
The masked version of the input vector.
Raises:
ValueError: If the sizes or the sizes of variables are inconsistent.
"""
design_space = self.optimization_problem.design_space
if not all_data_names:
all_data_names = design_space
variable_sizes = {var: design_space.get_size(var) for var in design_space}
total_size = sum(variable_sizes[var] for var in masking_data_names)
indices = self._get_dv_indices(all_data_names)
x_mask = empty(total_size, dtype="int")
i_masked_min = i_masked_max = 0
try:
for key in masking_data_names:
i_min, i_max, loc_size = indices[key]
i_masked_max += loc_size
x_mask[i_masked_min:i_masked_max] = arange(i_min, i_max)
i_masked_min = i_masked_max
except KeyError as err:
msg = (
"Inconsistent inputs of masking. "
f"Key {err} is in masking_data_names {masking_data_names} "
f"but not in provided all_data_names : {all_data_names}!"
)
raise ValueError(msg) from None
return x_mask
def _remove_unused_variables(self) -> None:
"""Remove variables in the design space that are not discipline inputs."""
design_space = self.optimization_problem.design_space
disciplines = self.get_top_level_disciplines()
all_inputs = {var for disc in disciplines for var in disc.io.input_grammar}
for name in design_space.variable_names:
if name not in all_inputs:
design_space.remove_variable(name)
LOGGER.info(
"Variable %s was removed from the Design Space, it is not an input"
" of any discipline.",
name,
)
def _remove_sub_scenario_dv_from_ds(self) -> None:
"""Remove the sub scenarios design variables from the design space."""
for scenario in self.get_sub_scenarios():
for var in scenario.formulation.design_space:
if var in self.optimization_problem.design_space:
self.optimization_problem.design_space.remove_variable(var)
def _build_objective_from_disc(
self,
objective_name: str | Sequence[str],
discipline: Discipline | None = None,
top_level_disc: bool = True,
) -> None:
"""Build the objective function from the discipline able to compute it.
Args:
objective_name: The name(s) of the discipline output(s) used as objective.
If multiple names are passed, the objective will be a vector.
discipline: The discipline computing the objective.
If ``None``, the discipline is detected from the inner disciplines.
top_level_disc: Whether to search the discipline among the top level ones.
"""
objective = FunctionFromDiscipline(
convert_strings_to_iterable(objective_name),
self,
discipline=discipline,
top_level_disc=top_level_disc,
)
if objective.discipline_adapter.is_linear:
objective = compute_linear_approximation(
objective, zeros(objective.discipline_adapter.input_dimension)
)
self.optimization_problem.objective = objective
[docs]
def get_optim_variable_names(self) -> list[str]:
"""Get the optimization unknown names to be provided to the optimizer.
This is different from the design variable names provided by the user,
since it depends on the formulation,
and can include target values for coupling for instance in IDF.
Returns:
The optimization variable names.
"""
return self.optimization_problem.design_space.variable_names
[docs]
def get_x_names_of_disc(
self,
discipline: Discipline,
) -> list[str]:
"""Get the design variables names of a given discipline.
Args:
discipline: The discipline.
Returns:
The names of the design variables.
"""
optim_variable_names = self.get_optim_variable_names()
input_names = discipline.io.input_grammar
return [name for name in optim_variable_names if name in input_names]
[docs]
def get_sub_scenarios(self) -> list[BaseScenario]:
"""List the disciplines that are actually scenarios.
Returns:
The scenarios.
"""
from gemseo.scenarios.base_scenario import BaseScenario
return [disc for disc in self.disciplines if isinstance(disc, BaseScenario)]
def _set_default_input_values_from_design_space(self) -> None:
"""Initialize the top level disciplines from the design space."""
if not self.optimization_problem.design_space.has_current_value:
return
current_x = self.optimization_problem.design_space.get_current_value(
as_dict=True
)
for discipline in self.get_top_level_disciplines():
input_names = discipline.io.input_grammar
to_value = discipline.io.input_grammar.data_converter.convert_array_to_value
discipline.io.input_grammar.defaults.update({
name: to_value(name, value)
for name, value in current_x.items()
if name in input_names
})
[docs]
@classmethod
def get_default_sub_option_values(cls, **options: str) -> StrKeyMapping:
"""Return the default values of the sub-options of the formulation.
When some options of the formulation depend on higher level options,
the default values of these sub-options may be obtained here,
mainly for use in the API.
Args:
**options: The options required to deduce the sub-options grammar.
Returns:
Either ``None`` or the sub-options default values.
"""
return {}
[docs]
@classmethod
def get_sub_options_grammar(cls, **options: str) -> JSONGrammar:
"""Get the sub-options grammar.
When some options of the formulation depend on higher level options,
the schema of the sub-options may be obtained here,
mainly for use in the API.
Args:
**options: The options required to deduce the sub-options grammar.
Returns:
Either ``None`` or the sub-options grammar.
"""
return {}