Source code for gemseo.algos.linear_solvers.lib_scipy_linalg

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry,
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#        :author: Francois Gallard

"""Wrappers for scipy's linear solvers."""

from __future__ import division, unicode_literals

import logging
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union

from numpy import find_common_type, ndarray
from scipy.sparse import spmatrix
from scipy.sparse.base import issparse
from scipy.sparse.linalg import LinearOperator, bicg, bicgstab, gmres, lgmres, qmr, splu

from gemseo.algos.linear_solvers.linear_solver_lib import LinearSolverLib

LOGGER = logging.getLogger(__name__)

[docs]class ScipyLinalgAlgos(LinearSolverLib): """Wrapper for scipy linalg sparse linear solvers. Attributes: save_fpath (str): The path to the file to saved the problem when it is not converged and the option save_when_fail is active. methods_map (Dict[str, Callable]): The mapping between the solver names and the solvers methods in scipy.sparse. """ BASE_INFO_MSG = "scipy linear solver algorithm stop info: " OPTIONS_MAP = { "max_iter": "maxiter", "preconditioner": "M", "store_outer_av": "store_outer_Av", } LGMRES_SPEC_OPTS = ( "inner_m", "outer_k", "outer_v", "store_outer_av", "prepend_outer_v", ) __WEBSITE = "{}.html" __WEBPAGE = "scipy.sparse.linalg.{}" __WEBPAGES = { "BICG": __WEBPAGE.format("bicg"), "GMRES": __WEBPAGE.format("gmres"), "LGMRES": __WEBPAGE.format("lgmres"), "QMR": __WEBPAGE.format("qmr"), "BICGSTAB": __WEBPAGE.format("bicgstab"), "DEFAULT": __WEBPAGE.format("splu"), } def __init__(self): # type: (...) -> None super(ScipyLinalgAlgos, self).__init__() self.methods_map = { "LGMRES": lgmres, "GMRES": gmres, "BICG": bicg, "QMR": qmr, "BICGSTAB": bicgstab, "DEFAULT": self._run_default_solver, } self.lib_dict = { name: self.get_default_properties(name) for name in self.methods_map.keys() } self.lib_dict["DEFAULT"]["description"] = ( "This starts by LGMRES, but if it fails, " "switches to GMRES, then direct method super LU factorization." )
[docs] @classmethod def get_default_properties( cls, algo_name # type: str ): # type: (...) -> Dict[str, Union[bool,str]] """Return the properties of the algorithm. It states if it requires symmetric, or positive definite matrices for instance. Args: algo_name: The algorithm name. Returns: The properties of the solver. """ return { cls.LHS_MUST_BE_POSITIVE_DEFINITE: False, cls.LHS_MUST_BE_SYMMETRIC: False, cls.LHS_CAN_BE_LINEAR_OPERATOR: True, cls.INTERNAL_NAME: algo_name, "description": "Linear solver implemented in the SciPy library.", "website": cls.__WEBSITE.format(algo_name), }
def _get_options( self, max_iter=1000, # type: int preconditioner=None, # type: Optional[Union[ndarray, LinearOperator]] tol=1e-12, # type: float atol=None, # type: Optional[float] x0=None, # type: Optional[ndarray] use_ilu_precond=True, # type: bool inner_m=30, # type: int outer_k=3, # type: int outer_v=None, # type: Optional[List[Tuple]] store_outer_av=True, # type: bool prepend_outer_v=False, # type: bool save_when_fail=False, # type: bool store_residuals=False, # type: bool ): # type: (...) -> Dict[str, Any] """Check the options and sets the default values. See Args: max_iter: The maximum number of iterations. preconditioner: The preconditionner, approximation of RHS^-1. If None, no preconditioner is used. tol: The relative tolerance for convergence, norm( <= max(tol*norm(LHS), atol). atol: The absolute tolerance for convergence, norm( <= max(tol*norm(LHS), atol). x0: The initial guess for the solution. M{sparse matrix, dense matrix, LinearOperator}. If None, solvers usually start from the null vector. inner_m int: The number of inner GMRES iterations per outer iteration. outer_k: The number of vectors to carry between inner GMRES iterations. outer_v: The data used to augment the Krylov subspace. store_outer_A: Whether LGMRES should store also A*v in addition to the vectors v in outer_v. prepend_outer_v: Whether to put outer_v augmentation vectors before the Krylov iterates. use_ilu_precond: Whether to use superLU to compute an incomplete LU factorization as preconditioner. save_when_fail: Whether to save the linear system to the disk when the solver failed to converge. store_residuals: Whether to store the residuals convergence history. Returns: The options. Raises: ValueError: When the LHR and RHS shapes are inconsistent, or when the preconditioner options are inconsistent. """ if preconditioner is not None and ( preconditioner.shape != self.problem.lhs.shape ): msg = "Inconsistent Preconditioner shape: %s != %s" raise ValueError(msg.format(preconditioner.shape, self.problem.lhs.shape)) if x0 is not None and len(x0) != len(self.problem.rhs): msg = "Inconsistent initial guess shape: %s != %s" raise ValueError(msg.format(x0.shape, self.problem.rhs.shape)) if use_ilu_precond and preconditioner is not None: raise ValueError( "Use either 'use_ilu_precond' or provide 'preconditioner'," " but not both." ) return self._process_options( max_iter=max_iter, preconditioner=preconditioner, tol=tol, atol=atol, x0=x0, inner_m=inner_m, outer_k=outer_k, outer_v=outer_v, store_outer_av=store_outer_av, prepend_outer_v=prepend_outer_v, use_ilu_precond=use_ilu_precond, save_when_fail=save_when_fail, store_residuals=store_residuals, ) def _run( self, **options # type: Union[None, bool, int, float, ndarray] ): # type: (...) -> ndarray """Run the algorithm. Args: **options: The options for the algorithm. Returns: The solution of the problem. """ if issparse(self.problem.rhs): self.problem.rhs = self.problem.rhs.toarray() rhs = self.problem.rhs lhs = self.problem.lhs opts_solver = options.copy() c_dtype = None if rhs.dtype != lhs.dtype and not isinstance(lhs, LinearOperator): c_dtype = find_common_type([rhs.dtype, lhs.dtype], []) if lhs.dtype != c_dtype: lhs = lhs.astype(c_dtype) if rhs.dtype != c_dtype: rhs = rhs.astype(c_dtype) if opts_solver["use_ilu_precond"] and not isinstance(lhs, LinearOperator): opts_solver["M"] = self._build_ilu_preconditioner(lhs, c_dtype) del opts_solver["use_ilu_precond"] del opts_solver["store_residuals"] method = self.methods_map[self.algo_name] if options.get("store_residuals", False): opts_solver["callback"] = self.__store_residuals opts_solver.pop("save_when_fail") self.problem.solution, info = method(lhs, rhs, **opts_solver) self._check_solver_info(info, options) return self.problem.solution def __store_residuals( self, current_x # type: ndarray ): # type: (...) -> ndarray """Store the current iteration residuals. Args: current_x: The current solution. """ self.problem.solution = current_x self.problem.compute_residuals(True, True) def _check_solver_info( self, info, # type: int options, # type: Mapping[str, Any] ): # type: (...) -> bool """Check the info returned by the solver. Args: info: The info value, negative, 0 or positive depending on status. options: The options passed to the solver. Returns: Whether the solver converged. Raises: RuntimeError: If the inputs are illegal for the solver. """ self.problem.is_converged = info == 0 if info > 0: if self.problem.solution is not None: LOGGER.warning( "%s, residual = %s", self.BASE_INFO_MSG, self.problem.compute_residuals(True), ) LOGGER.warning("info = %s", info) else: LOGGER.warning("Failed to converge") return False # check the dimensions if info < 0: raise RuntimeError( self.BASE_INFO_MSG + "illegal input or breakdown" ", options = %s", options, ) return True def _run_default_solver( self, lhs, # type: Union[ndarray, spmatrix, LinearOperator] rhs, # type: ndarray **options # type: Any ): # type: (...) -> Tuple[ndarray, int] """Run the default solver. This starts by LGMRES, but if it fails, switches to GMRES, then direct method super LU factorization. Args: lhs: The left hand side of the equation (matrix). rhs: The right hand side of the equation. **options: The user options. Returns: The last solution found and the info. """ # Try LGMRES first best_sol, info = lgmres(A=lhs, b=rhs, **options) min_res = self.problem.compute_residuals(True, current_x=best_sol) if self._check_solver_info(info, options): # If converged, stop return best_sol, info else: # Otherwise try GMRES min_res = self.problem.compute_residuals(True, current_x=best_sol) for k in self.LGMRES_SPEC_OPTS: # Adapt options if k in options: del options[k] if min_res < 1.0: options["x0"] = best_sol sol, info = gmres(A=lhs, b=rhs, **options) res = self.problem.compute_residuals(True, current_x=sol) if res < min_res: best_sol = sol options["x0"] = best_sol if self._check_solver_info(info, options): # pragma: no cover return best_sol, info # In this case previous runs failed, trying direct method # based on super LU a_fact = splu(lhs) sol = a_fact.solve(rhs) res = self.problem.compute_residuals(True, current_x=best_sol) if res < options["tol"]: # pragma: no cover best_sol = sol info = 0 self.problem.is_converged = True else: info = 1 return best_sol, info