# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Matthias De Lozzo
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""Scalable MDO problem.
This module implements the concept of scalable problem by means of the
:class:`.ScalableProblem` class.
Given
- an MDO scenario based on a set of sampled disciplines
with a particular problem dimension,
- a new problem dimension (= number of inputs and outputs),
a scalable problem:
1. makes each discipline scalable based on the new problem dimension,
2. creates the corresponding MDO scenario.
Then, this MDO scenario can be executed and post-processed.
We can repeat this tasks for different sizes of variables
and compare the scalability, which is the dependence of the scenario results
on the problem dimension.
.. seealso:: :class:`.MDODiscipline`, :class:`.ScalableDiscipline`
and :class:`.Scenario`
"""
from __future__ import annotations
import logging
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from numpy import array
from numpy import full
from numpy import ones
from numpy import where
from numpy import zeros
from numpy.random import default_rng
from gemseo import create_design_space
from gemseo import create_scenario
from gemseo import generate_coupling_graph
from gemseo import generate_n2_plot
from gemseo.algos.design_space import DesignSpace
from gemseo.core.coupling_structure import MDOCouplingStructure
from gemseo.disciplines.utils import get_all_inputs
from gemseo.mda.mda_factory import MDAFactory
from gemseo.problems.scalable.data_driven.discipline import ScalableDiscipline
from gemseo.utils.seeder import SEED
from gemseo.utils.string_tools import MultiLineString
if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Mapping
from collections.abc import Sequence
from numpy._typing import NDArray
from gemseo.core.discipline import MDODiscipline
from gemseo.core.scenario import Scenario
from gemseo.datasets.io_dataset import IODataset
LOGGER = logging.getLogger(__name__)
[docs]
class ScalableProblem:
"""Scalable problem."""
def __init__(
self,
datasets: Iterable[IODataset],
design_variables: Iterable[str],
objective_function: str,
eq_constraints: Iterable[str] | None = None,
ineq_constraints: Iterable[str] | None = None,
maximize_objective: bool = False,
sizes: Mapping[str, int] | None = None,
**parameters: Any,
) -> None:
"""
Args:
datasets: One input-output dataset per discipline.
design_variables: The names of the design variables.
objective_function: The name of the objective.
eq_constraints: The names of the equality constraints, if any.
ineq_constraints: The names of the inequality constraints, if any.
maximize_objective: Whether to maximize the objective.
sizes: The sizes of the inputs and outputs.
If ``None``, use the original sizes.
**parameters: The optional parameters of the scalable model.
""" # noqa: D205, D212, D415
self.disciplines = [dataset.name for dataset in datasets]
self.data = {dataset.name: dataset for dataset in datasets}
self.inputs = {
dataset.name: dataset.get_variable_names(dataset.INPUT_GROUP)
for dataset in datasets
}
self.outputs = {
dataset.name: dataset.get_variable_names(dataset.OUTPUT_GROUP)
for dataset in datasets
}
self.varsizes = {}
for dataset in datasets:
self.varsizes.update(dataset.variable_names_to_n_components)
self.design_variables = design_variables
self.objective_function = objective_function
self.ineq_constraints = ineq_constraints
self.eq_constraints = eq_constraints
self.maximize_objective = maximize_objective
self.scaled_disciplines = []
self.scaled_sizes = {}
self._build_scalable_disciplines(sizes, **parameters)
self.scenario = None
def __str__(self) -> str:
disciplines = ", ".join(self.disciplines)
design_variables = None
if self.design_variables is not None:
design_variables = ", ".join(self.design_variables)
ineq_constraints = None
if self.ineq_constraints is not None:
ineq_constraints = ", ".join(self.ineq_constraints)
eq_constraints = None
if self.eq_constraints is not None:
eq_constraints = ", ".join(self.eq_constraints)
sizes = [name + f" ({size})" for name, size in self.scaled_sizes.items()]
sizes = ", ".join(sizes)
optimize = "maximize" if self.maximize_objective else "minimize"
msg = MultiLineString()
msg.add("MDO problem")
msg.indent()
msg.add("Disciplines: {}", disciplines)
msg.add("Design variables: {}", design_variables)
msg.add("Objective function: {} (to {})", self.objective_function, optimize)
msg.add("Inequality constraints: {}", ineq_constraints)
msg.add("Equality constraints: {}", eq_constraints)
msg.add("Sizes: {}", sizes)
return str(msg)
[docs]
def plot_n2_chart(self, save: bool = True, show: bool = False) -> None:
"""Plot a N2 chart.
Args:
save: Whether to save the figure.
show: Whether to display the figure.
"""
generate_n2_plot(self.scaled_disciplines, save=save, show=show)
[docs]
def plot_coupling_graph(self) -> None:
"""Plot a coupling graph."""
generate_coupling_graph(self.scaled_disciplines)
[docs]
def plot_1d_interpolations(
self,
save: bool = True,
show: bool = False,
step: float = 0.01,
varnames: Sequence[str] | None = None,
directory: Path | str = ".",
png: bool = False,
):
"""Plot 1d interpolations.
Args:
save: Whether to save the figure.
show: Whether to display the figure.
step: The step to evaluate the 1d interpolation function.
varnames: The names of the variable to plot.
If ``None``, all the variables are plotted.
directory: The directory path.
png: Whether to use PNG file format instead of PDF.
"""
directory = Path(directory)
directory.mkdir(exist_ok=True)
file_paths = []
for scalable_discipline in self.scaled_disciplines:
func = scalable_discipline.scalable_model.plot_1d_interpolations
file_names = func(save, show, step, varnames, directory, png)
file_paths += [directory / file_name for file_name in file_names]
return file_paths
[docs]
def plot_dependencies(
self, save: bool = True, show: bool = False, directory: str = "."
):
"""Plot dependency matrices.
Args:
save: Whether to save the figure.
show: Whether to display the figure.
directory: The directory path.
"""
fnames = []
for scalable_discipline in self.scaled_disciplines:
scalable_model = scalable_discipline.scalable_model
plot_dependency = scalable_model.plot_dependency
fname = plot_dependency(
add_levels=True, save=save, show=show, directory=directory
)
fnames.append(fname)
return fnames
def _build_scalable_disciplines(
self, sizes: Mapping[str, int] | None = None, **parameters: Any
) -> None:
"""Build the scalable disciplines.
Args:
size: The sizes of the inputs and outputs.
**parameters: The options of the scalable disciplines.
"""
copied_parameters = deepcopy(parameters)
for disc in self.disciplines:
varnames = self.inputs[disc] + self.outputs[disc]
sizes = sizes or {}
new_varsizes = {
varname: sizes.get(varname, self.varsizes[varname])
for varname in varnames
}
if "group_dep" in parameters:
copied_parameters["group_dep"] = parameters["group_dep"][disc]
if "fill_factor" in parameters:
copied_parameters["fill_factor"] = parameters["fill_factor"][disc]
self.scaled_disciplines.append(
ScalableDiscipline(
"ScalableDiagonalModel",
self.data[disc],
new_varsizes,
**copied_parameters,
)
)
self.scaled_sizes.update(deepcopy(new_varsizes))
[docs]
def create_scenario(
self,
formulation: str = "DisciplinaryOpt",
scenario_type: str = "MDO",
start_at_equilibrium: bool = False,
active_probability: float = 0.1,
feasibility_level: float = 0.5,
**options,
) -> Scenario:
"""Create a :class:`.Scenario` from the scalable disciplines.
Args:
formulation: The MDO formulation to use for the scenario.
scenario_type: The type of scenario, either ``MDO`` or ``DOE``.
start_at_equilibrium: Whether to start at equilibrium using a preliminary
MDA.
active_probability: The probability to set the inequality constraints as
active at the initial step of the optimization.
feasibility_level: The offset of satisfaction for inequality
constraints.
**options: The formulation options.
Returns:
The :class:`.Scenario` from the scalable disciplines.
"""
equilibrium = {}
if start_at_equilibrium:
equilibrium = self.__get_equilibrium()
disciplines = self.scaled_disciplines
design_space = self._create_design_space(disciplines, formulation)
if formulation == "BiLevel":
self.scenario = self._create_bilevel_scenario(disciplines, **options)
else:
self.scenario = create_scenario(
disciplines,
formulation,
self.objective_function,
deepcopy(design_space),
scenario_type=scenario_type,
maximize_objective=self.maximize_objective,
**options,
)
self.__add_ineq_constraints(active_probability, feasibility_level, equilibrium)
self.__add_eq_constraints(equilibrium)
return self.scenario
def _create_bilevel_scenario(
self, disciplines: Iterable[MDODiscipline], **sub_scenario_options
) -> Scenario:
"""Create a bi-level scenario from disciplines.
Args:
disciplines: The disciplines.
**sub_scenario_options: The options of the sub-scenarios.
Returns:
A scenario using a bi-level formulation.
"""
cpl_structure = MDOCouplingStructure(disciplines)
st_cpl_disciplines = cpl_structure.strongly_coupled_disciplines
wk_cpl_disciplines = cpl_structure.weakly_coupled_disciplines()
obj = self.objective_function
max_obj = self.maximize_objective
# Construction of the subsystem scenarios
sub_scenarios = []
sub_inputs = []
for discipline in st_cpl_disciplines:
cplt_disciplines = list(set(disciplines) - {discipline})
sub_disciplines = [discipline, *wk_cpl_disciplines]
design_space = DesignSpace()
inputs = get_all_inputs([discipline])
all_inputs = get_all_inputs(cplt_disciplines)
inputs = list(set(inputs) - set(all_inputs))
sub_inputs += inputs
for name in inputs:
design_space.add_variable(
name, self.scaled_sizes[name], "float", 0.0, 1.0, 0.5
)
sub_scenarios.append(
create_scenario(
sub_disciplines,
"DisciplinaryOpt",
obj,
design_space,
maximize_objective=max_obj,
)
)
sub_scenarios[-1].default_inputs = sub_scenario_options
# Construction of the system scenario
all_inputs = get_all_inputs(disciplines)
inputs = list(set(all_inputs) - set(sub_inputs))
design_space = DesignSpace()
for name in inputs:
design_space.add_variable(
name, self.scaled_sizes[name], "float", 0.0, 1.0, 0.5
)
sub_disciplines = sub_scenarios + wk_cpl_disciplines
return create_scenario(
sub_disciplines,
"BiLevel",
obj,
design_space,
maximize_objective=max_obj,
mda_name="MDAJacobi",
tolerance=1e-8,
)
def _create_design_space(
self, disciplines: Sequence[MDODiscipline], formulation: str = "DisciplinaryOpt"
) -> DesignSpace:
"""Create a design space into the unit hypercube.
Args:
disciplines: The disciplines.
formulation: The name of the formulation.
Returns:
The design space.
"""
design_space = create_design_space()
for name in self.design_variables:
size = self.scaled_sizes[name]
design_space.add_variable(
name,
size=size,
var_type="float",
l_b=zeros(size),
u_b=ones(size),
value=full(size, 0.5),
)
if formulation == "IDF":
coupling_structure = MDOCouplingStructure(disciplines)
all_couplings = set(coupling_structure.all_couplings)
for name in all_couplings:
size = self.scaled_sizes[name]
design_space.add_variable(
name,
size=size,
var_type="float",
l_b=zeros(size),
u_b=ones(size),
value=full(size, 0.5),
)
return design_space
def __get_equilibrium(
self, mda_name: str = "MDAJacobi", **options: Any
) -> dict[str, NDArray[float]]:
"""Get the equilibrium point from an MDA method.
Args:
mda_name: The name of the MDA.
Returns:
The equilibrium point.
"""
LOGGER.info("Build a preliminary MDA to start at equilibrium")
factory = MDAFactory()
mda = factory.create(mda_name, self.scaled_disciplines, **options)
if len(mda.strong_couplings) == 0:
mda = factory.create("MDAQuasiNewton", self.scaled_disciplines, **options)
return mda.execute()
def __add_ineq_constraints(
self,
active_probability: float,
feasibility_level: float,
equilibrium: Mapping[str, NDArray[float]],
) -> None:
"""Add the inequality constraints.
Args:
active_probability: The probability to set the inequality constraints
as active at initial step of the optimization.
feasibility_level: The offset of satisfaction
for the inequality constraints.
equilibrium: The starting point at equilibrium.
"""
if not hasattr(feasibility_level, "__len__"):
feasibility_level = dict.fromkeys(self.ineq_constraints, feasibility_level)
for constraint, alphai in feasibility_level.items():
if constraint in list(equilibrium.keys()):
sample = default_rng(SEED).random(len(equilibrium[constraint]))
val = equilibrium[constraint]
taui = where(
sample < active_probability, val, alphai + (1 - alphai) * val
)
else:
taui = 0.0
self.scenario.add_constraint(constraint, constraint_type="ineq", value=taui)
def __add_eq_constraints(self, equilibrium: Mapping[str, NDArray[float]]) -> None:
"""Add equality constraints.
Args:
equilibrium: The starting point at equilibrium.
"""
for constraint in self.eq_constraints:
self.scenario.add_constraint(
constraint, value=equilibrium.get(constraint, array([0.0]))[0]
)
[docs]
def exec_time(self, do_sum: bool = True) -> float | list[float]:
"""Get the total execution time.
Args:
do_sum: Whether to sum the disciplinary execution times.
Returns:
Either the total execution time
or the total execution times per disciplines.
"""
exec_time = [discipline.exec_time for discipline in self.scenario.disciplines]
if do_sum:
exec_time = sum(exec_time)
return exec_time
@property
def n_calls_top_level(self) -> dict[str, int]:
"""The number of top-level disciplinary calls per discipline."""
disciplines = self.scenario.formulation.get_top_level_disc()
return {discipline.name: discipline.n_calls for discipline in disciplines}
@property
def n_calls_linearize_top_level(self) -> dict[str, int]:
"""The number of top-level disciplinary linearizations per discipline."""
disciplines = self.scenario.formulation.get_top_level_disc()
return {
discipline.name: discipline.n_calls_linearize for discipline in disciplines
}
@property
def n_calls(self) -> dict[str, int]:
"""The number of disciplinary calls per discipline."""
return {
discipline.name: discipline.n_calls
for discipline in self.scenario.disciplines
}
@property
def n_calls_linearize(self) -> dict[str, int]:
"""The number of disciplinary linearizations per discipline."""
return {
discipline.name: discipline.n_calls_linearize
for discipline in self.scenario.disciplines
}
@property
def status(self) -> int:
"""The status of the scenario."""
return self.scenario.optimization_result.status
@property
def is_feasible(self) -> bool:
"""Whether the solution is feasible."""
return self.scenario.optimization_result.is_feasible