Source code for gemseo.core.parallel_execution.disc_parallel_linearization
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Parallel execution of linearized disciplines."""
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import Callable
from typing import NamedTuple
from gemseo.core.execution_statistics import ExecutionStatistics
from gemseo.core.parallel_execution.callable_parallel_execution import (
CallableParallelExecution,
)
from gemseo.core.parallel_execution.callable_parallel_execution import CallbackType
from gemseo.typing import StrKeyMapping
from gemseo.utils.constants import N_CPUS
if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Sequence
from gemseo.core.discipline import Discipline
from gemseo.core.discipline.discipline_data import DisciplineData
from gemseo.typing import JacobianData
class _WorkerData(NamedTuple):
"""The computed data of a worker (discipline)."""
io_data: DisciplineData
jacobian: JacobianData
class _Functor:
"""A functor to call a discipline linearization.
When called, the :attr:`.Discipline.io.data` and :attr:`.Discipline.jac`
are returned.
"""
def __init__(self, discipline: Discipline, execute: bool = True) -> None:
"""
Args:
discipline: The discipline to get a callable from.
execute: Whether to start by executing the discipline
with the input data for which to compute the Jacobian;
this allows to ensure that the discipline was executed
with the right input data;
it can be almost free if the corresponding output data
have been stored in the :attr:`.cache`.
""" # noqa:D205 D212 D415
self.__disc = discipline
self.__execute = execute
def __call__(self, inputs: StrKeyMapping) -> _WorkerData:
"""
Args:
inputs: The inputs of the discipline.
Returns:
The discipline :attr:`.Discipline.io.data` and its jacobian.
""" # noqa:D205 D212 D415
jacobian = self.__disc.linearize(inputs, execute=self.__execute)
return _WorkerData(self.__disc.io.data, jacobian)
[docs]
class DiscParallelLinearization(CallableParallelExecution[StrKeyMapping, _WorkerData]):
"""Linearize disciplines in parallel."""
_disciplines: Sequence[Discipline]
"""The disciplines to linearize."""
def __init__(
self,
disciplines: Sequence[Discipline],
n_processes: int = N_CPUS,
use_threading: bool = False,
wait_time_between_fork: float = 0.0,
exceptions_to_re_raise: Sequence[type[Exception]] = (),
execute: bool = True,
) -> None:
"""
Args:
disciplines: The disciplines to execute.
execute: Whether to start by executing the discipline
with the input data for which to compute the Jacobian;
this allows to ensure that the discipline was executed
with the right input data;
it can be almost free if the corresponding output data
have been stored in the :attr:`.cache`.
""" # noqa:D205 D212 D415
super().__init__(
workers=[_Functor(d, execute=execute) for d in disciplines],
n_processes=n_processes,
use_threading=use_threading,
wait_time_between_fork=wait_time_between_fork,
exceptions_to_re_raise=exceptions_to_re_raise,
)
# Because accessing a method of an object provides a new callable object for
# every access, we shall check unicity on the disciplines.
self._check_unicity(disciplines)
self._disciplines = disciplines
# TODO: API: fix return type or return None and use the disc attributes updated?
[docs]
def execute( # type: ignore[override] # noqa: D102
self,
inputs: Sequence[StrKeyMapping],
exec_callback: CallbackType | Iterable[CallbackType] = (),
task_submitted_callback: Callable[[], None] | None = None,
) -> list[JacobianData | None]:
ordered_outputs = super().execute(
inputs,
exec_callback=exec_callback,
task_submitted_callback=task_submitted_callback,
)
if len(self._disciplines) == 1 or len(self._disciplines) != len(inputs):
output_0 = ordered_outputs[0]
if output_0 is not None:
disc_0 = self._disciplines[0]
if len(self._disciplines) == 1:
disc_0.io.data = output_0.io_data
disc_0.jac = output_0.jacobian
if (
not self.use_threading
and self.MULTI_PROCESSING_START_METHOD
== self.MultiProcessingStartMethod.SPAWN
and ExecutionStatistics.is_enabled
and output_0.io_data
):
# Only increase the number of calls if the Jacobian was computed.
disc_0.execution_statistics.n_calls += len(inputs) # type: ignore[operator] # checked with activate_counter
disc_0.execution_statistics.n_calls_linearize += len(inputs) # type: ignore[operator] # checked with activate_counter
else:
for disc, output in zip(self._disciplines, ordered_outputs):
# When the discipline in the worker failed, output is None.
# We do not update the data such that the issue is caught by the
# output grammar.
if output is not None:
disc.io.data = output.io_data
disc.jac = output.jacobian
return [out.jacobian for out in ordered_outputs if out is not None or None]