Source code for gemseo.problems.scalable.data_driven.problem

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#         documentation
#        :author:  Matthias De Lozzo
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""Scalable MDO problem.

This module implements the concept of scalable problem by means of the
:class:`.ScalableProblem` class.

Given

- an MDO scenario based on a set of sampled disciplines
  with a particular problem dimension,
- a new problem dimension (= number of inputs and outputs),

a scalable problem:

1. makes each discipline scalable based on the new problem dimension,
2. creates the corresponding MDO scenario.

Then, this MDO scenario can be executed and post-processed.

We can repeat this tasks for different sizes of variables
and compare the scalability, which is the dependence of the scenario results
on the problem dimension.

.. seealso:: :class:`.MDODiscipline`, :class:`.ScalableDiscipline`
   and :class:`.Scenario`
"""

from __future__ import annotations

import logging
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING

from numpy import array
from numpy import full
from numpy import ones
from numpy import where
from numpy import zeros
from numpy.random import default_rng

from gemseo import SEED
from gemseo import create_design_space
from gemseo import create_scenario
from gemseo import generate_coupling_graph
from gemseo import generate_n2_plot
from gemseo.algos.design_space import DesignSpace
from gemseo.core.coupling_structure import MDOCouplingStructure
from gemseo.disciplines.utils import get_all_inputs
from gemseo.mda.mda_factory import MDAFactory
from gemseo.problems.scalable.data_driven.discipline import ScalableDiscipline
from gemseo.utils.string_tools import MultiLineString

if TYPE_CHECKING:
    from collections.abc import Iterable
    from collections.abc import Sequence

    from gemseo.core.discipline import MDODiscipline
    from gemseo.core.scenario import Scenario
    from gemseo.datasets.io_dataset import IODataset

LOGGER = logging.getLogger(__name__)


[docs] class ScalableProblem: """Scalable problem.""" def __init__( self, datasets: Iterable[IODataset], design_variables, objective_function, eq_constraints=None, ineq_constraints=None, maximize_objective: bool = False, sizes=None, **parameters, ) -> None: """Constructor. :param list(Dataset) datasets: disciplinary datasets. :param list(str) design_variables: list of design variable names :param str objective_function: objective function :param list(str) eq_constraints: equality constraints. Default: None. :param list(str) eq_constraints: inequality constraints. Default: None. :param bool maximize_objective: maximize objective. Default: False. :param dict sizes: sizes of input and output variables. If ``None``, use the original sizes. Default: None. :param parameters: optional parameters for the scalable model. """ self.disciplines = [dataset.name for dataset in datasets] self.data = {dataset.name: dataset for dataset in datasets} self.inputs = { dataset.name: dataset.get_variable_names(dataset.INPUT_GROUP) for dataset in datasets } self.outputs = { dataset.name: dataset.get_variable_names(dataset.OUTPUT_GROUP) for dataset in datasets } self.varsizes = {} for dataset in datasets: self.varsizes.update(dataset.variable_names_to_n_components) self.design_variables = design_variables self.objective_function = objective_function self.ineq_constraints = ineq_constraints self.eq_constraints = eq_constraints self.maximize_objective = maximize_objective self.scaled_disciplines = [] self.scaled_sizes = {} self._build_scalable_disciplines(sizes, **parameters) self.scenario = None def __str__(self) -> str: """String representation of information about the scalable problem. :return: scalable problem description :rtype: str """ disciplines = ", ".join(self.disciplines) design_variables = None if self.design_variables is not None: design_variables = ", ".join(self.design_variables) ineq_constraints = None if self.ineq_constraints is not None: ineq_constraints = ", ".join(self.ineq_constraints) eq_constraints = None if self.eq_constraints is not None: eq_constraints = ", ".join(self.eq_constraints) sizes = [name + f" ({size})" for name, size in self.scaled_sizes.items()] sizes = ", ".join(sizes) optimize = "maximize" if self.maximize_objective else "minimize" msg = MultiLineString() msg.add("MDO problem") msg.indent() msg.add("Disciplines: {}", disciplines) msg.add("Design variables: {}", design_variables) msg.add("Objective function: {} (to {})", self.objective_function, optimize) msg.add("Inequality constraints: {}", ineq_constraints) msg.add("Equality constraints: {}", eq_constraints) msg.add("Sizes: {}", sizes) return str(msg)
[docs] def plot_n2_chart(self, save: bool = True, show: bool = False) -> None: """Plot a N2 chart. :param bool save: save plot. Default: True. :param bool show: show plot. Default: False. """ generate_n2_plot(self.scaled_disciplines, save=save, show=show)
[docs] def plot_coupling_graph(self) -> None: """Plot a coupling graph.""" generate_coupling_graph(self.scaled_disciplines)
[docs] def plot_1d_interpolations( self, save: bool = True, show: bool = False, step: float = 0.01, varnames: Sequence[str] | None = None, directory: Path | str = ".", png: bool = False, ): """Plot 1d interpolations. :param bool save: save plot. Default: True. :param bool show: show plot. Default: False. :param bool step: Step to evaluate the 1d interpolation function Default: 0.01. :param list(str) varnames: names of the variable to plot; if None, all variables are plotted. Default: None. :param str directory: directory path. Default: '.'. :param bool png: if True, the file format is PNG. Otherwise, use PDF. Default: False. """ directory = Path(directory) directory.mkdir(exist_ok=True) file_paths = [] for scalable_discipline in self.scaled_disciplines: func = scalable_discipline.scalable_model.plot_1d_interpolations file_names = func(save, show, step, varnames, directory, png) file_paths += [directory / file_name for file_name in file_names] return file_paths
[docs] def plot_dependencies( self, save: bool = True, show: bool = False, directory: str = "." ): """Plot dependency matrices. :param bool save: save plot (default: True) :param bool show: show plot (default: False) :param str directory: directory path (default: '.') """ fnames = [] for scalable_discipline in self.scaled_disciplines: scalable_model = scalable_discipline.scalable_model plot_dependency = scalable_model.plot_dependency fname = plot_dependency( add_levels=True, save=save, show=show, directory=directory ) fnames.append(fname) return fnames
def _build_scalable_disciplines(self, sizes=None, **parameters) -> None: """Build scalable disciplines. :param dict sizes: dictionary whose keys are variable names and variables sizes. :param parameters: options. """ copied_parameters = deepcopy(parameters) for disc in self.disciplines: varnames = self.inputs[disc] + self.outputs[disc] sizes = sizes or {} new_varsizes = { varname: sizes.get(varname, self.varsizes[varname]) for varname in varnames } if "group_dep" in parameters: copied_parameters["group_dep"] = parameters["group_dep"][disc] if "fill_factor" in parameters: copied_parameters["fill_factor"] = parameters["fill_factor"][disc] self.scaled_disciplines.append( ScalableDiscipline( "ScalableDiagonalModel", self.data[disc], new_varsizes, **copied_parameters, ) ) self.scaled_sizes.update(deepcopy(new_varsizes))
[docs] def create_scenario( self, formulation: str = "DisciplinaryOpt", scenario_type: str = "MDO", start_at_equilibrium: bool = False, active_probability: float = 0.1, feasibility_level: float = 0.5, **options, ) -> Scenario: """Create a :class:`.Scenario` from the scalable disciplines. Args: formulation: The MDO formulation to use for the scenario. scenario_type: The type of scenario, either ``MDO`` or ``DOE``. start_at_equilibrium: Whether to start at equilibrium using a preliminary MDA. active_probability: The probability to set the inequality constraints as active at the initial step of the optimization. feasibility_level: The offset of satisfaction for inequality constraints. **options: The formulation options. Returns: The :class:`.Scenario` from the scalable disciplines. """ equilibrium = {} if start_at_equilibrium: equilibrium = self.__get_equilibrium() disciplines = self.scaled_disciplines design_space = self._create_design_space(disciplines, formulation) if formulation == "BiLevel": self.scenario = self._create_bilevel_scenario(disciplines, **options) else: self.scenario = create_scenario( disciplines, formulation, self.objective_function, deepcopy(design_space), scenario_type=scenario_type, maximize_objective=self.maximize_objective, **options, ) self.__add_ineq_constraints(active_probability, feasibility_level, equilibrium) self.__add_eq_constraints(equilibrium) return self.scenario
def _create_bilevel_scenario( self, disciplines: Iterable[MDODiscipline], **sub_scenario_options ) -> Scenario: """Create a bilevel scenario from disciplines. :param list(MDODiscipline) disciplines: list of MDODiscipline """ cpl_structure = MDOCouplingStructure(disciplines) st_cpl_disciplines = cpl_structure.strongly_coupled_disciplines wk_cpl_disciplines = cpl_structure.weakly_coupled_disciplines() obj = self.objective_function max_obj = self.maximize_objective # Construction of the subsystem scenarios sub_scenarios = [] sub_inputs = [] for discipline in st_cpl_disciplines: cplt_disciplines = list(set(disciplines) - {discipline}) sub_disciplines = [discipline, *wk_cpl_disciplines] design_space = DesignSpace() inputs = get_all_inputs([discipline]) all_inputs = get_all_inputs(cplt_disciplines) inputs = list(set(inputs) - set(all_inputs)) sub_inputs += inputs for name in inputs: design_space.add_variable( name, self.scaled_sizes[name], "float", 0.0, 1.0, 0.5 ) sub_scenarios.append( create_scenario( disciplines=sub_disciplines, formulation="DisciplinaryOpt", objective_name=obj, design_space=design_space, maximize_objective=max_obj, ) ) sub_scenarios[-1].default_inputs = sub_scenario_options # Construction of the system scenario all_inputs = get_all_inputs(disciplines) inputs = list(set(all_inputs) - set(sub_inputs)) design_space = DesignSpace() for name in inputs: design_space.add_variable( name, self.scaled_sizes[name], "float", 0.0, 1.0, 0.5 ) sub_disciplines = sub_scenarios + wk_cpl_disciplines return create_scenario( disciplines=sub_disciplines, formulation="BiLevel", objective_name=obj, design_space=design_space, maximize_objective=max_obj, mda_name="MDAJacobi", tolerance=1e-8, ) def _create_design_space( self, disciplines=None, formulation: str = "DisciplinaryOpt" ) -> DesignSpace: """Create a design space into the unit hypercube. :param list(MDODiscipline) disciplines: list of MDODiscipline :param str formulation: MDO formulation (default: 'DisciplinaryOpt') """ design_space = create_design_space() for name in self.design_variables: size = self.scaled_sizes[name] design_space.add_variable( name, size=size, var_type="float", l_b=zeros(size), u_b=ones(size), value=full(size, 0.5), ) if formulation == "IDF": coupling_structure = MDOCouplingStructure(disciplines) all_couplings = set(coupling_structure.all_couplings) for name in all_couplings: size = self.scaled_sizes[name] design_space.add_variable( name, size=size, var_type="float", l_b=zeros(size), u_b=ones(size), value=full(size, 0.5), ) return design_space def __get_equilibrium(self, mda_name: str = "MDAJacobi", **options): """Get the equilibrium point from an MDA method. :param str mda_name: MDA name (default: 'MDAJacobi') :return: equilibrium point :rtype: dict """ LOGGER.info("Build a preliminary MDA to start at equilibrium") factory = MDAFactory() mda = factory.create(mda_name, self.scaled_disciplines, **options) if len(mda.strong_couplings) == 0: mda = factory.create("MDAQuasiNewton", self.scaled_disciplines, **options) return mda.execute() def __add_ineq_constraints( self, active_probability, feasibility_level, equilibrium ) -> None: """Add inequality constraints. :param float active_probability: probability to set the inequality constraints as active at initial step of the optimization :param float feasibility_level: offset of satisfaction for inequality constraints :param dict equilibrium: starting point at equilibrium """ if not hasattr(feasibility_level, "__len__"): feasibility_level = { constraint: feasibility_level for constraint in self.ineq_constraints } for constraint, alphai in feasibility_level.items(): if constraint in list(equilibrium.keys()): sample = default_rng(SEED).random(len(equilibrium[constraint])) val = equilibrium[constraint] taui = where( sample < active_probability, val, alphai + (1 - alphai) * val ) else: taui = 0.0 self.scenario.add_constraint(constraint, "ineq", value=taui) def __add_eq_constraints(self, equilibrium) -> None: """Add equality constraints. :param dict equilibrium: starting point at equilibrium """ for constraint in self.eq_constraints: cstr_value = equilibrium.get(constraint, array([0.0]))[0] self.scenario.add_constraint(constraint, "eq", value=cstr_value)
[docs] def exec_time(self, do_sum: bool = True): """Get total execution time per discipline. :param bool do_sum: sum over disciplines (default: True) :return: execution time :rtype: list(float) or float """ exec_time = [discipline.exec_time for discipline in self.scenario.disciplines] if do_sum: exec_time = sum(exec_time) return exec_time
@property def n_calls_top_level(self): """Get number of top level disciplinary calls per discipline. :return: number of top level disciplinary calls per discipline :rtype: list(int) or int """ disciplines = self.scenario.formulation.get_top_level_disc() return {discipline.name: discipline.n_calls for discipline in disciplines} @property def n_calls_linearize_top_level(self): """Get number of top level disciplinary calls per discipline. :return: number of top level disciplinary calls per discipline :rtype: list(int) or int """ disciplines = self.scenario.formulation.get_top_level_disc() return { discipline.name: discipline.n_calls_linearize for discipline in disciplines } @property def n_calls(self): """Get number of disciplinary calls per discipline. :return: number of disciplinary calls per discipline :rtype: list(int) or int """ return { discipline.name: discipline.n_calls for discipline in self.scenario.disciplines } @property def n_calls_linearize(self): """Get number of disciplinary calls per discipline. :return: number of disciplinary calls per discipline :rtype: list(int) or int """ return { discipline.name: discipline.n_calls_linearize for discipline in self.scenario.disciplines } @property def status(self): """Get the status of the scenario.""" return self.scenario.optimization_result.status @property def is_feasible(self): """Get the feasibility property of the scenario.""" return self.scenario.optimization_result.is_feasible