Source code for gemseo.core.execution_status

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
"""Execution status of processes."""

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import ClassVar

from strenum import StrEnum

from gemseo.core.serializable import Serializable

if TYPE_CHECKING:
    from gemseo.core.base_execution_status_observer import BaseExecutionStatusObserver

# TODO: use BaseExecutionStatusObserver.
# TODO: API: add more precise status rules?


[docs] class ExecutionStatus(Serializable): """Execution status of a process. The status aims at monitoring a process and give the user a simplified view on the state of the processes. The possible statuses are defined in :attr:`.Status`. The status rules are: - the initial status is ``DONE``, - the status ``RUNNING`` or ``LINEARIZING`` can only be set when the current one is ``DONE``, - the status ``DONE`` can only be set when the current one is ``RUNNING``. Helper methods should be used to handle the statuses when running or linearizing a process: :meth:`.run` and :meth:`linearize`. Observers can be attached and are notified when the value of the status is changed. The observers are not restored after pickling. """
[docs] class Status(StrEnum): """The statuses.""" RUNNING = "RUNNING" LINEARIZING = "LINEARIZING" FAILED = "FAILED" DONE = "DONE"
_ATTR_NOT_TO_SERIALIZE: ClassVar[set[str]] = {"__observers"} __process_name: str """The name of the process that has an execution status.""" __status: Status """The status.""" __observers: set[BaseExecutionStatusObserver] """The observers.""" def __init__(self, process_name: str) -> None: """ Args: process_name: The name of the process. """ # noqa: D205, D212 self.__process_name = process_name self.__status = self.Status.DONE self._init_shared_memory_attrs_before() def __check_can_be_running(self, status: Status) -> bool: """Return whether the status can be set to ``RUNNING``. Args: status: A status. Returns: Whether the status can be set to ``RUNNING``. """ return status == self.Status.RUNNING and self.value != self.Status.DONE def __check_can_be_linearizing(self, status: Status) -> bool: """Return whether the status can be set to ``LINEARIZING``. Args: status: A status. Returns: Whether the status can be set to ``LINEARIZING``. """ return status == self.Status.LINEARIZING and self.value != self.Status.DONE @property def value(self) -> Status: """The status of the process.""" return self.__status @value.setter def value(self, status: Status) -> None: if self.__check_can_be_running(status) or self.__check_can_be_linearizing( status ): msg = ( f"{self.__process_name} cannot be set to status {status} " f"while in status {self.value}." ) raise ValueError(msg) # Cast the argument so that the enum class raise an explicit error. self.Status(status) self.__status = status self.__notify_observers()
[docs] def handle( self, status: Status, function: Callable[[Any], None], *args: Any, ) -> None: """Handle a status while executing a function. On exception, the status is set to ``FAILED``, otherwise is set to ``DONE``. Args: status: The status to be set before execution. function: The function to be called. *args: The argument to be passed for calling the function. """ self.value = status try: function(*args) except Exception: self.value = self.Status.FAILED raise self.value = self.Status.DONE
[docs] def add_observer(self, observer: BaseExecutionStatusObserver) -> None: """Add an observer. Args: observer: The observer to add. """ self.__observers.add(observer)
[docs] def remove_observer(self, observer: BaseExecutionStatusObserver) -> None: """Remove an observer. Args: observer: The observer to remove. """ self.__observers.discard(observer)
def __notify_observers(self) -> None: """Notify the observers.""" # Iterate on a copy because some ExecutionSequence observers can remove # observers. for observer in tuple(self.__observers): observer.update_status(self) def _init_shared_memory_attrs_before(self) -> None: self.__observers = set()