# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Francois Gallard, Remi Lafage
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""Abstraction for workflow."""
from __future__ import annotations
from abc import abstractmethod
from typing import TYPE_CHECKING
from uuid import uuid4
from gemseo.core.discipline import MDODiscipline
from gemseo.utils.metaclasses import ABCGoogleDocstringInheritanceMeta
if TYPE_CHECKING:
from collections.abc import Iterable
_ExecutionStatus = MDODiscipline.ExecutionStatus
[docs]
class ExecutionSequence(metaclass=ABCGoogleDocstringInheritanceMeta):
"""A base class for execution sequences.
The execution sequence structure is introduced to reflect the main workflow
implicitly executed by |g| regarding the given scenario/formulation executed. That
structure allows to identify single executions of a same discipline that may be run
several times at various stages in the given scenario/formulation.
"""
START_STR = "["
END_STR = "]"
def __init__(self, sequence=None) -> None:
"""
Args:
sequence: This argument is not used.
""" # noqa: D205, D212, D415
self.uuid = str(uuid4())
self.uuid_to_disc = {}
self.disc_to_uuids = {}
self._status = None
self._enabled = False
self._parent = None
[docs]
@abstractmethod
def accept(self, visitor):
"""Accept a visitor object (see Visitor pattern).
Have to be implemented by subclasses.
Args:
visitor: A visitor object.
"""
[docs]
@abstractmethod
def set_observer(self, obs):
"""Register an observer.
This observer is intended to be notified via its :meth:`update` method
each time an underlying discipline changes its status.
To be implemented in subclasses.
Returns:
The disciplines.
"""
@property
def status(self):
"""Get the value of the status.
One of :attr:`.MDODiscipline.ExecutionStatus`.
Returns:
The value of the status.
"""
return self._status
@status.setter
def status(self, status) -> None:
"""Set the value of the status.
One of :attr:`.MDODiscipline.ExecutionStatus`.
Args:
status: The value of the status
"""
self._status = status
@property
def parent(self):
"""Get the containing execution sequence.
Returns:
The execution sequence containing the current one.
"""
return self._parent
@parent.setter
def parent(self, parent) -> None:
"""Set the containing execution sequence as parent.
Args:
parent: An execution sequence.
Raises:
RuntimeError: When the current execution sequence is not a child
of the given parent execution sequence.
"""
if self not in parent.sequences:
msg = f"parent {parent} does not include child {self}"
raise RuntimeError(msg)
self._parent = parent
[docs]
def enabled(self) -> bool:
"""Get activation state.
Returns:
Whether the execution sequence is enabled.
"""
return self._enabled
[docs]
def enable(self) -> None:
"""Set the execution sequence as activated (enabled)."""
self.status = _ExecutionStatus.PENDING
self._enabled = True
[docs]
def disable(self) -> None:
"""Set the execution sequence as deactivated (disabled)."""
self._enabled = False
def _compute_disc_to_uuids(self) -> None:
"""Update discipline to uuids mapping from uuids to discipline mapping.
Notes:
A discipline might correspond to several AtomicExecutionSequence hence
might correspond to several uuids.
"""
self.disc_to_uuids = {}
for key, value in self.uuid_to_disc.items():
self.disc_to_uuids.setdefault(value, []).append(key)
[docs]
class AtomicExecSequence(ExecutionSequence):
"""An execution sequence to represent the single execution of a given discipline."""
def __init__(self, discipline: MDODiscipline | None = None) -> None:
"""
Args:
discipline: A discipline.
""" # noqa: D205, D212, D415
super().__init__()
if not isinstance(discipline, MDODiscipline):
msg = (
"Atomic sequence shall be a discipline, got "
f"{type(discipline)} instead !"
)
raise TypeError(msg)
self.discipline = discipline
self.uuid_to_disc = {self.uuid: discipline}
self.disc_to_uuids = {discipline: [self.uuid]}
self._observer = None
def __str__(self) -> str:
return f"{self.discipline.name}({self.status})"
def __repr__(self) -> str:
return f"{self.discipline.name}({self.status}, {self.uuid})"
[docs]
def accept(self, visitor) -> None:
"""Accept a visitor object (see Visitor pattern).
Args:
visitor: An object implementing the :meth:`visit_atomic` method.
"""
visitor.visit_atomic(self)
[docs]
def set_observer(self, obs) -> None:
"""Register a given observer to be notified when discipline status changes.
Args:
obs: An object implementing the :meth:`update` method for notification.
"""
self._observer = obs
[docs]
def enable(self) -> None:
"""Subscribe to status changes of the discipline.
Notified via the :meth:`update_status` method.
"""
super().enable()
self.discipline.add_status_observer(self)
[docs]
def disable(self) -> None:
"""Unsubscribe from receiving status changes of the discipline."""
super().disable()
self.discipline.remove_status_observer(self)
[docs]
def get_statuses(self):
"""Get the dictionary of statuses mapping atom uuid to status.
Args:
The statuses mapping atom uuid to status.
"""
return {self.uuid: self.status}
[docs]
def update_status(self, discipline) -> None:
"""Update status from given discipline.
Reflect the status then notifies the parent and the observer if any.
Notes: update_status if discipline status change actually
compared to current, otherwise do nothing.
Args:
discipline: The discipline whose status changed.
"""
if self._enabled and self.status != discipline.status:
self.status = discipline.status or _ExecutionStatus.PENDING
if self.status in {_ExecutionStatus.DONE, _ExecutionStatus.FAILED}:
self.disable()
if self._parent:
self._parent.update_child_status(self)
if self._observer:
self._observer.update(self)
[docs]
def force_statuses(self, status) -> None:
"""Force the self status and the status of subsequences.
This is done without notifying the
parent (as the force_status is called by a parent), but notify the observer is
status changed.
Args:
status: The value of the status,
one of :attr:`.MDODiscipline.ExecutionStatus`.
"""
old_status = self._status
self._status = status
if old_status != status and self._observer:
self._observer.update(self)
[docs]
class CompositeExecSequence(ExecutionSequence):
"""A base class for execution sequence made of other execution sequences.
Intended to be subclassed.
"""
START_STR = "'"
END_STR = "'"
sequences: list[ExecutionSequence]
"""The inner execution sequences."""
disciplines: list[MDODiscipline]
"""The disciplines."""
def __init__(self, sequence=None) -> None: # noqa:D107
super().__init__()
self.sequences = []
self.disciplines = []
def __str__(self) -> str:
string = self.START_STR
for sequence in self.sequences:
string += str(sequence) + ", "
string += self.END_STR
return string
[docs]
def accept(self, visitor) -> None:
"""Accept a visitor object and then make its children accept it too.
Args:
visitor: A visitor object implementing the :meth:`visit_serial` method.
"""
self._accept(visitor)
for sequence in self.sequences:
sequence.accept(visitor)
@abstractmethod
def _accept(self, visitor) -> None:
"""Accept a visitor object (see Visitor pattern).
To be specifically implemented
by subclasses to call relevant visitor method depending on the subclass type.
Args:
visitor: An object implementing the :meth:`visit_serial` method.
"""
[docs]
def set_observer(self, obs) -> None:
"""Set observer obs to subsequences.
Override super.set_observer()
Args:
obs: An object implementing the meth:`update` method.
"""
for sequence in self.sequences:
sequence.set_observer(obs)
[docs]
def disable(self) -> None:
"""Unsubscribe subsequences from receiving status changes of disciplines."""
super().disable()
for sequence in self.sequences:
sequence.disable()
[docs]
def force_statuses(self, status) -> None:
"""Force the self status and the status of subsequences.
Args:
status: The value of the status,
one of :attr:`.MDODiscipline.ExecutionStatus`.
"""
self.status = status
for sequence in self.sequences:
sequence.force_statuses(status)
[docs]
def get_statuses(self):
"""Get the dictionary of statuses mapping atom uuid to status.
Returns:
The statuses related to the atom uuid.
"""
uuids_to_statuses = {}
for sequence in self.sequences:
uuids_to_statuses.update(sequence.get_statuses())
return uuids_to_statuses
[docs]
def update_child_status(self, child) -> None:
"""Manage status change of child execution sequences.
Propagates status change
to the parent (containing execution sequence).
Args:
child: The child execution sequence (contained in sequences)
whose status has changed.
"""
old_status = self.status
self._update_child_status(child)
if self._parent and self.status != old_status:
self._parent.update_child_status(self)
@abstractmethod
def _update_child_status(self, child):
"""Handle child execution change.
To be implemented in subclasses.
Args:
child: the child execution sequence (contained in sequences)
whose status has changed.
"""
[docs]
class ExtendableExecSequence(CompositeExecSequence):
"""A base class for composite execution sequence that are extendable.
Intended to be subclassed.
"""
def __init__(self, sequence=None) -> None: # noqa:D107
super().__init__()
if sequence is not None:
self.extend(sequence)
[docs]
def extend(self, sequence):
"""Extend the execution sequence with another sequence or discipline(s).
Args:
sequence: Either another execution sequence or one or several disciplines.
Returns:
The extended execution sequence.
"""
seq_class = sequence.__class__
self_class = self.__class__
if isinstance(sequence, list):
# In this case we are initializing the sequence
# or extending by a list of disciplines
self._extend_with_disciplines(sequence)
elif isinstance(sequence, MDODiscipline):
# Sequence is extended by a single discipline: generate a new
# uuid
self._extend_with_disciplines([sequence])
elif isinstance(sequence, AtomicExecSequence):
# Sequence is extended by an AtomicSequence:
# we extend
self._extend_with_atomic_sequence(sequence)
elif seq_class != self_class:
# We extend by a different type of ExecSequence
# So we append the other sequence as a sub structure
self._extend_with_diff_sequence_kind(sequence)
else:
# We extend by a same type of ExecSequence
# So we just extend the sequence
self._extend_with_same_sequence_kind(sequence)
self._compute_disc_to_uuids() # refresh disc_to_uuids
for sequence in self.sequences:
sequence.parent = self
return self
def _extend_with_disciplines(self, disciplines: Iterable[MDODiscipline]) -> None:
"""Extend the sequence with disciplines.
Args:
disciplines: A collection of disciplines.
"""
sequences = [AtomicExecSequence(discipline) for discipline in disciplines]
self.sequences.extend(sequences)
self.uuid_to_disc.update({
sequence.uuid: sequence.discipline for sequence in sequences
})
def _extend_with_atomic_sequence(self, sequence: ExecutionSequence) -> None:
"""Extend by a list of AtomicExecutionSequence.
Args:
sequence: A list of MDODiscipline objects.
"""
self.sequences.append(sequence)
self.uuid_to_disc[sequence.uuid] = sequence
def _extend_with_same_sequence_kind(self, sequence) -> None:
"""Extend by another ExecutionSequence of same type.
Args:
sequence: An ExecutionSequence of same type as self.
"""
self.sequences.extend(sequence.sequences)
self.uuid_to_disc.update(sequence.uuid_to_disc)
def _extend_with_diff_sequence_kind(self, sequence: ExecutionSequence) -> None:
"""Extend by another ExecutionSequence of different type.
Args:
sequence: An ExecutionSequence of type different from self's one.
"""
self.sequences.append(sequence)
self.uuid_to_disc.update(sequence.uuid_to_disc)
def _update_child_status(self, child) -> None:
"""Manage status change of child execution sequences.
Done status management is handled in subclasses.
Args:
child: The child execution sequence (contained in sequences)
whose status has changed.
"""
if child.status == _ExecutionStatus.FAILED:
self.status = _ExecutionStatus.FAILED
elif child.status == _ExecutionStatus.DONE:
self._update_child_done_status(child)
else:
self.status = child.status
@abstractmethod
def _update_child_done_status(self, child):
"""Handle done status of child execution sequences.
To be implemented in subclasses.
Args:
child: The child execution sequence (contained in sequences)
whose status has changed.
"""
[docs]
class SerialExecSequence(ExtendableExecSequence):
"""A class to describe a serial execution of disciplines."""
START_STR = "["
END_STR = "]"
def __init__(self, sequence=None) -> None: # noqa:D107
super().__init__(sequence)
self.exec_index = None
def _accept(self, visitor) -> None:
"""Accept a visitor object (see Visitor pattern).
Args:
visitor: An object implementing the :meth:`visit_serial` method.
"""
visitor.visit_serial(self)
[docs]
def enable(self) -> None:
"""Activate first child execution sequence."""
super().enable()
self.exec_index = 0
if self.sequences:
self.sequences[self.exec_index].enable()
else:
msg = "Serial execution is empty"
raise ValueError(msg)
def _update_child_done_status(self, child) -> None:
"""Activate next child to given child execution sequence.
Disable itself when all children done.
Args:
child: The child execution sequence in done state.
"""
if child.status == _ExecutionStatus.DONE:
child.disable()
self.exec_index += 1
if self.exec_index < len(self.sequences):
self.sequences[self.exec_index].enable()
else: # last seq done
self.status = _ExecutionStatus.DONE
self.disable()
[docs]
class ParallelExecSequence(ExtendableExecSequence):
"""A class to describe a parallel execution of disciplines."""
START_STR = "("
END_STR = ")"
def _accept(self, visitor) -> None:
"""Accept a visitor object (see Visitor pattern).
Args:
visitor: An object implementing the :meth:`visit_serial` method.
"""
visitor.visit_parallel(self)
[docs]
def enable(self) -> None:
"""Activate all child execution sequences."""
super().enable()
for sequence in self.sequences:
sequence.enable()
def _update_child_done_status(self, child) -> None:
"""Disable itself when all children done.
Args:
child: The child execution sequence in done state.
"""
all_done = True
for sequence in self.sequences:
all_done = all_done and (sequence.status == _ExecutionStatus.DONE)
if all_done:
self.status = _ExecutionStatus.DONE
self.disable()
[docs]
class LoopExecSequence(CompositeExecSequence):
"""A loop with a controller discipline and an execution_sequence as iterate."""
START_STR = "{"
END_STR = "}"
def __init__(
self,
controller: MDODiscipline | AtomicExecSequence,
sequence: CompositeExecSequence,
) -> None:
"""
Args:
controller: A controller.
sequence: A sequence.
""" # noqa: D205, D212, D415
if isinstance(controller, AtomicExecSequence):
control = controller
elif not isinstance(controller, MDODiscipline):
msg = (
"Controller of a loop shall be a discipline, "
f"got {type(controller)} instead."
)
raise TypeError(msg)
else:
control = AtomicExecSequence(controller)
if not isinstance(sequence, CompositeExecSequence):
msg = (
"Sequence of a loop shall be a composite execution sequence, "
f"got {type(sequence)} instead."
)
raise TypeError(msg)
super().__init__()
self.sequences = [control, sequence]
self.atom_controller = control
self.atom_controller.parent = self
self.iteration_sequence = sequence
self.iteration_sequence.parent = self
self.uuid_to_disc.update(sequence.uuid_to_disc)
self.uuid_to_disc[self.atom_controller.uuid] = controller
self._compute_disc_to_uuids()
self.iteration_count = 0
def _accept(self, visitor) -> None:
"""Accept a visitor object (see Visitor pattern).
Args:
visitor: An object implementing the :meth:`visit_serial` method.
"""
visitor.visit_loop(self)
[docs]
def enable(self) -> None:
"""Active controller execution sequence."""
super().enable()
self.atom_controller.enable()
self.iteration_count = 0
def _update_child_status(self, child) -> None:
"""Activate iteration successively regarding controller status.
Count iterations regarding iteration_sequence status.
Args:
child: The child execution sequence in done state.
"""
self.status = self.atom_controller.status
if child == self.atom_controller:
if self.status == _ExecutionStatus.RUNNING:
if not self.iteration_sequence.enabled():
self.iteration_sequence.enable()
elif self.status == _ExecutionStatus.DONE:
self.disable()
self.force_statuses(_ExecutionStatus.DONE)
if child == self.iteration_sequence and child.status == _ExecutionStatus.DONE:
self.iteration_count += 1
self.iteration_sequence.enable()
if child.status == _ExecutionStatus.FAILED:
self.status = _ExecutionStatus.FAILED
[docs]
class ExecutionSequenceFactory:
"""A factory class for ExecutionSequence objects.
Allow to create AtomicExecutionSequence, SerialExecutionSequence,
ParallelExecutionSequence and LoopExecutionSequence. Main |g| workflow is intended
to be expressed with those four ExecutionSequence types
"""
[docs]
@staticmethod
def atom(discipline) -> AtomicExecSequence:
"""Return a structure representing the execution of a discipline.
This function
is intended to be called by MDOFormulation.get_expected_workflow methods.
Args:
discipline: A discipline.
Returns:
The structure used within XDSM workflow representation.
"""
return AtomicExecSequence(discipline)
[docs]
@staticmethod
def serial(sequence=None) -> SerialExecSequence:
"""Return a structure representing the serial execution of disciplines.
This function is intended to be called by MDOFormulation.get_expected_workflow
methods.
Args:
sequence: Any number of discipline
or the return value of a serial, parallel or loop call.
Returns:
A serial execution sequence.
"""
return SerialExecSequence(sequence)
[docs]
@staticmethod
def parallel(sequence=None) -> ParallelExecSequence:
"""Return a structure representing the parallel execution of disciplines.
This function is intended to be called by MDOFormulation.get_expected_workflow
methods.
Args:
sequence: Any number of discipline or
the return value of a serial, parallel or loop call.
Returns:
A parallel execution sequence.
"""
return ParallelExecSequence(sequence)
[docs]
@staticmethod
def loop(control, composite_sequence) -> LoopExecSequence:
"""Return a structure representing a loop execution of a function.
It is intended to be called by MDOFormulation.get_expected_workflow methods.
Args:
control: The discipline object, controller of the loop.
composite_sequence: Any number of discipline
or the return value of a serial, parallel or loop call.
Returns:
A loop execution sequence.
"""
return LoopExecSequence(control, composite_sequence)