Source code for gemseo.mda.newton

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#        :author: Charlie Vanaret, Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""A set of Newton algorithm variants for solving MDAs.

Root finding methods include:

- `Newton-Raphson <https://en.wikipedia.org/wiki/Newton%27s_method>`__
- `quasi-Newton methods <https://en.wikipedia.org/wiki/Quasi-Newton_method>`__

Each of these methods is implemented by a class in this module.
Both inherit from a common abstract cache.
"""
from __future__ import annotations

import logging
from copy import deepcopy
from typing import Any
from typing import Mapping
from typing import Sequence

from numpy import ndarray
from numpy.linalg import norm
from scipy.optimize import root

from gemseo.core.coupling_structure import MDOCouplingStructure
from gemseo.core.discipline import MDODiscipline
from gemseo.mda.mda import MDA
from gemseo.utils.data_conversion import concatenate_dict_of_arrays_to_array
from gemseo.utils.data_conversion import update_dict_of_arrays_from_array

# from gemseo.core.parallel_execution import DisciplinesParallelExecution
LOGGER = logging.getLogger(__name__)


[docs]class MDARoot(MDA): """Abstract class implementing MDAs based on (Quasi-)Newton methods.""" _ATTR_TO_SERIALIZE = MDA._ATTR_TO_SERIALIZE + ("strong_couplings",) def __init__( self, disciplines: Sequence[MDODiscipline], max_mda_iter: int = 10, name: str | None = None, grammar_type: str = MDODiscipline.JSON_GRAMMAR_TYPE, tolerance: float = 1e-6, linear_solver_tolerance: float = 1e-12, warm_start: bool = False, use_lu_fact: bool = False, coupling_structure: MDOCouplingStructure | None = None, log_convergence: bool = False, linear_solver: str = "DEFAULT", linear_solver_options: Mapping[str, Any] = None, ) -> None: self.tolerance = 1e-6 self.max_mda_iter = 10 super().__init__( disciplines, max_mda_iter=max_mda_iter, name=name, grammar_type=grammar_type, tolerance=tolerance, linear_solver_tolerance=linear_solver_tolerance, warm_start=warm_start, use_lu_fact=use_lu_fact, coupling_structure=coupling_structure, log_convergence=log_convergence, linear_solver=linear_solver, linear_solver_options=linear_solver_options, ) self._set_default_inputs() self._compute_input_couplings() # parallel execution # ================================================================== # if 1 < n_processes: # self.parallel_execution = DisciplinesParallelExecution( # self.disciplines, n_processes) # else: # self.parallel_execution = None # ================================================================== def _initialize_grammars(self) -> None: for disciplines in self.disciplines: self.input_grammar.update(disciplines.input_grammar) self.output_grammar.update(disciplines.output_grammar)
[docs] def execute_all_disciplines( self, input_local_data: Mapping[str, ndarray], ) -> None: """Execute all self.disciplines. Args: input_local_data: The input data of the disciplines. """ # Set status of sub disciplines # if self.parallel_execution is not None: # self.disciplines = self.parallel_execution # .execute(input_local_data) # else: for discipline in self.disciplines: discipline.reset_statuses_for_run() discipline.execute(deepcopy(input_local_data)) outputs = [discipline.get_output_data() for discipline in self.disciplines] for data in outputs: self.local_data.update(data)
[docs]class MDANewtonRaphson(MDARoot): r"""Newton solver for MDA. The `Newton-Raphson method <https://en.wikipedia.org/wiki/Newton%27s_method>`__ is parameterized by a relaxation factor :math:`\alpha \in (0, 1]` to limit the length of the steps taken along the Newton direction. The new iterate is given by: .. math:: x_{k+1} = x_k - \alpha f'(x_k)^{-1} f(x_k) """ _ATTR_TO_SERIALIZE = MDARoot._ATTR_TO_SERIALIZE + ( "assembly", "relax_factor", "linear_solver", "linear_solver_options", "matrix_type", ) def __init__( self, disciplines: Sequence[MDODiscipline], max_mda_iter: int = 10, relax_factor: float = 0.99, name: str | None = None, grammar_type: str = MDODiscipline.JSON_GRAMMAR_TYPE, linear_solver: str = "DEFAULT", tolerance: float = 1e-6, linear_solver_tolerance: float = 1e-12, warm_start: bool = False, use_lu_fact: bool = False, coupling_structure: MDOCouplingStructure | None = None, log_convergence: bool = False, linear_solver_options: Mapping[str, Any] = None, ): """ Args: relax_factor: The relaxation factor in the Newton step. """ super().__init__( disciplines, max_mda_iter=max_mda_iter, name=name, grammar_type=grammar_type, tolerance=tolerance, linear_solver_tolerance=linear_solver_tolerance, warm_start=warm_start, use_lu_fact=use_lu_fact, linear_solver=linear_solver, linear_solver_options=linear_solver_options, coupling_structure=coupling_structure, log_convergence=log_convergence, ) self.relax_factor = self.__check_relax_factor(relax_factor) self.linear_solver = linear_solver @staticmethod def __check_relax_factor( relax_factor: float, ) -> float: """Check that the relaxation factor in the Newton step is in (0, 1]. Args: relax_factor: The relaxation factor. """ if relax_factor <= 0.0 or relax_factor > 1: raise ValueError( "Newton relaxation factor should belong to (0, 1] " "(current value: {}).".format(relax_factor) ) return relax_factor def _newton_step(self) -> None: """Execute the full Newton step. Compute the increment :math:`-[dR/dW]^{-1}.R` and run the disciplines. """ newton_dict = self.assembly.compute_newton_step( self.local_data, self.strong_couplings, self.relax_factor, self.linear_solver, matrix_type=self.matrix_type, **self.linear_solver_options, ) # update current solution with Newton step exec_data = deepcopy(self.local_data) for c_var, c_step in newton_dict.items(): exec_data[c_var] += c_step self.reset_disciplines_statuses() self.execute_all_disciplines(exec_data) def _run(self) -> None: if self.warm_start: self._couplings_warm_start() # execute the disciplines current_couplings = self._current_input_couplings() self.reset_disciplines_statuses() self.execute_all_disciplines(self.local_data) new_couplings = self._current_input_couplings() self._compute_residual( current_couplings, new_couplings, log_normed_residual=self.log_convergence, ) current_couplings = new_couplings while not self._stop_criterion_is_reached: self._newton_step() new_couplings = self._current_input_couplings() self._compute_residual( current_couplings, new_couplings, log_normed_residual=self.log_convergence, ) current_couplings = new_couplings
[docs]class MDAQuasiNewton(MDARoot): """Quasi-Newton solver for MDA. `Quasi-Newton methods <https://en.wikipedia.org/wiki/Quasi-Newton_method>`__ include numerous variants ( `Broyden <https://en.wikipedia.org/wiki/Broyden%27s_method>`__, `Levenberg-Marquardt <https://en.wikipedia.org/wiki/Levenberg%E2%80%93Marquardt_algorithm>`__, ...). The name of the variant should be provided as a parameter :code:`method` of the class. The new iterate is given by: .. math:: x_{k+1} = x_k - \\rho_k B_k f(x_k) where :math:`\\rho_k` is a coefficient chosen in order to minimize the convergence and :math:`B_k` is an approximation of the inverse of the Jacobian :math:`Df(x_k)^{-1}`. """ # quasi-Newton methods HYBRID = "hybr" LEVENBERG_MARQUARDT = "lm" BROYDEN1 = "broyden1" BROYDEN2 = "broyden2" ANDERSON = "anderson" LINEAR_MIXING = "linearmixing" DIAG_BROYDEN = "diagbroyden" EXCITING_MIXING = "excitingmixing" KRYLOV = "krylov" DF_SANE = "df-sane" QUASI_NEWTON_METHODS = [ HYBRID, LEVENBERG_MARQUARDT, BROYDEN1, BROYDEN2, ANDERSON, LINEAR_MIXING, DIAG_BROYDEN, EXCITING_MIXING, KRYLOV, DF_SANE, ] _ATTR_TO_SERIALIZE = MDARoot._ATTR_TO_SERIALIZE + ( "method", "use_gradient", "assembly", "normed_residual", ) def __init__( self, disciplines: Sequence[MDODiscipline], max_mda_iter: int = 10, name: str | None = None, grammar_type: str = MDODiscipline.JSON_GRAMMAR_TYPE, method: str = HYBRID, use_gradient: bool = False, tolerance: float = 1e-6, linear_solver_tolerance: float = 1e-12, warm_start: bool = False, use_lu_fact: bool = False, coupling_structure: MDOCouplingStructure | None = None, linear_solver: str = "DEFAULT", linear_solver_options: Mapping[str, Any] = None, ): """ Args: method: The name of the method in scipy root finding, among :attr:`.QUASI_NEWTON_METHODS`. use_gradient: Whether to use the analytic gradient of the discipline. Raises: ValueError: If the method is not a valid quasi-Newton method. """ super().__init__( disciplines, max_mda_iter=max_mda_iter, name=name, grammar_type=grammar_type, tolerance=tolerance, linear_solver_tolerance=linear_solver_tolerance, warm_start=warm_start, use_lu_fact=use_lu_fact, linear_solver=linear_solver, linear_solver_options=linear_solver_options, coupling_structure=coupling_structure, ) if method not in self.QUASI_NEWTON_METHODS: msg = f"Method '{method}' is not a valid quasi-Newton method." raise ValueError(msg) self.method = method self.use_gradient = use_gradient self.local_residual_history = [] self.last_outputs = None # used for computing the residual history def _solver_options(self) -> dict[str, float | int]: """Determine options for the solver, based on the resolution method.""" options = {} if self.method in [ self.BROYDEN1, self.BROYDEN2, self.ANDERSON, self.LINEAR_MIXING, self.DIAG_BROYDEN, self.EXCITING_MIXING, self.KRYLOV, ]: options["ftol"] = self.tolerance options["maxiter"] = self.max_mda_iter elif self.method in [self.LEVENBERG_MARQUARDT]: options["xtol"] = self.tolerance options["maxiter"] = self.max_mda_iter elif self.method in [self.DF_SANE]: options["fatol"] = self.tolerance options["maxfev"] = self.max_mda_iter elif self.method in [self.HYBRID]: options["xtol"] = self.tolerance options["maxfev"] = self.max_mda_iter return options def _methods_with_callback(self) -> list[str]: """Determine whether resolution method accepts a callback function.""" return [self.BROYDEN1, self.BROYDEN2] def _run(self) -> dict[str, ndarray]: if self.warm_start: self._couplings_warm_start() self.reset_disciplines_statuses() self.execute_all_disciplines(deepcopy(self.local_data)) couplings = self.strong_couplings if not couplings: msg = ( "MDAQuasiNewton found no strong couplings. Executed all" "disciplines once." ) LOGGER.warning(msg) return self.local_data options = self._solver_options() self.current_iter = 0 def fun( x_vect: ndarray, ) -> ndarray: """Evaluate all residuals, possibly in parallel. Args: x_vect: The value of the design variables. """ self.current_iter += 1 # transform input vector into a dict input_values = update_dict_of_arrays_from_array( self.local_data, couplings, x_vect ) # compute all residuals self.reset_disciplines_statuses() self.execute_all_disciplines(input_values) residuals = self.assembly.residuals(input_values, couplings).real # if residuals.size == 1: # Weak couplings already treated # return residuals[0] return residuals jac = None if self.use_gradient: for discipline in self.disciplines: # Tells the discipline what to linearize outs = discipline.get_output_data_names() to_linearize = set(outs) & set(couplings) discipline.add_differentiated_outputs(list(to_linearize)) inpts = discipline.get_input_data_names() to_linearize = set(inpts) & set(couplings) discipline.add_differentiated_inputs(list(to_linearize)) # linearize the residuals def jacobian( x_vect: ndarray, ) -> ndarray: """Linearize all residuals. Args: x_vect: The value of the design variables. """ # transform input vector into a dict input_values = update_dict_of_arrays_from_array( self.local_data, couplings, x_vect ) # linearize all residuals self.reset_disciplines_statuses() for discipline in self.disciplines: discipline.linearize(input_values) # assemble the system n_couplings = 0 for coupling in couplings: discipline = self.coupling_structure.find_discipline(coupling) size = list(discipline.jac[coupling].values())[0].shape[0] n_couplings += size self.assembly.compute_sizes(couplings, couplings, couplings) dresiduals = self.assembly.dres_dvar( couplings, couplings, n_couplings, n_couplings ).todense() return dresiduals jac = jacobian # initial solution y_0 = concatenate_dict_of_arrays_to_array(self.local_data, couplings).real # callback function to retrieve the residual at iteration k norm_0 = norm(y_0.real) if self.reset_history_each_run: self.residual_history = [] # callback function to store residuals self.last_outputs = y_0 if self.method in self._methods_with_callback(): def callback( y_k: ndarray, _, ) -> None: """Store the current residual in the history. Args: y_k: The coupling variables. _: ignored """ self.residual_history.append( norm((y_k - self.last_outputs).real) / norm_0 ) self.last_outputs = y_k else: callback = None # solve the system y_opt = root( fun, x0=y_0, method=self.method, jac=jac, callback=callback, options=options ) self._warn_convergence_criteria() # transform optimal vector into a dict self.local_data = update_dict_of_arrays_from_array( self.local_data, couplings, y_opt.x ) return self.local_data