Source code for gemseo.mda.mda

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#        :author: Francois Gallard, Charlie Vanaret
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""Base class for all Multi-disciplinary Design Analyses (MDA).."""
from __future__ import division, unicode_literals

import logging
from multiprocessing import cpu_count
from typing import Any, Iterable, List, Mapping, Optional, Sequence, Set, Tuple, Union

import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
from numpy import array, concatenate, ndarray
from numpy.linalg import norm

from gemseo.core.coupling_structure import DependencyGraph, MDOCouplingStructure
from gemseo.core.discipline import MDODiscipline
from gemseo.core.execution_sequence import ExecutionSequenceFactory, LoopExecSequence
from gemseo.core.jacobian_assembly import JacobianAssembly
from gemseo.utils.py23_compat import Path

LOGGER = logging.getLogger(__name__)


[docs]class MDA(MDODiscipline): """An MDA analysis.""" FINITE_DIFFERENCES = "finite_differences" N_CPUS = cpu_count() _ATTR_TO_SERIALIZE = MDODiscipline._ATTR_TO_SERIALIZE + ( "disciplines", "warm_start", "_input_couplings", "reset_history_each_run", "norm0", "residual_history", "tolerance", "max_mda_iter", "_log_convergence", "lin_cache_tol_fact", "assembly", "coupling_structure", "max_mda_iter", "normed_residual", "strong_couplings", "matrix_type", "use_lu_fact", "linear_solver_tolerance", ) def __init__( self, disciplines, # type: Sequence[MDODiscipline] max_mda_iter=10, # type: int name=None, # type: Optional[str] grammar_type=MDODiscipline.JSON_GRAMMAR_TYPE, # type: str tolerance=1e-6, # type: float linear_solver_tolerance=1e-12, # type: float warm_start=False, # type: bool use_lu_fact=False, # type: bool coupling_structure=None, # type: Optional[MDOCouplingStructure] log_convergence=False, # type: bool linear_solver="DEFAULT", # type: str linear_solver_options=None, # type: Mapping[str,Any] ): # type: (...) -> None """ Args: disciplines: The disciplines from which to compute the MDA. max_mda_iter: The maximum iterations number for the MDA algorithm. name: The name to be given to the MDA. If None, use the name of the class. grammar_type: The type of the input and output grammars, either :attr:`JSON_GRAMMAR_TYPE` or :attr:`SIMPLE_GRAMMAR_TYPE`. tolerance: The tolerance of the iterative direct coupling solver; the norm of the current residuals divided by initial residuals norm shall be lower than the tolerance to stop iterating. linear_solver_tolerance: The tolerance of the linear solver in the adjoint equation. warm_start: Whether the second iteration and ongoing start from the previous coupling solution. use_lu_fact: Whether to store a LU factorization of the matrix when using adjoint/forward differentiation. to solve faster multiple RHS problem. coupling_structure: The coupling structure to be used by the MDA. If None, it is created from `disciplines`. log_convergence: Whether to log the MDA convergence, expressed in terms of normed residuals. linear_solver: The name of the linear solver. linear_solver_options: The options passed to the linear solver factory. """ super(MDA, self).__init__(name, grammar_type=grammar_type) self.tolerance = tolerance self.linear_solver = linear_solver self.linear_solver_tolerance = linear_solver_tolerance self.linear_solver_options = linear_solver_options or {} self.max_mda_iter = max_mda_iter self.disciplines = disciplines if coupling_structure is None: self.coupling_structure = MDOCouplingStructure(disciplines) else: self.coupling_structure = coupling_structure self.assembly = JacobianAssembly(self.coupling_structure) self.residual_history = [] self.reset_history_each_run = False self.warm_start = warm_start # Don't erase coupling values before calling _compute_jacobian self._linearize_on_last_state = True self.norm0 = None self.normed_residual = 1.0 self.strong_couplings = self.coupling_structure.strong_couplings() self.all_couplings = self.coupling_structure.get_all_couplings() self._input_couplings = [] self.matrix_type = JacobianAssembly.SPARSE self.use_lu_fact = use_lu_fact # By default dont use an approximate cache for linearization self.lin_cache_tol_fact = 0.0 self._initialize_grammars() self._check_consistency() self.__check_linear_solver_options() self._check_couplings_types() self._log_convergence = log_convergence def _initialize_grammars(self): # type: (...) -> None """Define all the inputs and outputs of the MDA. Add all the outputs of all the disciplines to the outputs. """ for discipline in self.disciplines: self.input_grammar.update_from(discipline.input_grammar) self.output_grammar.update_from(discipline.output_grammar) @property def log_convergence(self): # type: (...) -> bool """Whether to log the MDA convergence.""" return self._log_convergence @log_convergence.setter def log_convergence( self, value, # type: bool ): # type: (...) -> None self._log_convergence = value def __check_linear_solver_options(self): # type: (...) -> None """Check the linear solver options. The linear solver tolerance cannot be set using the linear solver option dictionary, as it is set using the linear_solver_tolerance keyword argument. Raises: ValueError: If the 'tol' keyword is in linear_solver_options. """ if "tol" in self.linear_solver_options: msg = ( "The linear solver tolerance shall be set" " using the linear_solver_tolerance argument." ) raise ValueError(msg) def _check_consistency(self): # type: (...) -> None """Check if there are not more than one equation per variable. For instance if a strong coupling is not also a self coupling. Raises: ValueError: * If there are too many coupling constraints. * If outputs are defined multiple times. """ strong_c_disc = self.coupling_structure.strongly_coupled_disciplines( add_self_coupled=False ) also_strong = [ disc for disc in strong_c_disc if self.coupling_structure.is_self_coupled(disc) ] if also_strong: for disc in also_strong: in_outs = set(disc.get_input_data_names()) & set( disc.get_output_data_names() ) LOGGER.warning( "Self coupling variables in discipline %s are: %s.", disc.name, in_outs, ) also_strong_n = [disc.name for disc in also_strong] raise ValueError( "Too many coupling constraints; " "the following disciplines are self coupled " "and also strongly coupled with other disciplines: {}.".format( also_strong_n ) ) all_outs = {} multiple_outs = [] for disc in self.disciplines: for out in disc.get_output_data_names(): if out in all_outs: multiple_outs.append(out) all_outs[out] = disc if multiple_outs: raise ValueError( "Outputs are defined multiple times: {}.".format(multiple_outs) ) def _compute_input_couplings(self): # type: (...) -> None """Compute the strong couplings that are inputs of the MDA.""" input_couplings = set(self.strong_couplings) & set(self.get_input_data_names()) self._input_couplings = list(input_couplings) def _current_input_couplings(self): # type: (...) -> ndarray """Return the current values of the input coupling variables.""" input_couplings = list(iter(self.get_outputs_by_name(self._input_couplings))) if not input_couplings: return array([]) return concatenate(input_couplings) def _current_strong_couplings(self): # type: (...) -> ndarray """Return the current values of the strong coupling variables.""" couplings = list(iter(self.get_outputs_by_name(self.strong_couplings))) if not couplings: return array([]) return concatenate(couplings) def _retrieve_diff_inouts( self, force_all=False, # type: bool ): # type: (...) -> Tuple[Union[Set[str],List[str]],Union[Set[str],List[str]]] """Return the names of the inputs and outputs involved in the differentiation. Args: force_all: Whether to differentiate all outputs with respect to all inputs. If `False`, differentiate the :attr:`_differentiated_outputs` with respect to the :attr:`_differentiated_inputs`. Returns: The inputs according to which to differentiate, the outputs to be differentiated. """ if force_all: strong_cpl = set(self.strong_couplings) inputs = set(self.get_input_data_names()) outputs = self.get_output_data_names() # Don't linearize wrt inputs = inputs - (strong_cpl & inputs) # Don't do this with output couplings because # their derivatives wrt design variables may be needed # outputs = outputs - (strong_cpl & outputs) return inputs, outputs return MDODiscipline._retrieve_diff_inouts(self, False) def _couplings_warm_start(self): # type: (...) -> None """Load the previous couplings values to local data.""" cached_outputs = self.cache.get_last_cached_outputs() if not cached_outputs: return for input_name in self._input_couplings: input_value = cached_outputs.get(input_name) if input_value is not None: self.local_data[input_name] = input_value def _set_default_inputs(self): # type: (...) -> None """Set the default input values of the MDA from the disciplines ones.""" self.default_inputs = {} mda_input_names = self.get_input_data_names() for discipline in self.disciplines: for input_name in discipline.default_inputs: if input_name in mda_input_names: self.default_inputs[input_name] = discipline.default_inputs[ input_name ] def _check_couplings_types(self): # type: (...) -> None """Check that the coupling variables are of type array in the grammars. Raises: ValueError: When at least one of the coupling variables is not an array. """ not_arrays = [] for discipline in self.disciplines: for grammar in (discipline.input_grammar, discipline.output_grammar): for coupling in self.all_couplings: exists = grammar.is_data_name_existing(coupling) if exists and not grammar.is_type_array(coupling): not_arrays.append(coupling) not_arrays = sorted(set(not_arrays)) if not_arrays: raise ValueError( "The coupling variables {} must be of type array.".format(not_arrays) )
[docs] def reset_disciplines_statuses(self): # type: (...) -> None """Reset all the statuses of the disciplines.""" for discipline in self.disciplines: discipline.reset_statuses_for_run()
[docs] def reset_statuses_for_run(self): # type: (...) -> None MDODiscipline.reset_statuses_for_run(self) self.reset_disciplines_statuses()
[docs] def get_expected_workflow(self): # type: (...) ->LoopExecSequence disc_exec_seq = ExecutionSequenceFactory.serial(self.disciplines) return ExecutionSequenceFactory.loop(self, disc_exec_seq)
[docs] def get_expected_dataflow( self, ): # type: (...) -> List[Tuple[MDODiscipline,MDODiscipline,List[str]]] all_disc = [self] all_disc.extend(self.disciplines) graph = DependencyGraph(all_disc) res = graph.get_disciplines_couplings() return res
def _compute_jacobian( self, inputs=None, # type: Optional[Iterable[str]] outputs=None, # type: Optional[Iterable[str]] ): # type: (...) -> None # Do not re execute disciplines if inputs error is beyond self tol # Apply a safety factor on this (mda is a loop, inputs # of first discipline # have changed at convergence, therefore the cache is not exactly # the same as the current value exec_cache_tol = self.lin_cache_tol_fact * self.tolerance force_no_exec = exec_cache_tol != 0.0 self.__check_linear_solver_options() self.jac = self.assembly.total_derivatives( self.local_data, outputs, inputs, self.all_couplings, tol=self.linear_solver_tolerance, mode=self.linearization_mode, matrix_type=self.matrix_type, use_lu_fact=self.use_lu_fact, exec_cache_tol=exec_cache_tol, force_no_exec=force_no_exec, linear_solver=self.linear_solver, **self.linear_solver_options ) # fixed point methods def _compute_residual( self, current_couplings, # type: ndarray new_couplings, # type: ndarray current_iter, # type: int first=False, # type: bool store_it=True, # type: bool log_normed_residual=False, # type: bool ): # type: (...) -> ndarray """Compute the residual on the inputs of the MDA. Args: current_couplings: The values of the couplings before the execution. new_couplings: The values of the couplings after the execution. current_iter: The current iteration of the fixed-point method. first: Whether it is the first residual of the fixed-point method. store_it: Whether to store the normed residual. log_normed_residual: Whether to log the normed residual. Returns: The normed residual. """ if first and self.reset_history_each_run: self.residual_history = [] normed_residual = norm((current_couplings - new_couplings).real) if self.norm0 is None: self.norm0 = normed_residual if self.norm0 == 0: self.norm0 = 1 self.normed_residual = normed_residual / self.norm0 if log_normed_residual: LOGGER.info( "%s running... Normed residual = %s (iter. %s)", self.name, "{:.2e}".format(self.normed_residual), current_iter, ) if store_it: self.residual_history.append((self.normed_residual, current_iter)) return self.normed_residual
[docs] def check_jacobian( self, input_data=None, # type: Optional[Mapping[str,ndarray]] derr_approx=FINITE_DIFFERENCES, # type: str step=1e-7, # type: float threshold=1e-8, # type: float linearization_mode="auto", # type: str inputs=None, # type: Optional[Iterable[str]] outputs=None, # type: Optional[Iterable[str]] parallel=False, # type: bool n_processes=N_CPUS, # type: int use_threading=False, # type: bool wait_time_between_fork=0, # type: int auto_set_step=False, # type: bool plot_result=False, # type: bool file_path="jacobian_errors.pdf", # type: Union[str,Path] show=False, # type: bool figsize_x=10, # type: float figsize_y=10, # type: float reference_jacobian_path=None, save_reference_jacobian=False, indices=None, ): # type: (...) -> bool """Check if the analytical Jacobian is correct with respect to a reference one. If `reference_jacobian_path` is not `None` and `save_reference_jacobian` is `True`, compute the reference Jacobian with the approximation method and save it in `reference_jacobian_path`. If `reference_jacobian_path` is not `None` and `save_reference_jacobian` is `False`, do not compute the reference Jacobian but read it from `reference_jacobian_path`. If `reference_jacobian_path` is `None`, compute the reference Jacobian without saving it. Args: input_data: The input values. If None, use the default input values. derr_approx: The derivative approximation method. threshold: The acceptance threshold for the Jacobian error. linearization_mode: The mode of linearization, either "direct", "adjoint" or "auto" switch depending on dimensions of inputs and outputs. inputs: The names of the inputs with respect to which to differentiate. If None, use the inputs of the MDA. outputs: The outputs to differentiate. If None, use all the outputs of the MDA. step: The step for finite differences or complex step differentiation methods. parallel: Whether to execute the MDA in parallel. n_processes: The maximum number of processors on which to run. use_threading: Whether to use threads instead of processes to parallelize the execution; multiprocessing will copy (serialize) all the disciplines, while threading will share all the memory. This is important to note if you want to execute the same discipline multiple times, you shall use multiprocessing. wait_time_between_fork: The time waited between two forks of the process / thread. auto_set_step: Whether to compute the optimal step for a forward first order finite differences gradient approximation. plot_result: Whether to plot the result of the validation comparing the exact and approximated Jacobians. file_path: The path to the output file if `plot_result` is `True`. show: Whether to open the figure. figsize_x: The *x* size of the figure in inches. figsize_y: The *y* size of the figure in inches. reference_jacobian_path: The path of the reference Jacobian file. save_reference_jacobian: Whether to save the reference Jacobian. indices: The indices of the inputs and outputs for the different sub-Jacobian matrices, formatted as ``{variable_name: variable_components}`` where ``variable_components`` can be either an integer, e.g. `2` a sequence of integers, e.g. `[0, 3]`, a slice, e.g. `slice(0,3)`, the ellipsis symbol (`...`) or `None`, which is the same as ellipsis. If a variable name is missing, consider all its components. If None, consider all the components of all the ``inputs`` and ``outputs``. Return: Whether the passed Jacobian is correct. """ # Strong couplings are not linearized if inputs is None: inputs = self.get_input_data_names() inputs = list(iter(inputs)) for str_cpl in self.all_couplings: if str_cpl in inputs: inputs.remove(str_cpl) if outputs is None: outputs = self.get_output_data_names() outputs = list(iter(outputs)) for str_cpl in self.all_couplings: if str_cpl in outputs: outputs.remove(str_cpl) return super(MDA, self).check_jacobian( input_data=input_data, derr_approx=derr_approx, step=step, threshold=threshold, linearization_mode=linearization_mode, inputs=inputs, outputs=outputs, parallel=parallel, n_processes=n_processes, use_threading=use_threading, wait_time_between_fork=wait_time_between_fork, auto_set_step=auto_set_step, plot_result=plot_result, file_path=file_path, show=show, figsize_x=figsize_x, figsize_y=figsize_y, reference_jacobian_path=reference_jacobian_path, save_reference_jacobian=save_reference_jacobian, indices=indices, )
def _warn_convergence_criteria( self, current_iter, # type: int ): # type: (...) -> Tuple[bool,bool] """Log a warning if max_iter is reached and if max residuals is above tolerance. Args: current_iter: The current iteration of the MDA. Returns: * Whether the normed residual is lower than the tolerance. * Whether the maximum number of iterations is reached. """ residual_is_small = self.normed_residual <= self.tolerance max_iter_is_reached = self.max_mda_iter <= current_iter if max_iter_is_reached and not residual_is_small: msg = ( "%s has reached its maximum number of iterations " "but the normed residual %s is still above the tolerance %s." ) LOGGER.warning(msg, self.name, self.normed_residual, self.tolerance) return residual_is_small, max_iter_is_reached def _termination( self, current_iter, # type: int ): # type: (...) -> bool """Termination criterion. Args: current_iter: The current iteration of the fixed point method. Returns: Whether to stop the MDA algorithm. """ residual_is_small, max_iter_is_reached = self._warn_convergence_criteria( current_iter ) return residual_is_small or max_iter_is_reached def _set_cache_tol( self, cache_tol, # type: float ): # type: (...) -> None """Set to the cache input tolerance. To be overloaded by subclasses. Args: cache_tol: The cache tolerance. """ super(MDA, self)._set_cache_tol(cache_tol) for disc in self.disciplines: disc.cache_tol = cache_tol or 0.0
[docs] def plot_residual_history( self, show=False, # type: bool save=True, # type: bool n_iterations=None, # type: Optional[int] logscale=None, # type: Optional[Tuple[int,int]] filename=None, # type: Optional[str] figsize=(50, 10), # type: Tuple[int,int] ): # type: (...) -> None """Generate a plot of the residual history. All residuals are stored in the history; only the final residual of the converged MDA is plotted at each optimization iteration. Args: show: Whether to display the plot on screen. save: Whether to save the plot as a PDF file. n_iterations: The number of iterations on the *x* axis. If None, use all the iterations. logscale: The limits of the *y* axis. If None, do not change the limits of the *y* axis. filename: The name of the file to save the figure. If None, use "{mda.name}_residual_history.pdf". figsize: The *x* and *y* sizes of the figure in inches. """ fig = plt.figure(figsize=figsize) fig_ax = fig.add_subplot(1, 1, 1) # split list of couples residual = [res for (res, _) in self.residual_history] # red dot for first iteration colors = [ "red" if current_iter == 1 else "black" for (_, current_iter) in self.residual_history ] fig_ax.scatter( list(range(len(residual))), residual, s=20, color=colors, zorder=2 ) fig_ax.plot(residual, linestyle="-", c="k", zorder=1) fig_ax.axhline(y=self.tolerance, c="blue", linewidth=0.5, zorder=0) fig_ax.set_title("{}: residual plot".format(self.name)) if n_iterations is None: n_iterations = len(self.residual_history) plt.yscale("log") plt.xlabel(r"iterations", fontsize=14) plt.xlim([-1, n_iterations]) fig_ax.get_xaxis().set_major_locator(MaxNLocator(integer=True)) plt.ylabel(r"$\log(||residuals||/||y_0||)$", fontsize=14) if logscale is not None: plt.ylim(logscale) if save: if filename is None: filename = "{}_residual_history.pdf".format(self.name) plt.savefig(filename, bbox_inches="tight") if show: plt.show() else: plt.close(fig)