Source code for gemseo.mda.newton

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#        :author: Charlie Vanaret, Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""
A set of Newton algorithm variants for solving MDAs
***************************************************

Root finding methods include:

- `Newton-Raphson <https://en.wikipedia.org/wiki/Newton%27s_method>`__
- `quasi-Newton methods <https://en.wikipedia.org/wiki/Quasi-Newton_method>`__

Each of these methods is implemented by a class in this module.
Both inherits from a common abstract cache.
"""
from __future__ import absolute_import, division, print_function, unicode_literals

from builtins import str, super
from copy import deepcopy

from future import standard_library
from numpy.linalg import norm
from scipy.optimize import root

from gemseo import LOGGER
from gemseo.core.discipline import MDODiscipline
from gemseo.mda.mda import MDA
from gemseo.utils.data_conversion import DataConversion

standard_library.install_aliases()


[docs]class MDARoot(MDA): """Abstract class implementing MDAs based on (Quasi-)Newton methods.""" def __init__( self, disciplines, max_mda_iter=10, name=None, grammar_type=MDODiscipline.JSON_GRAMMAR_TYPE, tolerance=1e-6, linear_solver_tolerance=1e-12, warm_start=False, use_lu_fact=False, norm0=None, ): """Constructor. :param disciplines: the disciplines list :type disciplines: list(MDODiscipline) :param max_mda_iter: maximum number of iterations :type max_mda_iter: int :param grammar_type: the type of grammar to use for IO declaration either JSON_GRAMMAR_TYPE or SIMPLE_GRAMMAR_TYPE :type grammar_type: str :param tolerance: tolerance of the iterative direct coupling solver, norm of the current residuals divided by initial residuals norm shall be lower than the tolerance to stop iterating :type tolerance: float :param name: the name of the chain :type name: str :param linear_solver_tolerance: Tolerance of the linear solver in the adjoint equation :type linear_solver_tolerance: float :param warm_start: if True, the second iteration and ongoing start from the previous coupling solution :type warm_start: bool :param use_lu_fact: if True, when using adjoint/forward differenciation, store a LU factorization of the matrix to solve faster multiple RHS problem :type use_lu_fact: bool """ self.tolerance = 1e-6 self.max_mda_iter = 10 super(MDARoot, self).__init__( disciplines, max_mda_iter=max_mda_iter, name=name, grammar_type=grammar_type, tolerance=tolerance, linear_solver_tolerance=linear_solver_tolerance, warm_start=warm_start, use_lu_fact=use_lu_fact, norm0=norm0, ) self._initialize_grammars() self._set_default_inputs() self._compute_input_couplings() # parallel execution # ================================================================== # if 1 < n_processes: # self.parallel_execution = DisciplinesParallelExecution( # self.disciplines, n_processes) # else: # self.parallel_execution = None # ================================================================== def _initialize_grammars(self): """Define all inputs and outputs of the chain.""" for disciplines in self.disciplines: self.input_grammar.update_from(disciplines.input_grammar) self.output_grammar.update_from(disciplines.output_grammar)
[docs] def execute_all_disciplines(self, input_local_data): """Execute all self.disciplines. :param input_local_data: the input data of the disciplines """ # Set status of sub disciplines # if self.parallel_execution is not None: # self.disciplines = self.parallel_execution # .execute(input_local_data) # else: for discipline in self.disciplines: discipline.reset_statuses_for_run() discipline.execute(deepcopy(input_local_data)) outputs = [discipline.get_output_data() for discipline in self.disciplines] for data in outputs: self.local_data.update(data)
def _run(self): """Run the MDA. To be implemented in concrete classes. """ raise NotImplementedError()
[docs]class MDANewtonRaphson(MDARoot): r"""Newton solver for MDA. The `Newton-Raphson method <https://en.wikipedia.org/wiki/Newton%27s_method>`__ is parameterized by a relaxation factor :math:`\alpha \in (0, 1]` to limit the length of the steps taken along the Newton direction. The new iterate is given by: .. math:: x_{k+1} = x_k - \alpha f'(x_k)^{-1} f(x_k) """ def __init__( self, disciplines, max_mda_iter=10, relax_factor=0.99, name=None, grammar_type=MDODiscipline.JSON_GRAMMAR_TYPE, linear_solver="lgmres", tolerance=1e-6, linear_solver_tolerance=1e-12, warm_start=False, use_lu_fact=False, norm0=None, ): """Constructor. :param disciplines: list of disciplines :type disciplines: list(MDODiscipline) :param max_mda_iter: maximum number of iterations :type max_mda_iter: int :param relax_factor: relaxation factor in the Newton step (default 1.) :type relax_factor: float :param name: name :type name: str :param grammar_type: the type of grammar to use for IO declaration either JSON_GRAMMAR_TYPE or SIMPLE_GRAMMAR_TYPE :type grammar_type: str :param linear_solver: linear solver used to compute the Newton step :type linear_solver: str :param n_processes: number of processes running the MDA. if >1, the run is parallel :type n_processes: int :param tolerance: tolerance of the iterative direct coupling solver, norm of the current residuals divided by initial residuals norm shall be lower than the tolerance to stop iterating :type tolerance: float :param warm_start: if True, the second iteration and ongoing start from the previous coupling solution :type warm_start: bool :param linear_solver_tolerance: Tolerance of the linear solver in the adjoint equation :type linear_solver_tolerance: float :param use_lu_fact: if True, when using adjoint/forward differenciation, store a LU factorization of the matrix to solve faster multiple RHS problem :type use_lu_fact: bool :param norm0: reference value of the norm of the residual to compute the decrease stop criteria. Iterations stops when norm(residual)/norm0<tolerance :type norm0: float """ super(MDANewtonRaphson, self).__init__( disciplines, max_mda_iter=max_mda_iter, name=name, grammar_type=grammar_type, tolerance=tolerance, linear_solver_tolerance=linear_solver_tolerance, warm_start=warm_start, use_lu_fact=use_lu_fact, norm0=norm0, ) self.relax_factor = self.__check_relax_factor(relax_factor) self.linear_solver = linear_solver @staticmethod def __check_relax_factor(relax_factor): """Check that the relaxation factor in the Newton step is in (0, 1]. :param relax_factor: relaxation factor """ if relax_factor <= 0.0 or relax_factor > 1: raise ValueError( "Newton relaxation factor should belong to " + "(0, 1] (current value: " + str(relax_factor) + ")" ) return relax_factor def _newton_step(self): """Execute the full newton step. Computes increment : -[dR/dW]**-1 . R Runs disciplines """ newton_dict = self.assembly.compute_newton_step( self.local_data, self.strong_couplings, self.relax_factor, self.linear_solver, ) # update current solution with Newton step exec_data = deepcopy(self.local_data) for c_var, c_step in newton_dict.items(): exec_data[c_var] += c_step self.reset_disciplines_statuses() self.execute_all_disciplines(exec_data) def _run(self): """Run the MDA. Run the disciplines in a sequential way until the difference between outputs is under tolerance :returns: the local data """ if self.warm_start: self._couplings_warm_start() # execute the disciplines current_couplings = self._current_input_couplings() self.reset_disciplines_statuses() self.execute_all_disciplines(self.local_data) new_couplings = self._current_input_couplings() # store initial residual current_iter = 1 self._compute_residual( current_couplings, new_couplings, current_iter, first=True ) current_couplings = new_couplings while not self._termination(current_iter): self._newton_step() new_couplings = self._current_input_couplings() # store current residual current_iter += 1 self._compute_residual(current_couplings, new_couplings, current_iter) current_couplings = new_couplings
[docs]class MDAQuasiNewton(MDARoot): """Quasi-Newton solver for MDA. `Quasi-Newton methods <https://en.wikipedia.org/wiki/Quasi-Newton_method>`__ include numerous variants ( `Broyden <https://en.wikipedia.org/wiki/Broyden%27s_method>`__, `Levenberg-Marquardt <https://en.wikipedia.org/wiki/Levenberg%E2%80%93Marquardt_algorithm>`__, ...). The name of the variant should be provided as a parameter :code:`method` of the class. The new iterate is given by: .. math:: x_{k+1} = x_k - \\rho_k B_k f(x_k) where :math:`\\rho_k` is a coefficient chosen in order to minimize the convergence and :math:`B_k` is an approximation of the inverse of the Jacobian :math:`Df(x_k)^{-1}`. """ # quasi-Newton methods HYBRID = "hybr" LEVENBERG_MARQUARDT = "lm" BROYDEN1 = "broyden1" BROYDEN2 = "broyden2" ANDERSON = "anderson" LINEAR_MIXING = "linearmixing" DIAG_BROYDEN = "diagbroyden" EXCITING_MIXING = "excitingmixing" KRYLOV = "krylov" DF_SANE = "df-sane" QUASI_NEWTON_METHODS = [ HYBRID, LEVENBERG_MARQUARDT, BROYDEN1, BROYDEN2, ANDERSON, LINEAR_MIXING, DIAG_BROYDEN, EXCITING_MIXING, KRYLOV, DF_SANE, ] def __init__( self, disciplines, max_mda_iter=10, name=None, grammar_type=MDODiscipline.JSON_GRAMMAR_TYPE, method=HYBRID, use_gradient=False, tolerance=1e-6, linear_solver_tolerance=1e-12, warm_start=False, use_lu_fact=False, norm0=None, ): """Constructor. :param disciplines: the disciplines list :type disciplines: list(MDODiscipline) :param max_mda_iter: maximum number of iterations :type max_mda_iter: int :param name: name :type name: str :param grammar_type: the type of grammar to use for IO declaration either JSON_GRAMMAR_TYPE or SIMPLE_GRAMMAR_TYPE :type grammar_type: str :param method: method name in scipy root finding, among self.QUASI_NEWTON_METHODS :type method: str :param use_gradient: if True, used the analytic gradient of the discipline :type use_gradient: bool :param tolerance: tolerance of the iterative direct coupling solver, norm of the current residuals divided by initial residuals norm shall be lower than the tolerance to stop iterating :type tolerance: float :param linear_solver_tolerance: Tolerance of the linear solver in the adjoint equation :type linear_solver_tolerance: float :param warm_start: if True, the second iteration and ongoing start from the previous coupling solution :type warm_start: bool :param use_lu_fact: if True, when using adjoint/forward differenciation, store a LU factorization of the matrix to solve faster multiple RHS problem :type use_lu_fact: bool :param norm0: reference value of the norm of the residual to compute the decrease stop criteria. Iterations stops when norm(residual)/norm0<tolerance :type norm0: float """ super(MDAQuasiNewton, self).__init__( disciplines, max_mda_iter=max_mda_iter, name=name, grammar_type=grammar_type, tolerance=tolerance, linear_solver_tolerance=linear_solver_tolerance, warm_start=warm_start, use_lu_fact=use_lu_fact, norm0=norm0, ) if method not in self.QUASI_NEWTON_METHODS: msg = "Method " + method + " is not a valid quasi-Newton method" raise ValueError(msg) self.method = method self.use_gradient = use_gradient self.local_residual_history = [] self.last_outputs = None # used for computing the residual history def _solver_options(self): """Determine options for the solver, based on the resolution method.""" options = {} if self.method in [ self.BROYDEN1, self.BROYDEN2, self.ANDERSON, self.LINEAR_MIXING, self.DIAG_BROYDEN, self.EXCITING_MIXING, self.KRYLOV, ]: options["ftol"] = self.tolerance options["maxiter"] = self.max_mda_iter elif self.method in [self.LEVENBERG_MARQUARDT]: options["xtol"] = self.tolerance options["maxiter"] = self.max_mda_iter elif self.method in [self.DF_SANE]: options["fatol"] = self.tolerance options["maxfev"] = self.max_mda_iter elif self.method in [self.HYBRID]: options["xtol"] = self.tolerance options["maxfev"] = self.max_mda_iter return options def _methods_with_callback(self): """Determine whether resolution method accepts a callback function.""" return [self.BROYDEN1, self.BROYDEN2] def _run(self): """Run the MDA. Runs the disciplines in a sequential way until the difference between outputs is under tolerance :returns: the local data """ if self.warm_start: self._couplings_warm_start() self.reset_disciplines_statuses() self.execute_all_disciplines(deepcopy(self.local_data)) couplings = self.strong_couplings if not couplings: msg = ( "MDAQuasiNewton found no strong couplings. Executed all" "disciplines once." ) LOGGER.warning(msg) return self.local_data options = self._solver_options() def fun(x_vect): """Evaluate all residuals, possibly in parallel. :param x_vect: design variables """ # transform input vector into a dict input_values = DataConversion.update_dict_from_array( self.local_data, couplings, x_vect ) # compute all residuals self.reset_disciplines_statuses() self.execute_all_disciplines(input_values) residuals = self.assembly.residuals(input_values, couplings).real # if residuals.size == 1: # Weak couplings already treated # return residuals[0] return residuals jac = None if self.use_gradient: for discipline in self.disciplines: # Tells the discipline what to linearize outs = discipline.get_output_data_names() to_linearize = set(outs) & set(couplings) discipline.add_differentiated_outputs(list(to_linearize)) inpts = discipline.get_input_data_names() to_linearize = set(inpts) & set(couplings) discipline.add_differentiated_inputs(list(to_linearize)) # linearize the residuals def jacobian(x_vect): """Linearize all the residuals. :param x_vect:design variables """ # transform input vector into a dict input_values = DataConversion.update_dict_from_array( self.local_data, couplings, x_vect ) # linearize all residuals self.reset_disciplines_statuses() for discipline in self.disciplines: discipline.linearize(input_values) # assemble the system n_couplings = 0 for coupling in couplings: discipline = self.coupling_structure.find_discipline(coupling) size = list(discipline.jac[coupling].values())[0].shape[0] n_couplings += size self.assembly.compute_sizes(couplings, couplings, couplings) dresiduals = self.assembly.dres_dvar( couplings, couplings, n_couplings, n_couplings ).todense() return dresiduals jac = jacobian # initial solution y_0 = DataConversion.dict_to_array(self.local_data, couplings).real # callback function to retrieve the residual at iteration k norm_0 = norm(y_0.real) if self.reset_history_each_run: self.residual_history = [] # callback function to store residuals self.last_outputs = y_0 if self.method in self._methods_with_callback(): def callback(y_k, _): """Store the current residual in the history. :param y_k: couling variables :param _: ignored """ delta = norm((y_k - self.last_outputs).real) / norm_0 self.residual_history.append((delta, 0)) # iter number? self.last_outputs = y_k else: callback = None # solve the system y_opt = root( fun, x0=y_0, method=self.method, jac=jac, callback=callback, options=options ) # transform optimal vector into a dict self.local_data = DataConversion.update_dict_from_array( self.local_data, couplings, y_opt.x ) return self.local_data