Source code for gemseo.mda.base_mda_solver

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Copyright 2024 Capgemini
"""The base class for MDA solvers."""

from __future__ import annotations

import logging
from abc import abstractmethod
from copy import copy
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar

from numpy import abs as np_abs
from numpy import array
from numpy import concatenate
from numpy import inf
from numpy import ones
from numpy.linalg import norm

from gemseo.algos.sequence_transformer.composite.relaxation_acceleration import (
    RelaxationAcceleration,
)
from gemseo.mda.base_mda import BaseMDA

if TYPE_CHECKING:
    from collections.abc import Callable
    from collections.abc import Iterable
    from collections.abc import Mapping
    from collections.abc import Sequence

    from numpy import ndarray

    from gemseo.algos.sequence_transformer.acceleration import AccelerationMethod
    from gemseo.core.discipline import Discipline
    from gemseo.mda.base_mda_solver_settings import BaseMDASolverSettings
    from gemseo.typing import MutableStrKeyMapping
    from gemseo.typing import RealArray

LOGGER = logging.getLogger(__name__)


[docs] class BaseMDASolver(BaseMDA): """The base class for MDA solvers.""" Settings: ClassVar[type[BaseMDASolverSettings]] """The Pydantic model for the settings.""" settings: BaseMDASolverSettings """The settings of the MDA.""" __current_residuals: dict[str, ndarray] """The mapping from residual names to current values.""" _sequence_transformer: RelaxationAcceleration """The sequence transformer aimed at improving the convergence rate. The transformation applies a relaxation followed by an acceleration. """ __lower_bound_vector: RealArray | None """The vector of lower bounds.""" __upper_bound_vector: RealArray | None """The vector of upper bounds.""" __resolved_variable_names_to_bounds: dict[ str, tuple[RealArray | None, RealArray | None] ] """The mapping from variable names to lower/upper bounds.""" __resolved_variable_names_to_slices: dict[str, slice] """The mapping from names to slices for converting array to data structures.""" __resolved_variable_names: tuple[str, ...] """The names of the resolved variables. Resolved variables are coupling and state variables (for disciplines that does not solve their own residuals). These variables are modified by the MDA so as to make the corresponding residuals converge towards 0. """ __resolved_residual_names: tuple[str, ...] """The names of the resolved residuals the MDA is solving. Resolved residuals are either related to coupling variables or explicitly attached to state variables (for disciplines that does not solve their own residuals). The MDA is meant to bring these residuals towards zero. """ __n_consecutive_unsuccessful_iterations: int """The number of consecutive unsuccessful iterations.""" __iteration_callbacks: list[Callable[[BaseMDASolver], None]] """The callback functions to be called after each iteration.""" def __init__( # noqa: D107 self, disciplines: Sequence[Discipline], settings_model: BaseMDASolverSettings | None = None, **settings: Any, ) -> None: super().__init__(disciplines, settings_model=settings_model, **settings) self._sequence_transformer = RelaxationAcceleration( self.settings.over_relaxation_factor, self.settings.acceleration_method, ) self.__resolved_variable_names_to_slices = {} self.__resolved_variable_names = () self.__resolved_residual_names = () self.__current_residuals = {} self.__n_consecutive_unsuccessful_iterations = 0 self.__lower_bound_vector = None self.__upper_bound_vector = None self.__resolved_variable_names_to_bounds = {} self.__iteration_callbacks = [] @property def acceleration_method(self) -> AccelerationMethod: """The acceleration method.""" return self._sequence_transformer.acceleration_method @acceleration_method.setter def acceleration_method(self, acceleration_method: AccelerationMethod) -> None: self._sequence_transformer.acceleration_method = acceleration_method @property def over_relaxation_factor(self) -> float: """The over-relaxation factor.""" return self._sequence_transformer.over_relaxation_factor @over_relaxation_factor.setter def over_relaxation_factor(self, over_relaxation_factor: float) -> None: self._sequence_transformer.over_relaxation_factor = over_relaxation_factor @property def lower_bound_vector(self) -> RealArray | None: """The vector of resolved variables lower bound.""" return self.__lower_bound_vector @property def upper_bound_vector(self) -> RealArray | None: """The vector of resolved variables upper bound.""" return self.__upper_bound_vector @property def _resolved_variable_names(self) -> tuple[str, ...]: """The names of the variables (couplings and state) the MDA is solving.""" return self.__resolved_variable_names @property def _resolved_residual_names(self) -> tuple[str, ...]: """The names of the residuals (couplings and residuals) the MDA is solving.""" return self.__resolved_residual_names
[docs] def get_current_resolved_variables_vector(self) -> ndarray: """Return the vector of resolved variables (couplings and state variables).""" return concatenate([ self.io.input_grammar.data_converter.convert_data_to_array( self.__resolved_variable_names, self.io.data, ) ])
[docs] def get_current_resolved_residual_vector(self) -> ndarray: """Return the vector of residuals.""" if not self._resolved_residual_names: return array([]) current_residuals = self.__current_residuals return concatenate( tuple(current_residuals[name] for name in self._resolved_residual_names) )
[docs] def set_bounds( self, variable_names_to_bounds: Mapping[ str, tuple[RealArray | None, RealArray | None] ], ) -> None: """Set the bounds for the resolved variables. Args: variable_names_to_bounds: The mapping from variable names to bounds. """ self.__resolved_variable_names_to_bounds |= { name: bounds for name, bounds in variable_names_to_bounds.items() if name in self._resolved_variable_names } self.__update_bounds_vectors()
def _check_stopping_criteria(self, update_iteration_metrics: bool = True) -> bool: """Check whether a stopping criterion has been reached. Args: update_iteration_metrics: Whether to update the iteration metrics before checking the stopping criteria. Returns: Whether a stopping criterion has been reached. """ if update_iteration_metrics: self.__update_iteration_metrics() self._execute_iteration_callbacks() if self.normed_residual <= self.settings.tolerance: return True if self._current_iter >= self.settings.max_mda_iter: LOGGER.warning( "%s has reached its maximum number of iterations, " "but the normalized residual norm %s is still above the tolerance %s.", self.name, self.normed_residual, self.settings.tolerance, ) return True if ( self.__n_consecutive_unsuccessful_iterations >= self.settings.max_consecutive_unsuccessful_iterations ): LOGGER.warning( "%s has reached its maximum number of unsuccessful iterations, " "but the normalized residual norm %s is still above the tolerance %s.", self.name, self.normed_residual, self.settings.tolerance, ) return True return False def _set_resolved_variables(self, resolved_couplings: Iterable[str]) -> None: """Set the resolved variables and associated residuals. The state variables are added to the provided coupling variable to form the list of resolved variables. The associated residuals are either identical to coupling variable or, for state variables, retrieved from the corresponding discipline. The order of resolved variable names and residuals names is consistent. Args: resolved_couplings: The name of coupling variables resolved by the MDA. """ # Aggregate the residual variables from disciplines residual_variables = {} for discipline in self._disciplines: if discipline.io.state_equations_are_solved: continue residual_variables.update(discipline.io.residual_to_state_variable) state_variables = residual_variables.values() resolved_variables = set(resolved_couplings).union(state_variables) resolved_variables.difference_update(self._non_numeric_array_variables) self.__resolved_variable_names = tuple(sorted(resolved_variables)) # State variable names are replaced with associated residual names residuals = list(self._resolved_variable_names) for key, value in residual_variables.items(): # The order is maintained to guarantee consistency if value in residuals: residuals[residuals.index(value)] = key self.__resolved_residual_names = tuple(residuals) def _compute_names_to_slices(self) -> None: """Compute the mapping of variable names to slices for converting data to array. Two mappings are computed, one for the resolved variables (couplings and state variables), one for the associated residuals. The mappings are cached and computed only once. When possible, the unique grammar (input or output) of a converter that contains all the coupling data is chosen. Otherwise, converters from the 2 grammars are used. """ if self.__resolved_variable_names_to_slices: return self.__resolved_variable_names_to_slices = ( self.io.input_grammar.data_converter.compute_names_to_slices( self._resolved_variable_names, self.io.data, )[0] ) # Initialize the vectors of bounds once the variable sizes are known. total_size = sum( slice_.stop - slice_.start for slice_ in self.__resolved_variable_names_to_slices.values() ) self.__lower_bound_vector = -inf * ones(total_size) self.__upper_bound_vector = +inf * ones(total_size) self.__update_bounds_vectors() def _update_local_data_from_array(self, array_: ndarray) -> None: """Update the local data from an array. Args: array_: An array. """ self.io.data |= self.io.output_grammar.data_converter.convert_array_to_data( array_, self.__resolved_variable_names_to_slices, ) def _compute_normalized_residual_norm(self) -> float: """Compute the normalized residual norm at the current point. Returns: The normalized residual norm. """ residual = self.get_current_resolved_residual_vector() scaling = self.scaling scaling_data = self._scaling_data ResidualScaling = self.ResidualScaling # noqa: N806 if scaling == ResidualScaling.NO_SCALING: normed_residual = float(norm(residual)) elif scaling == ResidualScaling.INITIAL_RESIDUAL_NORM: normed_residual = float(norm(residual)) if scaling_data is None: scaling_data = normed_residual if normed_residual != 0 else 1.0 normed_residual /= scaling_data elif scaling == ResidualScaling.N_COUPLING_VARIABLES: if scaling_data is None: scaling_data = residual.size**0.5 normed_residual = norm(residual) / scaling_data elif scaling == ResidualScaling.INITIAL_SUBRESIDUAL_NORM: if scaling_data is None: scaling_data = [] for slice_ in self.__resolved_variable_names_to_slices.values(): initial_norm = float(norm(residual[slice_])) initial_norm = initial_norm if initial_norm != 0.0 else 1.0 scaling_data.append((slice_, initial_norm)) normalized_norms = [] for current_slice, initial_norm in scaling_data: normalized_norms.append(norm(residual[current_slice]) / initial_norm) normed_residual = max(normalized_norms) elif scaling == ResidualScaling.INITIAL_RESIDUAL_COMPONENT: if scaling_data is None: scaling_data = residual + (residual == 0) normed_residual = np_abs(residual / scaling_data).max() elif scaling == ResidualScaling.SCALED_INITIAL_RESIDUAL_COMPONENT: if scaling_data is None: scaling_data = residual + (residual == 0) normed_residual = float(norm(residual / scaling_data)) normed_residual /= residual.size**0.5 else: # Use the StrEnum casting to raise an explicit error. ResidualScaling(scaling) self._scaling_data = scaling_data return normed_residual def _pre_solve(self) -> bool: # noqa: D103 self._sequence_transformer.clear() self.__n_consecutive_unsuccessful_iterations = 0 return super()._pre_solve() def _solve(self) -> None: while self._iterate_once(): pass @abstractmethod def _iterate_once(self) -> bool: """Perform one MDA iteration. Returns: Whether the iteration should be stopped. """ def _compute_residuals(self, input_data: MutableStrKeyMapping) -> None: """Compute the residual vector. Residuals are computed either as the difference between input and output values of coupling variables, or, for state variables, directly from the associated residual variable. This method should be used only after the disciplines' local data have been modified, otherwise there will be no difference between input and output values. Args: input_data: The input data to compute residual of coupling variables. """ to_array = self.io.output_grammar.data_converter.convert_value_to_array data = self.io.data for residual_name in self._resolved_residual_names: residual = to_array(residual_name, data[residual_name]) if residual_name in self._resolved_variable_names: residual_ = to_array(residual_name, input_data[residual_name]) # At first iteration, # when sampling in a vectorized way, # residual.shape = (d,) != residual_.shape is (d*n,) # where d is the feature dimension and n is the number of samples. if residual.shape == residual_.shape: # No -= assignment to avoid possible casting problems. residual = residual - residual_ self.__current_residuals[residual_name] = residual def __update_bounds_vectors(self) -> None: """Set the bounds of the sequence transformer.""" if self.__lower_bound_vector is None or self.__upper_bound_vector is None: return for name, ( lower_bound, upper_bound, ) in self.__resolved_variable_names_to_bounds.items(): slice_ = self.__resolved_variable_names_to_slices[name] self.__lower_bound_vector[slice_] = ( lower_bound if lower_bound is not None else -inf ) self.__upper_bound_vector[slice_] = ( upper_bound if upper_bound is not None else +inf ) self._sequence_transformer.lower_bound = self.__lower_bound_vector self._sequence_transformer.upper_bound = self.__upper_bound_vector def __update_iteration_metrics(self) -> None: """Update the iteration metrics.""" if self._current_iter == 0: self._starting_indices.append(len(self.residual_history)) if self.reset_history_each_run: self._starting_indices.clear() self.residual_history.clear() old_normalized_residual_norm = copy(self.normed_residual) self.normed_residual = self._compute_normalized_residual_norm() if self.normed_residual >= old_normalized_residual_norm: self.__n_consecutive_unsuccessful_iterations += 1 else: self.__n_consecutive_unsuccessful_iterations = 0 self.residual_history.append(self.normed_residual) self._current_iter += 1 self.io.update_output_data({ self.NORMALIZED_RESIDUAL_NORM: array([self.normed_residual]) }) if self.settings.log_convergence: msg = "{} running... Normalized residual norm = {} (iter. {})" LOGGER.info( msg.format(self.name, f"{self.normed_residual:.2e}", self._current_iter) )
[docs] def add_iteration_callback( self, iteration_callback: Callable[[BaseMDASolver], None] ) -> None: """Add a callback function to be called after each iteration. Args: iteration_callback: The callback function. """ self.__iteration_callbacks.append(iteration_callback)
def _execute_iteration_callbacks(self) -> None: """Execute the iteration callbacks.""" for iteration_callback in self.__iteration_callbacks: iteration_callback(self)
[docs] def clear_iteration_callbacks(self) -> None: """Clear the iteration callbacks.""" self.__iteration_callbacks.clear()