Source code for gemseo.core.monitoring

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#        :author: Remi Lafage
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""Monitoring mechanism to track |g| execution (update events)."""

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import Any
from typing import Protocol

from gemseo.utils.singleton import SingleInstancePerAttributeId

if TYPE_CHECKING:
    from gemseo.core._process_flow.execution_sequences.base import BaseExecutionSequence
    from gemseo.core.execution_status import ExecutionStatus
    from gemseo.scenarios.base_scenario import BaseScenario

    class Observer(Protocol):
        """API of an observer."""

        def update(self, obj: Any) -> None:
            """Update an observer.

            Args:
                obj: The object to update from.
            """


[docs] class Monitoring(metaclass=SingleInstancePerAttributeId): """This class implements the observer pattern. It is a singleton, it is called by |g| core classes like MDODicipline whenever an event of interest like a status change occurs. Client objects register with add_observer and are notified whenever a discipline status change occurs. """ _observers: list[Observer] """The observers.""" workflow: BaseExecutionSequence """The execution sequence.""" # TODO: API: pass the workflow instead of the scenario since this is only what # matters. # TODO: API: make attr private. def __init__(self, scenario: BaseScenario) -> None: """ Args: scenario: The scenario to be monitored. """ # noqa: D205, D212, D415 self._observers = [] self.workflow = scenario.get_process_flow().get_execution_flow() self.workflow.set_observer(self) self.workflow.enable()
[docs] def add_observer(self, observer: Observer) -> None: """Register an observer object interested in observable update events. Args: observer: The object to be notified. """ if observer not in self._observers: self._observers.append(observer)
[docs] def remove_observer(self, observer: Observer) -> None: """Unsubscribe the given observer. Args: observer: The observer to be removed. """ if observer in self._observers: self._observers.remove(observer)
[docs] def remove_all_observers(self) -> None: """Unsubscribe all observers.""" self._observers.clear()
[docs] def update(self, atom: Any) -> None: """Notify the observers that the corresponding observable object is updated. Observers have to know what to retrieve from the observable object. Args: atom: The updated object. """ for obs in self._observers: obs.update(atom)
[docs] def get_statuses(self) -> dict[str, ExecutionStatus.Status]: """Get the statuses of all disciplines. Returns: These statuses associated with the atom ids. """ return self.workflow.get_statuses()
def __str__(self) -> str: return str(self.workflow)