Source code for gemseo.core.mdo_scenario

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                        documentation
#        :author: Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
#        :author: Pierre-Jean Barjhoux, Benoit Pauwels - MDOScenarioAdapter
#                                                        Jacobian computation
"""
A Scenario which driver is an optimization algorithm
****************************************************
"""
from __future__ import division, unicode_literals

import logging
from copy import copy, deepcopy
from datetime import timedelta
from timeit import default_timer as timer

from numpy import atleast_1d, zeros
from numpy.linalg import norm

from gemseo.algos.lagrange_multipliers import LagrangeMultipliers
from gemseo.algos.opt.opt_factory import OptimizersFactory
from gemseo.algos.post_optimal_analysis import PostOptimalAnalysis
from gemseo.core.discipline import MDODiscipline
from gemseo.core.json_grammar import JSONGrammar
from gemseo.core.parallel_execution import DiscParallelLinearization
from gemseo.core.scenario import Scenario

# The detection of formulations requires to import them,
# before calling get_formulation_from_name


LOGGER = logging.getLogger(__name__)


[docs]class MDOScenario(Scenario): """Multidisciplinary Design Optimization Scenario, main user interface Creates an optimization problem and solves it with an optimizer. The main differences between Scenario and MDOScenario are the allowed inputs in the MDOScenario.json, which differs from DOEScenario.json, at least on the driver names MDO Problem description: links the disciplines and the formulation to create an optimization problem. Use the class by instantiation. Create your disciplines beforehand. Specify the formulation by giving the class name such as the string "MDF" The reference_input_data is the typical input data dict that is provided to the run method of the disciplines Specify the objective function name, which must be an output of a discipline of the scenario, with the "objective_name" attribute If you want to add additional design constraints, use the add_user_defined_constraint method To view the results, use the "post_process" method after execution. You can view: - the design variables history, the objective value, the constraints, by using: scenario.post_process("OptHistoryView", show=False, save=True) - Quadratic approximations of the functions close to the optimum, when using gradient based algorithms, by using: scenario.post_process("QuadApprox", method="SR1", show=False, save=True, function="my_objective_name", file_path="appl_dir") - Self Organizing Maps of the design space, by using: scenario.post_process("SOM", save=True, file_path="appl_dir") To list post-processing on your setup, use the method scenario.posts For more detains on their options, go to the "gemseo.post" package """ # Constants for input variables in json schema MAX_ITER = "max_iter" X_OPT = "x_opt" def __init__( self, disciplines, formulation, objective_name, design_space, name=None, **formulation_options ): """Constructor, initializes the MDO scenario Objects instantiation and checks are made before run intentionally. :param disciplines: the disciplines of the scenario :param formulation: the formulation name, the class name of the formulation in gemseo.formulations :param objective_name: the objective function name :param design_space: the design space :param name: scenario name :param formulation_options: options for creation of the formulation """ # This loads the right json grammars from class name super(MDOScenario, self).__init__( disciplines, formulation, objective_name, design_space, name, **formulation_options ) self.clear_history_before_run = False def _run_algorithm(self): """Runs the optimization algo.""" problem = self.formulation.opt_problem # Clears the database when multiple runs are performed (bi level) if self.clear_history_before_run: problem.database.clear() algo_name = self.local_data[self.ALGO] max_iter = self.local_data[self.MAX_ITER] options = self.local_data.get(self.ALGO_OPTIONS) if options is None: options = {} if self.MAX_ITER in options: LOGGER.warning( "Double definition of algorithm option " "max_iter, keeping value: %s", max_iter, ) options.pop(self.MAX_ITER) lib = self._algo_factory.create(algo_name) self.optimization_result = lib.execute( problem, algo_name=algo_name, max_iter=max_iter, **options ) return self.optimization_result def _run(self): """Execute the scenario and run the optimization problems.""" t_0 = timer() LOGGER.info(" ") LOGGER.info("*** Start MDO Scenario execution ***") LOGGER.info("%s", repr(self)) self._run_algorithm() # MDODiscipline.execute is not finished therefore self.exec_time is not # computed yet, need to recompute it, besides exec_time is the total # execution time, while this is for a single execution delta_t = timer() - t_0 LOGGER.info( "*** MDO Scenario run terminated in %s ***", timedelta(seconds=delta_t) ) def _init_algo_factory(self): """Initalizes the algorithms factory.""" self._algo_factory = OptimizersFactory()
[docs]class MDOScenarioAdapter(MDODiscipline): """An adapter class for MDO Scenario: input variables are specified they update default_data in the top level discipline they output data from the top level discipline outputs. """ LOWER_BND_SUFFIX = "_lower_bnd" UPPER_BND_SUFFIX = "_upper_bnd" MULTIPLIER_SUFFIX = "_multiplier" def __init__( self, scenario, inputs_list, outputs_list, reset_x0_before_opt=False, set_x0_before_opt=False, set_bounds_before_opt=False, cache_type=MDODiscipline.SIMPLE_CACHE, output_multipliers=False, ): """Initialize the scenario adapter. :param scenario: the scenario to adapt :type scenario: MDOScenario :param inputs_list: list of inputs to overload at sub scenario execution :type inputs_list: list(str) :param outputs_list: list of outputs to get from scenario execution :type outputs_list: list(str) :param reset_x0_before_opt: before running the sub optimization, reset the initial guess :type reset_x0_before_opt: bool :param set_x0_before_opt: if True, sets the initial point of the sub scenario, useful for multi-start :type set_x0_before_opt: bool :param set_bounds_before_opt: if True, sets the bounds of the design space, useful for trust regions :type set_bounds_before_opt: bool :param cache_type: type of cache policy, SIMPLE_CACHE or HDF5_CACHE :type cache_type: str :param output_multipliers: if True then the Lagrange multipliers of the scenario optimal solution are computed and added to the outputs :type output_multipliers: bool """ if reset_x0_before_opt and set_x0_before_opt: raise ValueError("Inconsistent options for ScenarioAdapter !") self.scenario = scenario self._set_x0_before_opt = set_x0_before_opt self._set_bounds_before_opt = set_bounds_before_opt self._inputs_list = inputs_list self._outputs_list = outputs_list self._reset_x0_before_opt = reset_x0_before_opt self._output_multipliers = output_multipliers name = "{}_adapter".format(scenario.name) super(MDOScenarioAdapter, self).__init__(name, cache_type=cache_type) self._update_grammars() self._dv_in_names = None if set_x0_before_opt: dv_names = set(self.scenario.formulation.design_space.variables_names) self._dv_in_names = list(dv_names & set(self._inputs_list)) # Set the initial bounds as default bounds self._bounds_names = [] if set_bounds_before_opt: dspace = scenario.design_space lower_bounds = dspace.array_to_dict(dspace.get_lower_bounds()) lower_suffix = MDOScenarioAdapter.LOWER_BND_SUFFIX upper_bounds = dspace.array_to_dict(dspace.get_upper_bounds()) upper_suffix = MDOScenarioAdapter.UPPER_BND_SUFFIX for bounds, suffix in [ (lower_bounds, lower_suffix), (upper_bounds, upper_suffix), ]: bounds = {name + suffix: val for name, val in bounds.items()} self.default_inputs.update(bounds) self._bounds_names.extend(bounds.keys()) # Optimization functions are redefined at each run # since default inputs of top # level discipline change # History must be erased otherwise the wrong values are retrieved # between two runs scenario.clear_history_before_run = True self._x_dict_0 = deepcopy(scenario.design_space.get_current_x_dict()) self.post_optimal_analysis = None def _update_grammars(self): """Update the input and output grammars.""" formulation = self.scenario.formulation opt_problem = formulation.opt_problem top_leveld = formulation.get_top_level_disc() for disc in top_leveld: self.input_grammar.update_from(disc.input_grammar) self.output_grammar.update_from(disc.output_grammar) # The output may also be the optimum value of the design # variables, so the output grammar may contain inputs # of the disciplines. All grammars are filtered just after # this loop self.output_grammar.update_from(disc.input_grammar) self.default_inputs.update(disc.default_inputs) self.input_grammar.restrict_to(self._inputs_list) self.output_grammar.restrict_to(self._outputs_list) # If a DV is not an input of the top level disciplines: output_names = self.output_grammar.get_data_names() missing_out = set(self._outputs_list) - set(output_names) if missing_out: dv_names = opt_problem.design_space.variables_names miss_dvs = set(dv_names) & set(missing_out) if miss_dvs: dv_gram = JSONGrammar("dvs") dv_gram.initialize_from_data_names(miss_dvs) self.output_grammar.update_from(dv_gram) output_names = self.output_grammar.get_data_names() missing_out = set(self._outputs_list) - set(output_names) # Add the design variables bounds to the input grammar if self._set_bounds_before_opt: current_x = self.scenario.design_space.get_current_x_dict() typical_data_dict = dict() for suffix in [ MDOScenarioAdapter.LOWER_BND_SUFFIX, MDOScenarioAdapter.UPPER_BND_SUFFIX, ]: bnds = {name + suffix: val for name, val in current_x.items()} typical_data_dict.update(bnds) bnds_gram = JSONGrammar("bnds") bnds_gram.initialize_from_base_dict(typical_data_dict) self.input_grammar.update_from(bnds_gram) if missing_out: raise ValueError( "Can't compute outputs from scenarios: " + str(missing_out) ) missing_inpt = set(self._inputs_list) - set(self.input_grammar.get_data_names()) if missing_inpt: raise ValueError( "Can't compute inputs from scenarios: " + str(missing_inpt) ) # Add the Lagrange multipliers to the output grammar if self._output_multipliers: self._add_output_multipliers() def _add_output_multipliers(self): """Add the Lagrange multipliers of the scenario optimal solution as outputs of the adapter.""" # Fill a dictionary with data of typical shapes base_dict = dict() problem = self.scenario.formulation.opt_problem # bound-constraints multipliers current_x = problem.design_space.get_current_x_dict() base_dict.update( { self.get_bnd_mult_name(var_name, False): val for var_name, val in current_x.items() } ) base_dict.update( { self.get_bnd_mult_name(var_name, True): val for var_name, val in current_x.items() } ) # equality- and inequality-constraints multipliers base_dict.update( { self.get_cstr_mult_name(cstr_name): zeros(1) for cstr_name in problem.get_constraints_names() } ) # Update the output grammar multipliers_grammar = JSONGrammar("multipliers") multipliers_grammar.initialize_from_base_dict(base_dict) self.output_grammar.update_from(multipliers_grammar)
[docs] @staticmethod def get_bnd_mult_name(variable_name, is_upper): """Return the name of the lower bound-constraint multiplier of a variable. :param variable_name: name of the variable :type variable_name: str :param is_upper: if True then return the upper bound-constraint multiplier name, otherwise return the lower bound-constraint multiplier :type is_upper: bool :return: name of the lower bound-constraint multiplier :rtype: str """ mult_name = variable_name mult_name += "_upp-bnd" if is_upper else "_low-bnd" mult_name += MDOScenarioAdapter.MULTIPLIER_SUFFIX return mult_name
[docs] @staticmethod def get_cstr_mult_name(constraint_name): """Return the name of the multiplier of a constraint. :param constraint_name: name of the constraint :type constraint_name: str :return: name of the multiplier :rtype: str """ return constraint_name + MDOScenarioAdapter.MULTIPLIER_SUFFIX
def _run(self): """Runs the scenario.""" self._pre_run() self.scenario.execute() self._post_run() def _pre_run(self): """Pre-processes the scenario.""" formulation = self.scenario.formulation opt_problem = formulation.opt_problem design_space = opt_problem.design_space top_leveld = formulation.get_top_level_disc() # Update the top level discipline default inputs with adapter inputs # This is the key role of the adapter for indata in self._inputs_list: for disc in top_leveld: if disc.is_input_existing(indata): disc.default_inputs[indata] = self.local_data[indata] # Default inputs have changed, therefore caches shall be cleared self.scenario.cache.clear() self.scenario.reset_statuses_for_run() for func in opt_problem.get_all_functions(): # Avoids max_iter reached func.n_calls = 0 if self._reset_x0_before_opt: design_space.set_current_x(self._x_dict_0) # Set the starting point of the sub scenario with current dv names if self._set_x0_before_opt: dv_values = {dv_n: self.local_data[dv_n] for dv_n in self._dv_in_names} self.scenario.formulation.design_space.set_current_x(dv_values) # Set the bounds of the sub-scenario if self._set_bounds_before_opt: for name in design_space.variables_names: # Set the lower bound lower_suffix = MDOScenarioAdapter.LOWER_BND_SUFFIX lower_bound = self.local_data[name + lower_suffix] design_space.set_lower_bound(name, lower_bound) # Set the upper bound upper_suffix = MDOScenarioAdapter.UPPER_BND_SUFFIX upper_bound = self.local_data[name + upper_suffix] design_space.set_upper_bound(name, upper_bound) def _post_run(self): """Post-process the scenario.""" formulation = self.scenario.formulation opt_problem = formulation.opt_problem design_space = opt_problem.design_space # Test if the last evaluation is the optimum x_opt = design_space.get_current_x() last_x = opt_problem.database.get_x_by_iter(-1) last_eval_not_opt = norm(x_opt - last_x) / (1.0 + norm(last_x)) > 1e-14 if last_eval_not_opt: # Revaluate all functions at optimum # To re execute all disciplines and get the right data opt_problem.evaluate_functions( x_opt, eval_jac=False, eval_obj=True, normalize=False, # Force call without database no_db_no_norm=True, ) # Retrieves top-level discipline outputs self._retrieve_top_level_outputs() # Compute the Lagrange multipliers and store them in the local data if self._output_multipliers: self._compute_lagrange_multipliers() def _retrieve_top_level_outputs(self): """Overwrites the adapter outputs with the top-level discipline outputs and optimal design parameters.""" formulation = self.scenario.formulation opt_problem = formulation.opt_problem top_leveld = formulation.get_top_level_disc() x_dict = opt_problem.design_space.get_current_x_dict() for outdata in self._outputs_list: for disc in top_leveld: if disc.is_output_existing(outdata) and outdata not in x_dict: self.local_data[outdata] = disc.local_data[outdata] out_ds = x_dict.get(outdata) if out_ds is not None: self.local_data[outdata] = out_ds def _compute_lagrange_multipliers(self): """Compute the Lagrange multipliers for the optimal solution of the scenario and store them in the local data.""" # Compute the Lagrange multipliers problem = self.scenario.formulation.opt_problem x_opt = problem.solution.x_opt lagrange = LagrangeMultipliers(problem) lagrange.compute(x_opt, problem.ineq_tolerance) # Store the Lagrange multipliers in the local data multipliers = lagrange.get_multipliers_arrays() self.local_data.update( { self.get_bnd_mult_name(name, False): mult for name, mult in multipliers[lagrange.LOWER_BOUNDS].items() } ) self.local_data.update( { self.get_bnd_mult_name(name, True): mult for name, mult in multipliers[lagrange.UPPER_BOUNDS].items() } ) self.local_data.update( { self.get_cstr_mult_name(name): mult for name, mult in multipliers[lagrange.EQUALITY].items() } ) self.local_data.update( { self.get_cstr_mult_name(name): mult for name, mult in multipliers[lagrange.INEQUALITY].items() } )
[docs] def get_expected_workflow(self): return self.scenario.get_expected_workflow()
[docs] def get_expected_dataflow(self): return self.scenario.get_expected_dataflow()
def _compute_jacobian(self, inputs=None, outputs=None): """ Computes the Jacobian of the adapted scenario outputs with respect to its inputs. The Jacobian is stored as a dict of ndarray dict: jac = {name: { input_name: (output_dim, input_dim) ndarray } } The bound-constraints on the scenario optimization variables are assumed independent of the other scenario inputs. :param inputs: linearization should be performed with respect to inputs list. If None, linearization should be performed wrt all inputs (Default value = None) :param outputs: linearization should be performed on outputs list. If None, linearization should be performed on all outputs (Default value = None) """ opt_problem = self.scenario.formulation.opt_problem ineq_tol = opt_problem.ineq_tolerance outvars = opt_problem.objective.outvars if len(outvars) != 1: raise ValueError("The objective must be single-valued.") # Check the required inputs if inputs is None and not self._set_bounds_before_opt: inputs = self._inputs_list elif inputs is None and self._set_bounds_before_opt: # Bounds are inputs of the adapter inputs = [ name for name in self._bounds_names if name not in self._inputs_list ] inputs = self._inputs_list + inputs elif set(inputs) - set(self._inputs_list) - set(self._bounds_names): not_inputs = set(inputs) - set(self._inputs_list) - set(self._bounds_names) raise ValueError( "The following are not inputs of the adapter: " + ", ".join(not_inputs) + "." ) # N.B the adapter is assumed constant w.r.t. bounds bound_inputs = set(inputs) & set(self._bounds_names) # Check the required outputs if outputs is None: outputs = outvars elif set(outputs) - set(self._outputs_list): raise ValueError( "The following are not outputs of the adapter: " + str(set(outputs) - set(self._outputs_list)) + "." ) nondifferentiable_outputs = set(outputs) - set(outvars) if nondifferentiable_outputs: raise ValueError( "Post-optimal Jacobians of " + ", ".join(nondifferentiable_outputs) + " cannot be computed." ) # Initialize the Jacobian diff_inputs = [name for name in inputs if name not in bound_inputs] # N.B. there may be only bound inputs self._init_jacobian(diff_inputs, outputs) # Compute the Jacobians of the optimization functions jacobians = self._compute_auxiliary_jacobians(diff_inputs, use_threading=True) # Perform the post-optimal analysis self.post_optimal_analysis = PostOptimalAnalysis(opt_problem, ineq_tol) post_opt_jac = self.post_optimal_analysis.execute( outputs, diff_inputs, jacobians ) self.jac.update(post_opt_jac) # Fill the Jacobian blocks w.r.t. bounds with zeros for out_jac in self.jac.values(): for in_name in bound_inputs: in_dim = self.default_inputs[in_name].size out_jac[in_name] = zeros((1, in_dim)) def _compute_auxiliary_jacobians(self, inputs, func_names=None, use_threading=True): """Computes the Jacobians of the optimization functions. :param inputs: names list of the inputs w.r.t. which differentiate :type inputs: list(str) :param func_names: names list of the functions to differentiate If None then all the optimizations functions are differentiated :type func_names: list(str) :param use_threading : if True, use Threads instead of processes to parallelize the execution :type use_threading: bool """ # Gather the names of the functions to differentiate opt_problem = self.scenario.formulation.opt_problem if func_names is None: func_names = ( opt_problem.objective.outvars + opt_problem.get_constraints_names() ) # Identify the disciplines that compute the functions disc_dict = dict() for name in func_names: for disc in self.scenario.formulation.get_top_level_disc(): if disc.is_all_outputs_existing([name]): disc_dict[name] = disc break # Linearize the required disciplines for disc in set(disc_dict.values()): inputs_set = set(disc.get_input_data_names()) & set(inputs) outputs_set = set(disc.get_output_data_names()) & set(func_names) if inputs_set and outputs_set: disc.add_differentiated_inputs(list(inputs_set)) disc.add_differentiated_outputs(list(outputs_set)) disc_list = list(set(disc_dict.values())) paralell_lin = DiscParallelLinearization(disc_list, use_threading=use_threading) # Update the local data with the optimal design parameters # [The adapted scenario is assumed to have been run beforehand.] x_opt_dict = opt_problem.design_space.get_current_x_dict() post_opt_data = copy(self.local_data) post_opt_data.update(x_opt_dict) paralell_lin.execute([post_opt_data] * len(disc_list)) # Store the Jacobians jacobians = dict() for name in func_names: jacobians[name] = dict() for input_name in inputs: jac_block = disc_dict[name].jac[name].get(input_name) if jac_block is None: output_value = self.get_outputs_by_name(name) input_value = self.get_inputs_by_name(input_name) jac_block = zeros((len(output_value, len(input_value)))) jacobians[name][input_name] = jac_block return jacobians
[docs] def add_outputs(self, outputs_names): """Add outputs to the scenario adapter. :param outputs_names: names of the outputs to be added :type outputs_names: list(str) """ names_to_add = [ name for name in outputs_names if name not in self._outputs_list ] self._outputs_list.extend(names_to_add) self._update_grammars()
[docs]class MDOObjScenarioAdapter(MDOScenarioAdapter): """A scenario adapter that overwrites the local data with the optimal objective function value.""" def _retrieve_top_level_outputs(self): """Overwrites the adapter outputs with the top-level discipline outputs and optimal design parameters.""" formulation = self.scenario.formulation opt_problem = formulation.opt_problem top_leveld = formulation.get_top_level_disc() # Get the optimal outputs optim_data = opt_problem.design_space.get_current_x_dict() f_opt = opt_problem.get_optimum()[0] if not opt_problem.minimize_objective: f_opt = -f_opt outvars = opt_problem.objective.outvars if not len(outvars) == 1: raise ValueError("The objective function must be single-valued.") optim_data[outvars[0]] = atleast_1d(f_opt) # Overwrite the adapter local data for outdata in self._outputs_list: for disc in top_leveld: if disc.is_output_existing(outdata) and outdata not in optim_data: self.local_data[outdata] = disc.local_data[outdata] out_ds = optim_data.get(outdata) if out_ds is not None: self.local_data[outdata] = out_ds def _compute_jacobian(self, inputs=None, outputs=None): """ Computes the Jacobian of the adapted scenario outputs with respect to its inputs. The Jacobian is stored as a dict of ndarray dict: jac = {name: { input_name: (output_dim, input_dim) ndarray } } The bound-constraints on the scenario optimization variables are assumed independent of the other scenario inputs. :param inputs: linearization should be performed with respect to inputs list. If None, linearization should be performed wrt all inputs (Default value = None) :param outputs: linearization should be performed on outputs list. If None, linearization should be performed on all outputs (Default value = None) """ MDOScenarioAdapter._compute_jacobian(self, inputs, outputs) # The gradient of the objective function cannot be computed by the # disciplines, but the gradients of the constraints can. # The objective function is assumed independent of non-optimization # variables. obj_name = self.scenario.formulation.opt_problem.objective.outvars[0] mult_cstr_jac_key = PostOptimalAnalysis.MULT_DOT_CONSTR_JAC self.jac[obj_name] = dict(self.jac[mult_cstr_jac_key])