# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Francois Gallard, Charlie Vanaret
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""Coupled derivatives calculations."""
from __future__ import annotations
import itertools
import logging
from collections import defaultdict
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from typing import NamedTuple
import matplotlib.pyplot as plt
from numpy import concatenate
from numpy import empty
from numpy import fill_diagonal
from numpy import ndarray
from numpy import zeros
from numpy.linalg import norm
from scipy.sparse import bmat
from scipy.sparse import csc_matrix
from scipy.sparse import csr_matrix
from scipy.sparse import eye
from scipy.sparse import vstack
from scipy.sparse.linalg import LinearOperator
from scipy.sparse.linalg import factorized
from strenum import StrEnum
from gemseo.algos.linear_solvers.factory import LinearSolverLibraryFactory
from gemseo.algos.linear_solvers.linear_problem import LinearProblem
from gemseo.core.derivatives import derivation_modes
from gemseo.core.derivatives.jacobian_operator import JacobianOperator
from gemseo.core.derivatives.mda_derivatives import traverse_add_diff_io_mda
from gemseo.utils.compatibility.scipy import sparse_classes
from gemseo.utils.constants import READ_ONLY_EMPTY_DICT
from gemseo.utils.matplotlib_figure import save_show_figure
if TYPE_CHECKING:
from collections.abc import Callable
from collections.abc import Collection
from collections.abc import Iterable
from collections.abc import Iterator
from collections.abc import Mapping
from typing import TypeAlias
from scipy.sparse import dok_matrix
from gemseo.core.coupling_structure import CouplingStructure
from gemseo.core.discipline import Discipline
from gemseo.typing import RealArray
from gemseo.typing import RealOrComplexArray
from gemseo.typing import RealOrComplexArrayT
from gemseo.typing import StrKeyMapping
LOGGER = logging.getLogger(__name__)
[docs]
def none_factory() -> None:
"""Returns None...
To be used for defaultdict
"""
[docs]
def default_dict_factory() -> dict[Any, None]:
"""Instantiates a defaultdict(None) object."""
return defaultdict(none_factory)
# TODO: API: extract to a specific module
[docs]
class AssembledJacobianOperator(LinearOperator): # type: ignore[misc] # because missing types
"""Representation of the assembled Jacobian as a SciPy ``LinearOperator``."""
__functions: Iterable[str]
"""The names of functions to differentiate."""
__variables: Iterable[str]
"""The names of variables with respect to which differentiate."""
__is_residual: bool
"""Whether the functions are residuals."""
__get_jacobian_generator: Callable[
[Iterable[str], Iterable[str], bool],
Iterator[tuple[RealOrComplexArray, JacobianAssembly.JacobianPosition]],
]
"""The method to iterate over the relevant Jacobians, given the provided variables
and functions."""
def __init__(
self,
functions: Iterable[str],
variables: Iterable[str],
n_functions: int,
n_variables: int,
get_jacobian_generator: Callable[
[Iterable[str], Iterable[str], bool],
Iterator[tuple[RealOrComplexArray, JacobianAssembly.JacobianPosition]],
],
is_residual: bool = False,
) -> None:
"""
Args:
functions: The functions to differentiate.
variables: The differentiation variables.
n_functions: The number of functions components.
n_variables: The number of variables components.
get_jacobian_generator: The method to iterate over the Jacobians associated
with the provided functions and variables.
is_residual: Whether the functions are residuals.
""" # noqa: D205, D212, D415
super().__init__(shape=(n_functions, n_variables), dtype=float)
self.__functions = functions
self.__variables = variables
self.__is_residual = is_residual
self.__get_jacobian_generator = get_jacobian_generator
def _matvec(self, x: RealOrComplexArrayT) -> RealOrComplexArrayT:
"""The matrix-vector product involving the Jacobian ∂f/∂v.
Args:
x: The vector to apply ∂f/∂v to.
Returns:
The resulting vector ∂f/∂v x.
"""
# Initialize the result with appropriate dimension
result = zeros(self.shape[0], dtype=x.dtype)
jacobian_generator = self.__get_jacobian_generator(
self.__functions, self.__variables, self.__is_residual
)
for jacobian, position in jacobian_generator:
result[position.row_slice] += jacobian.dot(x[position.column_slice])
return result
def _rmatvec(self, x: RealOrComplexArrayT) -> RealOrComplexArrayT:
"""The matrix-vector product involving the transposed Jacobian ∂f/∂v.
Args:
x: The vector to apply the transpose of ∂f/∂v to.
Returns:
The resulting vector (∂f/∂v)^T x.
"""
# Initialize the result with appropriate dimension
result = zeros(self.shape[1], dtype=x.dtype)
jacobian_generator = self.__get_jacobian_generator(
self.__functions, self.__variables, self.__is_residual
)
for jacobian, position in jacobian_generator:
result[position.column_slice] += jacobian.T.dot(x[position.row_slice])
return result
[docs]
class JacobianAssembly:
"""Assembly of Jacobians.
Typically, assemble discipline's Jacobians into a system Jacobian.
"""
coupling_structure: CouplingStructure
"""The considered coupling structure."""
sizes: dict[str, int]
"""The number of elements of a given str."""
disciplines: dict[str, Discipline]
"""The disciplines, stored using their name."""
__last_diff_inouts: tuple[set[str], set[str]]
"""The last diff in-outs stored."""
__minimal_couplings: set[str]
"""The minimal couplings."""
coupled_system: CoupledSystem
"""The coupled derivative system of residuals."""
__linear_solver_factory: LinearSolverLibraryFactory
"""The linear solver factory."""
DerivationMode: TypeAlias = derivation_modes.DerivationMode
[docs]
class JacobianType(StrEnum):
"""The available types for the Jacobian matrix."""
LINEAR_OPERATOR = "linear_operator"
"""Jacobian as a SciPy ``LinearOperator`` implementing the appropriate method to
perform matrix-vector products."""
MATRIX = "matrix"
"""Jacobian matrix in Compressed Sparse Row (CSR) format."""
[docs]
class JacobianPosition(NamedTuple):
"""The position of the discipline's Jacobians within the assembled Jacobian."""
row_slice: slice
"""The row slice indicating where to position the disciplinary Jacobian within
the assembled Jacobian when defined as an array."""
column_slice: slice
"""The column slice indicating where to position the disciplinary Jacobian
within the assembled Jacobian when defined as an array."""
row_index: int
"""The row index of the disciplinary Jacobian within the assembled Jacobian when
defined blockwise."""
column_index: int
"""The column index of the disciplinary Jacobian within the assembled Jacobian
when defined blockwise."""
def __init__(self, coupling_structure: CouplingStructure) -> None:
"""
Args:
coupling_structure: The CouplingStructure associated disciplines that
form the coupled system.
""" # noqa: D205, D212, D415
self.coupling_structure = coupling_structure
self.sizes = {}
self.disciplines = {}
self.__last_diff_inouts = (set(), set())
self.__minimal_couplings = set()
self.coupled_system = CoupledSystem()
self.__linear_solver_factory = LinearSolverLibraryFactory(use_cache=True)
def _check_inputs(
self,
functions: Iterable[str],
variables: Iterable[str],
couplings: Iterable[str],
matrix_type: JacobianType,
use_lu_fact: bool,
) -> None:
"""Check the inputs before differentiation.
Args:
functions: The functions to differentiate.
variables: The differentiation variables.
couplings: The coupling variables.
matrix_type: The type of matrix for linearization.
use_lu_fact: Whether to use the LU factorization once for all second
members.
Raises:
ValueError: When the inputs are inconsistent.
"""
unknown_dvars = set(variables)
unknown_outs = set(functions)
for discipline in self.coupling_structure.disciplines:
inputs = set(discipline.io.input_grammar)
outputs = set(discipline.io.output_grammar)
unknown_outs -= outputs
unknown_dvars -= inputs
if unknown_dvars:
possible_inputs = [
list(disc.io.input_grammar)
for disc in self.coupling_structure.disciplines
]
msg = (
"Some of the specified variables are not "
"inputs of the disciplines: "
f"{unknown_dvars}"
" possible inputs are: "
f"{possible_inputs}"
)
raise ValueError(msg)
if unknown_outs:
raise ValueError(
"Some outputs are not computed by the disciplines:"
+ str(unknown_outs)
+ " available outputs are: "
+ str([
list(disc.io.output_grammar)
for disc in self.coupling_structure.disciplines
])
)
for coupling in set(couplings) & set(variables):
raise ValueError(
"Variable "
+ str(coupling)
+ " is both a coupling and a design variable"
)
matrix_type = self.JacobianType(matrix_type)
if use_lu_fact and matrix_type == self.JacobianType.LINEAR_OPERATOR:
msg = (
"Unsupported LU factorization for "
"LinearOperators! Please use Sparse matrices"
" instead"
)
raise ValueError(msg)
[docs]
def compute_sizes(
self,
functions: Iterable[str],
variables: Iterable[str],
couplings: Iterable[str],
residual_variables: Mapping[str, str] = READ_ONLY_EMPTY_DICT,
) -> None:
"""Compute the number of scalar functions, variables and couplings.
Args:
functions: The functions to differentiate.
variables: The differentiation variables.
couplings: The coupling variables.
residual_variables: The mapping of residuals of disciplines to their
respective state variables.
Raises:
ValueError: When the size of some variables could not be determined.
"""
# search for functions/variables/couplings in the Jacobians of the disciplines
if residual_variables:
outputs = itertools.chain(
functions,
couplings,
residual_variables.keys(),
residual_variables.values(),
)
else:
outputs = itertools.chain(functions, couplings)
# functions and coupling and states
for output in outputs:
discipline = self.coupling_structure.find_discipline(output)
self.disciplines[output] = discipline
# get an arbitrary Jacobian and compute the number of rows
self.sizes[output] = (
discipline.io.output_grammar.data_converter.get_value_size(
output, discipline.io.data[output]
)
)
# variables
for variable in variables:
for discipline in self.coupling_structure.disciplines:
if variable not in self.sizes:
for jacobian in discipline.jac.values():
jacobian_wrt_variable = jacobian.get(variable, None)
if jacobian_wrt_variable is not None:
self.sizes[variable] = jacobian_wrt_variable.shape[1]
self.disciplines[variable] = discipline
break
if variable not in self.sizes:
msg = f"Failed to determine the size of input variable {variable}"
raise ValueError(msg)
@classmethod
def _get_derivation_mode(
cls,
mode: DerivationMode,
n_variables: int,
n_functions: int,
) -> DerivationMode:
"""Get the differentiation mode.
Args:
mode: The differentiation mode.
n_variables: The number of variables.
n_functions: The number of functions.
Returns:
The differentiation mode.
"""
if mode != cls.DerivationMode.AUTO:
return mode
if n_variables <= n_functions:
return cls.DerivationMode.DIRECT
return cls.DerivationMode.ADJOINT
[docs]
def compute_dimension(self, names: Iterable[str]) -> int:
"""Compute the total number of functions/variables/couplings of the full system.
Args:
names: The names of the inputs or the outputs.
Returns:
The dimension if the system.
"""
return sum(self.sizes[name] for name in names)
def _get_jacobian_generator(
self,
functions: Iterable[str],
variables: Iterable[str],
is_residual: bool = False,
) -> Iterator[
tuple[RealOrComplexArray | csr_matrix | JacobianOperator, JacobianPosition]
]:
"""Iterate over Jacobian matrices.
Provide a generator to iterate over the Jacobians associated with each provided
pair (function, variable). The generator yields the Jacobian along with its
relative position in the to be assembled Jacobian.
Args:
functions: The functions to differentiate.
variables: The differentiation variables.
is_residual: Whether the functions are residuals.
Yields:
A tuple of the form (Jacobian, Position).
"""
row = 0
# Iterate over outputs
for row_index, function in enumerate(functions):
column = 0
discipline = self.disciplines[function]
if not discipline.jac:
continue
function_jacobian = discipline.jac[function]
# Iterate over inputs
for column_index, variable in enumerate(variables):
jacobian = function_jacobian.get(variable, None)
variable_size = self.sizes[variable]
# If residual of the form Yi-Yi, then add -I to the Jacobian
if is_residual and function == variable:
if jacobian is not None:
# Make a copy to avoid in-place modifications
jacobian_copy = jacobian.copy()
if isinstance(jacobian_copy, ndarray):
fill_diagonal(jacobian_copy, jacobian.diagonal() - 1)
elif isinstance(jacobian_copy, sparse_classes):
jacobian_copy.setdiag(jacobian.diagonal() - 1)
elif isinstance(jacobian_copy, JacobianOperator):
jacobian_copy = jacobian_copy.shift_identity()
jacobian = jacobian_copy
else:
jacobian = -eye(variable_size, dtype=int)
# Yield only if Jacobian exists
if jacobian is not None:
yield (
jacobian.real,
self.JacobianPosition(
row_slice=slice(row, row + jacobian.shape[0]),
column_slice=slice(column, column + jacobian.shape[1]),
row_index=row_index,
column_index=column_index,
),
)
column += variable_size
row += self.sizes[function]
def _assemble_jacobian_as_matrix(
self,
functions: Collection[str],
variables: Collection[str],
is_residual: bool = False,
) -> csr_matrix:
"""Form the Jacobian as a sparse matrix in Compressed Sparse Row (CSR) format.
The CSR format is well-adapted to perform matrix-vector and matrix-matrix
multiplications efficiently.
Args:
functions: The functions to differentiate.
variables: The differentiation variables.
is_residual: Whether the functions are residuals.
Returns:
The Jacobian as a sparse matrix in CSR format.
"""
# Fill in with zero blocks of appropriate dimension if necessary
variable_sizes = [self.sizes[variable_name] for variable_name in variables]
function_sizes = [self.sizes[function_name] for function_name in functions]
# Initialize the block Jacobian with the appropriate structure
total_jacobian: list[list[csr_matrix | None]] = [
[None for _ in variables] for _ in functions
]
variable_sizes_0 = variable_sizes[0]
for i, function_size in enumerate(function_sizes):
total_jacobian[i][0] = csr_matrix((function_size, variable_sizes_0))
function_sizes_0 = function_sizes[0]
total_jacobian_0 = total_jacobian[0]
for j, variable_size in enumerate(variable_sizes):
total_jacobian_0[j] = csr_matrix((function_sizes_0, variable_size))
# Perform the assembly
jacobian_generator = self._get_jacobian_generator(
functions, variables, is_residual=is_residual
)
for jacobian, position in jacobian_generator:
if isinstance(jacobian, JacobianOperator):
jacobian = jacobian.get_matrix_representation()
total_jacobian[position.row_index][position.column_index] = csr_matrix(
jacobian.real
)
return bmat(total_jacobian, format="csr")
[docs]
def assemble_jacobian(
self,
functions: Collection[str],
variables: Collection[str],
is_residual: bool = False,
jacobian_type: JacobianType = JacobianType.MATRIX,
) -> csr_matrix | AssembledJacobianOperator:
"""Form the Jacobian as a SciPy ``LinearOperator``.
Args:
functions: The functions to differentiate.
variables: The differentiation variables.
is_residual: Whether the functions are residuals.
jacobian_type: The type of representation for the Jacobian ∂f/∂v.
Returns:
The Jacobian ∂f/∂v in the specified type.
"""
if jacobian_type == self.JacobianType.MATRIX:
return self._assemble_jacobian_as_matrix(
functions,
variables,
is_residual,
)
if jacobian_type == self.JacobianType.LINEAR_OPERATOR:
return AssembledJacobianOperator(
functions,
variables,
self.compute_dimension(functions),
self.compute_dimension(variables),
self._get_jacobian_generator,
is_residual,
)
msg = f"Bad jacobian_type: {jacobian_type}"
raise ValueError(msg)
def _compute_diff_ios_and_couplings(
self,
variables: Iterable[str],
functions: Iterable[str],
states: Iterable[str],
coupling_structure: CouplingStructure,
) -> set[str]:
"""Compute the minimal differentiated inputs, outputs and couplings.
This is done form the
disciplines that are required to differentiate the functions with respect to the
variables.
This uses a graph algorithm that computes the dependency
process graph and address the "jacobian accumulation" problem with a heuristic
but conservative approach.
Args:
variables: The differentiation variables.
functions: The functions to differentiate.
states: The state variables.
coupling_structure: The coupling structure containing all the disciplines.
Returns: The minimal coupling variables set requires to differentiate the
functions with respect to the variables.
"""
diff_ios = (set(variables), set(functions))
if self.__last_diff_inouts != diff_ios:
diff_ios_merged = traverse_add_diff_io_mda(
coupling_structure, variables, functions
)
self.__last_diff_inouts = diff_ios
couplings = [
coupl
for coupls in diff_ios_merged.values()
for coupl in list(coupls[0]) + list(coupls[1])
]
minimal_couplings = set(couplings).intersection(
coupling_structure.all_couplings
)
# The state variables are not coupling variables, although they are inputs
# and outputs of the disciplines with residuals.
self.__minimal_couplings = minimal_couplings.difference(states)
return self.__minimal_couplings
[docs]
def total_derivatives(
self,
in_data: StrKeyMapping,
functions: Collection[str],
variables: Collection[str],
couplings: Iterable[str],
linear_solver: str = "DEFAULT",
mode: DerivationMode = DerivationMode.AUTO,
matrix_type: JacobianType = JacobianType.MATRIX,
use_lu_fact: bool = False,
exec_cache_tol: float | None = None,
execute: bool = True,
residual_variables: Mapping[str, str] = READ_ONLY_EMPTY_DICT,
**linear_solver_settings: Any,
) -> dict[str, dict[str, RealOrComplexArray]] | dict[Any, dict[Any, None]]:
"""Compute the Jacobian of total derivatives of the coupled system.
Args:
in_data: The input data dict.
functions: The functions to differentiate.
variables: The differentiation variables.
couplings: The coupling variables.
linear_solver: The name of the linear solver.
mode: The linearization mode (auto, direct or adjoint).
matrix_type: The representation of the matrix ∂R/∂y (sparse or
linear operator).
use_lu_fact: Whether to factorize dres_dy once,
unsupported for linear operator mode.
exec_cache_tol: The discipline cache tolerance to
when calling the linearize method.
If ``None``, no tolerance is set (equivalent to tol=0.0).
execute: Whether to start by executing the discipline
with the input data for which to compute the Jacobian;
this allows to ensure that the discipline was executed
with the right input data;
it can be almost free if the corresponding output data
have been stored in the :attr:`.Discipline.cache`.
residual_variables: a mapping of residuals of disciplines to
their respective state variables.
**linear_solver_settings: The options passed to the linear solver factory.
Returns:
The total coupled derivatives.
Raises:
ValueError: When the linearization_mode is incorrect.
"""
if not functions:
return defaultdict(default_dict_factory)
self._check_inputs(functions, variables, couplings, matrix_type, use_lu_fact)
# Retrieve states variables and local residuals if provided
states = list(residual_variables.values()) if residual_variables else []
couplings_minimal = self._compute_diff_ios_and_couplings(
variables,
functions,
states,
self.coupling_structure,
)
# Exclude the non-numeric couplings from the coupling minimal list
for discipline in self.coupling_structure.disciplines:
inputs_couplings = couplings_minimal.intersection(
discipline.io.input_grammar
)
couplings_minimal.difference_update(
itertools.filterfalse(
discipline.io.input_grammar.data_converter.is_numeric,
inputs_couplings,
)
)
sorted_couplings_minimal = sorted(couplings_minimal)
couplings_and_res = sorted_couplings_minimal.copy()
couplings_and_states = sorted_couplings_minimal.copy()
# linearize all the disciplines
if residual_variables:
couplings_and_res += residual_variables.keys()
couplings_and_states += states
for disc in self.coupling_structure.disciplines:
if disc.cache is not None and exec_cache_tol is not None:
disc.cache.tolerance = exec_cache_tol
disc.linearize(in_data, execute=execute)
# compute the sizes from the Jacobians
self.compute_sizes(
functions, variables, sorted_couplings_minimal, residual_variables
)
n_variables = self.compute_dimension(variables)
n_functions = self.compute_dimension(functions)
n_residuals = self.compute_dimension(sorted_couplings_minimal)
if residual_variables:
n_residuals += self.compute_dimension(residual_variables.keys())
# compute the partial derivatives of the residuals
dres_dx = self.assemble_jacobian(couplings_and_res, variables, is_residual=True)
# compute the partial derivatives of the interest functions
(dfun_dx, dfun_dy) = ({}, {})
for fun in functions:
dfun_dx[fun] = self.assemble_jacobian([fun], variables)
dfun_dy[fun] = self.assemble_jacobian([fun], couplings_and_res)
mode = self._get_derivation_mode(mode, n_variables, n_functions)
# compute the total derivatives
if mode == self.DerivationMode.DIRECT:
# sparse square matrix ∂R/∂y
dres_dy = self.assemble_jacobian(
couplings_and_res,
couplings_and_states,
is_residual=True,
jacobian_type=matrix_type,
)
# compute the coupled derivatives
total_derivatives = self.coupled_system.direct_mode(
functions,
n_variables,
n_residuals,
dres_dx,
dres_dy,
dfun_dx,
dfun_dy,
linear_solver,
use_lu_fact=use_lu_fact,
**linear_solver_settings,
)
elif mode == self.DerivationMode.ADJOINT:
# transposed square matrix ∂R/∂y^T
dres_dy_t = self.assemble_jacobian(
couplings_and_res,
couplings_and_states,
is_residual=True,
jacobian_type=matrix_type,
)
# compute the coupled derivatives
total_derivatives = self.coupled_system.adjoint_mode(
functions,
dres_dx,
dres_dy_t.T,
dfun_dx,
dfun_dy,
linear_solver,
use_lu_fact=use_lu_fact,
**linear_solver_settings,
)
else:
raise ValueError("Incorrect linearization mode " + str(mode))
return self.split_jac(total_derivatives, variables)
[docs]
def split_jac(
self,
coupled_system: Mapping[str, RealOrComplexArray | dok_matrix],
variables: Iterable[str],
) -> dict[str, dict[str, RealOrComplexArray | dok_matrix]]:
"""Split a Jacobian dict into a dict of dict.
Args:
coupled_system: The derivatives to split.
variables: The variables wrt which the differentiation is performed.
Returns:
The Jacobian.
"""
j_split = {}
for function, function_jac in coupled_system.items():
i_out = 0
sub_jac = {}
for variable in variables:
size = self.sizes[variable]
sub_jac[variable] = function_jac[:, i_out : i_out + size]
i_out += size
j_split[function] = sub_jac
return j_split
[docs]
def set_newton_differentiated_ios(
self,
couplings: Collection[str],
) -> None:
"""Set the differentiated inputs and outputs for the Newton algorithm.
Also ensures that :attr:`.JacobianAssembly.sizes` contains the sizes of all
the coupling sizes needed for Newton.
Args:
couplings: The coupling variables.
"""
for disc in self.coupling_structure.disciplines:
inputs_to_linearize = set(disc.io.input_grammar).intersection(couplings)
outputs_to_linearize = set(disc.io.output_grammar).intersection(couplings)
# If outputs and inputs to linearize not empty, then linearize
if inputs_to_linearize and outputs_to_linearize:
disc.add_differentiated_inputs(inputs_to_linearize)
disc.add_differentiated_outputs(outputs_to_linearize)
self.compute_sizes(couplings, couplings, couplings)
# Newton step computation
[docs]
def compute_newton_step(
self,
in_data: Mapping[str, RealArray],
couplings: Collection[str],
linear_solver: str = "DEFAULT",
matrix_type: JacobianType = JacobianType.MATRIX,
residuals: RealOrComplexArray | None = None,
resolved_residual_names: Collection[str] = (),
**linear_solver_settings: Any,
) -> tuple[RealOrComplexArray, bool]:
"""Compute the Newton step for the coupled system of disciplines residuals.
Args:
in_data: The input data.
couplings: The coupling variables.
linear_solver: The name of the linear solver.
matrix_type: The representation of the matrix ∂R/∂y (sparse or
linear operator).
residuals: The residuals vector, if ``None`` use :attr:`.residuals`.
resolved_residual_names: The names of residual variables.
**linear_solver_settings: The options passed to the linear solver factory.
Returns:
The Newton step - relax_factor . [∂R/∂y]^-1 . R as an array of steps
for which the order is given by the `couplings` argument.
Whether the linear solver converged.
"""
residual_names = resolved_residual_names or couplings
self.compute_sizes(residual_names, couplings, couplings)
# compute the partial derivatives of the residuals
dres_dy = self.assemble_jacobian(
residual_names,
couplings,
is_residual=True,
jacobian_type=matrix_type,
)
# form the residuals
if residuals is None:
residuals = self.residuals(in_data, couplings)
# solve the linear system
linear_problem = LinearProblem(dres_dy, -residuals)
self.__linear_solver_factory.execute(
linear_problem, algo_name=linear_solver, **linear_solver_settings
)
return linear_problem.solution, linear_problem.is_converged
[docs]
def residuals(
self, in_data: StrKeyMapping, var_names: Iterable[str]
) -> RealOrComplexArray:
"""Form the matrix of residuals wrt coupling variables.
Given disciplinary explicit calculations Yi(Y0_t,...Yn_t),
fill the residual matrix::
[Y0(Y0_t,...Yn_t) - Y0_t]
[ ]
[Yn(Y0_t,...Yn_t) - Yn_t]
Args:
in_data: The values prescribed for the calculation
of the residuals (Y0_t,...Yn_t).
var_names: The names of variables associated with the residuals (R).
Returns:
The residuals array.
"""
residuals = []
# Build rows blocks
for name in var_names:
for discipline in self.coupling_structure.disciplines:
if name in discipline.io.output_grammar:
to_array = discipline.io.output_grammar.data_converter.convert_value_to_array # noqa: E501
local_data_array = to_array(name, discipline.io.data[name])
in_data_array = to_array(name, in_data[name])
residuals.append(local_data_array - in_data_array)
return concatenate(residuals)
[docs]
def plot_dependency_jacobian(
self,
functions: Collection[str],
variables: Collection[str],
save: bool = True,
show: bool = False,
filepath: str = "",
markersize: float | None = None,
) -> str:
"""Plot the Jacobian matrix.
Nonzero elements of the sparse matrix are represented by blue squares.
Args:
functions: The functions to plot.
variables: The variables.
show: Whether the plot is displayed.
save: Whether the plot is saved in a PDF file.
filepath: The file name to save to.
If empty, ``coupled_jacobian.pdf`` is used, otherwise
``coupled_jacobian_ + filepath + .pdf``.
markersize: The size of the markers.
Returns:
The file name.
"""
self.compute_sizes(functions, variables, [])
total_jac = empty(0)
# compute the positions of the outputs
outputs_positions = {}
current_position = 0
for fun in functions:
dfun_dx = self.assemble_jacobian([fun], variables)
outputs_positions[fun] = current_position
current_position += self.sizes[fun]
total_jac = vstack((total_jac, dfun_dx)) if len(total_jac) else dfun_dx
# compute the positions of the inputs
inputs_positions = {}
current_position = 0
for variable in variables:
inputs_positions[variable] = current_position
current_position += self.sizes[variable]
fig = plt.figure(figsize=(6.0, 10.0))
ax1 = fig.add_subplot(111)
plt.spy(total_jac, markersize=markersize)
ax1.set_aspect("auto")
plt.yticks(list(outputs_positions.values()), list(outputs_positions.keys()))
plt.xticks(
list(inputs_positions.values()), list(inputs_positions.keys()), rotation=90
)
if save:
if filepath:
filename = f"coupled_jacobian_{filepath}.pdf"
else:
filename = "coupled_jacobian.pdf"
else:
filename = ""
save_show_figure(fig, show, filename)
return filename
[docs]
class CoupledSystem:
"""Compute coupled (total) derivatives of a system of residuals.
Use several methods:
- direct or adjoint
- factorized for multiple RHS
"""
n_linear_resolutions: int
"""The number of linear resolutions."""
n_direct_modes: int
"""The number of direct mode calls."""
n_adjoint_modes: int
"""The number of adjoint mode calls."""
lu_fact: int
"""The number of LU mode calls (adjoint or direct)."""
__linear_solver_factory: LinearSolverLibraryFactory
"""The linear solver factory."""
linear_problem: LinearProblem | None
"""The considered linear problem."""
DEFAULT_LINEAR_SOLVER: ClassVar[str] = "DEFAULT"
"""The default linear solver."""
def __init__(self) -> None: # noqa:D107
self.n_linear_resolutions = 0
self.n_direct_modes = 0
self.n_adjoint_modes = 0
self.lu_fact = 0
self.__linear_solver_factory = LinearSolverLibraryFactory(use_cache=True)
self.linear_problem = None
[docs]
def direct_mode(
self,
functions: Iterable[str],
n_variables: int,
n_couplings: int,
dres_dx: dok_matrix | LinearOperator,
dres_dy: dok_matrix | LinearOperator,
dfun_dx: Mapping[str, dok_matrix],
dfun_dy: Mapping[str, dok_matrix],
linear_solver: str = DEFAULT_LINEAR_SOLVER,
use_lu_fact: bool = False,
**linear_solver_settings: Any,
) -> dict[str, dok_matrix]:
"""Compute the total derivative Jacobian in direct mode.
Args:
functions: The functions to differentiate.
n_variables: The number of variables.
n_couplings: The number of couplings.
dres_dx: The Jacobian of the residuals wrt the design variables.
dres_dy: The Jacobian of the residuals wrt the coupling variables.
dfun_dx: The Jacobian of the functions wrt the design variables.
dfun_dy: The Jacobian of the functions wrt the coupling variables.
linear_solver: The name of the linear solver.
use_lu_fact: Whether to factorize dres_dy once.
**linear_solver_settings: The optional parameters.
Returns:
The Jacobian of the total coupled derivatives.
"""
self.n_direct_modes += 1
if use_lu_fact:
return self._direct_mode_lu(
functions, n_variables, n_couplings, dres_dx, dres_dy, dfun_dx, dfun_dy
)
return self._direct_mode(
functions,
n_variables,
n_couplings,
dres_dx,
dres_dy,
dfun_dx,
dfun_dy,
linear_solver,
**linear_solver_settings,
)
[docs]
def adjoint_mode(
self,
functions: Iterable[str],
dres_dx: dok_matrix | LinearOperator,
dres_dy_t: dok_matrix | LinearOperator,
dfun_dx: Mapping[str, dok_matrix],
dfun_dy: Mapping[str, dok_matrix],
linear_solver: str = DEFAULT_LINEAR_SOLVER,
use_lu_fact: bool = False,
**linear_solver_settings: Any,
) -> dict[str, RealArray]:
"""Compute the total derivative Jacobian in adjoint mode.
Args:
functions: The functions to differentiate.
dres_dx: The Jacobian of the residuals wrt the design variables.
dres_dy_t: The Jacobian of the residuals wrt the coupling variables.
dfun_dx: The Jacobian of the functions wrt the design variables.
dfun_dy: The Jacobian of the functions wrt the coupling variables.
linear_solver: The name of the linear solver.
use_lu_fact: Whether to factorize dres_dy_t once.
**linear_solver_settings: The optional parameters.
Returns:
The Jacobian of total coupled derivatives.
"""
self.n_adjoint_modes += 1
if use_lu_fact:
return self._adjoint_mode_lu(
functions, dres_dx, dres_dy_t, dfun_dx, dfun_dy
)
return self._adjoint_mode(
functions,
dres_dx,
dres_dy_t,
dfun_dx,
dfun_dy,
linear_solver,
**linear_solver_settings,
)
def _direct_mode(
self,
functions: Iterable[str],
n_variables: int,
n_couplings: int,
dres_dx: dok_matrix | LinearOperator,
dres_dy: dok_matrix | LinearOperator,
dfun_dx: Mapping[str, dok_matrix],
dfun_dy: Mapping[str, dok_matrix],
linear_solver: str = DEFAULT_LINEAR_SOLVER,
**linear_solver_settings: Any,
) -> dict[str, dok_matrix]:
"""Compute the total derivative Jacobian in direct mode.
Args:
functions: The functions to differentiate.
n_variables: The number of variables.
n_couplings: The number of couplings.
dres_dx: The Jacobian of the residuals wrt the design variables.
dres_dy: The Jacobian of the residuals wrt the coupling variables.
dfun_dx: The Jacobian of the functions wrt the design variables.
dfun_dy: The Jacobian of the functions wrt the coupling variables.
linear_solver: The name of the linear solver.
**linear_solver_settings: The optional parameters.
Returns:
The Jacobian of total coupled derivatives.
"""
# compute the total derivative dy/dx, independent of the
# function to differentiate
dy_dx = empty((n_couplings, n_variables))
self.linear_problem = LinearProblem(dres_dy)
if linear_solver in {"DEFAULT", "LGMRES"}:
# Reinit outerV, and store it for all RHS
linear_solver_settings["outer_v"] = []
for var_index in range(n_variables):
self.linear_problem.rhs = -dres_dx[:, var_index]
self.__linear_solver_factory.execute(
self.linear_problem, algo_name=linear_solver, **linear_solver_settings
)
dy_dx[:, var_index] = self.linear_problem.solution
self.n_linear_resolutions += 1
# assemble the total derivatives of the functions using dy_dx
jac = {}
for fun in functions:
jac[fun] = dfun_dx[fun].toarray() + dfun_dy[fun].dot(dy_dx)
return jac
def _adjoint_mode(
self,
functions: Iterable[str],
dres_dx: dok_matrix | LinearOperator,
dres_dy_t: dok_matrix | LinearOperator,
dfun_dx: Mapping[str, dok_matrix],
dfun_dy: Mapping[str, dok_matrix],
linear_solver: str = DEFAULT_LINEAR_SOLVER,
**linear_solver_settings: Any,
) -> dict[str, RealArray]:
"""Compute the total derivative Jacobian in adjoint mode.
Args:
functions: The functions to differentiate.
dres_dx: The Jacobian of the residuals wrt the design variables.
dres_dy: The Jacobian of the residuals wrt the coupling variables.
dfun_dx: The Jacobian of the functions wrt the design variables.
dfun_dy: The Jacobian of the functions wrt the coupling variables.
linear_solver: The name of the linear solver.
dres_dy_t: The derivatives of the residuals wrt coupling vars.
**linear_solver_settings: The optional parameters.
Returns:
The Jacobian of total coupled derivatives.
"""
jac = {}
# adjoint vector for each interest function
if linear_solver in {"DEFAULT", "LGMRES"}:
# Reinit outerV, and store it for all RHS
linear_solver_settings["outer_v"] = []
self.linear_problem = LinearProblem(dres_dy_t)
for fun in functions:
dfunction_dx = dfun_dx[fun]
dfunction_dy = dfun_dy[fun]
jac[fun] = empty(dfunction_dx.shape)
# compute adjoint vector for each component of the function
for fun_component in range(dfunction_dy.shape[0]):
self.linear_problem.rhs = -dfunction_dy[fun_component, :].T
self.__linear_solver_factory.execute(
self.linear_problem,
algo_name=linear_solver,
**linear_solver_settings,
)
adjoint = self.linear_problem.solution
self.n_linear_resolutions += 1
jac[fun][fun_component, :] = (
dfunction_dx[fun_component, :] + (dres_dx.T.dot(adjoint)).T
)
return jac
def _direct_mode_lu(
self,
functions: Iterable[str],
n_variables: int,
n_couplings: int,
dres_dx: dok_matrix | LinearOperator,
dres_dy: dok_matrix | LinearOperator,
dfun_dx: Mapping[str, dok_matrix],
dfun_dy: Mapping[str, dok_matrix],
tol: float = 1e-10,
) -> dict[str, dok_matrix]:
"""Compute the total derivative Jacobian in direct mode.
Args:
functions: The functions to differentiate.
n_variables: The number of variables.
n_couplings: The number of couplings.
dres_dx: The Jacobian of the residuals wrt the design variables.
dres_dy: The Jacobian of the residuals wrt the coupling variables.
dfun_dx: The Jacobian of the functions wrt the design variables.
dfun_dy: The Jacobian of the functions wrt the coupling variables.
tol: Tolerance on the relative residuals norm to consider that the linear
system is solved. If not, raises a warning.
Returns:
The Jacobian of total coupled derivatives.
"""
# compute the total derivative dy/dx, independent of the
# function to differentiate
dy_dx = empty((n_couplings, n_variables))
# compute LU decomposition
lhs = csc_matrix(dres_dy)
solve = factorized(lhs)
self.lu_fact += 1
for var_index in range(n_variables):
rhs = -dres_dx[:, var_index].todense()
sol = solve(rhs)
dy_dx[:, var_index] = sol.squeeze()
self.n_linear_resolutions += 1
res = norm(lhs.dot(sol) - rhs) / norm(rhs)
if res > tol:
LOGGER.warning(
"The linear system in _direct_mode_lu used to compute the coupled "
"derivatives is not well resolved, "
"residuals > tolerance : %s > %s ",
res,
tol,
)
# assemble the total derivatives of the functions using dy_dx
jac = {}
for fun in functions:
jac[fun] = dfun_dx[fun].toarray() + dfun_dy[fun].dot(dy_dx)
return jac
def _adjoint_mode_lu(
self,
functions: Iterable[str],
dres_dx: dok_matrix | LinearOperator,
dres_dy_t: dok_matrix | LinearOperator,
dfun_dx: Mapping[str, dok_matrix],
dfun_dy: Mapping[str, dok_matrix],
tol: float = 1e-10,
) -> dict[str, RealArray]:
"""Compute the total derivative Jacobian in adjoint mode.
Args:
functions: The functions to differentiate.
dres_dx: The Jacobian of the residuals wrt the design variables.
dfun_dx: The Jacobian of the functions wrt the design variables.
dfun_dy: The Jacobian of the functions wrt the coupling variables.
dres_dy_t: The Jacobian of the residuals wrt the coupling variables.
tol: Tolerance on the relative residuals norm to consider that the linear
system is solved. If not, raises a warning.
Returns:
The Jacobian of total coupled derivatives.
"""
jac = {}
# compute LU factorization
solve = factorized(dres_dy_t)
self.lu_fact += 1
# adjoint vector for each interest function
for fun in functions:
dfunction_dx = dfun_dx[fun]
dfunction_dy = dfun_dy[fun]
jac[fun] = empty(dfunction_dx.shape)
# compute adjoint vector for each component of the function
for fun_component in range(dfunction_dy.shape[0]):
rhs = -dfunction_dy[fun_component, :].todense().T
adjoint = solve(rhs)
res = norm(dres_dy_t.dot(adjoint) - rhs) / norm(rhs)
if res > tol:
LOGGER.warning(
"The linear system in _adjoint_mode_lu used to compute the "
"coupled "
"derivatives is not well resolved, "
"residuals > tolerance : %s > %s ",
res,
tol,
)
self.n_linear_resolutions += 1
jac[fun][fun_component, :] = (
dfunction_dx[fun_component, :] + (dres_dx.T.dot(adjoint)).T
)
return jac