Source code for gemseo.problems.scalable.data_driven.problem

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#         documentation
#        :author:  Matthias De Lozzo
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""
Scalable MDO problem
====================

This module implements the concept of scalable problem by means of the
:class:.ScalableProblem class.

Given

- a MDO scenario based on a set of sampled disciplines
with a particular problem dimension,
- a new problem dimension (= number of inputs and outputs),

a scalable problem:

1. makes each discipline scalable based on the new problem dimension,
2. creates the corresponding MDO scenario.

Then, this MDO scenario can be executed and post-processed.

We can repeat this tasks for different sizes of variables
and compare the scalability, which is the dependence of the scenario results
on the problem dimension.

.. seealso:: :class:.MDODiscipline, :class:.ScalableDiscipline
and :class:.Scenario
"""
from __future__ import annotations

import logging
import os
from copy import deepcopy

from numpy import array
from numpy import full
from numpy import ones
from numpy import random
from numpy import where
from numpy import zeros

from gemseo.algos.design_space import DesignSpace
from gemseo.api import create_design_space
from gemseo.api import create_scenario
from gemseo.api import generate_coupling_graph
from gemseo.api import generate_n2_plot
from gemseo.core.coupling_structure import MDOCouplingStructure
from gemseo.core.scenario import Scenario
from gemseo.disciplines.utils import get_all_inputs
from gemseo.mda.mda_factory import MDAFactory
from gemseo.problems.scalable.data_driven.discipline import ScalableDiscipline
from gemseo.utils.string_tools import MultiLineString

LOGGER = logging.getLogger(__name__)

[docs]class ScalableProblem:
"""Scalable problem."""

def __init__(
self,
datasets,
design_variables,
objective_function,
eq_constraints=None,
ineq_constraints=None,
maximize_objective=False,
sizes=None,
**parameters,
):
"""Constructor.

:param list(Dataset) datasets: disciplinary datasets.
:param list(str) design_variables: list of design variable names
:param str objective_function: objective function
:param list(str) eq_constraints: equality constraints. Default: None.
:param list(str) eq_constraints: inequality constraints. Default: None.
:param bool maximize_objective: maximize objective. Default: False.
:param dict sizes: sizes of input and output variables.
If None, use the original sizes. Default: None.
:param parameters: optional parameters for the scalable model.
"""
self.disciplines = [dataset.name for dataset in datasets]
self.data = {dataset.name: dataset for dataset in datasets}
self.inputs = {
dataset.name: dataset.get_names(dataset.INPUT_GROUP) for dataset in datasets
}
self.outputs = {
dataset.name: dataset.get_names(dataset.OUTPUT_GROUP)
for dataset in datasets
}
self.varsizes = {}
for dataset in datasets:
self.varsizes.update(dataset.sizes)
self.design_variables = design_variables
self.objective_function = objective_function
self.ineq_constraints = ineq_constraints
self.eq_constraints = eq_constraints
self.maximize_objective = maximize_objective
self.scaled_disciplines = []
self.scaled_sizes = {}
self._build_scalable_disciplines(sizes, **parameters)
self.scenario = None

def __str__(self):
"""String representation of information about the scalable problem.

:return: scalable problem description
:rtype: str
"""
disciplines = ", ".join(self.disciplines)
design_variables = None
if self.design_variables is not None:
design_variables = ", ".join(self.design_variables)
ineq_constraints = None
if self.ineq_constraints is not None:
ineq_constraints = ", ".join(self.ineq_constraints)
eq_constraints = None
if self.eq_constraints is not None:
eq_constraints = ", ".join(self.eq_constraints)
sizes = [name + f" ({size})" for name, size in self.scaled_sizes.items()]
sizes = ", ".join(sizes)
optimize = "maximize" if self.maximize_objective else "minimize"
msg = MultiLineString()
msg.indent()
msg.add("Objective function: {} (to {})", self.objective_function, optimize)
return str(msg)

[docs]    def plot_n2_chart(self, save=True, show=False):
"""Plot a N2 chart.

:param bool save: save plot. Default: True.
:param bool show: show plot. Default: False.
"""
generate_n2_plot(self.scaled_disciplines, save=save, show=show)

[docs]    def plot_coupling_graph(self):
"""Plot a coupling graph."""
generate_coupling_graph(self.scaled_disciplines)

[docs]    def plot_1d_interpolations(
self, save=True, show=False, step=0.01, varnames=None, directory=".", png=False
):
"""Plot 1d interpolations.

:param bool save: save plot. Default: True.
:param bool show: show plot. Default: False.
:param bool step: Step to evaluate the 1d interpolation function
Default: 0.01.
:param list(str) varnames: names of the variable to plot;
if None, all variables are plotted. Default: None.
:param str directory: directory path. Default: '.'.
:param bool png: if True, the file format is PNG. Otherwise, use PDF.
Default: False.
"""
if not os.path.exists(directory):
os.mkdir(directory)
allfnames = []
for scalable_discipline in self.scaled_disciplines:
func = scalable_discipline.scalable_model.plot_1d_interpolations
fnames = func(save, show, step, varnames, directory, png)
allfnames = allfnames + [os.path.join(directory, fname) for fname in fnames]
return allfnames

[docs]    def plot_dependencies(self, save=True, show=False, directory="."):
"""Plot dependency matrices.

:param bool save: save plot (default: True)
:param bool show: show plot (default: False)
:param str directory: directory path (default: '.')
"""
fnames = []
for scalable_discipline in self.scaled_disciplines:
scalable_model = scalable_discipline.scalable_model
plot_dependency = scalable_model.plot_dependency
fname = plot_dependency(
)
fnames.append(fname)
return fnames

def _build_scalable_disciplines(self, sizes=None, **parameters):
"""Build scalable disciplines.

:param dict sizes: dictionary whose keys are variable names
and variables sizes.
:param parameters: options.
"""
copied_parameters = deepcopy(parameters)
for disc in self.disciplines:
varnames = self.inputs[disc] + self.outputs[disc]
sizes = sizes or {}
new_varsizes = {
varname: sizes.get(varname, self.varsizes[varname])
for varname in varnames
}
if "group_dep" in parameters:
copied_parameters["group_dep"] = parameters["group_dep"][disc]
if "fill_factor" in parameters:
copied_parameters["fill_factor"] = parameters["fill_factor"][disc]
self.scaled_disciplines.append(
ScalableDiscipline(
"ScalableDiagonalModel",
self.data[disc],
new_varsizes,
**copied_parameters,
)
)
self.scaled_sizes.update(deepcopy(new_varsizes))

[docs]    def create_scenario(
self,
formulation: str = "DisciplinaryOpt",
scenario_type: str = "MDO",
start_at_equilibrium: bool = False,
active_probability: float = 0.1,
feasibility_level: float = 0.5,
**options,
) -> Scenario:
"""Create a :class:.Scenario from the scalable disciplines.

Args:
formulation: The MDO formulation to use for the scenario.
scenario_type: The type of scenario, either MDO or DOE.
start_at_equilibrium: Whether to start at equilibrium using a preliminary
MDA.
active_probability: The probability to set the inequality constraints as
active at the initial step of the optimization.
feasibility_level: The offset of satisfaction for inequality
constraints.
**options: The formulation options.

Returns:
The :class:.Scenario from the scalable disciplines.
"""
equilibrium = {}
if start_at_equilibrium:
equilibrium = self.__get_equilibrium()

disciplines = self.scaled_disciplines
design_space = self._create_design_space(disciplines, formulation)
if formulation == "BiLevel":
self.scenario = self._create_bilevel_scenario(disciplines, **options)
else:
self.scenario = create_scenario(
disciplines,
formulation,
self.objective_function,
deepcopy(design_space),
scenario_type=scenario_type,
maximize_objective=self.maximize_objective,
**options,
)
return self.scenario

def _create_bilevel_scenario(self, disciplines, **sub_scenario_options):
"""Create a bilevel scenario from disciplines.

:param list(MDODiscipline) disciplines: list of MDODiscipline
"""
cpl_structure = MDOCouplingStructure(disciplines)
st_cpl_disciplines = cpl_structure.strongly_coupled_disciplines
wk_cpl_disciplines = cpl_structure.weakly_coupled_disciplines()
obj = self.objective_function
max_obj = self.maximize_objective

# Construction of the subsystem scenarios
sub_scenarios = []
sub_inputs = []
for discipline in st_cpl_disciplines:
cplt_disciplines = list(set(disciplines) - {discipline})
sub_disciplines = [discipline] + wk_cpl_disciplines
design_space = DesignSpace()
inputs = get_all_inputs([discipline])
all_inputs = get_all_inputs(cplt_disciplines)
inputs = list(set(inputs) - set(all_inputs))
sub_inputs += inputs
for name in inputs:
name, self.scaled_sizes[name], "float", 0.0, 1.0, 0.5
)
sub_scenarios.append(
create_scenario(
disciplines=sub_disciplines,
formulation="DisciplinaryOpt",
objective_name=obj,
design_space=design_space,
maximize_objective=max_obj,
)
)
sub_scenarios[-1].default_inputs = sub_scenario_options

# Construction of the system scenario
all_inputs = get_all_inputs(disciplines)
inputs = list(set(all_inputs) - set(sub_inputs))
design_space = DesignSpace()
for name in inputs:
name, self.scaled_sizes[name], "float", 0.0, 1.0, 0.5
)
sub_disciplines = sub_scenarios + wk_cpl_disciplines
system_scenario = create_scenario(
disciplines=sub_disciplines,
formulation="BiLevel",
objective_name=obj,
design_space=design_space,
maximize_objective=max_obj,
mda_name="MDAJacobi",
tolerance=1e-8,
)
return system_scenario

def _create_design_space(self, disciplines=None, formulation="DisciplinaryOpt"):
"""Create a design space into the unit hypercube.

:param list(MDODiscipline) disciplines: list of MDODiscipline
:param str formulation: MDO formulation (default: 'DisciplinaryOpt')
"""
design_space = create_design_space()
for name in self.design_variables:
size = self.scaled_sizes[name]
name,
size=size,
var_type="float",
l_b=zeros(size),
u_b=ones(size),
value=full(size, 0.5),
)

if formulation == "IDF":
coupling_structure = MDOCouplingStructure(disciplines)
all_couplings = set(coupling_structure.all_couplings)
for name in all_couplings:
size = self.scaled_sizes[name]
name,
size=size,
var_type="float",
l_b=zeros(size),
u_b=ones(size),
value=full(size, 0.5),
)

return design_space

def __get_equilibrium(self, mda_name="MDAJacobi", **options):
"""Get the equilibrium point from a MDA method.

:param str mda_name: MDA name (default: 'MDAJacobi')
:return: equilibrium point
:rtype: dict
"""
LOGGER.info("Build a preliminary MDA to start at equilibrium")
factory = MDAFactory()
mda = factory.create(mda_name, self.scaled_disciplines, **options)
if len(mda.strong_couplings) == 0:
mda = factory.create("MDAQuasiNewton", self.scaled_disciplines, **options)
return mda.execute()

self, active_probability, feasibility_level, equilibrium
):

:param float active_probability: probability to set the inequality
constraints as active at initial step of the optimization
:param float feasibility_level: offset of satisfaction for inequality
constraints
:param dict equilibrium: starting point at equilibrium
"""
if not hasattr(feasibility_level, "__len__"):
feasibility_level = {
constraint: feasibility_level for constraint in self.ineq_constraints
}
for constraint, alphai in feasibility_level.items():
if constraint in list(equilibrium.keys()):
sample = random.rand(len(equilibrium[constraint]))
val = equilibrium[constraint]
taui = where(
sample < active_probability, val, alphai + (1 - alphai) * val
)
else:
taui = 0.0

:param dict equilibrium: starting point at equilibrium
"""
for constraint in self.eq_constraints:
cstr_value = equilibrium.get(constraint, array([0.0]))[0]

[docs]    def exec_time(self, do_sum=True):
"""Get total execution time per discipline.

:param bool do_sum: sum over disciplines (default: True)
:return: execution time
:rtype: list(float) or float
"""
exec_time = [discipline.exec_time for discipline in self.scenario.disciplines]
if do_sum:
exec_time = sum(exec_time)
return exec_time

@property
def n_calls_top_level(self):
"""Get number of top level disciplinary calls per discipline.

:return: number of top level disciplinary calls per discipline
:rtype: list(int) or int
"""
disciplines = self.scenario.formulation.get_top_level_disc()
n_calls = {discipline.name: discipline.n_calls for discipline in disciplines}
return n_calls

@property
def n_calls_linearize_top_level(self):
"""Get number of top level disciplinary calls per discipline.

:return: number of top level disciplinary calls per discipline
:rtype: list(int) or int
"""
disciplines = self.scenario.formulation.get_top_level_disc()
n_calls = {
discipline.name: discipline.n_calls_linearize for discipline in disciplines
}
return n_calls

@property
def n_calls(self):
"""Get number of disciplinary calls per discipline.

:return: number of disciplinary calls per discipline
:rtype: list(int) or int
"""
n_calls = {
discipline.name: discipline.n_calls
for discipline in self.scenario.disciplines
}
return n_calls

@property
def n_calls_linearize(self):
"""Get number of disciplinary calls per discipline.

:return: number of disciplinary calls per discipline
:rtype: list(int) or int
"""
tmp = {
discipline.name: discipline.n_calls_linearize
for discipline in self.scenario.disciplines
}
return tmp

@property
def status(self):
"""Get the status of the scenario."""
return self.scenario.optimization_result.status

@property
def is_feasible(self):
"""Get the feasibility property of the scenario."""
return self.scenario.optimization_result.is_feasible