Source code for gemseo.core.formulation

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                         documentation
#        :author: Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""The base class for all formulations."""

from __future__ import division, unicode_literals

import logging
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterable,
    List,
    Optional,
    Sequence,
    Tuple,
    Union,
)

from gemseo.algos.design_space import DesignSpace
from gemseo.core.discipline import MDODiscipline
from gemseo.core.execution_sequence import ExecutionSequence
from gemseo.core.json_grammar import JSONGrammar

if TYPE_CHECKING:
    from gemseo.core.scenario import Scenario

import six
from custom_inherit import DocInheritMeta
from numpy import array, copy, in1d, ndarray, where, zeros
from six import string_types

from gemseo.algos.opt_problem import OptimizationProblem
from gemseo.core.mdofunctions.function_from_discipline import FunctionFromDiscipline
from gemseo.core.mdofunctions.function_generator import MDOFunctionGenerator
from gemseo.core.mdofunctions.mdo_function import MDOFunction

LOGGER = logging.getLogger(__name__)


[docs]@six.add_metaclass( DocInheritMeta( abstract_base_class=True, style="google_with_merge", include_special_methods=True, ) ) class MDOFormulation(object): """Abstract MDO formulation class to be extended in subclasses for use. This class creates the objective function and constraints from the disciplines. It defines the process implicitly. By default, - the objective function is minimized, - the type of a constraint is equality, - the activation value of a constraint is 0. The link between the instances of :class:`.MDODiscipline`, the name of the objective function and the names of the constraints is made with the :class:`.MDOFunctionGenerator`, which generates instances of :class:`.MDOFunction` from the disciplines. """ NAME = "MDOFormulation" def __init__( self, disciplines, # type: Sequence[MDODiscipline] objective_name, # type: str design_space, # type: DesignSpace maximize_objective=False, # type: bool grammar_type=MDODiscipline.JSON_GRAMMAR_TYPE, # type: str **options # type: Any ): # type: (...) -> None # pylint: disable=W0613 """ Args: disciplines: The disciplines. objective_name: The name of the objective function. design_space: The design space. maximize_objective: If True, the objective function is maximized. grammar_type: The type of the input and output grammars, either :attr:`.MDODiscipline.JSON_GRAMMAR_TYPE` or :attr:`.MDODiscipline.SIMPLE_GRAMMAR_TYPE`. **options: The options of the formulation. """ self.check_disciplines(disciplines) self.disciplines = disciplines self._objective_name = objective_name self.opt_problem = OptimizationProblem(design_space) self._maximize_objective = maximize_objective self.__grammar_type = grammar_type @property def _grammar_type(self): # type: (...) -> str """The type of the input and output grammars.""" return self.__grammar_type @property def design_space(self): # type: (...) -> DesignSpace """The design space on which the formulation is applied.""" return self.opt_problem.design_space
[docs] @staticmethod def check_disciplines( disciplines, # type: Any ): # type: (...) -> None """Check that the disciplines are provided as a list. Args: disciplines: The disciplines. """ if not disciplines or not isinstance(disciplines, list): raise TypeError("Disciplines must be provided to the formulation as a list")
@staticmethod def _check_add_cstr_input( output_name, # type: str, constraint_type, # type:str ): # type: (...) -> List[str] """Check the output name and constraint type passed to :meth:`.add_constraint`. Args: output_name: The name of the output to be used as a constraint. For instance, if g_1 is given and constraint_type="eq", g_1=0 will be added as a constraint to the optimizer. constraint_type: The type of constraint, either "eq" for equality constraint or "ineq" for inequality constraint. """ if constraint_type not in [MDOFunction.TYPE_EQ, MDOFunction.TYPE_INEQ]: raise ValueError( "Constraint type must be either 'eq' or 'ineq'," " got: %s instead" % constraint_type ) if isinstance(output_name, list): outputs_list = output_name else: outputs_list = [output_name] return outputs_list
[docs] def add_constraint( self, output_name, # type: str constraint_type=MDOFunction.TYPE_EQ, # type: str constraint_name=None, # type: Optional[str] value=None, # type: Optional[float] positive=False, # type: bool ): # type: (...) -> None """Add a user constraint. A user constraint is a design constraint in addition to the formulation specific constraints such as the targets (a.k.a. consistency constraints) in IDF. The strategy of repartition of constraints is defined in the formulation class. Args: output_name: The name of the output to be used as a constraint. For instance, if g_1 is given and constraint_type="eq", g_1=0 will be added as a constraint to the optimizer. constraint_type: The type of constraint, either "eq" for equality constraint or "ineq" for inequality constraint. constraint_name: The name of the constraint to be stored, If None, the name is generated from the output name. value: The value of activation of the constraint. If None, the value is equal to 0. positive: If True, the inequality constraint is positive. """ outputs_list = self._check_add_cstr_input(output_name, constraint_type) mapped_cstr = FunctionFromDiscipline(outputs_list, self, top_level_disc=True) mapped_cstr.f_type = constraint_type if constraint_name is not None: mapped_cstr.name = constraint_name self.opt_problem.add_constraint(mapped_cstr, value=value, positive=positive)
[docs] def add_observable( self, output_names, # type: Union[str,Sequence[str]] observable_name=None, # type: Optional[str] discipline=None, # type: Optional[MDODiscipline] ): # type: (...) -> None """Add an observable to the optimization problem. The repartition strategy of the observable is defined in the formulation class. Args: output_names: The name(s) of the output(s) to observe. observable_name: The name of the observable. discipline: The discipline computing the observed outputs. If None, the discipline is detected from inner disciplines. """ if isinstance(output_names, string_types): output_names = [output_names] obs_fun = FunctionFromDiscipline( output_names, self, top_level_disc=True, discipline=discipline ) if observable_name is not None: obs_fun.name = observable_name obs_fun.f_type = MDOFunction.TYPE_OBS self.opt_problem.add_observable(obs_fun)
[docs] def get_top_level_disc(self): # type: (...) -> List[MDODiscipline] """Return the disciplines which inputs are required to run the scenario. A formulation seeks to evaluate objective function and constraints from inputs. It structures the optimization problem into multiple levels of disciplines. The disciplines directly depending on these inputs are called top level disciplines. By default, this method returns all disciplines. This method can be overloaded by subclasses. Returns: The top level disciplines. """ return self.disciplines
@staticmethod def _get_mask_from_datanames( all_data_names, # type: ndarray masked_data_names, # type: ndarray ): # type: (...) -> ndarray """Get a mask of all_data_names for masked_data_names. This mask is an array of the size of all_data_names with True values when masked_data_names are in all_data_names. Args: all_data_names: The main array for mask. masked_data_names: The array which masks all_data_names. Returns: A True / False valued mask array. """ places = in1d(all_data_names, masked_data_names) return where(places) def _get_generator_from( self, output_names, # type: Iterable[str] top_level_disc=False, # type: bool ): # type: (...) -> MDOFunctionGenerator """Create a generator of :class:`.MDOFunction` from the names of the outputs. Find a discipline which computes all the provided outputs and build the associated MDOFunctionGenerator. Args: output_names: The names of the outputs. top_level_disc: If True, search outputs among top level disciplines. Returns: A generator of :class:`.MDOFunction` instances. """ if top_level_disc: search_among = self.get_top_level_disc() else: search_among = self.disciplines for discipline in search_among: if discipline.is_all_outputs_existing(output_names): return MDOFunctionGenerator(discipline) raise ValueError( "No discipline known by formulation %s" " has all outputs named %s" % (type(self).__name__, output_names) ) def _get_generator_with_inputs( self, input_names, # type: Iterable[str] top_level_disc=False, # type: bool ): # type: (...) -> MDOFunctionGenerator """Create a generator of :class:`.MDOFunction` from the names of the inputs. Find a discipline which has all the provided inputs and build the associated MDOFunctionGenerator. Args: input_names: The names of the inputs. top_level_disc: If True, search inputs among the top level disciplines. Returns: A generator of :class:`.MDOFunction` instances. """ if top_level_disc: search_among = self.get_top_level_disc() else: search_among = self.disciplines for discipline in search_among: if discipline.is_all_inputs_existing(input_names): return MDOFunctionGenerator(discipline) raise ValueError( "No discipline known by formulation %s" " has all inputs named %s" % (type(self).__name__, input_names) )
[docs] def mask_x( self, masking_data_names, # type: Iterable[str] x_vect, # type: ndarray all_data_names=None, # type: Optional[Iterable[str]] ): # type: (...) -> ndarray """Mask a vector from a subset of names, with respect to a set of names. Args: masking_data_names: The names of data to keep. x_vect: The vector of float to mask. all_data_names: The set of all names. If None, use the design variables stored in the design space. Returns: A boolean mask with the same shape as the input vector. """ if all_data_names is None: all_data_names = self.get_optim_variables_names() i_min = 0 x_mask = array([False] * x_vect.size) for key in all_data_names: var_length = self._get_dv_length(key) i_max = i_min + var_length if len(x_vect) < i_max: raise ValueError( "Inconsistent input size array %s = %s" " for the design variable of length %s" % (key, x_vect.shape, var_length) ) if key in masking_data_names: x_mask[i_min:i_max] = True i_min = i_max return x_vect[x_mask]
[docs] def unmask_x( self, masking_data_names, # type: Iterable[str] x_masked, # type: ndarray all_data_names=None, # type: Optional[Iterable[str]] x_full=None, # type: ndarray ): # type: (...) -> ndarray """Unmask a vector from a subset of names, with respect to a set of names. Args: masking_data_names: The names of the kept data. x_masked: The boolean vector to unmask. all_data_names: The set of all names. If None, use the design variables stored in the design space. x_full: The default values for the full vector. If None, use the zero vector. Returns: The vector related to the input mask. """ return self.unmask_x_swap_order( masking_data_names, x_masked, all_data_names, x_full )
def _get_dv_length( self, variable_name, # type: str ): # type: (...) -> int """Retrieve the length of a variable. This method relies on the size declared in the design space. Args: variable_name: The name of the variable. Returns: The size of the variable. """ return self.opt_problem.design_space.get_size(variable_name) def _get_x_mask_swap( self, masking_data_names, # type: Iterable[str] all_data_names=None, # type: Optional[Iterable[str]] ): # type: (...) -> Tuple[Dict[str,Tuple[int,int]],int,int] """Get a mask from a subset of names, with respect to a set of names. This method eventually swaps the order of the values if the order of the data names is inconsistent between these sets. Args: masking_data_names: The names of data to keep. all_data_names: The set of all names. If None, use the design variables stored in the design space. Returns: A mask as well as the dimension of the restricted variable space and the dimension of the original variable space. The mask is a dictionary indexed by the names of the variables coming from the subset. For a given name, the value is a tuple whose first component is its lowest dimension in the original space and the second one is the lowest dimension of the variable that follows it in the original space. """ if all_data_names is None: all_data_names = self.get_optim_variables_names() i_min = 0 x_values_dict = {} n_x = 0 for key in all_data_names: i_max = i_min + self._get_dv_length(key) if key in masking_data_names: x_values_dict[key] = (i_min, i_max) n_x += i_max - i_min i_min = i_max return x_values_dict, n_x, i_max
[docs] def unmask_x_swap_order( self, masking_data_names, # type: Iterable[str] x_masked, # type: ndarray all_data_names=None, # type: Optional[Iterable[str]] x_full=None, # type: ndarray ): # type: (...) -> ndarray """Unmask a vector from a subset of names, with respect to a set of names. This method eventually swaps the order of the values if the order of the data names is inconsistent between these sets. Args: masking_data_names: The names of the kept data. x_masked: The boolean vector to unmask. all_data_names: The set of all names. If None, use the design variables stored in the design space. x_full: The default values for the full vector. If None, use the zero vector. Returns: The vector related to the input mask. """ if all_data_names is None: all_data_names = self.get_optim_variables_names() x_values_dict, _, len_x = self._get_x_mask_swap( masking_data_names, all_data_names ) if x_full is None: x_unmask = zeros(len_x, dtype=x_masked.dtype) else: x_unmask = copy(x_full) i_x = 0 for key in all_data_names: if key in x_values_dict: i_min, i_max = x_values_dict[key] n_x = i_max - i_min if x_masked.size < i_x + n_x: raise ValueError( "Inconsistent data shapes !\n" "Try to unmask data %s of length %s\n" "With values of length: %s" % (key, n_x, x_masked.size) ) x_unmask[i_min:i_max] = x_masked[i_x : i_x + n_x] i_x += n_x return x_unmask
[docs] def mask_x_swap_order( self, masking_data_names, # type: Iterable[str] x_vect, # type: ndarray all_data_names=None, # type: Optional[Iterable[str]] ): # type: (...) -> ndarray """Mask a vector from a subset of names, with respect to a set of names. This method eventually swaps the order of the values if the order of the data names is inconsistent between these sets. Args: masking_data_names: The names of the kept data. x_vect: The vector to mask. all_data_names: The set of all names. If None, use the design variables stored in the design space. Returns: The masked version of the input vector. """ if all_data_names is None: all_data_names = self.get_optim_variables_names() x_values_dict, n_x, _ = self._get_x_mask_swap( masking_data_names, all_data_names ) x_masked = zeros(n_x, dtype=x_vect.dtype) i_max = 0 i_min = 0 for key in masking_data_names: if key not in x_values_dict: raise ValueError( "Inconsistent inputs of masking. " "Key %s is in masking_data_names %s " "but not in provided all_data_names : %s!" % (key, masking_data_names, all_data_names) ) value = x_values_dict[key] i_max += value[1] - value[0] len_x = len(x_vect) if len(x_masked) < i_max or len_x <= value[0] or len_x < value[1]: raise ValueError( "Inconsistent input array size of values array %s " "with reference data shape %s, " "for data named: %s of size: %s" % (x_vect, x_vect.shape, key, i_max) ) x_masked[i_min:i_max] = x_vect[value[0] : value[1]] i_min = i_max return x_masked
def _remove_unused_variables(self): # type: (...) -> None """Remove variables in the design space that are not discipline inputs.""" design_space = self.opt_problem.design_space disciplines = self.get_top_level_disc() all_inputs = set( var for disc in disciplines for var in disc.get_input_data_names() ) for name in set(design_space.variables_names): if name not in all_inputs: design_space.remove_variable(name) def _remove_sub_scenario_dv_from_ds(self): # type: (...) -> None """Remove the sub scenarios design variables from the design space.""" for scenario in self.get_sub_scenarios(): loc_vars = scenario.design_space.variables_names for var in loc_vars: if var in self.design_space.variables_names: self.design_space.remove_variable(var) def _build_objective_from_disc( self, objective_name, # type: str discipline=None, # type: Optional[MDODiscipline] top_level_disc=True, # type: bool ): # type: (...) -> None """Build the objective function from the discipline able to compute it. Args: objective_name: The name of the objective function. discipline: The discipline computing the objective function. If None, the discipline is detected from the inner disciplines. top_level_disc: If True, search the discipline among the top level ones. """ if isinstance(objective_name, string_types): objective_name = [objective_name] obj_mdo_fun = FunctionFromDiscipline( objective_name, self, discipline, top_level_disc ) obj_mdo_fun.f_type = MDOFunction.TYPE_OBJ self.opt_problem.objective = obj_mdo_fun if self._maximize_objective: self.opt_problem.change_objective_sign()
[docs] def get_optim_variables_names(self): # type: (...) -> List[str] """Get the optimization unknown names to be provided to the optimizer. This is different from the design variable names provided by the user, since it depends on the formulation, and can include target values for coupling for instance in IDF. Returns: The optimization variable names. """ return self.opt_problem.design_space.variables_names
[docs] def get_x_names_of_disc( self, discipline, # type: MDODiscipline ): # type: (...) -> List[str] """Get the design variables names of a given discipline. Args: discipline: The discipline. Returns: The names of the design variables. """ optim_variables_names = self.get_optim_variables_names() input_names = discipline.get_input_data_names() return [name for name in optim_variables_names if name in input_names]
[docs] def get_sub_disciplines(self): # type: (...) ->List[MDODiscipline] """Accessor to the sub-disciplines. This method lists the sub scenarios' disciplines. Returns: The sub-disciplines. """ sub_disc = [] def add_to_sub( disc_list, # type:Iterable[MDODiscipline] ): # type: (...) -> None """Add the disciplines of the sub-scenarios if not already added it. Args: disc_list: The disciplines. """ for disc in disc_list: if disc not in sub_disc: sub_disc.append(disc) for discipline in self.disciplines: if hasattr(discipline, "disciplines"): add_to_sub(discipline.disciplines) else: add_to_sub([discipline]) return sub_disc
[docs] def get_sub_scenarios(self): # type: (...) -> List[Scenario] """List the disciplines that are actually scenarios. Returns: The scenarios. """ return [disc for disc in self.disciplines if disc.is_scenario()]
def _set_defaultinputs_from_ds(self): # type: (...) -> None """Initialize the top level disciplines from the design space.""" if not self.opt_problem.design_space.has_current_x(): return x_dict = self.opt_problem.design_space.get_current_x_dict() for disc in self.get_top_level_disc(): inputs = disc.get_input_data_names() curr_x_disc = {name: x_dict[name] for name in inputs if name in x_dict} disc.default_inputs.update(curr_x_disc)
[docs] def get_expected_workflow( self, ): # type: (...) -> List[ExecutionSequence,Tuple[ExecutionSequence]] """Get the expected sequence of execution of the disciplines. This method is used for the XDSM representation and can be overloaded by subclasses. For instance: * [A, B] denotes the execution of A, then the execution of B * (A, B) denotes the concurrent execution of A and B * [A, (B, C), D] denotes the execution of A, then the concurrent execution of B and C, then the execution of D. Returns: A sequence of elements which are either an :class:`.ExecutionSequence` or a tuple of :class:`.ExecutionSequence` for concurrent execution. """ raise NotImplementedError()
[docs] def get_expected_dataflow( self, ): # type: (...) -> List[Tuple[MDODiscipline,MDODiscipline,List[str]]] """Get the expected data exchange sequence. This method is used for the XDSM representation and can be overloaded by subclasses. Returns: The expected sequence of data exchange where the i-th item is described by the starting discipline, the ending discipline and the coupling variables. """ raise NotImplementedError()
[docs] @classmethod def get_default_sub_options_values( cls, **options # type:str ): # type: (...) -> Dict # pylint: disable=W0613 """Get the default values of the sub-options of the formulation. When some options of the formulation depend on higher level options, the default values of these sub-options may be obtained here, mainly for use in the API. Args: **options: The options required to deduce the sub-options grammar. Returns: Either None or the sub-options default values. """
[docs] @classmethod def get_sub_options_grammar( cls, **options # type: str ): # type: (...) -> JSONGrammar # pylint: disable=W0613 """Get the sub-options grammar. When some options of the formulation depend on higher level options, the schema of the sub-options may be obtained here, mainly for use in the API. Args: **options: The options required to deduce the sub-options grammar. Returns: Either None or the sub-options grammar. """