Source code for gemseo.mda.quasi_newton

# Copyright 2021 IRT Saint Exupéry,
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#        :author: Charlie Vanaret, Francois Gallard
"""A set of quasi Newton algorithm variants for solving MDAs.

`quasi-Newton methods <>`__
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from numpy import array
from scipy.optimize import root

from gemseo.core.coupling_structure import MDOCouplingStructure
from gemseo.core.discipline import MDODiscipline
from gemseo.mda.root import MDARoot
from gemseo.utils.data_conversion import split_array_to_dict_of_arrays
from gemseo.utils.data_conversion import update_dict_of_arrays_from_array

    from typing import Any
    from typing import Mapping
    from typing import Sequence
    from numpy.typing import NDArray
    from gemseo.core.discipline_data import DisciplineData

LOGGER = logging.getLogger(__name__)

[docs]class MDAQuasiNewton(MDARoot): r"""Quasi-Newton solver for MDA. `Quasi-Newton methods <>`__ include numerous variants ( `Broyden <>`__, `Levenberg-Marquardt <>` __, ...). The name of the variant should be provided via the :code:`method` parameter of the class. The new iterate is given by: .. math:: x_{k+1} = x_k - \\rho_k B_k f(x_k) where :math:`\\rho_k` is a coefficient chosen in order to minimize the convergence and :math:`B_k` is an approximation of :math:`Df(x_k)^{-1}`, the inverse of the Jacobian of :math:`f` at :math:`x_k`. """ # Available quasi-Newton methods HYBRID = "hybr" LEVENBERG_MARQUARDT = "lm" BROYDEN1 = "broyden1" BROYDEN2 = "broyden2" ANDERSON = "anderson" LINEAR_MIXING = "linearmixing" DIAG_BROYDEN = "diagbroyden" EXCITING_MIXING = "excitingmixing" KRYLOV = "krylov" DF_SANE = "df-sane" # TODO: API: use enums. QUASI_NEWTON_METHODS = [ HYBRID, LEVENBERG_MARQUARDT, BROYDEN1, BROYDEN2, ANDERSON, LINEAR_MIXING, DIAG_BROYDEN, EXCITING_MIXING, KRYLOV, DF_SANE, ] def __init__( self, disciplines: Sequence[MDODiscipline], max_mda_iter: int = 10, name: str | None = None, grammar_type: MDODiscipline.GrammarType = MDODiscipline.GrammarType.JSON, method: str = HYBRID, use_gradient: bool = False, tolerance: float = 1e-6, linear_solver_tolerance: float = 1e-12, warm_start: bool = False, use_lu_fact: bool = False, coupling_structure: MDOCouplingStructure | None = None, linear_solver: str = "DEFAULT", linear_solver_options: Mapping[str, Any] | None = None, ) -> None: """ Args: method: The name of the method in scipy root finding, among :attr:`.QUASI_NEWTON_METHODS`. use_gradient: Whether to use the analytic gradient of the discipline. Raises: ValueError: If the method is not a valid quasi-Newton method. """ # noqa:D205 D212 D415 self.method = method super().__init__( disciplines, max_mda_iter=max_mda_iter, name=name, grammar_type=grammar_type, tolerance=tolerance, linear_solver_tolerance=linear_solver_tolerance, warm_start=warm_start, use_lu_fact=use_lu_fact, linear_solver=linear_solver, linear_solver_options=linear_solver_options, coupling_structure=coupling_structure, ) if method not in self.QUASI_NEWTON_METHODS: raise ValueError(f"Method '{method}' is not a valid quasi-Newton method.") self.use_gradient = use_gradient self.local_residual_history = [] self.last_outputs = None # used for computing the residual history # TODO: API: prepend verb. def _solver_options(self) -> dict[str, float | int]: """Determine options for the solver, based on the resolution method.""" options = {} if self.method in [ self.BROYDEN1, self.BROYDEN2, self.ANDERSON, self.LINEAR_MIXING, self.DIAG_BROYDEN, self.EXCITING_MIXING, self.KRYLOV, ]: options["ftol"] = self.tolerance options["maxiter"] = self.max_mda_iter elif self.method in [self.LEVENBERG_MARQUARDT]: options["xtol"] = self.tolerance options["maxiter"] = self.max_mda_iter elif self.method in [self.DF_SANE]: options["fatol"] = self.tolerance options["maxfev"] = self.max_mda_iter elif self.method in [self.HYBRID]: options["xtol"] = self.tolerance options["maxfev"] = self.max_mda_iter return options # TODO: API: prepend verb. def _methods_with_callback(self) -> list[str]: """Determine whether resolution method accepts a callback function.""" return [self.BROYDEN1, self.BROYDEN2] def _run(self) -> DisciplineData: if self.warm_start: self._couplings_warm_start() self.reset_disciplines_statuses() self.execute_all_disciplines(self.local_data) self._compute_coupling_sizes() if not self.strong_couplings: msg = ( "MDAQuasiNewton found no strong couplings. Executed all" "disciplines once." ) LOGGER.warning(msg) self.local_data[self.RESIDUALS_NORM] = array([0.0]) return self.local_data options = self._solver_options() self.current_iter = 0 def fun( x_vect: NDArray, ) -> NDArray: """Evaluate all residuals, possibly in parallel. Args: x_vect: The value of the design variables. """ self.current_iter += 1 # transform input vector into a dict input_values = update_dict_of_arrays_from_array( self.local_data, self.strong_couplings, x_vect ) # compute all residuals self.reset_disciplines_statuses() self.execute_all_disciplines(input_values) return self.assembly.residuals(input_values, self.strong_couplings).real jac = None if self.use_gradient: self.assembly.set_newton_differentiated_ios(self.strong_couplings) def jacobian( x_vect: NDArray, ) -> NDArray: """Linearize all residuals. Args: x_vect: The value of the design variables. """ # transform input vector into a dict self.local_data.update( split_array_to_dict_of_arrays( x_vect, self._coupling_sizes, self.strong_couplings ) ) # linearize all residuals self.reset_disciplines_statuses() for discipline in self.disciplines: discipline.linearize(self.local_data) # assemble the system self.assembly.compute_sizes( self.strong_couplings, self.strong_couplings, self.strong_couplings ) dresiduals = self.assembly.assemble_jacobian( self.strong_couplings, self.strong_couplings, is_residual=True ) return dresiduals.toarray().real jac = jacobian # initial solution self.current_couplings = self._current_working_couplings().real # callback function to retrieve the residual at iteration k if self.reset_history_each_run: self.residual_history = [] # callback function to store residuals if self.method in self._methods_with_callback(): def callback( new_couplings: NDArray, _, ) -> None: """Store the current residual in the history. Args: new_couplings: The new coupling variables. _: ignored """ self._compute_residual(self.current_couplings, new_couplings) self.current_couplings = new_couplings else: callback = None # solve the system y_opt = root( fun, x0=self.current_couplings, method=self.method, jac=jac, callback=callback, tol=self.tolerance, options=options, ) self._warn_convergence_criteria() # transform optimal vector into a dict self.local_data.update( split_array_to_dict_of_arrays( y_opt.x, self._coupling_sizes, self.strong_couplings ) ) if self.method in self._methods_with_callback(): self.local_data[self.RESIDUALS_NORM] = array([self.normed_residual]) return self.local_data def _initialize_grammars(self) -> None: for disciplines in self.disciplines: self.input_grammar.update(disciplines.input_grammar) self.output_grammar.update(disciplines.output_grammar) if self.method in self._methods_with_callback(): self._add_residuals_norm_to_output_grammar()