Source code for gemseo.core.scenario

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                        documentation
#        :author: Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""The base class for the scenarios."""
from __future__ import annotations

import logging
import timeit
from datetime import timedelta
from os import remove
from os.path import basename
from os.path import exists
from pathlib import Path
from typing import Any
from typing import Mapping
from typing import Sequence
from typing import Union

from gemseo.algos.design_space import DesignSpace
from gemseo.algos.opt_problem import OptimizationProblem
from gemseo.algos.opt_result import OptimizationResult
from gemseo.core.dataset import Dataset
from gemseo.core.discipline import MDODiscipline
from gemseo.core.execution_sequence import ExecutionSequenceFactory
from gemseo.core.execution_sequence import LoopExecSequence
from gemseo.core.formulation import MDOFormulation
from gemseo.core.mdofunctions.mdo_function import MDOFunction
from gemseo.formulations.formulations_factory import MDOFormulationsFactory
from gemseo.post.opt_post_processor import OptPostProcessor
from gemseo.post.opt_post_processor import OptPostProcessorOptionType
from gemseo.post.post_factory import PostFactory
from gemseo.utils.string_tools import MultiLineString
from gemseo.utils.string_tools import pretty_repr

LOGGER = logging.getLogger(__name__)

ScenarioInputDataType = Mapping[str, Union[str, int, Mapping[str, Union[int, float]]]]


[docs]class Scenario(MDODiscipline): """Base class for the scenarios. The instantiation of a :class:`.Scenario` creates an :class:`.OptimizationProblem`, by linking :class:`.MDODiscipline` objects with an :class:`.MDOFormulation` and defining both the objective to minimize or maximize and the :class:`.DesignSpace` on which to solve the problem. Constraints can also be added to the :class:`.OptimizationProblem` with the :meth:`.Scenario.add_constraint` method, as well as observables with the :meth:`.Scenario.add_observable` method. Then, the :meth:`.Scenario.execute` method takes a driver (see :class:`.DriverLib`) with options as input data and uses it to solve the optimization problem. This driver is in charge of executing the multidisciplinary process. To view the results, use the :meth:`.Scenario.post_process` method after execution with one of the available post-processors that can be listed by :attr:`.Scenario.posts`. """ disciplines: list[MDODiscipline] """The disciplines.""" formulation: MDOFormulation """The MDO formulation.""" formulation_name: str """The name of the MDO formulation.""" optimization_result: OptimizationResult """The optimization result.""" post_factory: PostFactory | None """The factory for post-processors if any.""" # Constants for input variables in json schema X_0 = "x_0" U_BOUNDS = "u_bounds" L_BOUNDS = "l_bounds" ALGO = "algo" ALGO_OPTIONS = "algo_options" activate_input_data_check = True activate_output_data_check = True _ATTR_TO_SERIALIZE = MDODiscipline._ATTR_TO_SERIALIZE + ( "_algo_name", "_algo_factory", "clear_history_before_run", "disciplines", "formulation", ) def __init__( self, disciplines: Sequence[MDODiscipline], formulation: str, objective_name: str | Sequence[str], design_space: DesignSpace, name: str | None = None, grammar_type: str = MDODiscipline.JSON_GRAMMAR_TYPE, **formulation_options: Any, ) -> None: """ Args: disciplines: The disciplines used to compute the objective, constraints and observables from the design variables. formulation: The name of the MDO formulation, also the name of a class inheriting from :class:`.MDOFormulation`. objective_name: The name of the objective. If a sequence is passed, a vector objective function is created. design_space: The design space. name: The name to be given to this scenario. If None, use the name of the class. grammar_type: The type of grammar to use for IO declaration either JSON_GRAMMAR_TYPE or SIMPLE_GRAMMAR_TYPE. **formulation_options: The options to be passed to the :class:`.MDOFormulation`. """ self.formulation = None self.formulation_name = None self.disciplines = disciplines self.optimization_result = None self._algo_factory = None self._opt_hist_backup_path = None self._gen_opt_backup_plot = False self._algo_name = None self._lib = None self._check_disciplines() self._init_algo_factory() self._form_factory = self._formulation_factory super().__init__( name=name, grammar_type=grammar_type, auto_detect_grammar_files=True ) self._init_formulation( formulation, objective_name, design_space, grammar_type=grammar_type, **formulation_options, ) self.formulation.opt_problem.database.name = self.name self.__post_factory = None self._update_input_grammar() self.clear_history_before_run = False @property def use_standardized_objective(self) -> bool: """Whether to use the standardized :attr:`.OptimizationProblem.objective` for logging and post-processing.""" return self.formulation.opt_problem.use_standardized_objective @use_standardized_objective.setter def use_standardized_objective(self, value: bool) -> None: self.formulation.opt_problem.use_standardized_objective = value @property def post_factory(self) -> PostFactory | None: """The factory of post-processors.""" if self.__post_factory is None: self.__post_factory = PostFactory() return self.__post_factory @property def _formulation_factory(self) -> MDOFormulationsFactory: """The factory of MDO formulations.""" return MDOFormulationsFactory() def _check_disciplines(self) -> None: """Check that two disciplines do not compute the same output. Raises: ValueError: If two disciplines compute the same output. """ all_outs = set() for disc in self.disciplines: outs = set(disc.get_output_data_names()) common = outs & all_outs if len(common) > 0: msg = "Two disciplines, among which {}, compute the same output: {}" raise ValueError(msg.format(disc.name, common)) all_outs = all_outs | outs @property def design_space(self) -> DesignSpace: """The design space on which the scenario is performed.""" return self.formulation.design_space
[docs] def set_differentiation_method( self, method: str | None = "user", step: float = 1e-6, ) -> None: """Set the differentiation method for the process. Args: method: The method to use to differentiate the process, either "user", "finite_differences", "complex_step" or "no_derivatives", which is equivalent to None. step: The finite difference step. """ if method is None: method = "no_derivatives" self.formulation.opt_problem.differentiation_method = method self.formulation.opt_problem.fd_step = step
[docs] def add_constraint( self, output_name: str | Sequence[str], constraint_type: str = MDOFunction.TYPE_EQ, constraint_name: str | None = None, value: float | None = None, positive: bool = False, **kwargs, ) -> None: """Add a design constraint. This constraint is in addition to those created by the formulation, e.g. consistency constraints in IDF. The strategy of repartition of the constraints is defined by the formulation. Args: output_name: The names of the outputs to be used as constraints. For instance, if `"g_1"` is given and `constraint_type="eq"`, `g_1=0` will be added as constraint to the optimizer. If several names are given, a single discipline must provide all outputs. constraint_type: The type of constraint, `"eq"` for equality constraint and `"ineq"` for inequality constraint. constraint_name: The name of the constraint to be stored. If None, the name of the constraint is generated from the output name. value: The value for which the constraint is active. If None, this value is 0. positive: If True, the inequality constraint is positive. Raises: ValueError: If the constraint type is neither 'eq' or 'ineq'. """ if constraint_type not in [MDOFunction.TYPE_EQ, MDOFunction.TYPE_INEQ]: raise ValueError( "Constraint type must be either 'eq' or 'ineq'; " "got '{}' instead.".format(constraint_type) ) self.formulation.add_constraint( output_name, constraint_type, constraint_name=constraint_name, value=value, positive=positive, **kwargs, )
[docs] def add_observable( self, output_names: Sequence[str], observable_name: Sequence[str] | None = None, discipline: MDODiscipline | None = None, ) -> None: """Add an observable to the optimization problem. The repartition strategy of the observable is defined in the formulation class. When more than one output name is provided, the observable function returns a concatenated array of the output values. Args: output_names: The names of the outputs to observe. observable_name: The name to be given to the observable. If None, the output name is used by default. discipline: The discipline used to build the observable function. If None, detect the discipline from the inner disciplines. """ self.formulation.add_observable(output_names, observable_name, discipline)
def _init_formulation( self, formulation: str, objective_name: str, design_space: DesignSpace, **formulation_options: Any, ) -> None: """Initialize the MDO formulation. Args: formulation: The name of the MDO formulation, also the name of a class inheriting from :class:`.MDOFormulation`. objective_name: The name of the objective. design_space: The design space. **formulation_options: The options to be passed to the :class:`.MDOFormulation`. """ if not isinstance(formulation, str): raise TypeError( "Formulation must be specified by its name; " "please use GEMSEO_PATH to specify custom formulations." ) form_inst = self._form_factory.create( formulation, disciplines=self.disciplines, objective_name=objective_name, design_space=design_space, **formulation_options, ) self.formulation_name = formulation self.formulation = form_inst
[docs] def get_optim_variables_names(self) -> list[str]: """A convenience function to access the optimization variables. Returns: The optimization variables of the scenario. """ return self.formulation.get_optim_variables_names()
[docs] def get_optimum(self) -> OptimizationResult | None: """Return the optimization results. Returns: The optimal solution found by the scenario if executed, None otherwise. """ return self.optimization_result
[docs] def save_optimization_history( self, file_path: str, file_format: str = OptimizationProblem.HDF5_FORMAT, append: bool = False, ) -> None: """Save the optimization history of the scenario to a file. Args: file_path: The path to the file to save the history. file_format: The format of the file, either "hdf5" or "ggobi". append: If True, the history is appended to the file if not empty. Raises: ValueError: If the file format is not correct. """ opt_pb = self.formulation.opt_problem if file_format == OptimizationProblem.HDF5_FORMAT: opt_pb.export_hdf(file_path=file_path, append=append) elif file_format == OptimizationProblem.GGOBI_FORMAT: opt_pb.database.export_to_ggobi(file_path=file_path) else: raise ValueError( "Cannot export optimization history " "to file format: {}.".format(file_format) )
[docs] def set_optimization_history_backup( self, file_path: str, each_new_iter: bool = False, each_store: bool = True, erase: bool = False, pre_load: bool = False, generate_opt_plot: bool = False, ) -> None: """Set the backup file for the optimization history during the run. Args: file_path: The path to the file to save the history. each_new_iter: If True, callback at every iteration. each_store: If True, callback at every call to store() in the database. erase: If True, the backup file is erased before the run. pre_load: If True, the backup file is loaded before run, useful after a crash. generate_opt_plot: If True, generate the optimization history view at backup. Raises: ValueError: If both erase and pre_load are True. """ opt_pb = self.formulation.opt_problem self._opt_hist_backup_path = file_path self._gen_opt_backup_plot = generate_opt_plot if exists(self._opt_hist_backup_path): if erase and pre_load: raise ValueError( "Conflicting options for history backup, " "cannot pre load optimization history and erase it!" ) if erase: LOGGER.warning( "Erasing optimization history in %s", str(self._opt_hist_backup_path), ) remove(self._opt_hist_backup_path) elif pre_load: opt_pb.database.import_hdf(self._opt_hist_backup_path) opt_pb.add_callback( self._execute_backup_callback, each_new_iter=each_new_iter, each_store=each_store, )
def _execute_backup_callback(self, option: Any = None) -> None: """A callback function to backup optimization history. Args: option: Any input value which is not used within the current function, but need to be defined for the generic mechanism of the callback functions usage in :class:`.OptimizationProblem`. """ self.save_optimization_history(self._opt_hist_backup_path, append=True) if self._gen_opt_backup_plot and self.formulation.opt_problem.database: basepath = basename(self._opt_hist_backup_path).split(".")[0] self.post_process( "OptHistoryView", save=True, show=False, file_path=basepath ) @property def posts(self) -> list[str]: """The available post-processors.""" return self.post_factory.posts
[docs] def post_process( self, post_name: str, **options: OptPostProcessorOptionType | Path, ) -> OptPostProcessor: """Post-process the optimization history. Args: post_name: The name of the post-processor, i.e. the name of a class inheriting from :class:`.OptPostProcessor`. **options: The options for the post-processor. """ return self.post_factory.execute( self.formulation.opt_problem, post_name, **options )
def _run(self) -> None: t_0 = timeit.default_timer() LOGGER.info(" ") LOGGER.info("*** Start %s execution ***", self.name) LOGGER.info("%s", repr(self)) # Clear the database when multiple runs are performed, see MDOScenarioAdapter. if self.clear_history_before_run: self.formulation.opt_problem.database.clear() self._run_algorithm() LOGGER.info( "*** End %s execution (time: %s) ***", self.name, timedelta(seconds=timeit.default_timer() - t_0), ) def _run_algorithm(self) -> OptimizationResult: """Run the driver algorithm.""" raise NotImplementedError() def __repr__(self) -> str: msg = MultiLineString() msg.add(self.name) msg.indent() msg.add("Disciplines: {}", pretty_repr(self.disciplines, delimiter=" ")) msg.add("MDO formulation: {}", self.formulation.__class__.__name__) return str(msg)
[docs] def get_disciplines_statuses(self) -> dict[str, str]: """Retrieve the statuses of the disciplines. Returns: The statuses of the disciplines. """ statuses = {} for disc in self.disciplines: statuses[disc.__class__.__name__] = disc.status return statuses
def __get_execution_metrics(self) -> MultiLineString: """Return the execution metrics of the scenarios.""" n_lin = 0 n_calls = 0 msg = MultiLineString() msg.add("Scenario Execution Statistics") msg.indent() for disc in self.disciplines: msg.add("Discipline: {}", disc.name) msg.indent() msg.add("Executions number: {}", disc.n_calls) msg.add("Execution time: {} s", disc.exec_time) msg.add("Linearizations number: {}", disc.n_calls_linearize) msg.dedent() n_calls += disc.n_calls n_lin += disc.n_calls_linearize msg.add("Total number of executions calls: {}", n_calls) msg.add("Total number of linearizations: {}", n_lin) return msg
[docs] def print_execution_metrics(self) -> None: """Print the total number of executions and cumulated runtime by discipline.""" if MDODiscipline.activate_counters: LOGGER.info("%s", self.__get_execution_metrics()) else: LOGGER.info("The discipline counters are disabled.")
[docs] def xdsmize( self, monitor: bool = False, outdir: str | None = ".", print_statuses: bool = False, outfilename: str = "xdsm.html", latex_output: bool = False, open_browser: bool = False, html_output: bool = True, json_output: bool = False, ) -> None: """Create a JSON file defining the XDSM related to the current scenario. Args: monitor: If True, update the generated file at each discipline status change. outdir: The directory where the JSON file is generated. If None, the current working directory is used. print_statuses: If True, print the statuses in the console at each update. outfilename: The name of the file of the output. The basename is used and the extension is adapted for the HTML / JSON / PDF outputs. latex_output: If True, build TEX, TIKZ and PDF files. open_browser: If True, open the web browser and display the the XDSM. html_output: If True, output a self contained HTML file. json_output: If True, output a JSON file for XDSMjs. """ from gemseo.utils.xdsmizer import XDSMizer if print_statuses: monitor = True if monitor: XDSMizer(self).monitor(outdir=outdir, print_statuses=print_statuses) else: XDSMizer(self).run( output_directory_path=outdir, latex_output=latex_output, open_browser=open_browser, html_output=html_output, json_output=json_output, outfilename=outfilename, )
[docs] def get_expected_dataflow( self, ) -> list[tuple[MDODiscipline, MDODiscipline, list[str]]]: return self.formulation.get_expected_dataflow()
[docs] def get_expected_workflow(self) -> LoopExecSequence: exp_wf = self.formulation.get_expected_workflow() return ExecutionSequenceFactory.loop(self, exp_wf)
def _init_algo_factory(self) -> None: """Initialize the factory of algorithms.""" raise NotImplementedError()
[docs] def get_available_driver_names(self) -> list[str]: """The available drivers.""" return self._algo_factory.algorithms
def _update_input_grammar(self) -> None: """Update the input grammar from the names of available drivers.""" if self.grammar_type == MDODiscipline.JSON_GRAMMAR_TYPE: self.input_grammar.update( {"algo": {"type": "string", "enum": self.get_available_driver_names()}} ) else: self._update_grammar_input() def _update_grammar_input(self) -> None: """Update the inputs of a Grammar.""" raise NotImplementedError()
[docs] @staticmethod def is_scenario() -> bool: """Indicate if the current object is a :class:`.Scenario`. Returns: True if the current object is a :class:`.Scenario`. """ return True
[docs] def export_to_dataset( self, name: str | None = None, by_group: bool = True, categorize: bool = True, opt_naming: bool = True, export_gradients: bool = False, ) -> Dataset: """Export the database of the optimization problem to a :class:`.Dataset`. The variables can be classified into groups: :attr:`.Dataset.DESIGN_GROUP` or :attr:`.Dataset.INPUT_GROUP` for the design variables and :attr:`.Dataset.FUNCTION_GROUP` or :attr:`.Dataset.OUTPUT_GROUP` for the functions (objective, constraints and observables). Args: name: The name to be given to the dataset. If ``None``, use the name of the :attr:`.OptimizationProblem.database`. by_group: Whether to store the data by group in :attr:`.Dataset.data`, in the sense of one unique NumPy array per group. If ``categorize`` is ``False``, there is a unique group: :attr:`.Dataset.PARAMETER_GROUP``. If ``categorize`` is ``True``, the groups can be either :attr:`.Dataset.DESIGN_GROUP` and :attr:`.Dataset.FUNCTION_GROUP` if ``opt_naming`` is ``True``, or :attr:`.Dataset.INPUT_GROUP` and :attr:`.Dataset.OUTPUT_GROUP`. If ``by_group`` is ``False``, store the data by variable names. categorize: Whether to distinguish between the different groups of variables. Otherwise, group all the variables in :attr:`.Dataset.PARAMETER_GROUP``. opt_naming: Whether to use :attr:`.Dataset.DESIGN_GROUP` and :attr:`.Dataset.FUNCTION_GROUP` as groups. Otherwise, use :attr:`.Dataset.INPUT_GROUP` and :attr:`.Dataset.OUTPUT_GROUP`. export_gradients: Whether to export the gradients of the functions (objective function, constraints and observables) if the latter are available in the database of the optimization problem. Returns: A dataset built from the database of the optimization problem. """ return self.formulation.opt_problem.export_to_dataset( name=name, by_group=by_group, categorize=categorize, opt_naming=opt_naming, export_gradients=export_gradients, )