# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Job schedulers interface."""
from __future__ import annotations
import pickle
from logging import getLogger
from pathlib import Path
from string import Template
from subprocess import CompletedProcess
from subprocess import run as subprocess_run
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from gemseo import from_pickle
from gemseo.core.discipline import Discipline
from gemseo.utils.directory_creator import DirectoryCreator
if TYPE_CHECKING:
from collections.abc import Iterable
from gemseo.core.grammars.base_grammar import BaseGrammar
from gemseo.typing import JacobianData
from gemseo.typing import StrKeyMapping
LOGGER = getLogger(__name__)
[docs]
class JobSchedulerDisciplineWrapper(Discipline):
"""A discipline that wraps the execution with job schedulers.
The discipline is serialized to the disk, its inputs too, then a job file is created
from a template to execute it with the provided options. The submission command is
launched, it will set up the environment, deserialize the discipline and its inputs,
execute it and serialize the outputs. Finally, the deserialized outputs are returned
by the wrapper.
Each execution of the wrapped discipline is done in a unique directory within the
``workdir_path`` directory passed when instantiating this class.
.. warning::
See :ref:`platform-paths` to handle paths for cross-platforms.
"""
DISC_PICKLE_FILE_NAME: ClassVar[str] = "discipline.pckl"
"""The name of the pickle file for the discipline."""
DISC_INPUT_FILE_NAME: ClassVar[str] = "input_data.pckl"
"""The name of the pickle file for the discipline inputs."""
DISC_OUTPUT_FILE_NAME: ClassVar[str] = "output_data.pckl"
"""The name of the pickle file for the discipline outputs."""
# TODO: API: rename to JOB_TEMPLATES_DIR_PATH
TEMPLATES_DIR_PATH: ClassVar[Path] = Path(__file__).parent / "templates"
"""The path to the directory with the job templates."""
_discipline: Discipline
"""The discipline to wrap in the job scheduler."""
_job_template_path: Path
"""The path to the template to be used to make a submission to the job scheduler
command."""
_scheduler_run_command: str
"""The command to call the job scheduler and submit the generated script."""
_job_out_filename: str
"""The output job file name."""
_options: dict[str, Any]
"""The job scheduler specific options."""
_setup_cmd: str
"""The environment command to be used before running."""
_execute_at_linearize: bool
"""Whether to execute the discipline when linearizing."""
__directory_creator: DirectoryCreator
"""The temporary directory creator."""
def __init__(
self,
discipline: Discipline,
workdir_path: Path,
scheduler_run_command: str = "sbatch --wait",
job_out_filename: str = "batch.srun",
job_template_path: Path | str = "",
use_template: bool = True,
setup_cmd: str = "",
**options,
) -> None:
"""
Args:
discipline: The discipline to wrap in the job scheduler.
workdir_path: The path to the workdir where the files will be generated.
scheduler_run_command: The command to call the job scheduler and submit
the generated script.
job_out_filename: The output job file name.
job_template_path: The path to the template to be used to make a
submission to the job scheduler command.
use_template: whether to use template based interface to the job scheduler.
setup_cmd: The command used before running the executable.
**options: The job scheduler specific options to be used in the template.
Raises:
OSError: If ``job_template_path`` does not exist.
""" # noqa:D205 D212 D415
super().__init__(discipline.name)
self._discipline = discipline
self._use_template = use_template
if isinstance(job_template_path, Path):
self._job_template_path = job_template_path
else:
if job_template_path:
self._job_template_path = Path(job_template_path)
else:
self._job_template_path = (
self.TEMPLATES_DIR_PATH / self.__class__.__name__
)
self._scheduler_run_command = scheduler_run_command
self._job_out_filename = job_out_filename
self._setup_cmd = setup_cmd
self._options = options
# We must copy the grammars otherwise adding namespaces to this discipline
# will affect the wrapped discipline.
self.io.input_grammar = self._discipline.io.input_grammar.copy()
self.io.output_grammar = self._discipline.io.output_grammar.copy()
self.__directory_creator = DirectoryCreator(
workdir_path, DirectoryCreator.Naming.UUID
)
self.pickled_discipline = pickle.dumps(self._discipline)
self.job_file_template = None
self._execute_at_linearize = True
if use_template:
self._parse_template()
def _parse_template(self) -> None:
"""Parse the template.
Either it is passed directly by the user, or, if None, tries to find it in
the templates' directory. The file name must then be the class name.
Raises:
FileNotFoundError: If ``job_template_path`` does not exist.
"""
if not self._job_template_path.exists():
msg = (
f"Job scheduler template file {self._job_template_path} does not exist."
)
raise FileNotFoundError(msg)
self.job_file_template = Template(self._job_template_path.read_text())
def _generate_job_file_from_template(
self,
current_workdir: Path,
discipline_path: Path,
inputs_path: Path,
outputs_path: Path,
log_path: Path,
linearize: bool,
) -> Path:
"""Generate the job file from the template.
Args:
current_workdir: The path to the workdir where the files will be generated.
discipline_path: The path to the serialized discipline.
inputs_path: The path to the serialized input data.
outputs_path: The path to the serialized output data.
log_path: The path to the log file generated by the job scheduler.
linearize: Whether to linearize the discipline.
Returns:
The destination job file path.
"""
linearize_option = "--linearize" if linearize else ""
if linearize and self._execute_at_linearize:
execute_at_linearize_option = "--execute-at-linearize"
else:
execute_at_linearize_option = ""
try:
job_file_content = self.job_file_template.substitute(
discipline_name=self._discipline.name,
log_path=log_path,
setup_cmd=self._setup_cmd,
workdir_path=str(current_workdir),
discipline_path=str(discipline_path),
inputs_path=str(inputs_path),
outputs_path=str(outputs_path),
linearize=linearize_option,
execute_at_linearize=execute_at_linearize_option,
**self._options,
)
except KeyError as err:
msg = f"Value not passed to template for key: {err}"
raise KeyError(msg) from err
dest_job_file_path = current_workdir / self._job_out_filename
dest_job_file_path.write_text(job_file_content, encoding="utf8")
return dest_job_file_path
def _create_run_command(
self,
current_workdir: Path,
dest_job_file_path: Path | str,
) -> str:
"""Create the scheduler submission command.
Args:
current_workdir: The current working directory.
dest_job_file_path: The destination job scheduler input file path.
Returns:
The command.
"""
return f"{self._scheduler_run_command} {dest_job_file_path}"
def _run_command(
self,
current_workdir: Path,
dest_job_file_path: Path | str,
) -> CompletedProcess:
"""Run the scheduler submission command.
Args:
current_workdir: The current working directory.
dest_job_file_path: The destination job scheduler input file path.
Returns:
The return code of the command.
Raises:
CalledProcessError: When the command failed.
"""
cmd = self._create_run_command(current_workdir, dest_job_file_path)
LOGGER.debug("Submitting the job command: %s", cmd)
completed = subprocess_run(
cmd.split(),
capture_output=True,
cwd=current_workdir,
)
if completed.returncode != 0:
LOGGER.error(
"Failed to submit the job command %s, for discipline %s "
"in the working directory %s",
cmd,
self._discipline.name,
current_workdir,
)
completed.check_returncode()
LOGGER.debug("Job execution ended in %s", current_workdir)
return completed
def _handle_outputs(
self,
current_workdir: Path,
outputs_path: Path,
) -> StrKeyMapping:
"""Read the serialized outputs.
If an exception is contained inside, raises it.
If the outputs contain data, it updates self.io.data with it.
Args:
current_workdir: The current working directory.
outputs_path: The path to the serialized output data.
Returns:
The output data.
Raises:
FileNotFoundError: When the outputs contain an error.
"""
if not outputs_path.exists():
msg = (
"The serialized outputs file of the discipline does not exist: "
f"{outputs_path}."
)
raise FileNotFoundError(msg)
output = from_pickle(outputs_path)
if isinstance(output[0], BaseException):
error, trace = output
LOGGER.error(
"Discipline %s execution failed in %s",
self._discipline.name,
current_workdir,
)
LOGGER.error(trace)
raise error
LOGGER.debug(
"Discipline %s execution succeeded in %s",
self._discipline.name,
current_workdir,
)
if output[1]:
self.jac = output[1]
self._has_jacobian = True
self._handle_ns_in_jacobian()
self.io.update_output_data(output[0])
def _write_inputs_to_disk(
self,
current_workdir: Path,
differentiated_inputs: Iterable[str],
differentiated_outputs: Iterable[str],
) -> tuple[Path, Path]:
"""Write the serialized input data to the current working directory.
Args:
current_workdir: The current working directory.
differentiated_inputs: If the linearization is performed, the
inputs that define the rows of the jacobian.
differentiated_outputs: If the linearization is performed, the
outputs that define the columns of the jacobian.
Returns:
The path to the serialized discipline and inputs.
"""
discipline_path = current_workdir / self.DISC_PICKLE_FILE_NAME
discipline_path.write_bytes(self.pickled_discipline)
inputs_path = current_workdir / self.DISC_INPUT_FILE_NAME
serialized_data = pickle.dumps((
self.io.data,
differentiated_inputs,
differentiated_outputs,
))
inputs_path.write_bytes(serialized_data)
return discipline_path, inputs_path
def _wait_job(self, current_workdir: Path) -> None:
"""Wait for the end of the job.
By default, does nothing and expect the run command to be blocking.
Args:
current_workdir: The path to the workdir where the files will be generated.
"""
def _run_or_compute_jacobian(
self,
linearize: bool,
differentiated_inputs: Iterable[str] = (),
differentiated_outputs: Iterable[str] = (),
) -> StrKeyMapping:
"""Executes or linearizes the discipline.
Args:
linearize: Whether to linearize the discipline.
differentiated_inputs: If the linearization is performed, the
inputs that define the rows of the jacobian.
differentiated_outputs: If the linearization is performed, the
outputs that define the columns of the jacobian.
Returns:
The output data.
"""
current_workdir = self.__directory_creator.create()
outputs_path = current_workdir / self.DISC_OUTPUT_FILE_NAME
log_path = current_workdir / f"{self._discipline.name}.log"
discipline_path, inputs_path = self._write_inputs_to_disk(
current_workdir, differentiated_inputs, differentiated_outputs
)
if self._use_template:
dest_job_file_path = self._generate_job_file_from_template(
current_workdir,
discipline_path,
inputs_path,
outputs_path,
log_path,
linearize,
)
else:
dest_job_file_path = ""
self._run_command(current_workdir, dest_job_file_path)
self._wait_job(current_workdir)
self._handle_outputs(current_workdir, outputs_path)
def _run(self, input_data: StrKeyMapping) -> StrKeyMapping:
return self._run_or_compute_jacobian(False)
[docs]
def linearize( # noqa: D102
self,
input_data: StrKeyMapping | None = None,
compute_all_jacobians: bool = False,
execute: bool = True,
) -> JacobianData:
self._execute_at_linearize = execute
return super().linearize(
input_data=input_data,
compute_all_jacobians=compute_all_jacobians,
execute=False,
)
def _compute_jacobian(
self,
input_names: Iterable[str] = (),
output_names: Iterable[str] = (),
) -> None:
input_names = self._clean_namespaces_data_names(input_names, self.input_grammar)
output_names = self._clean_namespaces_data_names(
output_names, self.output_grammar
)
self._run_or_compute_jacobian(
True, differentiated_inputs=input_names, differentiated_outputs=output_names
)
def _clean_namespaces_data_names(
self, io_names: Iterable[str], grammar: BaseGrammar
) -> Iterable[str]:
"""Cleans the data names to be differentiated to remove the namespace prefixes.
Args:
io_names: The data names to differentiate.
Returns:
The cleaned data names.
"""
to_namespaced = grammar.to_namespaced
if not to_namespaced:
return io_names
io_names_no_namespace = list(io_names)
for name, ns_name in to_namespaced.items():
io_names_no_namespace.remove(ns_name)
io_names_no_namespace.append(name)
return io_names_no_namespace
def _handle_ns_in_jacobian(self) -> None:
"""Rename the Jacobian input and output names to handle the namespaces."""
jac = self.jac
for name, namespaced_name in self.io.output_grammar.to_namespaced.items():
jac[namespaced_name] = jac.pop(name)
to_namespaced = self.input_grammar.to_namespaced
if to_namespaced:
for local_jac in self.jac.values():
for input_name, input_namespace in to_namespaced.items():
local_jac[input_namespace] = local_jac.pop(input_name)