Source code for gemseo.mda.base_parallel_mda_solver
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - API and implementation and/or documentation
# :author: Charlie Vanaret, Francois Gallard
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""Base module for MDA solvers that can be run in parallel."""
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from gemseo.core.parallel_execution.disc_parallel_execution import DiscParallelExecution
from gemseo.core.parallel_execution.disc_parallel_linearization import (
DiscParallelLinearization,
)
from gemseo.mda.base_mda_solver import BaseMDASolver
from gemseo.mda.base_parallel_mda_settings import BaseParallelMDASettings
from gemseo.utils.constants import READ_ONLY_EMPTY_DICT
if TYPE_CHECKING:
from collections.abc import Sequence
from gemseo.core.discipline import Discipline
from gemseo.typing import StrKeyMapping
[docs]
class BaseParallelMDASolver(BaseMDASolver):
"""Abstract class for MDA solvers that can be run in parallel."""
Settings: ClassVar[type[BaseParallelMDASettings]] = BaseParallelMDASettings
"""The pydantic model for the settings."""
settings: BaseParallelMDASettings
"""The settings of the MDA"""
__parallel_execution: DiscParallelExecution | None
"""Either a parallel executor for disciplines or ``None`` in serial mode."""
__parallel_linearization: DiscParallelLinearization | None
"""Either a parallel linearizator for disciplines or ``None`` in serial mode."""
def __init__( # noqa: D107
self,
disciplines: Sequence[Discipline],
settings_model: BaseParallelMDASettings | None = None,
**settings: Any,
) -> None:
super().__init__(disciplines, settings_model=settings_model, **settings)
if self.settings.n_processes > 1:
self._execute_disciplines = self._execute_disciplines_in_parallel
self._linearize_disciplines = self._linearize_disciplines_in_parallel
self.__parallel_execution = DiscParallelExecution(
disciplines,
self.settings.n_processes,
use_threading=self.settings.use_threading,
exceptions_to_re_raise=(ValueError,),
)
self.__parallel_linearization = DiscParallelLinearization(
disciplines,
self.settings.n_processes,
use_threading=self.settings.use_threading,
execute=self.settings.execute_before_linearizing,
)
else:
self._execute_disciplines = self._execute_disciplines_sequentially
self._linearize_disciplines = self._linearize_disciplines_sequentially
self.__parallel_execution = None
self.__parallel_linearization = None
self._compute_input_coupling_names()
@property
def parallel_execution(self) -> DiscParallelExecution | None:
"""The parallel executor, if any."""
return self.__parallel_execution
def _execute_disciplines_sequentially(self, input_data: StrKeyMapping) -> None:
"""Execute the discipline sequentially.
Args:
input_data: The input data to execute the disciplines on.
"""
for discipline in self._disciplines:
discipline.execute(input_data)
def _execute_disciplines_in_parallel(self, input_data: StrKeyMapping) -> None:
"""Execute the discipline in parallel.
Args:
input_data: The input data to execute the disciplines on.
"""
self.__parallel_execution.execute([input_data] * len(self._disciplines))
def _linearize_disciplines_sequentially(self, input_data: StrKeyMapping) -> None:
"""Linearize the disciplines sequentially.
Args:
input_data: The input data defining the point around which the disciplines
are linearize.
"""
execute_before_linearizing = self.settings.execute_before_linearizing
for discipline in self._disciplines:
discipline.linearize(input_data, execute=execute_before_linearizing)
def _linearize_disciplines_in_parallel(self, input_data: StrKeyMapping) -> None:
"""Linearize the disciplines in parallel.
Args:
input_data: The input data defining the point around which the disciplines
are linearize.
"""
self.__parallel_linearization.execute([input_data] * len(self._disciplines))
def _execute_disciplines_and_update_local_data(
self, input_data: StrKeyMapping = READ_ONLY_EMPTY_DICT
) -> None:
self._execute_disciplines(input_data or self.io.data)
for discipline in self._disciplines:
self.io.data.update(discipline.get_output_data())
self._compute_names_to_slices()