# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - API and implementation and/or documentation
# :author: Francois Gallard
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""A Gauss-Seidel algorithm for solving MDAs."""
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from gemseo.mda.base_mda import BaseProcessFlow
from gemseo.mda.base_mda import _BaseMDAProcessFlow
from gemseo.mda.base_mda_solver import BaseMDASolver
from gemseo.mda.gauss_seidel_settings import MDAGaussSeidel_Settings
from gemseo.utils.constants import READ_ONLY_EMPTY_DICT
if TYPE_CHECKING:
from collections.abc import Sequence
from gemseo.core.coupling_structure import DependencyGraph
from gemseo.core.discipline import Discipline
from gemseo.typing import StrKeyMapping
class _ProcessFlow(_BaseMDAProcessFlow):
"""The process data and execution flow."""
def _get_disciplines_couplings(
self, graph: DependencyGraph
) -> list[tuple[str, str, list[str]]]:
couplings_results = []
disc_already_seen = set()
disciplines = BaseProcessFlow.get_disciplines_in_data_flow(self)
for disc in disciplines:
couplings_with_mda_to_be_removed = set()
predecessors = (
set(graph.graph.predecessors(disc)) - {self._node} & disc_already_seen
)
for predecessor in sorted(predecessors, key=lambda p: p.name):
current_couplings = graph.graph.get_edge_data(predecessor, disc)["io"]
couplings_results.append((predecessor, disc, sorted(current_couplings)))
couplings_with_mda_to_be_removed.update(current_couplings)
in_data = graph.graph.get_edge_data(self._node, disc)
if in_data:
couplings_with_mda = in_data["io"] - couplings_with_mda_to_be_removed
if couplings_with_mda:
couplings_results.append((
self._node,
disc,
sorted(couplings_with_mda),
))
out_data = graph.graph.get_edge_data(disc, self._node)
if out_data:
couplings_results.append((
disc,
self._node,
sorted(out_data["io"]),
))
disc_already_seen.add(disc)
return couplings_results
[docs]
class MDAGaussSeidel(BaseMDASolver):
r"""Perform an MDA using the Gauss-Seidel algorithm.
This algorithm is a fixed point iteration method to solve systems of non-linear
equations of the form,
.. math::
\left\{
\begin{matrix}
F_1(x_1, x_2, \dots, x_n) = 0 \\
F_2(x_1, x_2, \dots, x_n) = 0 \\
\vdots \\
F_n(x_1, x_2, \dots, x_n) = 0
\end{matrix}
\right.
Beginning with :math:`x_1^{(0)}, \dots, x_n^{(0)}`, the iterates are obtained by
performing **sequentially** the following :math:`n` steps.
**Step 1:** knowing :math:`x_2^{(i)}, \dots, x_n^{(i)}`, compute :math:`x_1^{(i+1)}`
by solving,
.. math::
r_1\left( x_1^{(i+1)} \right) =
F_1(x_1^{(i+1)}, x_2^{(i)}, \dots, x_n^{(i)}) = 0.
**Step** :math:`k \leq n`: knowing :math:`x_1^{(i+1)}, \dots, x_{k-1}^{(i+1)}` on
one hand, and :math:`x_{k+1}^{(i)}, \dots, x_n^{(i)}` on the other hand, compute
:math:`x_1^{(i+1)}` by solving,
.. math::
r_k\left( x_k^{(i+1)} \right) = F_1(x_1^{(i+1)}, \dots, x_{k-1}^{(i+1)},
x_k^{(i+1)}, x_{k+1}^{(i)}, \dots, x_n^{(i)}) = 0.
These :math:`n` steps account for one iteration of the Gauss-Seidel method.
"""
Settings: ClassVar[type[MDAGaussSeidel_Settings]] = MDAGaussSeidel_Settings
"""The pydantic model for the settings."""
settings: MDAGaussSeidel_Settings
"""The settings of the MDA"""
_process_flow_class: ClassVar[type[BaseProcessFlow]] = _ProcessFlow
def __init__( # noqa: D107
self,
disciplines: Sequence[Discipline],
settings_model: MDAGaussSeidel_Settings | None = None,
**settings: Any,
) -> None:
super().__init__(disciplines, settings_model=settings_model, **settings)
self._compute_input_coupling_names()
self._set_resolved_variables(self.coupling_structure.strong_couplings)
if self.settings.max_mda_iter == 0:
del self.output_grammar[self.NORMALIZED_RESIDUAL_NORM]
def _initialize_grammars(self) -> None:
"""Define the input and output grammars from the disciplines' ones."""
for discipline in self.disciplines:
self.input_grammar.update(
discipline.input_grammar, excluded_names=self.output_grammar.keys()
)
self.output_grammar.update(discipline.output_grammar)
def _execute_disciplines_and_update_local_data(
self, input_data: StrKeyMapping = READ_ONLY_EMPTY_DICT
) -> None:
input_data = input_data or self.io.data
for discipline in self.disciplines:
discipline.execute(input_data)
self.io.data.update(discipline.io.get_output_data())
def _execute(self) -> None:
super()._execute()
self._execute_disciplines_and_update_local_data()
if self.settings.max_mda_iter == 0:
return
while True:
local_data_before_execution = self.io.data.copy()
self._execute_disciplines_and_update_local_data()
self._compute_residuals(local_data_before_execution)
if self._stop_criterion_is_reached:
break
updated_couplings = self._sequence_transformer.compute_transformed_iterate(
self.get_current_resolved_variables_vector(),
self.get_current_resolved_residual_vector(),
)
self._update_local_data_from_array(updated_couplings)