Source code for gemseo.core.monitoring
# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - API and implementation and/or documentation
# :author: Remi Lafage
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""
Monitoring mechanism to track |g| execution (update events)
***********************************************************
"""
from __future__ import division, unicode_literals
from future.utils import with_metaclass
from gemseo.utils.singleton import SingleInstancePerAttributeId
[docs]class Monitoring(with_metaclass(SingleInstancePerAttributeId, object)):
"""This class implements the observer pattern.
It is a singleton, it is called by |g| core classes like MDODicipline whenever an
event of interest like a status change occurs. Client objects register with
add_observer and are notified whenever a discipline status change occurs.
"""
def __init__(self, scenario):
"""Initializes the monitoring.
:param scenario: the scenario to be monitored.
:type scenario: MDOScenario
"""
self._observers = []
self.workflow = scenario.get_expected_workflow()
self.workflow.set_observer(self)
self.workflow.enable()
[docs] def add_observer(self, observer):
"""Registers an observer object interested in observable update events.
:param observer: object to be notified
"""
if observer not in self._observers:
self._observers.append(observer)
[docs] def remove_observer(self, observer):
"""Unsubscribes the given observer.
:param observer: observer to be removed
"""
if observer in self._observers:
self._observers.remove(observer)
[docs] def remove_all_observers(self):
"""Unsubscribes all observers."""
self._observers = []
[docs] def update(self, atom):
"""Notifies observers that the corresponding observable object is updated.
Observers have to know what to retrieve from the observable object.
:param atom: updated object
"""
for obs in self._observers:
obs.update(atom)
[docs] def get_statuses(self):
"""Gets the statuses of all disciplines.
:returns: a dictionary of all statuses, keys are the atom ids
:rtype: dict
"""
return self.workflow.get_state_dict()
def __str__(self):
"""Returns the string representation of the workflow.
:returns: string representation of the workflow
:rtype: str
"""
return str(self.workflow)