Source code for gemseo.mda.jacobi

# Copyright 2021 IRT Saint Exupéry,
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#        :author: Francois Gallard
"""A Jacobi algorithm for solving MDAs."""
from __future__ import annotations

import logging
from copy import deepcopy
from multiprocessing import cpu_count
from typing import Any
from typing import Mapping
from typing import Sequence

from numpy import atleast_2d
from numpy import concatenate
from numpy import dot
from numpy import ndarray
from numpy.linalg import lstsq

from gemseo.core.coupling_structure import MDOCouplingStructure
from gemseo.core.discipline import MDODiscipline
from gemseo.core.execution_sequence import ExecutionSequenceFactory
from gemseo.core.execution_sequence import LoopExecSequence
from gemseo.core.parallel_execution import DiscParallelExecution
from gemseo.mda.mda import MDA
from gemseo.utils.data_conversion import split_array_to_dict_of_arrays

LOGGER = logging.getLogger(__name__)
N_CPUS = cpu_count()

[docs]class MDAJacobi(MDA): """Perform a MDA analysis using a Jacobi algorithm. This algorithm is an iterative technique to solve the linear system: .. math:: Ax = b by decomposing the matrix :math:`A` into the sum of a diagonal matrix :math:`D` and the reminder :math:`R`. The new iterate is given by: .. math:: x_{k+1} = D^{-1}(b-Rx_k) """ SECANT_ACCELERATION = "secant" M2D_ACCELERATION = "m2d" _ATTR_TO_SERIALIZE = MDA._ATTR_TO_SERIALIZE + ( "parallel_execution", "sizes", "acceleration", "n_processes", ) def __init__( self, disciplines: Sequence[MDODiscipline], max_mda_iter: int = 10, name: str | None = None, n_processes: int = N_CPUS, acceleration: str = M2D_ACCELERATION, tolerance: float = 1e-6, linear_solver_tolerance: float = 1e-12, use_threading: bool = True, warm_start: bool = False, use_lu_fact: bool = False, grammar_type: str = MDODiscipline.JSON_GRAMMAR_TYPE, coupling_structure: MDOCouplingStructure | None = None, log_convergence: bool = False, linear_solver: str = "DEFAULT", linear_solver_options: Mapping[str, Any] = None, ) -> None: """ Args: n_processes: The maximum simultaneous number of threads, if ``use_threading`` is True, or processes otherwise, used to parallelize the execution. acceleration: The type of acceleration to be used to extrapolate the residuals and save CPU time by reusing the information from the last iterations, either ``None``, ``"m2d"``, or ``"secant"``, ``"m2d"`` is faster but uses the 2 last iterations. use_threading: Whether to use threads instead of processes to parallelize the execution; multiprocessing will copy (serialize) all the disciplines, while threading will share all the memory. This is important to note if you want to execute the same discipline multiple times, you shall use multiprocessing. """ self.n_processes = n_processes super().__init__( disciplines, max_mda_iter=max_mda_iter, name=name, tolerance=tolerance, linear_solver_tolerance=linear_solver_tolerance, warm_start=warm_start, use_lu_fact=use_lu_fact, grammar_type=grammar_type, coupling_structure=coupling_structure, log_convergence=log_convergence, linear_solver=linear_solver, linear_solver_options=linear_solver_options, ) self._set_default_inputs() self._compute_input_couplings() self.acceleration = acceleration self._dx_n = [] self._g_x_n = [] self.sizes = None self.parallel_execution = DiscParallelExecution( disciplines, n_processes, use_threading, exceptions_to_re_raise=(ValueError,), ) def _compute_input_couplings(self) -> None: """Compute all the coupling variables that are inputs of the MDA. This must be overloaded here because the Jacobi algorithm induces a delay between the couplings, the strong couplings may be fully resolved but the weak ones may need one more iteration. The base MDA class uses strong couplings only which is not satisfying here if all disciplines are not strongly coupled. """ if len(self.coupling_structure.strongly_coupled_disciplines) == len( self.disciplines ): return super()._compute_input_couplings() inputs = self.get_input_data_names() strong_cpl = self.coupling_structure.all_couplings self._input_couplings = set(strong_cpl) & set(inputs)
[docs] def execute_all_disciplines( self, input_local_data: Mapping[str, ndarray], ) -> None: """Execute all the disciplines. Args: input_local_data: The input data of the disciplines. """ self.reset_disciplines_statuses() if self.n_processes > 1: self.parallel_execution.execute( [deepcopy(input_local_data) for _ in range(len(self.disciplines))] ) else: for discipline in self.disciplines: discipline.execute(deepcopy(input_local_data)) for discipline in self.disciplines: self.local_data.update(discipline.get_output_data())
[docs] def get_expected_workflow(self) -> LoopExecSequence: sub_workflow = ExecutionSequenceFactory.serial(self.disciplines) if self.n_processes > 1: sub_workflow = ExecutionSequenceFactory.parallel(self.disciplines) return ExecutionSequenceFactory.loop(self, sub_workflow)
def _run(self) -> None: """Execute all disciplines in a loop until outputs converge. Stops when: .. math:: ||outputs-previous output||/||first outputs|| < self.tolerance """ if self.warm_start: self._couplings_warm_start() self._dx_n = [] self._g_x_n = [] # execute the disciplines current_couplings = self._current_input_couplings() self.execute_all_disciplines(deepcopy(self.local_data)) new_couplings = self._current_input_couplings() self._dx_n.append(new_couplings - current_couplings) self._g_x_n.append(new_couplings) self._compute_residual( current_couplings, new_couplings, log_normed_residual=self._log_convergence, ) current_couplings = new_couplings while not self._stop_criterion_is_reached: self.execute_all_disciplines(deepcopy(self.local_data)) new_couplings = self._current_input_couplings() self._compute_residual( current_couplings, new_couplings, log_normed_residual=self._log_convergence, ) x_np1 = self._compute_nex_iterate(current_couplings, new_couplings) current_couplings = x_np1 def _compute_nex_iterate( self, current_couplings: ndarray, new_couplings: ndarray, ) -> dict[str, ndarray]: """Compute the next iterate given the evaluation of the couplings. Eventually compute the convergence acceleration term according to the secant or m2d methods. See: Iterative residual-based vector methods to accelerate fixed point iterations, Isabelle Ramiere, Thomas Helfer Args: current_couplings: The input couplings of the disciplines given for evaluation at the last iterations. current_couplings: The computed couplings of the disciplines at the last iterations. Returns: The next iterate. """ self._dx_n.append(new_couplings - current_couplings) self._g_x_n.append(new_couplings) coupl_names = self._input_couplings if self.sizes is None: self.sizes = {key: self.local_data[key].size for key in coupl_names} dxn = self._dx_n[-1] dxn_1 = self._dx_n[-2] g_n = self._g_x_n[-1] gn_1 = self._g_x_n[-2] x_np1 = new_couplings if self.acceleration == self.SECANT_ACCELERATION: x_np1 = self._compute_secant_acc(dxn, dxn_1, g_n, gn_1) elif self.acceleration == self.M2D_ACCELERATION: if len(self._dx_n) >= 3: dxn_2 = self._dx_n[-3] dgn_2 = self._g_x_n[-3] else: dxn_2 = self._dx_n[-2] dgn_2 = self._g_x_n[-2] x_np1 = self._compute_m2d_acc(dxn, dxn_1, dxn_2, g_n, gn_1, dgn_2) if len(self._dx_n) > 3: # Forget too old stuff self._dx_n = self._dx_n[-3:] self._g_x_n = self._g_x_n[-3:] new_c = split_array_to_dict_of_arrays(x_np1, self.sizes, coupl_names) self.local_data.update(new_c) return x_np1 @staticmethod def _minimize_2md( dxn: ndarray, dxn_1: ndarray, dxn_2: ndarray, ) -> ndarray: """Compute the next iterate according to the m2d method. Minimize the sub-problem in the d-2 method. Use a least squares solver to find he minimizer of: .. math:: dxn - x[0] * (dxn - dxn_1) - x[1] * (dxn_1 - dxn_2) Args: dxn: The delta couplings at last iteration. dxn_1: The delta couplings at last iteration-1. dxn_2: The delta couplings at last iteration-2. Returns: The extrapolation coefficients of the 2-delta method. """ mat = concatenate((atleast_2d(dxn - dxn_1), atleast_2d(dxn_1 - dxn_2))) return lstsq(mat.T, dxn, rcond=None)[0] @staticmethod def _compute_secant_acc( dxn: ndarray, dxn_1: ndarray, cgn: ndarray, cgn_1: ndarray, ) -> ndarray: """Compute the next iterate according to the secant method. From the paper: "Iterative residual-based vector methods to accelerate fixed point iterations", Isabelle Ramiere, Thomas Helfer (secant acceleration: page 15 equation (41)). Args: dxn: The delta couplings at last iteration. dxn_1: The delta couplings at last iteration-1. cgn: The computed couplings at last iteration. cgn_1: The computed couplings at last iteration-1. Returns: The next iterate. """ d_dxn = dxn - dxn_1 acc = (cgn - cgn_1) * dot(d_dxn, dxn) / dot(d_dxn, d_dxn) return cgn - acc def _compute_m2d_acc( self, dxn: ndarray, dxn_1: ndarray, dxn_2: ndarray, g_n: ndarray, gn_1: ndarray, gn_2: ndarray, ) -> ndarray: """Compute the 2-delta acceleration. From the paper: "Iterative residual-based vector methods to accelerate fixed point iterations", Isabelle Ramiere, Thomas Helfer page 22 eq (50) Args: dxn: The delta couplings at last iteration. dxn_1: The delta couplings at last iteration-1. dxn_2: The delta couplings at last iteration-2. g_n: The computed couplings at last iteration. gn_1: The computed couplings at last iteration-1. gn_2: The computed couplings at last iteration-2. Returns: The next iterate. """ lamba_min = self._minimize_2md(dxn, dxn_1, dxn_2) acc = lamba_min[0] * (g_n - gn_1) + lamba_min[1] * (gn_1 - gn_2) return g_n - acc