Source code for gemseo.core.execution_sequence

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                         documentation
#        :author: Francois Gallard, Remi Lafage
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""
Abstraction for workflow
************************
"""
from __future__ import division, unicode_literals

import logging
from uuid import uuid4

from gemseo.core.discipline import MDODiscipline
from gemseo.utils.py23_compat import OrderedDict  # automatically dict from py36

LOGGER = logging.getLogger(__name__)

STATUS_FAILED = MDODiscipline.STATUS_FAILED
STATUS_DONE = MDODiscipline.STATUS_DONE
STATUS_PENDING = MDODiscipline.STATUS_PENDING
STATUS_RUNNING = MDODiscipline.STATUS_RUNNING


[docs]class ExecutionSequence(object): """A base class for execution sequences. The execution sequence structure is introduced to reflect the main workflow implicitly executed by |g| regarding the given scenario/formulation executed. That structure allows to identify single executions of a same discipline that may be run several times at various stages in the given scenario/formulation. """ START_STR = "[" END_STR = "]" def __init__(self, sequence=None): # pylint: disable=unused-argument # use an OrderedDict to get disc_to_uuids lists ordered regarding # a discipline repetitive appearance: useful for testing and debug self.uuid = str(uuid4()) self.uuid_to_disc = OrderedDict() self.disc_to_uuids = {} self._status = None self._enabled = False self._parent = None
[docs] def accept(self, visitor): """Accept a visitor object (see Visitor pattern). Have to be implemented by subclasses. :param visitor: a visitor object """ raise NotImplementedError()
[docs] def set_observer(self, obs): """Register the given observer object which is intended to be notified via its update() method each time an underlying discipline changes its status. To be implemented in subclasses. :returns: the disciplines list. """ raise NotImplementedError()
@property def status(self): """Get status value. :returns: the status value (MDODiscipline.STATUS_XXX values). """ return self._status @status.setter def status(self, status): """Set status value. :param status: (MDODiscipline.STATUS_XXX values). """ self._status = status @property def parent(self): """Get the containing execution sequence. :returns: a composite execution sequence. """ return self._parent @parent.setter def parent(self, parent): """Set the containing execution sequence as parent. self should be included in parent.sequence_list. :returns: the status value. """ if self not in parent.sequence_list: raise RuntimeError( "parent " + str(parent) + " do not include child " + str(self) ) self._parent = parent
[docs] def enabled(self): """Get activation state. :returns: boolean True if enabled. """ return self._enabled
[docs] def enable(self): """Set the execution sequence as activated (enabled).""" self.status = STATUS_PENDING self._enabled = True
[docs] def disable(self): """Set the execution sequence as deactivated (disabled).""" self._enabled = False
def _compute_disc_to_uuids(self): """ Update discipline to uuids mapping from uuids to discipline mapping Note: a discipline might correspond to several AtomicExecutionSeuqence hence might correspond to several uuids. """ self.disc_to_uuids = {} for key, value in self.uuid_to_disc.items(): self.disc_to_uuids.setdefault(value, []).append(key)
[docs]class AtomicExecSequence(ExecutionSequence): """An execution sequence to represent the single execution of a given discipline.""" def __init__(self, discipline=None): super(AtomicExecSequence, self).__init__(discipline) if not isinstance(discipline, MDODiscipline): raise Exception( "Atomic sequence shall be a discipline" + ", got " + str(type(discipline)) + " instead !" ) self.discipline = discipline self.uuid_to_disc = {self.uuid: discipline} self.disc_to_uuids = {discipline: [self.uuid]} self._observer = None def __str__(self): return self.discipline.name + "(" + str(self.status) + ")" def __repr__(self): return ( self.discipline.name + "(" + str(self.status) + ", " + str(self.uuid) + ")" )
[docs] def accept(self, visitor): """Accept a visitor object (see Visitor pattern) :param visitor: a visitor object implementing visit_atomic() method """ visitor.visit_atomic(self)
[docs] def set_observer(self, obs): """Register given observer obs to be notified (obs.update()) when discipline status changes. :param obs: the observe object implementing update() method """ self._observer = obs
[docs] def enable(self): """Subscribe to status changes of the discipline (notified via update_status())""" super(AtomicExecSequence, self).enable() self.discipline.add_status_observer(self)
[docs] def disable(self): """Unsubscribe from receiving status changes of the discipline.""" super(AtomicExecSequence, self).disable() self.discipline.remove_status_observer(self)
[docs] def get_state_dict(self): """Get the dictionary of statuses mapping atom uuid to status. :returns: the status """ return {self.uuid: self.status}
[docs] def update_status(self, discipline): """ Update status from given discipline. Reflect the status then notifies the parent and the observer if any. Note: update_status if discipline status change actually compared to current, otherwise do nothing. :param discipline: the discipline whose status changed """ if self._enabled and self.status != discipline.status: self.status = discipline.status or STATUS_PENDING if self.status == STATUS_DONE or self.status == STATUS_FAILED: self.disable() if self._parent: self._parent.update_child_status(self) if self._observer: self._observer.update(self)
[docs] def force_statuses(self, status): """Force the self status and the status of subsequences without notifying the parent (as the force_status is called by a parent), but notify the observer is status changed. :param: status value (see MDODiscipline.STATUS_XXX values) """ old_status = self._status self._status = status if old_status != status and self._observer: self._observer.update(self)
[docs]class CompositeExecSequence(ExecutionSequence): """A base class for execution sequence made of other execution sequences. Intented to be subclassed. """ START_STR = "'" END_STR = "'" def __init__(self, sequence=None): super(CompositeExecSequence, self).__init__(sequence) self.sequence_list = [] self.disciplines = [] def __str__(self): str_out = self.START_STR for seq in self.sequence_list: str_out += str(seq) + ", " str_out += self.END_STR return str_out
[docs] def accept(self, visitor): """Accept a visitor object (see Visitor pattern) and then make its children accept it too. :param visitor: a visitor object implementing visit_serial() method """ self._accept(visitor) for seq in self.sequence_list: seq.accept(visitor)
def _accept(self, visitor): """Accept a visitor object (see Visitor pattern). To be specifically implemented by subclasses to call relevant visitor method depending the subclass type. :param visitor: a visitor object implementing visit_serial() method """ raise NotImplementedError()
[docs] def set_observer(self, obs): """Set observer obs to subsequences. Override super.set_observer() :param obs: observer object implementing update() method """ for seq in self.sequence_list: seq.set_observer(obs)
[docs] def disable(self): """Unsubscribe subsequences from receiving status changes of disciplines.""" super(CompositeExecSequence, self).disable() for seq in self.sequence_list: seq.disable()
[docs] def force_statuses(self, status): """Force the self status and the status of subsequences. params: status value (see MDODiscipline.STATUS_XXX values) """ self.status = status for seq in self.sequence_list: seq.force_statuses(status)
[docs] def get_state_dict(self): """Get the dictionary of statuses mapping atom uuid to status. :returns: the status """ state_dict = {} for seq in self.sequence_list: state_dict.update(seq.get_state_dict()) return state_dict
[docs] def update_child_status(self, child): """Manage status change of child execution sequences. Propagates status change to the parent (containing execution sequence) :param child: the child execution sequence (contained in sequence_list) whose status has changed """ old_status = self.status self._update_child_status(child) if self._parent and self.status != old_status: self._parent.update_child_status(self)
def _update_child_status(self, child): """Handle child execution change. To be implemented in subclasses. :param child: the child execution sequence (contained in sequence_list) whose status has changed """ raise NotImplementedError()
[docs]class ExtendableExecSequence(CompositeExecSequence): """A base class for composite execution sequence that are extendable. Intented to be subclassed. """ def __init__(self, sequence=None): super(ExtendableExecSequence, self).__init__(sequence) if sequence is not None: self.extend(sequence)
[docs] def extend(self, sequence): """Extend the execution sequence with another ExecutionSequence or a discipline. :param sequence: another execution sequence or """ seq_class = sequence.__class__ self_class = self.__class__ if isinstance(sequence, list): # In this case we are initializing the sequence # or extending by a list of disciplines self._extend_with_disc_list(sequence) elif isinstance(sequence, MDODiscipline): # Sequence is extended by a single discipline: generate a new # uuid self._extend_with_disc_list([sequence]) elif isinstance(sequence, AtomicExecSequence): # Sequence is extended by an AtomicSequence: # we extend self._extend_with_atomic_sequence(sequence) elif seq_class != self_class: # We extend by a different type of ExecSequence # So we append the other sequence as a sub structure self._extend_with_diff_sequence_kind(sequence) else: # We extend by a same type of ExecSequence # So we just extend the sequence self._extend_with_same_sequence_kind(sequence) self._compute_disc_to_uuids() # refresh disc_to_uuids for seq in self.sequence_list: seq.parent = self return self
def _extend_with_disc_list(self, sequence): """Extend by a list of disciplines. :param sequence: a list of MDODiscipline objects """ seq_list = [AtomicExecSequence(disc) for disc in sequence] self.sequence_list.extend(seq_list) uuids_dict = {atom.uuid: atom.discipline for atom in seq_list} self.uuid_to_disc.update(uuids_dict) def _extend_with_atomic_sequence(self, sequence): """Extend by a list of AtomicExecutionSequence. :param sequence: a list of MDODiscipline objects """ self.sequence_list.append(sequence) self.uuid_to_disc[sequence.uuid] = sequence def _extend_with_same_sequence_kind(self, sequence): """Extend by another ExecutionSequence of same type. :param sequence: an ExecutionSequence of same type as self """ self.sequence_list.extend(sequence.sequence_list) self.uuid_to_disc.update(sequence.uuid_to_disc) def _extend_with_diff_sequence_kind(self, sequence): """Extend by another ExecutionSequence of different type. :param sequence: an ExecutionSequence of type different from self's one """ self.sequence_list.append(sequence) self.uuid_to_disc.update(sequence.uuid_to_disc) def _update_child_status(self, child): """Manage status change of child execution sequences. Done status management is handled in subclasses. :param child: the child execution sequence (contained in sequence_list) whose status has changed """ if child.status == STATUS_FAILED: self.status = STATUS_FAILED elif child.status == STATUS_DONE: self._update_child_done_status(child) else: self.status = child.status def _update_child_done_status(self, child): """Handle done status of child execution sequences. To be implemented in subclasses. :param child: the child execution sequence (contained in sequence_list) whose status has changed """ raise NotImplementedError()
[docs]class SerialExecSequence(ExtendableExecSequence): """A class to describe a serial execution of disciplines.""" START_STR = "[" END_STR = "]" def __init__(self, sequence=None): super(SerialExecSequence, self).__init__(sequence) self.exec_index = None def _accept(self, visitor): """Accept a visitor object (see Visitor pattern) :param visitor: a visitor object implementing visit_serial() method """ visitor.visit_serial(self)
[docs] def enable(self): """Activate first child execution sequence.""" super(SerialExecSequence, self).enable() self.exec_index = 0 if self.sequence_list: self.sequence_list[self.exec_index].enable() else: raise Exception("Serial execution is empty")
def _update_child_done_status(self, child): """Activate next child to given child execution sequence. Disable itself when all children done. :param child: the child execution sequence in done state. """ if child.status == STATUS_DONE: child.disable() self.exec_index += 1 if self.exec_index < len(self.sequence_list): self.sequence_list[self.exec_index].enable() else: # last seq done self.status = STATUS_DONE self.disable()
[docs]class ParallelExecSequence(ExtendableExecSequence): """A class to describe a parallel execution of disciplines.""" START_STR = "(" END_STR = ")" def _accept(self, visitor): """Accept a visitor object (see Visitor pattern) :param visitor: a visitor object implementing visit_parallel() method """ visitor.visit_parallel(self)
[docs] def enable(self): """Activate all child execution sequences.""" super(ParallelExecSequence, self).enable() for seq in self.sequence_list: seq.enable()
def _update_child_done_status(self, child): # pylint: disable=unused-argument """Disable itself when all children done. :param child: the child execution sequence in done state. """ all_done = True for seq in self.sequence_list: all_done = all_done and (seq.status == STATUS_DONE) if all_done: self.status = STATUS_DONE self.disable()
[docs]class LoopExecSequence(CompositeExecSequence): """A class to describe a loop with a controller discipline and an execution_sequence as iterate.""" START_STR = "{" END_STR = "}" def __init__(self, controller, sequence): if isinstance(controller, AtomicExecSequence): control = controller elif not isinstance(controller, MDODiscipline): raise Exception( "Controller of a loop shall be a discipline" + ", got " + str(type(controller)) + " instead !" ) else: control = AtomicExecSequence(controller) if not isinstance(sequence, CompositeExecSequence): raise Exception( "Sequence of a loop shall be a composite execution sequence" + ", got " + str(type(sequence)) + " instead !" ) super(LoopExecSequence, self).__init__() self.sequence_list = [control, sequence] self.atom_controller = control self.atom_controller.parent = self self.iteration_sequence = sequence self.iteration_sequence.parent = self self.uuid_to_disc.update(sequence.uuid_to_disc) self.uuid_to_disc[self.atom_controller.uuid] = controller self._compute_disc_to_uuids() self.iteration_count = 0 def _accept(self, visitor): """Accept a visitor object (see Visitor pattern) :param visitor: a visitor object implementing visit_loop() method """ visitor.visit_loop(self)
[docs] def enable(self): """Active controller execution sequence.""" super(LoopExecSequence, self).enable() self.atom_controller.enable() self.iteration_count = 0
def _update_child_status(self, child): """Activate iteration successively regarding controller status. Count iterations regarding iteration_sequence status. :param child: the child execution sequence in done state. """ self.status = self.atom_controller.status if child == self.atom_controller: if self.status == STATUS_RUNNING: if not self.iteration_sequence.enabled(): self.iteration_sequence.enable() elif self.status == STATUS_DONE: self.disable() self.force_statuses(STATUS_DONE) if child == self.iteration_sequence: if child.status == STATUS_DONE: self.iteration_count += 1 self.iteration_sequence.enable() if child.status == STATUS_FAILED: self.status = STATUS_FAILED
[docs]class ExecutionSequenceFactory(object): """A factory class for ExecutionSequence objects. Allow to create AtomicExecutionSequence, SerialExecutionSequence, ParallelExecutionSequence and LoopExecutionSequence. Main |g| workflow is intended to be expressed with those four ExecutionSequence types """
[docs] @staticmethod def atom(discipline): """Returns a structure representing the execution of a discipline. This function is intended to be called by MDOFormulation.get_expected_workflow methods. :param discipline: a discipline :returns: the structure used within XDSM workflow representation """ return AtomicExecSequence(discipline)
[docs] @staticmethod def serial(sequence=None): """Returns a structure representing the serial execution of the given disciplines. This function is intended to be called by MDOFormulation.get_expected_workflow methods. :param sequence: any number of discipline or the return value of a serial, parallel or loop call :returns: a serial execution sequence """ return SerialExecSequence(sequence)
[docs] @staticmethod def parallel(sequence=None): """Returns a structure representing the parallel execution of the given disciplines. This function is intended to be called by MDOFormulation.get_expected_workflow methods. :param sequence: any number of discipline or the return value of a serial, parallel or loop call :returns: a parallel execution sequence """ return ParallelExecSequence(sequence)
[docs] @staticmethod def loop(control, composite_sequence): """Returns a structure representing a loop execution of a This function is intended to be called by MDOFormulation.get_expected_workflow methods. :param control: the discipline object, controller of the loop :param composite_sequence: any number of discipline or the return value of a serial, parallel or loop call :returns: a loop execution sequence """ return LoopExecSequence(control, composite_sequence)