Source code for gemseo.mda.jacobi

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#        :author: Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""A Jacobi algorithm for solving MDAs."""
from __future__ import division, unicode_literals

import logging
from copy import deepcopy
from multiprocessing import cpu_count
from typing import Any, Dict, Mapping, Optional, Sequence

from numpy import atleast_2d, concatenate, dot, ndarray
from numpy.linalg import lstsq

from gemseo.core.coupling_structure import MDOCouplingStructure
from gemseo.core.discipline import MDODiscipline
from gemseo.core.execution_sequence import ExecutionSequenceFactory, LoopExecSequence
from gemseo.core.parallel_execution import DiscParallelExecution
from gemseo.mda.mda import MDA
from gemseo.utils.data_conversion import DataConversion

LOGGER = logging.getLogger(__name__)
N_CPUS = cpu_count()


[docs]class MDAJacobi(MDA): """Perform a MDA analysis using a Jacobi algorithm. This algorithm is an iterative technique to solve the linear system: .. math:: Ax = b by decomposing the matrix :math:`A` into the sum of a diagonal matrix :math:`D` and the reminder :math:`R`. The new iterate is given by: .. math:: x_{k+1} = D^{-1}(b-Rx_k) """ SECANT_ACCELERATION = "secant" M2D_ACCELERATION = "m2d" _ATTR_TO_SERIALIZE = MDA._ATTR_TO_SERIALIZE + ( "parallel_execution", "sizes", "acceleration", "n_processes", ) def __init__( self, disciplines, # type: Sequence[MDODiscipline] max_mda_iter=10, # type: int name=None, # type: Optional[str] n_processes=N_CPUS, # type: int acceleration=M2D_ACCELERATION, # type: str tolerance=1e-6, # type: float linear_solver_tolerance=1e-12, # type: float use_threading=True, # type: bool warm_start=False, # type: bool use_lu_fact=False, # type: bool grammar_type=MDODiscipline.JSON_GRAMMAR_TYPE, # type: str coupling_structure=None, # type: Optional[MDOCouplingStructure] log_convergence=False, # type: bool linear_solver="DEFAULT", # type: str linear_solver_options=None, # type: Mapping[str,Any] ): # type: (...) -> None """ Args: n_processes: The maximum number of processors on which to run. acceleration: The type of acceleration to be used to extrapolate the residuals and save CPU time by reusing the information from the last iterations, either ``None``, ``"m2d"``, or ``"secant"``, ``"m2d"`` is faster but uses the 2 last iterations. use_threading: Whether to use threads instead of processes to parallelize the execution; multiprocessing will copy (serialize) all the disciplines, while threading will share all the memory. This is important to note if you want to execute the same discipline multiple times, you shall use multiprocessing. """ self.n_processes = n_processes super(MDAJacobi, self).__init__( disciplines, max_mda_iter=max_mda_iter, name=name, tolerance=tolerance, linear_solver_tolerance=linear_solver_tolerance, warm_start=warm_start, use_lu_fact=use_lu_fact, grammar_type=grammar_type, coupling_structure=coupling_structure, log_convergence=log_convergence, linear_solver=linear_solver, linear_solver_options=linear_solver_options, ) self._set_default_inputs() self._compute_input_couplings() self.acceleration = acceleration self._dx_n = [] self._g_x_n = [] self.sizes = None self.parallel_execution = DiscParallelExecution( disciplines, n_processes, use_threading ) def _compute_input_couplings(self): # type: (...) -> None """Compute all the coupling variables that are inputs of the MDA. This must be overloaded here because the Jacobi algorithm induces a delay between the couplings, the strong couplings may be fully resolved but the weak ones may need one more iteration. The base MDA class uses strong couplings only which is not satisfying here if all disciplines are not strongly coupled. """ if len(self.coupling_structure.strongly_coupled_disciplines()) == len( self.disciplines ): return super(MDAJacobi, self)._compute_input_couplings() inputs = self.get_input_data_names() strong_cpl = self.coupling_structure.get_all_couplings() self._input_couplings = set(strong_cpl) & set(inputs)
[docs] def execute_all_disciplines( self, input_local_data, # type: Mapping[str,ndarray] ): # type: (...) -> None """Execute all the disciplines. Args: input_local_data: The input data of the disciplines. """ self.reset_disciplines_statuses() if self.n_processes > 1: n_disc = len(self.disciplines) inputs_copy_list = [deepcopy(input_local_data) for _ in range(n_disc)] self.parallel_execution.execute(inputs_copy_list) else: for disc in self.disciplines: disc.execute(deepcopy(input_local_data)) outputs = [discipline.get_output_data() for discipline in self.disciplines] for data in outputs: self.local_data.update(data)
[docs] def get_expected_workflow(self): # type: (...) ->LoopExecSequence sub_workflow = ExecutionSequenceFactory.serial(self.disciplines) if self.n_processes > 1: sub_workflow = ExecutionSequenceFactory.parallel(self.disciplines) return ExecutionSequenceFactory.loop(self, sub_workflow)
def _run(self): # type: (...) -> None """Execute all disciplines in a loop until outputs converge. Stops when: .. math:: ||outputs-previous output||/||first outputs|| < self.tolerance """ if self.warm_start: self._couplings_warm_start() self._dx_n = [] self._g_x_n = [] # execute the disciplines current_couplings = self._current_input_couplings() self.execute_all_disciplines(deepcopy(self.local_data)) new_couplings = self._current_input_couplings() self._dx_n.append(new_couplings - current_couplings) self._g_x_n.append(new_couplings) # store initial residual current_iter = 1 self._compute_residual( current_couplings, new_couplings, current_iter, first=True, log_normed_residual=self._log_convergence, ) current_couplings = new_couplings while not self._termination(current_iter): self.execute_all_disciplines(deepcopy(self.local_data)) new_couplings = self._current_input_couplings() # store current residual current_iter += 1 self._compute_residual( current_couplings, new_couplings, current_iter, log_normed_residual=self._log_convergence, ) x_np1 = self._compute_nex_iterate(current_couplings, new_couplings) current_couplings = x_np1 def _compute_nex_iterate( self, current_couplings, # type: ndarray new_couplings, # type: ndarray ): # type: (...) -> Dict[str,ndarray] """Compute the next iterate given the evaluation of the couplings. Eventually compute the convergence acceleration term according to the secant or m2d methods. See: Iterative residual-based vector methods to accelerate fixed point iterations, Isabelle Ramiere, Thomas Helfer Args: current_couplings: The input couplings of the disciplines given for evaluation at the last iterations. current_couplings: The computed couplings of the disciplines at the last iterations. Returns: The next iterate. """ self._dx_n.append(new_couplings - current_couplings) self._g_x_n.append(new_couplings) coupl_names = self._input_couplings if self.sizes is None: self.sizes = {key: self.local_data[key].size for key in coupl_names} dxn = self._dx_n[-1] dxn_1 = self._dx_n[-2] g_n = self._g_x_n[-1] gn_1 = self._g_x_n[-2] x_np1 = new_couplings if self.acceleration == self.SECANT_ACCELERATION: x_np1 = self._compute_secant_acc(dxn, dxn_1, g_n, gn_1) elif self.acceleration == self.M2D_ACCELERATION: if len(self._dx_n) >= 3: dxn_2 = self._dx_n[-3] dgn_2 = self._g_x_n[-3] else: dxn_2 = self._dx_n[-2] dgn_2 = self._g_x_n[-2] x_np1 = self._compute_m2d_acc(dxn, dxn_1, dxn_2, g_n, gn_1, dgn_2) if len(self._dx_n) > 3: # Forget too old stuff self._dx_n = self._dx_n[-3:] self._g_x_n = self._g_x_n[-3:] new_c = DataConversion.array_to_dict(x_np1, coupl_names, self.sizes) self.local_data.update(new_c) return x_np1 @staticmethod def _minimize_2md( dxn, # type:ndarray dxn_1, # type:ndarray dxn_2, # type:ndarray ): # type: (...) -> ndarray """Compute the next iterate according to the m2d method. Minimize the sub-problem in the d-2 method. Use a least squares solver to find he minimizer of: .. math:: dxn - x[0] * (dxn - dxn_1) - x[1] * (dxn_1 - dxn_2) Args: dxn: The delta couplings at last iteration. dxn_1: The delta couplings at last iteration-1. dxn_2: The delta couplings at last iteration-2. Returns: The extrapolation coefficients of the 2-delta method. """ mat = concatenate((atleast_2d(dxn - dxn_1), atleast_2d(dxn_1 - dxn_2))) return lstsq(mat.T, dxn, rcond=None)[0] @staticmethod def _compute_secant_acc( dxn, # type: ndarray dxn_1, # type: ndarray cgn, # type: ndarray cgn_1, # type: ndarray ): # type: (...) -> ndarray """Compute the next iterate according to the secant method. From the paper: "Iterative residual-based vector methods to accelerate fixed point iterations", Isabelle Ramiere, Thomas Helfer (secant acceleration: page 15 equation (41)). Args: dxn: The delta couplings at last iteration. dxn_1: The delta couplings at last iteration-1. cgn: The computed couplings at last iteration. cgn_1: The computed couplings at last iteration-1. Returns: The next iterate. """ d_dxn = dxn - dxn_1 acc = (cgn - cgn_1) * dot(d_dxn, dxn) / dot(d_dxn, d_dxn) return cgn - acc def _compute_m2d_acc( self, dxn, # type: ndarray dxn_1, # type: ndarray dxn_2, # type: ndarray g_n, # type: ndarray gn_1, # type: ndarray gn_2, # type: ndarray ): # type: (...) -> ndarray """Compute the 2-delta acceleration. From the paper: "Iterative residual-based vector methods to accelerate fixed point iterations", Isabelle Ramiere, Thomas Helfer page 22 eq (50) Args: dxn: The delta couplings at last iteration. dxn_1: The delta couplings at last iteration-1. dxn_2: The delta couplings at last iteration-2. g_n: The computed couplings at last iteration. gn_1: The computed couplings at last iteration-1. gn_2: The computed couplings at last iteration-2. Returns: The next iterate. """ lamba_min = self._minimize_2md(dxn, dxn_1, dxn_2) acc = lamba_min[0] * (g_n - gn_1) + lamba_min[1] * (gn_1 - gn_2) return g_n - acc