Source code for gemseo.formulations.base_formulation

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                         documentation
#        :author: Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""The base class for all formulations."""

from __future__ import annotations

import logging
from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from typing import Generic
from typing import TypeVar

from numpy import arange
from numpy import empty
from numpy import ndarray
from numpy import zeros
from scipy.sparse import block_array

from gemseo.algos.optimization_problem import OptimizationProblem
from gemseo.core.mdo_functions.function_from_discipline import FunctionFromDiscipline
from gemseo.core.mdo_functions.mdo_function import MDOFunction
from gemseo.core.mdo_functions.taylor_polynomials import compute_linear_approximation
from gemseo.formulations.base_formulation_settings import BaseFormulationSettings
from gemseo.scenarios.scenario_results.scenario_result import ScenarioResult
from gemseo.utils.discipline import check_disciplines_consistency
from gemseo.utils.metaclasses import ABCGoogleDocstringInheritanceMeta
from gemseo.utils.pydantic import create_model
from gemseo.utils.string_tools import convert_strings_to_iterable

if TYPE_CHECKING:
    from collections.abc import Iterable
    from collections.abc import Sequence

    from gemseo.algos.design_space import DesignSpace
    from gemseo.core.discipline import Discipline
    from gemseo.core.grammars.json_grammar import JSONGrammar
    from gemseo.scenarios.base_scenario import BaseScenario
    from gemseo.typing import StrKeyMapping

LOGGER = logging.getLogger(__name__)

T = TypeVar("T", bound=BaseFormulationSettings)


[docs] class BaseFormulation(Generic[T], metaclass=ABCGoogleDocstringInheritanceMeta): """Base MDO formulation class to be extended in subclasses for use. This class creates the :class:`.MDOFunction` instances computing the constraints, objective and observables from the disciplines and add them to the attached :attr:`.optimization_problem`. It defines the multidisciplinary process, i.e. dataflow and workflow, implicitly. By default, - the objective is minimized, - the type of a constraint is equality, - the activation value of a constraint is 0. The link between the instances of :class:`.Discipline`, the design variables and the names of the discipline outputs used as constraints, objective and observables is made with the :class:`.DisciplineAdapterGenerator`, which generates instances of :class:`.MDOFunction` from the disciplines. """ DEFAULT_SCENARIO_RESULT_CLASS_NAME: ClassVar[str] = ScenarioResult.__name__ """The name of the :class:`.ScenarioResult` class to be used for post-processing.""" optimization_problem: OptimizationProblem """The optimization problem generated by the formulation from the disciplines.""" _objective_name: str | Sequence[str] """The name(s) of the discipline output(s) used as objective.""" variable_sizes: dict[str, int] """The sizes of the design variables and differentiated inputs substitutes.""" __disciplines: tuple[Discipline, ...] """The disciplines.""" Settings: ClassVar[type[T]] """The Pydantic model class for the settings of the formulation.""" _settings: T """The Pydantic model for the settings of the formulation.""" ConstraintType = MDOFunction.ConstraintType def __init__( self, disciplines: Sequence[Discipline], objective_name: str | Sequence[str], design_space: DesignSpace, settings_model: T | None = None, **settings: Any, ) -> None: r""" Args: disciplines: The disciplines. objective_name: The name(s) of the discipline output(s) used as objective. If multiple names are passed, the objective will be a vector. design_space: The design space. settings_model: The settings of the formulation as a Pydantic model. If ``None``, use ``**settings``. **settings: The settings of the formulation. This argument is ignored when ``settings_model`` is not ``None``. """ # noqa: D205, D212, D415 self._settings = create_model( self.Settings, settings_model=settings_model, **settings ) self.__disciplines = tuple(disciplines) self.__check_disciplines() self._objective_name = objective_name self.optimization_problem = OptimizationProblem(design_space) self.variable_sizes = design_space.variable_sizes.copy() @property def disciplines(self) -> tuple[Discipline, ...]: """The disciplines.""" return self.__disciplines @property def differentiated_input_names_substitute(self) -> Sequence[str]: """The names of the inputs with respect to which to differentiate the functions. If empty, consider the variables of their input space. """ return self._settings.differentiated_input_names_substitute def __check_disciplines(self) -> None: """Check that two disciplines do not compute the same output.""" disciplines = set(self.disciplines).difference(self.get_sub_scenarios()) if disciplines: check_disciplines_consistency(disciplines, False, True) @property def design_space(self) -> DesignSpace: """The design space on which the formulation is applied.""" return self.optimization_problem.design_space
[docs] @abstractmethod def add_constraint( self, output_name: str | Sequence[str], constraint_type: ConstraintType = ConstraintType.EQ, constraint_name: str = "", value: float = 0, positive: bool = False, ) -> None: r"""Add an equality or inequality constraint to the optimization problem. An equality constraint is written as :math:`c(x)=a`, a positive inequality constraint is written as :math:`c(x)\geq a` and a negative inequality constraint is written as :math:`c(x)\leq a`. This constraint is in addition to those created by the formulation, e.g. consistency constraints in IDF. The strategy of repartition of the constraints is defined by the formulation. Args: output_name: The name(s) of the outputs computed by :math:`c(x)`. If several names are given, a single discipline must provide all outputs. constraint_type: The type of constraint. constraint_name: The name of the constraint to be stored. If empty, the name of the constraint is generated from ``output_name``, ``constraint_type``, ``value`` and ``positive``. value: The value :math:`a`. positive: Whether the inequality constraint is positive. """
[docs] @abstractmethod def add_observable( self, output_names: str | Sequence[str], observable_name: str = "", discipline: Discipline | None = None, ) -> None: """Add an observable to the optimization problem. The repartition strategy of the observable is defined in the formulation class. Args: output_names: The name(s) of the output(s) to observe. observable_name: The name of the observable. If empty, the output name is used by default. discipline: The discipline computing the observed outputs. If ``None``, the discipline is detected from inner disciplines. """
[docs] @abstractmethod def get_top_level_disciplines( self, include_sub_formulations: bool = False ) -> tuple[Discipline, ...]: """Return the top level disciplines that are executed in the foreground. A formulation structures the optimization problem into multiple levels of disciplines. The top level disciplines map from the :attr:`.design_space` to the objective, constraint and observable spaces. They can be composed of both user disciplines and process disciplines added by the formulation, e.g. :class:`.MDOChain`. These process disciplines may also include both user disciplines and process disciplines, and so on. Args: include_sub_formulations: Whether to include the top level disciplines of the formulations that make up the current one. Returns: The top level disciplines. """
def _get_dv_indices( self, names: Iterable[str], ) -> dict[str, tuple[int, int, int]]: """Return the indices associated with specific variables. Args: names: The names of the variables. Returns: For each variable, a 3-length tuple whose first dimensions are its first and last indices in the design space and last dimension is its size. """ start = end = 0 sizes = self.variable_sizes names_to_indices = {} for name in names: size = sizes[name] end += size names_to_indices[name] = (start, end, size) start = end return names_to_indices
[docs] def unmask_x_swap_order( self, masking_data_names: Sequence[str], x_masked: ndarray, all_data_names: Iterable[str] = (), ) -> ndarray: """Unmask a vector or matrix from names, with respect to other names. This method eventually swaps the order of the values if the order of the data names is inconsistent between these sets. Args: masking_data_names: The names of the variables whose values come from ``x_masked`` (the other are zeros). x_masked: The vector or matrix to unmask. all_data_names: The names of the variables whose values the full array will concatenate. If empty, use the names of all the design variables. Returns: The vector or matrix related to the input mask. Raises: ValueError: when the sizes of variables are inconsistent. """ if not all_data_names: all_data_names = self.get_optim_variable_names() names_to_sizes = self.variable_sizes mask_size = sum(names_to_sizes[name] for name in masking_data_names) if (n_samples := x_masked.shape[-1] // mask_size) == 1: return self.__unmask_x_swap_order_if_one_sample( x_masked, all_data_names, masking_data_names ) return self.__unmask_x_swap_order_if_several_samples( x_masked, all_data_names, masking_data_names, mask_size, n_samples, )
def __unmask_x_swap_order_if_one_sample( self, x_masked: ndarray, all_data_names: Iterable[str], masking_data_names: Sequence[str], ) -> ndarray: """Unmasking function if there is only one sample. Args: x_masked: The array to unmask. all_data_names: All the variable names. masking_data_names: The names of the variables to unmask. Returns: The unmasked array. """ names_to_sizes = self.variable_sizes x_unmasked = zeros( ( *x_masked.shape[:-1], sum(names_to_sizes[name] for name in all_data_names), ), dtype=x_masked.dtype, ) indices = self._get_dv_indices(all_data_names) masked_position = 0 for variable_name in masking_data_names: unmasked_position, _, size = indices[variable_name] x_unmasked[..., unmasked_position : unmasked_position + size] = x_masked[ ..., masked_position : masked_position + size ] masked_position += size return x_unmasked def __unmask_x_swap_order_if_several_samples( self, x_masked: ndarray, all_data_names: Iterable[str], masking_data_names: Sequence[str], mask_size: int, n_samples: int, ) -> ndarray: """Unmasking function if there are several samples. Args: x_masked: The array to unmask. all_data_names: All the variable names. masking_data_names: The names of the variables to unmask. mask_size: The size of the mask. n_samples: The number of samples. Returns: The unmasked array. """ masked_position = 0 names_to_indices = { name: index for index, name in enumerate(all_data_names) if name in masking_data_names } n_variables = len(all_data_names) arrays = [None] * n_samples * n_variables names_to_sizes = self.variable_sizes for variable_name, variable_index in names_to_indices.items(): size = names_to_sizes[variable_name] a = variable_index - n_variables b = masked_position - mask_size for _ in range(n_samples): a += n_variables b += mask_size arrays[a] = x_masked[..., b : b + size] masked_position += size return block_array([arrays])
[docs] def mask_x_swap_order( self, masking_data_names: Iterable[str], x_vect: ndarray, all_data_names: Iterable[str] = (), ) -> ndarray: """Mask a vector from a subset of names, with respect to a set of names. This method eventually swaps the order of the values if the order of the data names is inconsistent between these sets. Args: masking_data_names: The names of the kept data. x_vect: The vector to mask. all_data_names: The set of all names. If empty, use the design variables stored in the design space. Returns: The masked version of the input vector. Raises: IndexError: when the sizes of variables are inconsistent. """ x_mask = self.get_x_mask_x_swap_order(masking_data_names, all_data_names) return x_vect[x_mask]
[docs] def get_x_mask_x_swap_order( self, masking_data_names: Iterable[str], all_data_names: Iterable[str] = (), ) -> ndarray: """Mask a vector from a subset of names, with respect to a set of names. This method eventually swaps the order of the values if the order of the data names is inconsistent between these sets. Args: masking_data_names: The names of the kept data. all_data_names: The set of all names. If empty, use the design variables stored in the design space. Returns: The masked version of the input vector. Raises: ValueError: If the sizes or the sizes of variables are inconsistent. """ design_space = self.optimization_problem.design_space if not all_data_names: all_data_names = design_space variable_sizes = {var: design_space.get_size(var) for var in design_space} total_size = sum(variable_sizes[var] for var in masking_data_names) indices = self._get_dv_indices(all_data_names) x_mask = empty(total_size, dtype="int") i_masked_min = i_masked_max = 0 try: for key in masking_data_names: i_min, i_max, loc_size = indices[key] i_masked_max += loc_size x_mask[i_masked_min:i_masked_max] = arange(i_min, i_max) i_masked_min = i_masked_max except KeyError as err: msg = ( "Inconsistent inputs of masking. " f"Key {err} is in masking_data_names {masking_data_names} " f"but not in provided all_data_names : {all_data_names}!" ) raise ValueError(msg) from None return x_mask
def _remove_unused_variables(self) -> None: """Remove variables in the design space that are not discipline inputs.""" design_space = self.optimization_problem.design_space disciplines = self.get_top_level_disciplines() all_inputs = {var for disc in disciplines for var in disc.io.input_grammar} for name in design_space.variable_names: if name not in all_inputs: design_space.remove_variable(name) LOGGER.info( "Variable %s was removed from the Design Space, it is not an input" " of any discipline.", name, ) def _remove_sub_scenario_dv_from_ds(self) -> None: """Remove the sub scenarios design variables from the design space.""" for scenario in self.get_sub_scenarios(): for var in scenario.formulation.design_space: if var in self.optimization_problem.design_space: self.optimization_problem.design_space.remove_variable(var) def _build_objective_from_disc( self, objective_name: str | Sequence[str], discipline: Discipline | None = None, top_level_disc: bool = True, ) -> None: """Build the objective function from the discipline able to compute it. Args: objective_name: The name(s) of the discipline output(s) used as objective. If multiple names are passed, the objective will be a vector. discipline: The discipline computing the objective. If ``None``, the discipline is detected from the inner disciplines. top_level_disc: Whether to search the discipline among the top level ones. """ objective = FunctionFromDiscipline( convert_strings_to_iterable(objective_name), self, discipline=discipline, top_level_disc=top_level_disc, ) if objective.discipline_adapter.is_linear: objective = compute_linear_approximation( objective, zeros(objective.discipline_adapter.input_dimension) ) self.optimization_problem.objective = objective
[docs] def get_optim_variable_names(self) -> list[str]: """Get the optimization unknown names to be provided to the optimizer. This is different from the design variable names provided by the user, since it depends on the formulation, and can include target values for coupling for instance in IDF. Returns: The optimization variable names. """ return self.optimization_problem.design_space.variable_names
[docs] def get_x_names_of_disc( self, discipline: Discipline, ) -> list[str]: """Get the design variables names of a given discipline. Args: discipline: The discipline. Returns: The names of the design variables. """ optim_variable_names = self.get_optim_variable_names() input_names = discipline.io.input_grammar return [name for name in optim_variable_names if name in input_names]
[docs] def get_sub_scenarios(self) -> list[BaseScenario]: """List the disciplines that are actually scenarios. Returns: The scenarios. """ from gemseo.scenarios.base_scenario import BaseScenario return [disc for disc in self.disciplines if isinstance(disc, BaseScenario)]
def _set_default_input_values_from_design_space(self) -> None: """Initialize the top level disciplines from the design space.""" if not self.optimization_problem.design_space.has_current_value: return current_x = self.optimization_problem.design_space.get_current_value( as_dict=True ) for discipline in self.get_top_level_disciplines(): input_names = discipline.io.input_grammar to_value = discipline.io.input_grammar.data_converter.convert_array_to_value discipline.io.input_grammar.defaults.update({ name: to_value(name, value) for name, value in current_x.items() if name in input_names })
[docs] @classmethod def get_default_sub_option_values(cls, **options: str) -> StrKeyMapping: """Return the default values of the sub-options of the formulation. When some options of the formulation depend on higher level options, the default values of these sub-options may be obtained here, mainly for use in the API. Args: **options: The options required to deduce the sub-options grammar. Returns: Either ``None`` or the sub-options default values. """ return {}
[docs] @classmethod def get_sub_options_grammar(cls, **options: str) -> JSONGrammar: """Get the sub-options grammar. When some options of the formulation depend on higher level options, the schema of the sub-options may be obtained here, mainly for use in the API. Args: **options: The options required to deduce the sub-options grammar. Returns: Either ``None`` or the sub-options grammar. """ return {}