Source code for gemseo.core.derivatives.jacobian_assembly

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                         documentation
#        :author: Francois Gallard, Charlie Vanaret
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""Coupled derivatives calculations."""

from __future__ import annotations

import itertools
import logging
from collections import defaultdict
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from typing import NamedTuple

import matplotlib.pyplot as plt
from numpy import concatenate
from numpy import empty
from numpy import fill_diagonal
from numpy import ndarray
from numpy import zeros
from numpy.linalg import norm
from scipy.sparse import bmat
from scipy.sparse import csc_matrix
from scipy.sparse import csr_matrix
from scipy.sparse import eye
from scipy.sparse import vstack
from scipy.sparse.linalg import LinearOperator
from scipy.sparse.linalg import factorized
from strenum import StrEnum

from gemseo.algos.linear_solvers.factory import LinearSolverLibraryFactory
from gemseo.algos.linear_solvers.linear_problem import LinearProblem
from gemseo.core.derivatives import derivation_modes
from gemseo.core.derivatives.jacobian_operator import JacobianOperator
from gemseo.core.derivatives.mda_derivatives import traverse_add_diff_io_mda
from gemseo.utils.compatibility.scipy import sparse_classes
from gemseo.utils.constants import READ_ONLY_EMPTY_DICT
from gemseo.utils.matplotlib_figure import save_show_figure

if TYPE_CHECKING:
    from collections.abc import Callable
    from collections.abc import Collection
    from collections.abc import Iterable
    from collections.abc import Iterator
    from collections.abc import Mapping
    from typing import TypeAlias

    from scipy.sparse import dok_matrix

    from gemseo.core.coupling_structure import CouplingStructure
    from gemseo.core.discipline import Discipline
    from gemseo.typing import RealArray
    from gemseo.typing import RealOrComplexArray
    from gemseo.typing import RealOrComplexArrayT
    from gemseo.typing import StrKeyMapping

LOGGER = logging.getLogger(__name__)


[docs] def none_factory() -> None: """Returns None... To be used for defaultdict """
[docs] def default_dict_factory() -> dict[Any, None]: """Instantiates a defaultdict(None) object.""" return defaultdict(none_factory)
# TODO: API: extract to a specific module
[docs] class AssembledJacobianOperator(LinearOperator): # type: ignore[misc] # because missing types """Representation of the assembled Jacobian as a SciPy ``LinearOperator``.""" __functions: Iterable[str] """The names of functions to differentiate.""" __variables: Iterable[str] """The names of variables with respect to which differentiate.""" __is_residual: bool """Whether the functions are residuals.""" __get_jacobian_generator: Callable[ [Iterable[str], Iterable[str], bool], Iterator[tuple[RealOrComplexArray, JacobianAssembly.JacobianPosition]], ] """The method to iterate over the relevant Jacobians, given the provided variables and functions.""" def __init__( self, functions: Iterable[str], variables: Iterable[str], n_functions: int, n_variables: int, get_jacobian_generator: Callable[ [Iterable[str], Iterable[str], bool], Iterator[tuple[RealOrComplexArray, JacobianAssembly.JacobianPosition]], ], is_residual: bool = False, ) -> None: """ Args: functions: The functions to differentiate. variables: The differentiation variables. n_functions: The number of functions components. n_variables: The number of variables components. get_jacobian_generator: The method to iterate over the Jacobians associated with the provided functions and variables. is_residual: Whether the functions are residuals. """ # noqa: D205, D212, D415 super().__init__(shape=(n_functions, n_variables), dtype=float) self.__functions = functions self.__variables = variables self.__is_residual = is_residual self.__get_jacobian_generator = get_jacobian_generator def _matvec(self, x: RealOrComplexArrayT) -> RealOrComplexArrayT: """The matrix-vector product involving the Jacobian ∂f/∂v. Args: x: The vector to apply ∂f/∂v to. Returns: The resulting vector ∂f/∂v x. """ # Initialize the result with appropriate dimension result = zeros(self.shape[0], dtype=x.dtype) jacobian_generator = self.__get_jacobian_generator( self.__functions, self.__variables, self.__is_residual ) for jacobian, position in jacobian_generator: result[position.row_slice] += jacobian.dot(x[position.column_slice]) return result def _rmatvec(self, x: RealOrComplexArrayT) -> RealOrComplexArrayT: """The matrix-vector product involving the transposed Jacobian ∂f/∂v. Args: x: The vector to apply the transpose of ∂f/∂v to. Returns: The resulting vector (∂f/∂v)^T x. """ # Initialize the result with appropriate dimension result = zeros(self.shape[1], dtype=x.dtype) jacobian_generator = self.__get_jacobian_generator( self.__functions, self.__variables, self.__is_residual ) for jacobian, position in jacobian_generator: result[position.column_slice] += jacobian.T.dot(x[position.row_slice]) return result
[docs] class JacobianAssembly: """Assembly of Jacobians. Typically, assemble discipline's Jacobians into a system Jacobian. """ coupling_structure: CouplingStructure """The considered coupling structure.""" sizes: dict[str, int] """The number of elements of a given str.""" disciplines: dict[str, Discipline] """The disciplines, stored using their name.""" __last_diff_inouts: tuple[set[str], set[str]] """The last diff in-outs stored.""" __minimal_couplings: set[str] """The minimal couplings.""" coupled_system: CoupledSystem """The coupled derivative system of residuals.""" __linear_solver_factory: LinearSolverLibraryFactory """The linear solver factory.""" DerivationMode: TypeAlias = derivation_modes.DerivationMode
[docs] class JacobianType(StrEnum): """The available types for the Jacobian matrix.""" LINEAR_OPERATOR = "linear_operator" """Jacobian as a SciPy ``LinearOperator`` implementing the appropriate method to perform matrix-vector products.""" MATRIX = "matrix" """Jacobian matrix in Compressed Sparse Row (CSR) format."""
[docs] class JacobianPosition(NamedTuple): """The position of the discipline's Jacobians within the assembled Jacobian.""" row_slice: slice """The row slice indicating where to position the disciplinary Jacobian within the assembled Jacobian when defined as an array.""" column_slice: slice """The column slice indicating where to position the disciplinary Jacobian within the assembled Jacobian when defined as an array.""" row_index: int """The row index of the disciplinary Jacobian within the assembled Jacobian when defined blockwise.""" column_index: int """The column index of the disciplinary Jacobian within the assembled Jacobian when defined blockwise."""
def __init__(self, coupling_structure: CouplingStructure) -> None: """ Args: coupling_structure: The CouplingStructure associated disciplines that form the coupled system. """ # noqa: D205, D212, D415 self.coupling_structure = coupling_structure self.sizes = {} self.disciplines = {} self.__last_diff_inouts = (set(), set()) self.__minimal_couplings = set() self.coupled_system = CoupledSystem() self.__linear_solver_factory = LinearSolverLibraryFactory(use_cache=True) def _check_inputs( self, functions: Iterable[str], variables: Iterable[str], couplings: Iterable[str], matrix_type: JacobianType, use_lu_fact: bool, ) -> None: """Check the inputs before differentiation. Args: functions: The functions to differentiate. variables: The differentiation variables. couplings: The coupling variables. matrix_type: The type of matrix for linearization. use_lu_fact: Whether to use the LU factorization once for all second members. Raises: ValueError: When the inputs are inconsistent. """ unknown_dvars = set(variables) unknown_outs = set(functions) for discipline in self.coupling_structure.disciplines: inputs = set(discipline.io.input_grammar) outputs = set(discipline.io.output_grammar) unknown_outs -= outputs unknown_dvars -= inputs if unknown_dvars: possible_inputs = [ list(disc.io.input_grammar) for disc in self.coupling_structure.disciplines ] msg = ( "Some of the specified variables are not " "inputs of the disciplines: " f"{unknown_dvars}" " possible inputs are: " f"{possible_inputs}" ) raise ValueError(msg) if unknown_outs: raise ValueError( "Some outputs are not computed by the disciplines:" + str(unknown_outs) + " available outputs are: " + str([ list(disc.io.output_grammar) for disc in self.coupling_structure.disciplines ]) ) for coupling in set(couplings) & set(variables): raise ValueError( "Variable " + str(coupling) + " is both a coupling and a design variable" ) matrix_type = self.JacobianType(matrix_type) if use_lu_fact and matrix_type == self.JacobianType.LINEAR_OPERATOR: msg = ( "Unsupported LU factorization for " "LinearOperators! Please use Sparse matrices" " instead" ) raise ValueError(msg)
[docs] def compute_sizes( self, functions: Iterable[str], variables: Iterable[str], couplings: Iterable[str], residual_variables: Mapping[str, str] = READ_ONLY_EMPTY_DICT, ) -> None: """Compute the number of scalar functions, variables and couplings. Args: functions: The functions to differentiate. variables: The differentiation variables. couplings: The coupling variables. residual_variables: The mapping of residuals of disciplines to their respective state variables. Raises: ValueError: When the size of some variables could not be determined. """ # search for functions/variables/couplings in the Jacobians of the disciplines if residual_variables: outputs = itertools.chain( functions, couplings, residual_variables.keys(), residual_variables.values(), ) else: outputs = itertools.chain(functions, couplings) # functions and coupling and states for output in outputs: discipline = self.coupling_structure.find_discipline(output) self.disciplines[output] = discipline # get an arbitrary Jacobian and compute the number of rows self.sizes[output] = ( discipline.io.output_grammar.data_converter.get_value_size( output, discipline.io.data[output] ) ) # variables for variable in variables: for discipline in self.coupling_structure.disciplines: if variable not in self.sizes: for jacobian in discipline.jac.values(): jacobian_wrt_variable = jacobian.get(variable, None) if jacobian_wrt_variable is not None: self.sizes[variable] = jacobian_wrt_variable.shape[1] self.disciplines[variable] = discipline break if variable not in self.sizes: msg = f"Failed to determine the size of input variable {variable}" raise ValueError(msg)
@classmethod def _get_derivation_mode( cls, mode: DerivationMode, n_variables: int, n_functions: int, ) -> DerivationMode: """Get the differentiation mode. Args: mode: The differentiation mode. n_variables: The number of variables. n_functions: The number of functions. Returns: The differentiation mode. """ if mode != cls.DerivationMode.AUTO: return mode if n_variables <= n_functions: return cls.DerivationMode.DIRECT return cls.DerivationMode.ADJOINT
[docs] def compute_dimension(self, names: Iterable[str]) -> int: """Compute the total number of functions/variables/couplings of the full system. Args: names: The names of the inputs or the outputs. Returns: The dimension if the system. """ return sum(self.sizes[name] for name in names)
def _get_jacobian_generator( self, functions: Iterable[str], variables: Iterable[str], is_residual: bool = False, ) -> Iterator[ tuple[RealOrComplexArray | csr_matrix | JacobianOperator, JacobianPosition] ]: """Iterate over Jacobian matrices. Provide a generator to iterate over the Jacobians associated with each provided pair (function, variable). The generator yields the Jacobian along with its relative position in the to be assembled Jacobian. Args: functions: The functions to differentiate. variables: The differentiation variables. is_residual: Whether the functions are residuals. Yields: A tuple of the form (Jacobian, Position). """ row = 0 # Iterate over outputs for row_index, function in enumerate(functions): column = 0 discipline = self.disciplines[function] if not discipline.jac: continue function_jacobian = discipline.jac[function] # Iterate over inputs for column_index, variable in enumerate(variables): jacobian = function_jacobian.get(variable, None) variable_size = self.sizes[variable] # If residual of the form Yi-Yi, then add -I to the Jacobian if is_residual and function == variable: if jacobian is not None: # Make a copy to avoid in-place modifications jacobian_copy = jacobian.copy() if isinstance(jacobian_copy, ndarray): fill_diagonal(jacobian_copy, jacobian.diagonal() - 1) elif isinstance(jacobian_copy, sparse_classes): jacobian_copy.setdiag(jacobian.diagonal() - 1) elif isinstance(jacobian_copy, JacobianOperator): jacobian_copy = jacobian_copy.shift_identity() jacobian = jacobian_copy else: jacobian = -eye(variable_size, dtype=int) # Yield only if Jacobian exists if jacobian is not None: yield ( jacobian.real, self.JacobianPosition( row_slice=slice(row, row + jacobian.shape[0]), column_slice=slice(column, column + jacobian.shape[1]), row_index=row_index, column_index=column_index, ), ) column += variable_size row += self.sizes[function] def _assemble_jacobian_as_matrix( self, functions: Collection[str], variables: Collection[str], is_residual: bool = False, ) -> csr_matrix: """Form the Jacobian as a sparse matrix in Compressed Sparse Row (CSR) format. The CSR format is well-adapted to perform matrix-vector and matrix-matrix multiplications efficiently. Args: functions: The functions to differentiate. variables: The differentiation variables. is_residual: Whether the functions are residuals. Returns: The Jacobian as a sparse matrix in CSR format. """ # Fill in with zero blocks of appropriate dimension if necessary variable_sizes = [self.sizes[variable_name] for variable_name in variables] function_sizes = [self.sizes[function_name] for function_name in functions] # Initialize the block Jacobian with the appropriate structure total_jacobian: list[list[csr_matrix | None]] = [ [None for _ in variables] for _ in functions ] variable_sizes_0 = variable_sizes[0] for i, function_size in enumerate(function_sizes): total_jacobian[i][0] = csr_matrix((function_size, variable_sizes_0)) function_sizes_0 = function_sizes[0] total_jacobian_0 = total_jacobian[0] for j, variable_size in enumerate(variable_sizes): total_jacobian_0[j] = csr_matrix((function_sizes_0, variable_size)) # Perform the assembly jacobian_generator = self._get_jacobian_generator( functions, variables, is_residual=is_residual ) for jacobian, position in jacobian_generator: if isinstance(jacobian, JacobianOperator): jacobian = jacobian.get_matrix_representation() total_jacobian[position.row_index][position.column_index] = csr_matrix( jacobian.real ) return bmat(total_jacobian, format="csr")
[docs] def assemble_jacobian( self, functions: Collection[str], variables: Collection[str], is_residual: bool = False, jacobian_type: JacobianType = JacobianType.MATRIX, ) -> csr_matrix | AssembledJacobianOperator: """Form the Jacobian as a SciPy ``LinearOperator``. Args: functions: The functions to differentiate. variables: The differentiation variables. is_residual: Whether the functions are residuals. jacobian_type: The type of representation for the Jacobian ∂f/∂v. Returns: The Jacobian ∂f/∂v in the specified type. """ if jacobian_type == self.JacobianType.MATRIX: return self._assemble_jacobian_as_matrix( functions, variables, is_residual, ) if jacobian_type == self.JacobianType.LINEAR_OPERATOR: return AssembledJacobianOperator( functions, variables, self.compute_dimension(functions), self.compute_dimension(variables), self._get_jacobian_generator, is_residual, ) msg = f"Bad jacobian_type: {jacobian_type}" raise ValueError(msg)
def _compute_diff_ios_and_couplings( self, variables: Iterable[str], functions: Iterable[str], states: Iterable[str], coupling_structure: CouplingStructure, ) -> set[str]: """Compute the minimal differentiated inputs, outputs and couplings. This is done form the disciplines that are required to differentiate the functions with respect to the variables. This uses a graph algorithm that computes the dependency process graph and address the "jacobian accumulation" problem with a heuristic but conservative approach. Args: variables: The differentiation variables. functions: The functions to differentiate. states: The state variables. coupling_structure: The coupling structure containing all the disciplines. Returns: The minimal coupling variables set requires to differentiate the functions with respect to the variables. """ diff_ios = (set(variables), set(functions)) if self.__last_diff_inouts != diff_ios: diff_ios_merged = traverse_add_diff_io_mda( coupling_structure, variables, functions ) self.__last_diff_inouts = diff_ios couplings = [ coupl for coupls in diff_ios_merged.values() for coupl in list(coupls[0]) + list(coupls[1]) ] minimal_couplings = set(couplings).intersection( coupling_structure.all_couplings ) # The state variables are not coupling variables, although they are inputs # and outputs of the disciplines with residuals. self.__minimal_couplings = minimal_couplings.difference(states) return self.__minimal_couplings
[docs] def total_derivatives( self, in_data: StrKeyMapping, functions: Collection[str], variables: Collection[str], couplings: Iterable[str], linear_solver: str = "DEFAULT", mode: DerivationMode = DerivationMode.AUTO, matrix_type: JacobianType = JacobianType.MATRIX, use_lu_fact: bool = False, exec_cache_tol: float | None = None, execute: bool = True, residual_variables: Mapping[str, str] = READ_ONLY_EMPTY_DICT, **linear_solver_settings: Any, ) -> dict[str, dict[str, RealOrComplexArray]] | dict[Any, dict[Any, None]]: """Compute the Jacobian of total derivatives of the coupled system. Args: in_data: The input data dict. functions: The functions to differentiate. variables: The differentiation variables. couplings: The coupling variables. linear_solver: The name of the linear solver. mode: The linearization mode (auto, direct or adjoint). matrix_type: The representation of the matrix ∂R/∂y (sparse or linear operator). use_lu_fact: Whether to factorize dres_dy once, unsupported for linear operator mode. exec_cache_tol: The discipline cache tolerance to when calling the linearize method. If ``None``, no tolerance is set (equivalent to tol=0.0). execute: Whether to start by executing the discipline with the input data for which to compute the Jacobian; this allows to ensure that the discipline was executed with the right input data; it can be almost free if the corresponding output data have been stored in the :attr:`.Discipline.cache`. residual_variables: a mapping of residuals of disciplines to their respective state variables. **linear_solver_settings: The options passed to the linear solver factory. Returns: The total coupled derivatives. Raises: ValueError: When the linearization_mode is incorrect. """ if not functions: return defaultdict(default_dict_factory) self._check_inputs(functions, variables, couplings, matrix_type, use_lu_fact) # Retrieve states variables and local residuals if provided states = list(residual_variables.values()) if residual_variables else [] couplings_minimal = self._compute_diff_ios_and_couplings( variables, functions, states, self.coupling_structure, ) # Exclude the non-numeric couplings from the coupling minimal list for discipline in self.coupling_structure.disciplines: inputs_couplings = couplings_minimal.intersection( discipline.io.input_grammar ) couplings_minimal.difference_update( itertools.filterfalse( discipline.io.input_grammar.data_converter.is_numeric, inputs_couplings, ) ) sorted_couplings_minimal = sorted(couplings_minimal) couplings_and_res = sorted_couplings_minimal.copy() couplings_and_states = sorted_couplings_minimal.copy() # linearize all the disciplines if residual_variables: couplings_and_res += residual_variables.keys() couplings_and_states += states for disc in self.coupling_structure.disciplines: if disc.cache is not None and exec_cache_tol is not None: disc.cache.tolerance = exec_cache_tol disc.linearize(in_data, execute=execute) # compute the sizes from the Jacobians self.compute_sizes( functions, variables, sorted_couplings_minimal, residual_variables ) n_variables = self.compute_dimension(variables) n_functions = self.compute_dimension(functions) n_residuals = self.compute_dimension(sorted_couplings_minimal) if residual_variables: n_residuals += self.compute_dimension(residual_variables.keys()) # compute the partial derivatives of the residuals dres_dx = self.assemble_jacobian(couplings_and_res, variables, is_residual=True) # compute the partial derivatives of the interest functions (dfun_dx, dfun_dy) = ({}, {}) for fun in functions: dfun_dx[fun] = self.assemble_jacobian([fun], variables) dfun_dy[fun] = self.assemble_jacobian([fun], couplings_and_res) mode = self._get_derivation_mode(mode, n_variables, n_functions) # compute the total derivatives if mode == self.DerivationMode.DIRECT: # sparse square matrix ∂R/∂y dres_dy = self.assemble_jacobian( couplings_and_res, couplings_and_states, is_residual=True, jacobian_type=matrix_type, ) # compute the coupled derivatives total_derivatives = self.coupled_system.direct_mode( functions, n_variables, n_residuals, dres_dx, dres_dy, dfun_dx, dfun_dy, linear_solver, use_lu_fact=use_lu_fact, **linear_solver_settings, ) elif mode == self.DerivationMode.ADJOINT: # transposed square matrix ∂R/∂y^T dres_dy_t = self.assemble_jacobian( couplings_and_res, couplings_and_states, is_residual=True, jacobian_type=matrix_type, ) # compute the coupled derivatives total_derivatives = self.coupled_system.adjoint_mode( functions, dres_dx, dres_dy_t.T, dfun_dx, dfun_dy, linear_solver, use_lu_fact=use_lu_fact, **linear_solver_settings, ) else: raise ValueError("Incorrect linearization mode " + str(mode)) return self.split_jac(total_derivatives, variables)
[docs] def split_jac( self, coupled_system: Mapping[str, RealOrComplexArray | dok_matrix], variables: Iterable[str], ) -> dict[str, dict[str, RealOrComplexArray | dok_matrix]]: """Split a Jacobian dict into a dict of dict. Args: coupled_system: The derivatives to split. variables: The variables wrt which the differentiation is performed. Returns: The Jacobian. """ j_split = {} for function, function_jac in coupled_system.items(): i_out = 0 sub_jac = {} for variable in variables: size = self.sizes[variable] sub_jac[variable] = function_jac[:, i_out : i_out + size] i_out += size j_split[function] = sub_jac return j_split
[docs] def set_newton_differentiated_ios( self, couplings: Collection[str], ) -> None: """Set the differentiated inputs and outputs for the Newton algorithm. Also ensures that :attr:`.JacobianAssembly.sizes` contains the sizes of all the coupling sizes needed for Newton. Args: couplings: The coupling variables. """ for disc in self.coupling_structure.disciplines: inputs_to_linearize = set(disc.io.input_grammar).intersection(couplings) outputs_to_linearize = set(disc.io.output_grammar).intersection(couplings) # If outputs and inputs to linearize not empty, then linearize if inputs_to_linearize and outputs_to_linearize: disc.add_differentiated_inputs(inputs_to_linearize) disc.add_differentiated_outputs(outputs_to_linearize) self.compute_sizes(couplings, couplings, couplings)
# Newton step computation
[docs] def compute_newton_step( self, in_data: Mapping[str, RealArray], couplings: Collection[str], linear_solver: str = "DEFAULT", matrix_type: JacobianType = JacobianType.MATRIX, residuals: RealOrComplexArray | None = None, resolved_residual_names: Collection[str] = (), **linear_solver_settings: Any, ) -> tuple[RealOrComplexArray, bool]: """Compute the Newton step for the coupled system of disciplines residuals. Args: in_data: The input data. couplings: The coupling variables. linear_solver: The name of the linear solver. matrix_type: The representation of the matrix ∂R/∂y (sparse or linear operator). residuals: The residuals vector, if ``None`` use :attr:`.residuals`. resolved_residual_names: The names of residual variables. **linear_solver_settings: The options passed to the linear solver factory. Returns: The Newton step - relax_factor . [∂R/∂y]^-1 . R as an array of steps for which the order is given by the `couplings` argument. Whether the linear solver converged. """ residual_names = resolved_residual_names or couplings self.compute_sizes(residual_names, couplings, couplings) # compute the partial derivatives of the residuals dres_dy = self.assemble_jacobian( residual_names, couplings, is_residual=True, jacobian_type=matrix_type, ) # form the residuals if residuals is None: residuals = self.residuals(in_data, couplings) # solve the linear system linear_problem = LinearProblem(dres_dy, -residuals) self.__linear_solver_factory.execute( linear_problem, algo_name=linear_solver, **linear_solver_settings ) return linear_problem.solution, linear_problem.is_converged
[docs] def residuals( self, in_data: StrKeyMapping, var_names: Iterable[str] ) -> RealOrComplexArray: """Form the matrix of residuals wrt coupling variables. Given disciplinary explicit calculations Yi(Y0_t,...Yn_t), fill the residual matrix:: [Y0(Y0_t,...Yn_t) - Y0_t] [ ] [Yn(Y0_t,...Yn_t) - Yn_t] Args: in_data: The values prescribed for the calculation of the residuals (Y0_t,...Yn_t). var_names: The names of variables associated with the residuals (R). Returns: The residuals array. """ residuals = [] # Build rows blocks for name in var_names: for discipline in self.coupling_structure.disciplines: if name in discipline.io.output_grammar: to_array = discipline.io.output_grammar.data_converter.convert_value_to_array # noqa: E501 local_data_array = to_array(name, discipline.io.data[name]) in_data_array = to_array(name, in_data[name]) residuals.append(local_data_array - in_data_array) return concatenate(residuals)
[docs] def plot_dependency_jacobian( self, functions: Collection[str], variables: Collection[str], save: bool = True, show: bool = False, filepath: str = "", markersize: float | None = None, ) -> str: """Plot the Jacobian matrix. Nonzero elements of the sparse matrix are represented by blue squares. Args: functions: The functions to plot. variables: The variables. show: Whether the plot is displayed. save: Whether the plot is saved in a PDF file. filepath: The file name to save to. If empty, ``coupled_jacobian.pdf`` is used, otherwise ``coupled_jacobian_ + filepath + .pdf``. markersize: The size of the markers. Returns: The file name. """ self.compute_sizes(functions, variables, []) total_jac = empty(0) # compute the positions of the outputs outputs_positions = {} current_position = 0 for fun in functions: dfun_dx = self.assemble_jacobian([fun], variables) outputs_positions[fun] = current_position current_position += self.sizes[fun] total_jac = vstack((total_jac, dfun_dx)) if len(total_jac) else dfun_dx # compute the positions of the inputs inputs_positions = {} current_position = 0 for variable in variables: inputs_positions[variable] = current_position current_position += self.sizes[variable] fig = plt.figure(figsize=(6.0, 10.0)) ax1 = fig.add_subplot(111) plt.spy(total_jac, markersize=markersize) ax1.set_aspect("auto") plt.yticks(list(outputs_positions.values()), list(outputs_positions.keys())) plt.xticks( list(inputs_positions.values()), list(inputs_positions.keys()), rotation=90 ) if save: if filepath: filename = f"coupled_jacobian_{filepath}.pdf" else: filename = "coupled_jacobian.pdf" else: filename = "" save_show_figure(fig, show, filename) return filename
[docs] class CoupledSystem: """Compute coupled (total) derivatives of a system of residuals. Use several methods: - direct or adjoint - factorized for multiple RHS """ n_linear_resolutions: int """The number of linear resolutions.""" n_direct_modes: int """The number of direct mode calls.""" n_adjoint_modes: int """The number of adjoint mode calls.""" lu_fact: int """The number of LU mode calls (adjoint or direct).""" __linear_solver_factory: LinearSolverLibraryFactory """The linear solver factory.""" linear_problem: LinearProblem | None """The considered linear problem.""" DEFAULT_LINEAR_SOLVER: ClassVar[str] = "DEFAULT" """The default linear solver.""" def __init__(self) -> None: # noqa:D107 self.n_linear_resolutions = 0 self.n_direct_modes = 0 self.n_adjoint_modes = 0 self.lu_fact = 0 self.__linear_solver_factory = LinearSolverLibraryFactory(use_cache=True) self.linear_problem = None
[docs] def direct_mode( self, functions: Iterable[str], n_variables: int, n_couplings: int, dres_dx: dok_matrix | LinearOperator, dres_dy: dok_matrix | LinearOperator, dfun_dx: Mapping[str, dok_matrix], dfun_dy: Mapping[str, dok_matrix], linear_solver: str = DEFAULT_LINEAR_SOLVER, use_lu_fact: bool = False, **linear_solver_settings: Any, ) -> dict[str, dok_matrix]: """Compute the total derivative Jacobian in direct mode. Args: functions: The functions to differentiate. n_variables: The number of variables. n_couplings: The number of couplings. dres_dx: The Jacobian of the residuals wrt the design variables. dres_dy: The Jacobian of the residuals wrt the coupling variables. dfun_dx: The Jacobian of the functions wrt the design variables. dfun_dy: The Jacobian of the functions wrt the coupling variables. linear_solver: The name of the linear solver. use_lu_fact: Whether to factorize dres_dy once. **linear_solver_settings: The optional parameters. Returns: The Jacobian of the total coupled derivatives. """ self.n_direct_modes += 1 if use_lu_fact: return self._direct_mode_lu( functions, n_variables, n_couplings, dres_dx, dres_dy, dfun_dx, dfun_dy ) return self._direct_mode( functions, n_variables, n_couplings, dres_dx, dres_dy, dfun_dx, dfun_dy, linear_solver, **linear_solver_settings, )
[docs] def adjoint_mode( self, functions: Iterable[str], dres_dx: dok_matrix | LinearOperator, dres_dy_t: dok_matrix | LinearOperator, dfun_dx: Mapping[str, dok_matrix], dfun_dy: Mapping[str, dok_matrix], linear_solver: str = DEFAULT_LINEAR_SOLVER, use_lu_fact: bool = False, **linear_solver_settings: Any, ) -> dict[str, RealArray]: """Compute the total derivative Jacobian in adjoint mode. Args: functions: The functions to differentiate. dres_dx: The Jacobian of the residuals wrt the design variables. dres_dy_t: The Jacobian of the residuals wrt the coupling variables. dfun_dx: The Jacobian of the functions wrt the design variables. dfun_dy: The Jacobian of the functions wrt the coupling variables. linear_solver: The name of the linear solver. use_lu_fact: Whether to factorize dres_dy_t once. **linear_solver_settings: The optional parameters. Returns: The Jacobian of total coupled derivatives. """ self.n_adjoint_modes += 1 if use_lu_fact: return self._adjoint_mode_lu( functions, dres_dx, dres_dy_t, dfun_dx, dfun_dy ) return self._adjoint_mode( functions, dres_dx, dres_dy_t, dfun_dx, dfun_dy, linear_solver, **linear_solver_settings, )
def _direct_mode( self, functions: Iterable[str], n_variables: int, n_couplings: int, dres_dx: dok_matrix | LinearOperator, dres_dy: dok_matrix | LinearOperator, dfun_dx: Mapping[str, dok_matrix], dfun_dy: Mapping[str, dok_matrix], linear_solver: str = DEFAULT_LINEAR_SOLVER, **linear_solver_settings: Any, ) -> dict[str, dok_matrix]: """Compute the total derivative Jacobian in direct mode. Args: functions: The functions to differentiate. n_variables: The number of variables. n_couplings: The number of couplings. dres_dx: The Jacobian of the residuals wrt the design variables. dres_dy: The Jacobian of the residuals wrt the coupling variables. dfun_dx: The Jacobian of the functions wrt the design variables. dfun_dy: The Jacobian of the functions wrt the coupling variables. linear_solver: The name of the linear solver. **linear_solver_settings: The optional parameters. Returns: The Jacobian of total coupled derivatives. """ # compute the total derivative dy/dx, independent of the # function to differentiate dy_dx = empty((n_couplings, n_variables)) self.linear_problem = LinearProblem(dres_dy) if linear_solver in {"DEFAULT", "LGMRES"}: # Reinit outerV, and store it for all RHS linear_solver_settings["outer_v"] = [] for var_index in range(n_variables): self.linear_problem.rhs = -dres_dx[:, var_index] self.__linear_solver_factory.execute( self.linear_problem, algo_name=linear_solver, **linear_solver_settings ) dy_dx[:, var_index] = self.linear_problem.solution self.n_linear_resolutions += 1 # assemble the total derivatives of the functions using dy_dx jac = {} for fun in functions: jac[fun] = dfun_dx[fun].toarray() + dfun_dy[fun].dot(dy_dx) return jac def _adjoint_mode( self, functions: Iterable[str], dres_dx: dok_matrix | LinearOperator, dres_dy_t: dok_matrix | LinearOperator, dfun_dx: Mapping[str, dok_matrix], dfun_dy: Mapping[str, dok_matrix], linear_solver: str = DEFAULT_LINEAR_SOLVER, **linear_solver_settings: Any, ) -> dict[str, RealArray]: """Compute the total derivative Jacobian in adjoint mode. Args: functions: The functions to differentiate. dres_dx: The Jacobian of the residuals wrt the design variables. dres_dy: The Jacobian of the residuals wrt the coupling variables. dfun_dx: The Jacobian of the functions wrt the design variables. dfun_dy: The Jacobian of the functions wrt the coupling variables. linear_solver: The name of the linear solver. dres_dy_t: The derivatives of the residuals wrt coupling vars. **linear_solver_settings: The optional parameters. Returns: The Jacobian of total coupled derivatives. """ jac = {} # adjoint vector for each interest function if linear_solver in {"DEFAULT", "LGMRES"}: # Reinit outerV, and store it for all RHS linear_solver_settings["outer_v"] = [] self.linear_problem = LinearProblem(dres_dy_t) for fun in functions: dfunction_dx = dfun_dx[fun] dfunction_dy = dfun_dy[fun] jac[fun] = empty(dfunction_dx.shape) # compute adjoint vector for each component of the function for fun_component in range(dfunction_dy.shape[0]): self.linear_problem.rhs = -dfunction_dy[fun_component, :].T self.__linear_solver_factory.execute( self.linear_problem, algo_name=linear_solver, **linear_solver_settings, ) adjoint = self.linear_problem.solution self.n_linear_resolutions += 1 jac[fun][fun_component, :] = ( dfunction_dx[fun_component, :] + (dres_dx.T.dot(adjoint)).T ) return jac def _direct_mode_lu( self, functions: Iterable[str], n_variables: int, n_couplings: int, dres_dx: dok_matrix | LinearOperator, dres_dy: dok_matrix | LinearOperator, dfun_dx: Mapping[str, dok_matrix], dfun_dy: Mapping[str, dok_matrix], tol: float = 1e-10, ) -> dict[str, dok_matrix]: """Compute the total derivative Jacobian in direct mode. Args: functions: The functions to differentiate. n_variables: The number of variables. n_couplings: The number of couplings. dres_dx: The Jacobian of the residuals wrt the design variables. dres_dy: The Jacobian of the residuals wrt the coupling variables. dfun_dx: The Jacobian of the functions wrt the design variables. dfun_dy: The Jacobian of the functions wrt the coupling variables. tol: Tolerance on the relative residuals norm to consider that the linear system is solved. If not, raises a warning. Returns: The Jacobian of total coupled derivatives. """ # compute the total derivative dy/dx, independent of the # function to differentiate dy_dx = empty((n_couplings, n_variables)) # compute LU decomposition lhs = csc_matrix(dres_dy) solve = factorized(lhs) self.lu_fact += 1 for var_index in range(n_variables): rhs = -dres_dx[:, var_index].todense() sol = solve(rhs) dy_dx[:, var_index] = sol.squeeze() self.n_linear_resolutions += 1 res = norm(lhs.dot(sol) - rhs) / norm(rhs) if res > tol: LOGGER.warning( "The linear system in _direct_mode_lu used to compute the coupled " "derivatives is not well resolved, " "residuals > tolerance : %s > %s ", res, tol, ) # assemble the total derivatives of the functions using dy_dx jac = {} for fun in functions: jac[fun] = dfun_dx[fun].toarray() + dfun_dy[fun].dot(dy_dx) return jac def _adjoint_mode_lu( self, functions: Iterable[str], dres_dx: dok_matrix | LinearOperator, dres_dy_t: dok_matrix | LinearOperator, dfun_dx: Mapping[str, dok_matrix], dfun_dy: Mapping[str, dok_matrix], tol: float = 1e-10, ) -> dict[str, RealArray]: """Compute the total derivative Jacobian in adjoint mode. Args: functions: The functions to differentiate. dres_dx: The Jacobian of the residuals wrt the design variables. dfun_dx: The Jacobian of the functions wrt the design variables. dfun_dy: The Jacobian of the functions wrt the coupling variables. dres_dy_t: The Jacobian of the residuals wrt the coupling variables. tol: Tolerance on the relative residuals norm to consider that the linear system is solved. If not, raises a warning. Returns: The Jacobian of total coupled derivatives. """ jac = {} # compute LU factorization solve = factorized(dres_dy_t) self.lu_fact += 1 # adjoint vector for each interest function for fun in functions: dfunction_dx = dfun_dx[fun] dfunction_dy = dfun_dy[fun] jac[fun] = empty(dfunction_dx.shape) # compute adjoint vector for each component of the function for fun_component in range(dfunction_dy.shape[0]): rhs = -dfunction_dy[fun_component, :].todense().T adjoint = solve(rhs) res = norm(dres_dy_t.dot(adjoint) - rhs) / norm(rhs) if res > tol: LOGGER.warning( "The linear system in _adjoint_mode_lu used to compute the " "coupled " "derivatives is not well resolved, " "residuals > tolerance : %s > %s ", res, tol, ) self.n_linear_resolutions += 1 jac[fun][fun_component, :] = ( dfunction_dx[fun_component, :] + (dres_dx.T.dot(adjoint)).T ) return jac