# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# Copyright 2024 Capgemini
# Contributors:
# INITIAL AUTHORS - API and implementation and/or documentation
# :author: Francois Gallard, Charlie Vanaret
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""Base class for all Multi-disciplinary Design Analyses (MDA)."""
from __future__ import annotations
import logging
from abc import abstractmethod
from enum import auto
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from typing import Final
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
from strenum import LowercaseStrEnum
from gemseo.caches.simple_cache import SimpleCache
from gemseo.core._process_flow.base_process_flow import BaseProcessFlow
from gemseo.core._process_flow.execution_sequences.loop import LoopExecSequence
from gemseo.core.coupling_structure import CouplingStructure
from gemseo.core.coupling_structure import DependencyGraph
from gemseo.core.derivatives.jacobian_assembly import JacobianAssembly
from gemseo.core.discipline import Discipline
from gemseo.core.process_discipline import ProcessDiscipline
from gemseo.utils.constants import READ_ONLY_EMPTY_DICT
from gemseo.utils.matplotlib_figure import save_show_figure
from gemseo.utils.pydantic import create_model
if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Iterator
from collections.abc import Sequence
from pathlib import Path
from matplotlib.figure import Figure
from numpy.typing import NDArray
from gemseo.core.discipline.base_discipline import _CacheType
from gemseo.mda.base_mda_settings import BaseMDASettings
from gemseo.typing import StrKeyMapping
from gemseo.utils.matplotlib_figure import FigSizeType
LOGGER = logging.getLogger(__name__)
class _BaseMDAProcessFlow(BaseProcessFlow):
"""The process data and execution flow."""
def get_disciplines_in_data_flow(self) -> list[Discipline]:
return [self._node]
def get_execution_flow(self) -> LoopExecSequence: # noqa:D102
return LoopExecSequence(self._node, super().get_execution_flow())
def get_data_flow( # noqa:D102
self,
) -> list[tuple[Discipline, Discipline, list[str]]]:
disciplines = super().get_disciplines_in_data_flow()
graph = DependencyGraph([self._node, *disciplines])
res = self._get_disciplines_couplings(graph)
for discipline in self._node.disciplines:
res.extend(discipline.get_process_flow().get_data_flow())
return res
def _get_disciplines_couplings(
self, graph: DependencyGraph
) -> list[tuple[str, str, list[str]]]:
"""Return the couplings between disciplines.
Args:
graph: The dependency graph of the disciplines.
Returns:
The couplings between disciplines associated to the edges of the graph.
For each edge,
the first component corresponds to the *from* discipline,
the second component corresponds to the *to* discipline,
the third component corresponds to the list of coupling variables.
"""
return graph.get_disciplines_couplings()
[docs]
class BaseMDA(ProcessDiscipline):
"""A base class for multidisciplinary analysis (MDA)."""
NORMALIZED_RESIDUAL_NORM: Final[str] = "MDA residuals norm"
default_cache_type: ClassVar[_CacheType] = ProcessDiscipline.CacheType.SIMPLE
_linearize_on_last_state: ClassVar[bool] = True
"""Whether to update the local data from the input data before linearizing."""
# TODO: use generics to handle the type of the settings
# TODO: API: rename to settings_class.
Settings: ClassVar[type[BaseMDASettings]]
"""The Pydantic model for the settings."""
settings: BaseMDASettings
"""The settings of the MDA."""
assembly: JacobianAssembly
"""The Jacobian assembly."""
coupling_structure: CouplingStructure
"""The coupling structure to be used by the MDA."""
residual_history: list[float]
"""The history of the MDA residuals."""
reset_history_each_run: bool
"""Whether to reset the history of MDA residuals before each run."""
# TODO: API: remove
norm0: float | None
"""The reference residual, if any."""
# TODO: API: change to normalized_residual_norm
normed_residual: float
"""The normed residual."""
matrix_type: JacobianAssembly.JacobianType
"""The type of the matrix."""
lin_cache_tol_fact: float
"""The tolerance factor to cache the Jacobian."""
_current_iter: int
"""The iteration number."""
_scaling: ResidualScaling
"""The scaling method applied to MDA residuals for convergence monitoring."""
_scaling_data: float | list[tuple[slice, float]] | NDArray[float] | None
"""The data required to perform the scaling of the MDA residuals."""
_starting_indices: list[int]
"""The indices of the residual history where a new execution starts."""
[docs]
class ResidualScaling(LowercaseStrEnum):
"""The scaling method applied to MDA residuals for convergence monitoring."""
NO_SCALING = auto()
r"""The residual vector is not scaled. The MDA is considered converged when its
Euclidean norm satisfies,
.. math::
\|R_k\|_2 \leq \text{tol}.
"""
INITIAL_RESIDUAL_NORM = auto()
r"""The :math:k`-th residual vector is scaled by the Euclidean norm of the
initial residual (if not null, else it is not scaled). The MDA is considered
converged when its Euclidean norm satisfies,
.. math::
\frac{ \|R_k\|_2 }{ \|R_0\|_2 } \leq \text{tol}.
"""
INITIAL_SUBRESIDUAL_NORM = auto()
r"""The :math:k`-th residual vector is scaled discipline-wise. The sub-residual
associated wich each discipline is scaled by the Euclidean norm of the initial
sub-residual (if not null, else it is not scaled). The MDA is considered
converged when the Euclidean norm of each sub-residual satisfies,
.. math::
\max_i \left| \frac{\|r^i_k\|_2}{\|r^i_0\|_2} \right| \leq \text{tol}.
"""
N_COUPLING_VARIABLES = auto()
r"""The :math:k`-th residual vector is scaled using the number of coupling
variables. The MDA is considered converged when its Euclidean norm satisfies,
.. math::
\frac{ \|R_k\|_2 }{ \sqrt{n_\text{coupl.}} } \leq \text{tol}.
"""
INITIAL_RESIDUAL_COMPONENT = auto()
r"""The :math:k`-th residual is scaled component-wise. Each component is scaled
by the corresponding component of the initial residual (if not null, else it is
not scaled). The MDA is considered converged when each component satisfies,
.. math::
\max_i \left| \frac{(R_k)_i}{(R_0)_i} \right| \leq \text{tol}.
"""
SCALED_INITIAL_RESIDUAL_COMPONENT = auto()
r"""The :math:k`-th residual vector is scaled component-wise and by the number
coupling variables. If :math:`\div` denotes the component-wise division between
two vectors, then the MDA is considered converged when the residual vector
satisfies,
.. math::
\frac{1}{\sqrt{n_\text{coupl.}}} \| R_k \div R_0 \|_2 \leq \text{tol}.
"""
def __init__(
self,
disciplines: Sequence[Discipline],
settings_model: BaseMDASettings | None = None,
**settings: Any,
) -> None:
"""
Args:
disciplines: The disciplines from which to compute the MDA.
settings_model: The MDA settings as a Pydantic model.
If ``None``, use ``**settings``.
**settings: The MDA settings.
These arguments are ignored when ``settings_model`` is not ``None``.
""" # noqa:D205 D212 D415
self.settings = create_model(
self.Settings,
settings_model=settings_model,
**settings,
)
super().__init__(disciplines, name=self.settings.name)
self.coupling_structure = (
CouplingStructure(disciplines)
if self.settings.coupling_structure is None
else self.settings.coupling_structure
)
self.assembly = JacobianAssembly(self.coupling_structure)
self.residual_history = []
self._starting_indices = []
self.reset_history_each_run = False
self._scaling = self.ResidualScaling.INITIAL_RESIDUAL_NORM
self._scaling_data = None
# Don't erase coupling values before calling _compute_jacobian
self.norm0 = None
self._current_iter = 0
self.normed_residual = 1.0
self._input_couplings = []
self._non_numeric_array_variables = []
self.matrix_type = JacobianAssembly.JacobianType.MATRIX
# By default, don't use an approximate cache for linearization
self.lin_cache_tol_fact = 0.0
self._initialize_grammars()
self.io.output_grammar.update_from_names([self.NORMALIZED_RESIDUAL_NORM])
self._check_consistency()
self._check_coupling_types()
@property
def scaling(self) -> ResidualScaling:
"""The scaling method applied to MDA residuals for convergence monitoring."""
return self._scaling
@scaling.setter
def scaling(self, scaling: ResidualScaling) -> None:
# This setter will be overloaded in certain child classes.
self._scaling = scaling
def _initialize_grammars(self) -> None:
"""Define the grammars as the union of the disciplines' grammars."""
for discipline in self._disciplines:
self.io.input_grammar.update(discipline.io.input_grammar)
self.io.output_grammar.update(discipline.io.output_grammar)
def _check_consistency(self) -> None:
"""Check if there are not more than one equation per variable.
For instance if a strong coupling is not also a self coupling, or if outputs are
defined multiple times.
"""
strong_c_disc = self.coupling_structure.get_strongly_coupled_disciplines(
add_self_coupled=False
)
also_strong = [
disc
for disc in strong_c_disc
if self.coupling_structure.is_self_coupled(disc)
]
if also_strong:
for disc in also_strong:
in_outs = sorted(
set(disc.io.input_grammar) & set(disc.io.output_grammar)
)
LOGGER.warning(
"Self coupling variables in discipline %s are: %s.",
disc.name,
in_outs,
)
also_strong_n = sorted(disc.name for disc in also_strong)
LOGGER.warning(
"The following disciplines contain self-couplings and strong couplings:"
" %s. This is not a problem as long as their self-coupling variables "
"are not strongly coupled to another discipline.",
also_strong_n,
)
all_outs = {}
multiple_outs = []
for disc in self._disciplines:
for out in disc.io.output_grammar:
if out in all_outs:
multiple_outs.append(out)
all_outs[out] = disc
if multiple_outs:
LOGGER.warning(
"The following outputs are defined multiple times: %s.",
sorted(multiple_outs),
)
def _compute_input_coupling_names(self) -> None:
"""Compute the strong couplings that are inputs of the MDA."""
self._input_couplings = sorted(
set(self.coupling_structure.strong_couplings).intersection(
self.io.input_grammar
)
)
def _get_differentiated_io(
self,
compute_all_jacobians: bool = False,
) -> tuple[tuple[str, ...], tuple[str, ...]]:
if compute_all_jacobians:
strong_cpl = set(self.coupling_structure.strong_couplings)
inputs = set(self.io.input_grammar)
outputs = self.io.output_grammar
# Don't linearize wrt
inputs -= strong_cpl & inputs
# Don't do this with output couplings because
# their derivatives wrt design variables may be needed
# outputs = outputs - (strong_cpl & outputs)
else:
inputs, outputs = Discipline._get_differentiated_io(self)
if self.NORMALIZED_RESIDUAL_NORM in outputs:
outputs = list(outputs)
outputs.remove(self.NORMALIZED_RESIDUAL_NORM)
# Filter the non-numeric arrays
inputs = [
input_
for input_ in inputs
if self.io.input_grammar.data_converter.is_numeric(input_)
]
outputs = [
output
for output in outputs
if self.io.output_grammar.data_converter.is_numeric(output)
]
return inputs, outputs
def _check_coupling_types(self) -> None:
"""Check that the coupling variables are numeric.
If non-numeric array coupling variables are present, they will be filtered and
not taken into account in the MDA residual. Yet, this method warns the user that
some of the coupling variables are non-numeric arrays, in case of this event
follows an improper setup.
"""
not_arrays = set()
for coupling_name in self.coupling_structure.all_couplings:
for discipline in self._disciplines:
for grammar in (
discipline.io.input_grammar,
discipline.io.output_grammar,
):
if (
coupling_name in grammar
and not grammar.data_converter.is_numeric(coupling_name)
):
not_arrays.add(coupling_name)
break
self._non_numeric_array_variables = not_arrays
if not_arrays:
LOGGER.debug(
"The coupling variable(s) %s is/are not an array of numeric values.",
not_arrays,
)
def _compute_jacobian(
self,
input_names: Iterable[str] = (),
output_names: Iterable[str] = (),
) -> None:
# Do not re-execute disciplines if inputs error is beyond self tol
# Apply a safety factor on this (mda is a loop, inputs
# of first discipline
# have changed at convergence, therefore the cache is not exactly
# the same as the current value
exec_cache_tol = self.lin_cache_tol_fact * self.settings.tolerance
residual_variables = {}
for disc in self._disciplines:
residual_variables.update(disc.io.residual_to_state_variable)
couplings_adjoint = sorted(
set(self.coupling_structure.all_couplings).difference(
self._non_numeric_array_variables
)
- residual_variables.keys()
- set(residual_variables.values())
)
output_names = list(
set(output_names).difference(self._non_numeric_array_variables)
)
input_names = list(
set(input_names).difference(self._non_numeric_array_variables)
)
linear_solver_settings = self.settings.linear_solver_settings.model_dump()
linear_solver_settings["rtol"] = self.settings.linear_solver_tolerance
self.jac = self.assembly.total_derivatives(
self.io.data,
output_names,
input_names,
couplings_adjoint,
mode=self.linearization_mode,
matrix_type=self.matrix_type,
use_lu_fact=self.settings.use_lu_fact,
exec_cache_tol=exec_cache_tol,
execute=exec_cache_tol == 0.0,
residual_variables=residual_variables,
**linear_solver_settings,
)
def _prepare_io_for_check_jacobian(
self,
input_names: Iterable[str],
output_names: Iterable[str],
) -> tuple[Iterable[str], Iterable[str]]:
# Strong couplings are not linearized.
input_names, output_names = super()._prepare_io_for_check_jacobian(
input_names, output_names
)
input_names = list(input_names)
output_names = list(output_names)
for coupling in self.coupling_structure.all_couplings:
if coupling in output_names:
output_names.remove(coupling)
if coupling in input_names:
input_names.remove(coupling)
if self.NORMALIZED_RESIDUAL_NORM in output_names:
output_names.remove(self.NORMALIZED_RESIDUAL_NORM)
# Remove non-numeric arrays that cannot be differentiated.
# TODO: use self._non_numeric_array_variables,
# also factorize with _compute_jacobian?
input_names = [
input_
for input_ in input_names
if self.io.input_grammar.data_converter.is_numeric(input_)
]
output_names = [
output
for output in output_names
if self.io.output_grammar.data_converter.is_numeric(output)
]
return input_names, output_names
[docs]
def plot_residual_history(
self,
show: bool = False,
save: bool = True,
n_iterations: int | None = None,
logscale: tuple[float, float] = (),
filename: Path | str = "",
fig_size: FigSizeType = (),
) -> Figure:
"""Generate a plot of the residual history.
The first iteration of each new execution is marked with a red dot.
Args:
show: Whether to display the plot on screen.
save: Whether to save the plot as a PDF file.
n_iterations: The number of iterations on the *x* axis.
If ``None``, use all the iterations.
logscale: The limits of the *y* axis.
If empty, do not change the limits of the *y* axis.
filename: The name of the file to save the figure.
If empty, use "{mda.name}_residual_history.pdf".
fig_size: The width and height of the figure in inches, e.g. `(w, h)`.
Returns:
The figure, to be customized if not closed.
"""
fig = plt.figure()
fig_ax = fig.add_subplot(1, 1, 1)
history_length = len(self.residual_history)
n_iterations = n_iterations or history_length
if n_iterations > history_length:
msg = (
"Requested %s iterations but the residual history contains only %s, "
"plotting all the residual history."
)
LOGGER.info(msg, n_iterations, history_length)
n_iterations = history_length
# red dot for first iteration
colors = [
"red" if index in self._starting_indices else "black"
for index in range(n_iterations)
]
fig_ax.scatter(
list(range(n_iterations)),
self.residual_history[:n_iterations],
s=20,
color=colors,
zorder=2,
)
fig_ax.plot(
self.residual_history[:n_iterations], linestyle="-", c="k", zorder=1
)
fig_ax.axhline(y=self.settings.tolerance, c="blue", linewidth=0.5, zorder=0)
fig_ax.set_title(f"{self.name}: residual plot")
fig_ax.set_yscale("log")
fig_ax.set_xlabel(r"iterations", fontsize=14)
fig_ax.set_xlim([-1, n_iterations])
fig_ax.get_xaxis().set_major_locator(MaxNLocator(integer=True))
fig_ax.set_ylabel(r"$\log(||residuals||/||y_0||)$", fontsize=14)
if logscale:
fig_ax.set_ylim(logscale)
if save and not filename:
filename = f"{self.name}_residual_history.pdf"
save_show_figure(fig, show, filename, fig_size=fig_size)
return fig
def _prepare_warm_start(self) -> None:
"""Load the previous couplings values to local data."""
cached_outputs = self.cache.last_entry.outputs
if not cached_outputs:
return
if isinstance(self.cache, SimpleCache):
self.io.data.update(dict(self.__get_cached_outputs(cached_outputs)))
else:
# Non simple caches require NumPy arrays.
to_value = self.io.input_grammar.data_converter.convert_array_to_value
for input_name, input_value in self.__get_cached_outputs(cached_outputs):
self.io.update_output_data({
input_name: to_value(input_name, input_value)
})
def __get_cached_outputs(self, cached_outputs) -> Iterator[Any]:
"""Return an iterator over the input couplings names and value in cache.
Args:
cached_outputs: The cached outputs.
Returns:
The names and value of the input couplings in cache.
"""
for input_name in self._input_couplings:
input_value = cached_outputs.get(input_name)
if input_value is not None:
yield input_name, input_value
def _execute(self) -> None: # noqa: D103
self._current_iter = 0
if self._pre_solve():
self._solve()
def _pre_solve(self) -> bool:
"""Prepare to solve the MDA.
Returns:
Whether the MDA can be solved.
"""
if self.settings.warm_start:
self._prepare_warm_start()
return True
@abstractmethod
def _solve(self) -> None:
"""Solve the MDA."""
def _execute_disciplines_and_update_local_data(
self, input_data: StrKeyMapping = READ_ONLY_EMPTY_DICT
) -> None:
"""Execute the disciplines and update the local data with their output data.
Args:
input_data: The input data to execute the disciplines.
If empty, use the local data.
"""