Source code for gemseo.disciplines.wrappers.retry_discipline

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
"""The retry discipline."""

from __future__ import annotations

import math
import time
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from ctypes import c_long
from ctypes import py_object
from ctypes import pythonapi
from logging import getLogger
from typing import TYPE_CHECKING

import psutil

from gemseo.core.discipline import Discipline
from gemseo.core.execution_status import ExecutionStatus

if TYPE_CHECKING:
    from collections.abc import Iterable

    from gemseo.typing import StrKeyMapping

LOGGER = getLogger(__name__)


[docs] class RetryDiscipline(Discipline): """A discipline to be executed with retry and timeout options. This :class:`.Discipline` wraps another discipline so it can be executed multiple times (up to a specified number of trials) if the previous attempts fail to produce any result. A timeout in seconds can be specified to prevent executions from becoming stuck. The timeout can be handled either via a thread or a subprocess. By default, it is handled by a thread, this can be changed by setting the ``timeout_with_process`` argument to ``True``. Use a thread when the discipline does not create other processes, otherwise those processes will keep running after the timeout duration. Beware that using a process is slower, especially under Windows. Users can also provide a tuple of :class:`.Exception` that, if one of them is raised, it does not retry the execution. Please note that the ``TimeoutError`` exception is also caught if the wrapped discipline raises such an exception (i.e. aside from ``RetryDiscipline`` itself). So it could lead to 2 surprising cases, but in fact normal cases: - a ``TimeoutError`` exception even though the user didn't provide any timeout value. - a ``TimeoutError`` raised sooner than the ``timeout`` value set by the user. """ __n_executions: int """The number of performed executions of the discipline.""" n_trials: int """The number of trials to execute the discipline.""" wait_time: float """The time to wait between 2 trials (in seconds).""" timeout: float """The maximum duration, in seconds, that the discipline is allowed to run.""" fatal_exceptions: Iterable[type[Exception]] """The exceptions for which the code raises an exception and exit immediately without retrying a run.""" def __init__( self, discipline: Discipline, n_trials: int = 5, wait_time: float = 0.0, timeout: float = math.inf, fatal_exceptions: Iterable[type[Exception]] = (), timeout_with_process: bool = False, ) -> None: """ Args: discipline: The discipline to wrap in the retry loop. n_trials: The number of trials of the discipline. wait_time: The time to wait between 2 trials (in seconds). timeout: The maximum duration, in seconds, that the discipline is allowed to run. If this time limit is exceeded, the execution is terminated. If ``math.inf``, the discipline is executed without timeout limit. fatal_exceptions: The exceptions for which the code raises an exception and exit immediately without retrying a run. timeout_with_process: Whether to use a process or a thread when using the timeout feature. """ # noqa:D205 D212 D415 super().__init__(discipline.name) self._discipline = discipline self.__n_executions = 0 self.io.input_grammar = discipline.io.input_grammar self.io.output_grammar = discipline.io.output_grammar self.n_trials = n_trials self.wait_time = wait_time self.timeout = timeout self.fatal_exceptions = fatal_exceptions self.timeout_with_process = timeout_with_process @property def n_executions(self) -> int: """The number of times the discipline has been retried during execution.""" return self.__n_executions def _run(self, input_data: StrKeyMapping) -> StrKeyMapping | None: self.__n_executions = 0 for n_trial in range(1, self.n_trials + 1): self.__n_executions += 1 LOGGER.debug( "Trying to execute the discipline: attempt %d/%d", n_trial, self.n_trials, ) try: if math.isinf(self.timeout): return self._discipline.execute(input_data) return self._run_discipline_with_timeout(input_data) except FutureTimeoutError: msg = ( "Timeout reached during the execution of " f"discipline {self._discipline.name}" ) LOGGER.debug(msg) current_error = TimeoutError(msg) except Exception as error: # noqa: BLE001 if isinstance(error, tuple(self.fatal_exceptions)): LOGGER.info( "Failed to execute discipline %s, " "aborting retry because of the exception type %s.", self._discipline.name, type(error), ) raise current_error = error self._discipline.execution_status.value = ExecutionStatus.Status.DONE time.sleep(self.wait_time) self._run_before_next_trial() plural_suffix = "s" if self.n_trials > 1 else "" LOGGER.error( "Failed to execute discipline %s after %d attempt%s.", self._discipline.name, self.n_trials, plural_suffix, ) raise current_error def _run_before_next_trial(self) -> None: """Run before the next trial. This method is called whenever a trial has just ended, without success. It can be used to perform any necessary cleanup or preparation for the next trial. """ def _run_discipline_with_timeout(self, input_data: StrKeyMapping) -> StrKeyMapping: """Run the discipline with a timeout. Args: input_data: The input data passed to the discipline. Returns: The output returned by the discipline. Raises: FutureTimeoutError: If the execution runs longer than the specified timeout. """ executor_class = ( ProcessPoolExecutor if self.timeout_with_process else ThreadPoolExecutor ) with executor_class(max_workers=1) as executor: run_discipline = executor.submit( self._discipline.execute, input_data, ) try: return run_discipline.result(timeout=self.timeout) except FutureTimeoutError: if self.timeout_with_process: # Terminate the unique process and its subprocesses. process_id = next(iter(executor._processes.keys())) process = psutil.Process(process_id) for child_process in process.children(): child_process.terminate() process.terminate() else: # The unique thread is still alive, so we need to kill it. thread = next(iter(executor._threads)) # The following is inspired from https://tomerfiliba.com/recipes/Thread2 thread_id = c_long(thread.ident) res = pythonapi.PyThreadState_SetAsyncExc( thread_id, py_object(SystemExit) ) if res == 0: # pragma: no cover LOGGER.debug("Invalid thread id %s", thread_id) if res != 1: # pragma: no cover # If it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL # to revert the effect. pythonapi.PyThreadState_SetAsyncExc(thread_id, None) raise