Source code for gemseo.formulations.bilevel

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#        :author: Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""A Bi-level formulation."""
from __future__ import division, unicode_literals

import logging
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union

from gemseo.algos.design_space import DesignSpace
from gemseo.core.chain import MDOChain, MDOParallelChain
from gemseo.core.coupling_structure import MDOCouplingStructure
from gemseo.core.discipline import MDODiscipline
from gemseo.core.execution_sequence import ExecutionSequence
from gemseo.core.formulation import MDOFormulation
from gemseo.core.function import MDOFunction
from gemseo.core.json_grammar import JSONGrammar
from gemseo.core.mdo_scenario import MDOScenarioAdapter
from gemseo.core.scenario import Scenario
from gemseo.mda.mda import MDA
from gemseo.mda.mda_factory import MDAFactory

LOGGER = logging.getLogger(__name__)


[docs]class BiLevel(MDOFormulation): """A bi-level formulation. This formulation draws an optimization architecture that involves multiple optimization problems to be solved to obtain the solution of the MDO problem. Here, at each iteration on the global design variables, the bi-level MDO formulation implementation performs: 1. a first MDA to compute the coupling variables, 2. several disciplinary optimizations on the local design variables in parallel, 3. a second MDA to update the coupling variables. """ SYSTEM_LEVEL = "system" SUBSCENARIOS_LEVEL = "sub-scenarios" LEVELS = (SYSTEM_LEVEL, SUBSCENARIOS_LEVEL) def __init__( self, disciplines, # type: Sequence[MDODiscipline] objective_name, # type: str design_space, # type: DesignSpace maximize_objective=False, # type: bool mda_name="MDAChain", # type: str parallel_scenarios=False, # type: bool multithread_scenarios=True, # type: bool apply_cstr_tosub_scenarios=True, # type: bool apply_cstr_to_system=True, # type: bool reset_x0_before_opt=False, # type: bool **mda_options # type: Any ): # type: (...) -> None """ Args: mda_name: The name of the MDA class to be used. parallel_scenarios: If True, the sub-scenarios are run in parallel. multithread_scenarios: If True and parallel_scenarios=True, the sub-scenarios are run in parallel using multi-threading; if False and parallel_scenarios=True, multi-processing is used. apply_cstr_tosub_scenarios: If True, the :meth:`.add_constraint` method adds the constraint to the optimization problem of the sub-scenario capable of computing the constraint. apply_cstr_to_system: If True, the :meth:`.add_constraint` method adds the constraint to the optimization problem of the system scenario. reset_x0_before_opt: If True, restart the sub optimizations from the initial guesses, otherwise warm start them. **mda_options: The options passed to the MDA at construction. """ super(BiLevel, self).__init__( disciplines, objective_name, design_space, maximize_objective=maximize_objective, ) self._shared_dv = list(design_space.variables_names) self.mda1 = None self.mda2 = None self.reset_x0_before_opt = reset_x0_before_opt self.scenario_adapters = [] self.chain = None self._mda_factory = MDAFactory() self._apply_cstr_to_system = apply_cstr_to_system self._apply_cstr_tosub_scenarios = apply_cstr_tosub_scenarios self.__parallel_scenarios = parallel_scenarios self._multithread_scenarios = multithread_scenarios self.couplstr = MDOCouplingStructure(self.get_sub_disciplines()) # Create MDA self._build_mdas(mda_name, **mda_options) # Create MDOChain : MDA1 -> sub scenarios -> MDA2 self._build_chain() # Cleanup design space self._update_design_space() # Builds the objective function on top of the chain self._build_objective_from_disc(self._objective_name) def _build_scenario_adapters( self, output_functions=False, # type: bool pass_nonshared_var=False, # type: bool adapter_class=MDOScenarioAdapter, # type:MDOScenarioAdapter **adapter_options ): # type: (...) -> List[MDOScenarioAdapter] """Build the MDOScenarioAdapter required for each sub scenario. This is used to build the self.chain. Args: output_functions: If True, then the optimization functions are outputs of the adapter pass_nonshared_var: If True, the non-shared design variables are inputs of the scenarios adapters. adapter_class: The class of the adapters. **adapter_options: The options for the adapters initialization. Returns: The adapters for the sub-scenarios. """ adapters = [] # coupled sub-disciplines couplings = self.couplstr.strong_couplings() mda2_inpts = self._get_mda2_inputs() shared_dv = set(self._shared_dv) for scenario in self.get_sub_scenarios(): # Get the I/O names of the sub-scenario top-level disciplines top_disc = scenario.formulation.get_top_level_disc() top_inputs = [ inpt for disc in top_disc for inpt in disc.get_input_data_names() ] top_outputs = [ outpt for disc in top_disc for outpt in disc.get_output_data_names() ] mda1_outputs = self.mda1.get_output_data_names() if self.mda1 else [] # All couplings of the scenarios are taken from the MDA sc_allins = list( # Add shared variables from system scenario driver set(top_inputs) & set(set(couplings) | shared_dv | set(mda1_outputs)) ) if pass_nonshared_var: nonshared_var = scenario.design_space.variables_names sc_allins = list(set(sc_allins) | set(top_inputs) & set(nonshared_var)) # Output couplings of scenario are given to MDA for speedup if output_functions: opt_problem = scenario.formulation.opt_problem sc_outvars = opt_problem.objective.outvars sc_constraints = opt_problem.get_constraints_names() sc_out_coupl = sc_outvars + sc_constraints else: sc_out_coupl = list(set(top_outputs) & set(couplings + mda2_inpts)) # Add private variables from disciplinary scenario design space sc_allouts = sc_out_coupl + scenario.design_space.variables_names adapter = adapter_class(scenario, sc_allins, sc_allouts, **adapter_options) adapters.append(adapter) return adapters @staticmethod def _get_mda2_inputs(): # type: (...) -> List[str] """Return the inputs of the second MDA. Returns: The inputs of the second MDA. """ return []
[docs] @classmethod def get_sub_options_grammar( cls, **options # type: str ): # type: (...) -> JSONGrammar main_mda = options.get("mda_name") if main_mda is None: raise ValueError( "'mda_name' option is required to deduce the sub options of BiLevel !" ) factory = MDAFactory().factory return factory.get_options_grammar(main_mda)
[docs] @classmethod def get_default_sub_options_values( cls, **options # type: str ): # type: (...) -> Optional[Dict[str,Optional[Union[str,int,float,bool]]]] main_mda = options.get("mda_name") if main_mda is None: raise ValueError( "'mda_name' option is required to deduce the sub options of BiLevel !" ) factory = MDAFactory().factory return factory.get_default_options_values(main_mda)
def _build_mdas( self, mda_name, # type: str **mda_options # type:Optional[Union[str,int,float,bool]] ): # type: (...) -> None """Build the chain on top of which all functions are built. This chain is: MDA -> MDOScenarios -> MDA. Args: mda_name: The class name of the MDA. **mda_options: The options passed to the MDA. """ disc_mda1 = self.couplstr.strongly_coupled_disciplines() if len(disc_mda1) > 0: self.mda1 = self._mda_factory.create(mda_name, disc_mda1, **mda_options) self.mda1.warm_start = True else: LOGGER.warning( "No strongly coupled disciplines detected, " " MDA1 is deactivated in the BiLevel formulation" ) disc_mda2 = self.get_sub_disciplines() self.mda2 = self._mda_factory.create(mda_name, disc_mda2, **mda_options) self.mda2.warm_start = False def _build_chain_dis_sub_opts( self, ): # type: (...) -> Tuple[Union[List,MDA], List[MDOScenarioAdapter]] """Initialize the chain of disciplines and the sub-scenarios. Returns: The first MDA (if exists) and the sub-scenarios. """ chain_dis = [] if self.mda1 is not None: chain_dis = [self.mda1] sub_opts = self.scenario_adapters return chain_dis, sub_opts def _build_chain(self): # type: (...) -> None """Build the chain on top of which all functions are built. This chain is: MDA -> MDOScenarios -> MDA. """ # Build the scenario adapters to be chained with MDAs adapter_opt = {"reset_x0_before_opt": self.reset_x0_before_opt} self.scenario_adapters = self._build_scenario_adapters(**adapter_opt) chain_dis, sub_opts = self._build_chain_dis_sub_opts() if self.__parallel_scenarios: use_threading = self._multithread_scenarios par_chain = MDOParallelChain(sub_opts, use_threading=use_threading) chain_dis += [par_chain, self.mda2] else: # Chain MDA -> scenarios exec -> MDA chain_dis += sub_opts + [self.mda2] self.chain = MDOChain(chain_dis, name="bilevel_chain") if not self.reset_x0_before_opt and self.mda1 is not None: run_mda1_orig = self.mda1._run def _run_mda(): """Redefine mda1 execution to warm start the chain with previous x_local opt.""" # TODO : Define a pre run method to be overloaded in MDA maybe # Or use observers at the system driver level to pass the local # vars for scenario in self.get_sub_scenarios(): x_loc_d = scenario.design_space.get_current_x_dict() for indata, x_loc in x_loc_d.items(): if self.mda1.is_input_existing(indata): if x_loc is not None: self.mda1.local_data[indata] = x_loc return run_mda1_orig() self.mda1._run = _run_mda def _update_design_space(self): # type: (...) -> None """Update the design space by removing the coupling variables.""" self._set_defaultinputs_from_ds() self._remove_sub_scenario_dv_from_ds() self._remove_couplings_from_ds() self._remove_unused_variables() def _remove_couplings_from_ds(self): # type: (...) -> None """Removes the coupling variables from the design space.""" if hasattr(self.mda2, "strong_couplings"): # Otherwise, the MDA2 may be a user provided MDA # Which manages the couplings internally couplings = self.mda2.strong_couplings design_space = self.opt_problem.design_space for coupling in couplings: if coupling in design_space.variables_names: design_space.remove_variable(coupling)
[docs] def get_top_level_disc(self): # type: (...) -> List[MDODiscipline] return [self.chain]
[docs] def get_expected_workflow( self, ): # type: (...) -> List[ExecutionSequence,Tuple[ExecutionSequence]] return self.chain.get_expected_workflow()
[docs] def get_expected_dataflow( self, ): # type: (...) -> List[Tuple[MDODiscipline,MDODiscipline,List[str]]] return self.chain.get_expected_dataflow()
[docs] def add_constraint( self, output_name, # type: str constraint_type=MDOFunction.TYPE_EQ, # type: str constraint_name=None, # type: Optional[str] value=None, # type: Optional[float] positive=False, # type: bool levels=None, # type: Optional[List[str]] ): # type: (...) -> None """Add a constraint to the formulation. Args: levels: The levels at which the constraint is to be added (sublist of Bilevel.LEVELS). By default the policy set at the initialization of the formulation is enforced. """ # If the constraint levels are not specified the initial policy is enforced. if levels is None: if self._apply_cstr_to_system: self._add_system_level_constraint( output_name, constraint_type, constraint_name, value, positive ) if self._apply_cstr_tosub_scenarios: self._add_sub_level_constraint( output_name, constraint_type, constraint_name, value, positive ) # Otherwise the constraint is applied at the specified levels. elif not isinstance(levels, list) or not set(levels) <= set(BiLevel.LEVELS): raise ValueError( "Constraint levels must be a sublist of {}".format(BiLevel.LEVELS) ) elif not levels: LOGGER.warning("Empty list of constraint levels, constraint not added") else: if BiLevel.SYSTEM_LEVEL in levels: self._add_system_level_constraint( output_name, constraint_type, constraint_name, value, positive ) if BiLevel.SUBSCENARIOS_LEVEL in levels: self._add_sub_level_constraint( output_name, constraint_type, constraint_name, value, positive )
def _add_system_level_constraint( self, output_name, # type: str constraint_type=MDOFunction.TYPE_EQ, # type: str constraint_name=None, # type: Optional[str] value=None, # type: Optional[float] positive=False, # type: bool ): # type: (...) -> None """Add a constraint at the system level. Args: output_name: The name of the output to be used as a constraint. For instance, if g_1 is given and constraint_type="eq", g_1=0 will be added as a constraint to the optimizer. constraint_type: The type of constraint, either "eq" for equality constraint or "ineq" for inequality constraint. constraint_name: The name of the constraint to be stored, If None, the name is generated from the output name. value: The value of activation of the constraint. If None, the value is equal to 0. positive: If True, the inequality constraint is positive. """ super(BiLevel, self).add_constraint( output_name, constraint_type, constraint_name, value, positive ) def _add_sub_level_constraint( self, output_name, # type: str constraint_type=MDOFunction.TYPE_EQ, # type: str constraint_name=None, # type: Optional[str] value=None, # type: Optional[float] positive=False, # type: bool ): # type: (...) -> None """Add a constraint at the sub-scenarios level. Args: output_name: The name of the output to be used as a constraint. For instance, if g_1 is given and constraint_type="eq", g_1=0 will be added as a constraint to the optimizer. constraint_type: The type of constraint, either "eq" for equality constraint or "ineq" for inequality constraint. constraint_name: The name of the constraint to be stored, If None, the name is generated from the output name. value: The value of activation of the constraint. If None, the value is equal to 0. positive: If True, the inequality constraint is positive. """ added = False outputs_list = self._check_add_cstr_input(output_name, constraint_type) for scen in self.get_sub_scenarios(): if self._scenario_computes_outputs(scen, outputs_list): scen.add_constraint( outputs_list, constraint_type, constraint_name, value, positive ) added = True if not added: raise ValueError( "No sub scenario has an output named {} " "cannot create such a constraint.".format(output_name) ) @staticmethod def _scenario_computes_outputs( scenario, # type: Scenario output_names, # type: Iterable[str] ): # type: (...) -> bool """Check if the top level disciplines compute the given outputs. Args: output_names: The names of the variable to check. scenario: The scenario to be tested. Return: True if the top level disciplines compute the given outputs. """ for disc in scenario.formulation.get_top_level_disc(): if disc.is_all_outputs_existing(output_names): return True return False