# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - API and implementation and/or documentation
# :author: Francois Gallard
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""Chains of disciplines.
Can be both sequential or parallel execution processes.
"""
from __future__ import annotations
import logging
from copy import deepcopy
from typing import Iterable
from typing import Sequence
from numpy import dot
from numpy import ndarray
from numpy import zeros
from gemseo.core.coupling_structure import DependencyGraph
from gemseo.core.discipline import MDODiscipline
from gemseo.core.execution_sequence import ExecutionSequenceFactory
from gemseo.core.jacobian_assembly import JacobianAssembly
from gemseo.core.parallel_execution import DiscParallelExecution
from gemseo.core.parallel_execution import DiscParallelLinearization
LOGGER = logging.getLogger(__name__)
[docs]class MDOChain(MDODiscipline):
"""Chain of disciplines that is based on a predefined order of execution."""
disciplines: Sequence[MDODiscipline]
"""The chained disciplines."""
AVAILABLE_MODES = [
JacobianAssembly.DIRECT_MODE,
JacobianAssembly.REVERSE_MODE,
JacobianAssembly.AUTO_MODE,
]
_ATTR_TO_SERIALIZE = MDODiscipline._ATTR_TO_SERIALIZE + ("disciplines",)
def __init__(
self,
disciplines: Sequence[MDODiscipline],
name: str | None = None,
grammar_type: str = MDODiscipline.JSON_GRAMMAR_TYPE,
) -> None:
# noqa: D205 D212 D415
"""
Args:
disciplines: The disciplines.
name: The name of the discipline.
If None, use the class name.
grammar_type: The type of grammar to use for inputs and outputs declaration,
e.g. :attr:`.JSON_GRAMMAR_TYPE` or :attr:`.SIMPLE_GRAMMAR_TYPE`.
"""
super().__init__(name, grammar_type=grammar_type)
self.disciplines = disciplines
self.initialize_grammars()
self.default_inputs = {}
self._update_default_inputs()
[docs] def set_disciplines_statuses(
self,
status: str,
) -> None:
"""Set the sub-disciplines statuses.
Args:
status: The status to be set.
"""
for discipline in self.disciplines:
discipline.status = status
discipline.set_disciplines_statuses(status)
[docs] def initialize_grammars(self) -> None:
"""Define the input and output grammars from the disciplines' ones."""
self.input_grammar.clear()
self.output_grammar.clear()
for discipline in self.disciplines:
self.input_grammar.update(
discipline.input_grammar, exclude_names=self.output_grammar.keys()
)
self.output_grammar.update(discipline.output_grammar)
def _update_default_inputs(self) -> None:
"""Compute the default inputs from the disciplines' ones."""
self_inputs = self.get_input_data_names()
for discipline in self.disciplines:
for input_name, input_value in discipline.default_inputs.items():
if input_name in self_inputs:
self.default_inputs[input_name] = input_value
def _run(self) -> None:
for discipline in self.disciplines:
self.local_data.update(discipline.execute(self.local_data))
[docs] def reverse_chain_rule(
self,
chain_outputs: Iterable[str],
discipline: MDODiscipline,
) -> None:
"""Chain the derivatives with a new discipline in the chain in reverse mode.
Perform chain ruling:
(notation: D is total derivative, d is partial derivative)
D out d out dinpt_1 d output dinpt_2
----- = -------- . ------- + -------- . --------
D new_in d inpt_1 d new_in d inpt_2 d new_in
D out d out d out dinpt_2
----- = -------- + -------- . --------
D z d z d inpt_2 d z
D out d out [dinpt_1 d out d inpt_1 dinpt_2 ]
----- = -------- . [------- + -------- . -------- . --------]
D z d inpt_1 [d z d inpt_1 d inpt_2 d z ]
Args:
discipline: The new discipline to compose in the chain.
chain_outputs: The outputs to lineariza.
"""
# TODO : only linearize wrt needed inputs/inputs
# use coupling_structure graph path for that
last_cached = discipline.get_input_data()
discipline.linearize(last_cached, force_no_exec=True, force_all=True)
for output_name in chain_outputs:
if output_name in self.jac:
# This output has already been taken from previous disciplines
# Derivatives must be composed using the chain rule
# Make a copy of the keys because the dict is changed in the
# loop
existing_inputs = self.jac[output_name].keys()
common_inputs = set(existing_inputs) & set(discipline.jac)
for input_name in common_inputs:
# Store reference to the current Jacobian
curr_jac = self.jac[output_name][input_name]
for new_in, new_jac in discipline.jac[input_name].items():
# Chain rule the derivatives
# TODO: sum BEFORE dot
loc_dot = dot(curr_jac, new_jac)
# when input_name==new_in, we are in the case of an
# input being also an output
# in this case we must only compose the derivatives
if new_in in self.jac[output_name] and input_name != new_in:
# The output is already linearized wrt this
# input_name. We are in the case:
# d o d o d o di_2
# ---- = ---- + ----- . -----
# d z d z d i_2 d z
self.jac[output_name][new_in] += loc_dot
else:
# The output is not yet linearized wrt this
# input_name. We are in the case:
# d o d o di_1 d o di_2
# ----- = ------ . ---- + ---- . ----
# d x d i_1 d x d i_2 d x
self.jac[output_name][new_in] = loc_dot
elif output_name in discipline.jac:
# Output of the chain not yet filled in jac,
# Take the jacobian dict of the current discipline to
# Initialize. Make a copy !
self.jac[output_name] = MDOChain.copy_jacs(discipline.jac[output_name])
def _compute_jacobian(
self,
inputs: Iterable[str] | None = None,
outputs: Iterable[str] | None = None,
) -> None:
# Initializes self jac with copy of last discipline (reverse mode)
last_discipline = self.disciplines[-1]
# TODO : only linearize wrt needed inputs/inputs
# use coupling_structure graph path for that
last_cached = last_discipline.get_input_data()
last_discipline.linearize(last_cached, force_no_exec=True, force_all=True)
self.jac = self.copy_jacs(last_discipline.jac)
# reverse mode of remaining disciplines
remaining_disciplines = self.disciplines[:-1]
for discipline in remaining_disciplines[::-1]:
self.reverse_chain_rule(outputs, discipline)
# Remove differentiations that should not be there,
# because inputs are not inputs of the chain
for output_jacobian in self.jac.values():
# Copy keys because the dict in changed in the loop
input_names_before_loop = list(output_jacobian.keys())
for input_name in input_names_before_loop:
if input_name not in inputs:
del output_jacobian[input_name]
# Add differentiations that should be there,
# because inputs inputs of the chain but not
# of all disciplines
for output_name, output_jacobian in self.jac.items():
output_size = len(self.get_outputs_by_name(output_name))
for input_name in inputs:
if input_name not in output_jacobian:
input_size = len(self.get_inputs_by_name(input_name))
output_jacobian[input_name] = zeros((output_size, input_size))
[docs] @staticmethod
def copy_jacs(
jacobian: dict[str, dict[str, ndarray]],
) -> dict[str, dict[str, ndarray]]:
"""Deepcopy a Jacobian dictionary.
Args:
jacobian: The Jacobian dictionary,
which is a nested dictionary as ``{'out': {'in': derivatives}}``.
Returns:
The deepcopy of the Jacobian dictionary.
"""
jacobian_copy = {}
for output_name, output_jacobian in jacobian.items():
if isinstance(output_jacobian, dict):
output_jacobian_copy = {}
jacobian_copy[output_name] = output_jacobian_copy
for input_name, derivatives in output_jacobian.items():
output_jacobian_copy[input_name] = derivatives.copy()
elif isinstance(output_jacobian, ndarray):
jacobian_copy[output_name] = output_jacobian.copy()
return jacobian_copy
[docs] def reset_statuses_for_run(self) -> None: # noqa: D102
super().reset_statuses_for_run()
for discipline in self.disciplines:
discipline.reset_statuses_for_run()
[docs] def get_expected_workflow(self) -> None: # noqa: D102
sequence = ExecutionSequenceFactory.serial()
for discipline in self.disciplines:
sequence.extend(discipline.get_expected_workflow())
return sequence
[docs] def get_expected_dataflow(self) -> None: # noqa: D102
disciplines = self.get_disciplines_in_dataflow_chain()
graph = DependencyGraph(disciplines)
disciplines_couplings = graph.get_disciplines_couplings()
# Add discipline inner couplings (ex. MDA case)
for discipline in disciplines:
disciplines_couplings.extend(discipline.get_expected_dataflow())
return disciplines_couplings
[docs] def get_disciplines_in_dataflow_chain(self) -> list[MDODiscipline]:
dataflow = []
for disc in self.disciplines:
dataflow.extend(disc.get_disciplines_in_dataflow_chain())
return dataflow
def _set_cache_tol(
self,
cache_tol: float,
) -> None:
super()._set_cache_tol(cache_tol)
for discipline in self.disciplines:
discipline.cache_tol = cache_tol or 0.0
[docs]class MDOParallelChain(MDODiscipline):
"""Chain of processes that executes disciplines in parallel."""
def __init__(
self,
disciplines: Sequence[MDODiscipline],
name: str | None = None,
grammar_type: str = MDODiscipline.JSON_GRAMMAR_TYPE,
use_threading: bool = True,
n_processes: int | None = None,
) -> None:
# noqa: D205 D212 D415
"""
Args:
disciplines: The disciplines.
name: The name of the discipline.
If None, use the class name.
grammar_type: The type of grammar to use for inputs and outputs declaration,
e.g. :attr:`.JSON_GRAMMAR_TYPE` or :attr:`.SIMPLE_GRAMMAR_TYPE`.
use_threading: Whether to use threads instead of processes
to parallelize the execution;
multiprocessing will copy (serialize) all the disciplines,
while threading will share all the memory.
This is important to note
if you want to execute the same discipline multiple times,
you shall use multiprocessing.
n_processes: The maximum simultaneous number of threads,
if ``use_threading`` is True, or processes otherwise,
used to parallelize the execution.
If None, uses the number of disciplines.
Note:
The actual number of processes could be lower than ``n_processes``
if there are less than ``n_processes`` disciplines.
``n_processes`` can be lower than the total number of CPUs on the machine.
Each discipline may itself run on several CPUs.
"""
super().__init__(name, grammar_type=grammar_type)
self.disciplines = disciplines
self.initialize_grammars()
self.default_inputs = {}
self._update_default_inputs()
if n_processes is None:
n_processes = len(self.disciplines)
parallel_execution = DiscParallelExecution(
self.disciplines, n_processes, use_threading=use_threading
)
self.parallel_execution = parallel_execution
parallel_linearization = DiscParallelLinearization(
self.disciplines, n_processes, use_threading=use_threading
)
self.parallel_lin = parallel_linearization
[docs] def initialize_grammars(self) -> None:
"""Define the input and output grammars from the disciplines' ones."""
self.input_grammar.clear()
self.output_grammar.clear()
for discipline in self.disciplines:
self.input_grammar.update(discipline.input_grammar)
self.output_grammar.update(discipline.output_grammar)
def _update_default_inputs(self) -> None:
"""Compute the default inputs from the disciplines' ones."""
input_names = self.get_input_data_names()
for disc in self.disciplines:
for disc_input_name, disc_input_value in disc.default_inputs.items():
if disc_input_name in input_names:
self.default_inputs[disc_input_name] = disc_input_value
def _get_input_data_copies(self) -> list[dict[str, ndarray]]:
"""Return copies of the input data, one per discipline.
Returns:
One copy of the input data per discipline.
"""
# Avoid overlaps with dicts in // by doing a deepcopy
# The outputs of a discipline may be a coupling, and shall therefore
# not be passed as input of another since the execution are assumed
# to be independent here
return [deepcopy(self.local_data) for _ in range(len(self.disciplines))]
def _run(self) -> None:
self.parallel_execution.execute(self._get_input_data_copies())
# Update data according to input order of priority
for discipline in self.disciplines:
self.local_data.update(
{
output_name: discipline.local_data[output_name]
for output_name in discipline.get_output_data_names()
}
)
def _compute_jacobian(
self,
inputs: Iterable[str] | None = None,
outputs: Iterable[str] | None = None,
) -> None:
self._set_disciplines_diff_outputs(outputs)
self._set_disciplines_diff_inputs(inputs)
jacobians = self.parallel_lin.execute(self._get_input_data_copies())
self.jac = {}
# Update jacobians according to input order of priority
for discipline_jacobian in jacobians:
for output_name, output_jacobian in discipline_jacobian.items():
chain_jacobian = self.jac.get(output_name)
if chain_jacobian is None:
chain_jacobian = {}
self.jac[output_name] = chain_jacobian
chain_jacobian.update(output_jacobian)
self._init_jacobian(inputs, outputs, with_zeros=True, fill_missing_keys=True)
def _set_disciplines_diff_inputs(
self,
inputs: Iterable[str],
) -> None:
"""Add the inputs to the right sub discipline's differentiated inputs.
Args:
inputs: The names of the inputs to be added.
"""
diff_inpts = set(inputs)
for discipline in self.disciplines:
inputs_set = set(discipline.get_input_data_names()) & diff_inpts
if inputs_set:
discipline.add_differentiated_inputs(list(inputs_set))
[docs] def add_differentiated_outputs(
self,
outputs: Iterable[str] | None = None,
) -> None:
# noqa: D102
MDODiscipline.add_differentiated_outputs(self, outputs)
self._set_disciplines_diff_outputs(outputs)
def _set_disciplines_diff_outputs(self, outputs: Iterable[str]) -> None:
"""Add the outputs to the right-sub discipline's differentiated outputs.
Args:
outputs: The outputs to be added.
"""
diff_outpts = set(outputs)
for discipline in self.disciplines:
outputs_set = set(discipline.get_output_data_names()) & diff_outpts
if outputs_set:
discipline.add_differentiated_outputs(list(outputs_set))
[docs] def reset_statuses_for_run(self) -> None: # noqa: D102
super().reset_statuses_for_run()
for discipline in self.disciplines:
discipline.reset_statuses_for_run()
[docs] def get_expected_workflow(self) -> None: # noqa: D102
sequence = ExecutionSequenceFactory.parallel()
for discipline in self.disciplines:
sequence.extend(discipline.get_expected_workflow())
return sequence
[docs] def get_expected_dataflow(self) -> None: # noqa: D102
return []
def _set_cache_tol(
self,
cache_tol: float,
) -> None:
super()._set_cache_tol(cache_tol)
for discipline in self.disciplines:
discipline.cache_tol = cache_tol or 0.0
[docs]class MDOAdditiveChain(MDOParallelChain):
"""Execute disciplines in parallel and sum specified outputs across disciplines."""
def __init__(
self,
disciplines: Iterable[MDODiscipline],
outputs_to_sum: Iterable[str],
name: str | None = None,
grammar_type: str = MDODiscipline.JSON_GRAMMAR_TYPE,
use_threading: bool = True,
n_processes: int | None = None,
) -> None:
# noqa: D205 D212 D415
"""
Args:
disciplines: The disciplines.
outputs_to_sum: The names of the outputs to sum.
name: The name of the discipline.
If None, use the class name.
grammar_type: The type of grammar to use for inputs and outputs declaration,
e.g. :attr:`.JSON_GRAMMAR_TYPE` or :attr:`.SIMPLE_GRAMMAR_TYPE`.
use_threading: Whether to use threads instead of processes
to parallelize the execution;
multiprocessing will copy (serialize) all the disciplines,
while threading will share all the memory.
This is important to note
if you want to execute the same discipline multiple times,
you shall use multiprocessing.
n_processes: The maximum simultaneous number of threads,
if ``use_threading`` is True, or processes otherwise,
used to parallelize the execution.
If None, uses the number of disciplines.
Note:
The actual number of processes could be lower than ``n_processes``
if there are less than ``n_processes`` disciplines.
``n_processes`` can be lower than the total number of CPUs on the machine.
Each discipline may itself run on several CPUs.
"""
super().__init__(disciplines, name, grammar_type, use_threading, n_processes)
self._outputs_to_sum = outputs_to_sum
def _run(self) -> None:
# Run the disciplines in parallel
MDOParallelChain._run(self)
# Sum the required outputs across disciplines
for output_name in self._outputs_to_sum:
disciplinary_outputs = [
discipline.local_data[output_name]
for discipline in self.disciplines
if output_name in discipline.local_data
]
self.local_data[output_name] = (
sum(disciplinary_outputs) if disciplinary_outputs else None
)
def _compute_jacobian(
self,
inputs: Iterable[str] | None = None,
outputs: Iterable[str] | None = None,
) -> None:
# Differentiate the disciplines in parallel
MDOParallelChain._compute_jacobian(self, inputs, outputs)
# Sum the Jacobians of the required outputs across disciplines
for output_name in self._outputs_to_sum:
self.jac[output_name] = dict()
for input_name in inputs:
disciplinary_jacobians = [
discipline.jac[output_name][input_name]
for discipline in self.disciplines
if input_name in discipline.jac[output_name]
]
assert disciplinary_jacobians
self.jac[output_name][input_name] = sum(disciplinary_jacobians)