Source code for gemseo.algos.opt.core.pseven_problem_adapter

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                        documentation
#        :author: Benoit Pauwels
#    OTHER AUTHORS   - MACROSCOPIC CHANGES

"""An adapter from `.OptimizationProblem` to a pSeven ProblemGeneric."""

from enum import Enum
from typing import Dict, List, Mapping, Optional, Tuple, Union

from da import p7core
from numpy import array, atleast_1d, concatenate, full, full_like, ndarray

from gemseo.algos.design_space import DesignSpace
from gemseo.algos.opt_problem import OptimizationProblem
from gemseo.algos.stop_criteria import TerminationCriterion
from gemseo.core.mdofunctions.mdo_function import (
    MDOFunction,
    MDOLinearFunction,
    MDOQuadraticFunction,
)


[docs]class CostType(Enum): """The evaluation cost type of pSeven functions.""" CHEAP = "Cheap" EXPENSIVE = "Expensive"
[docs]class PSevenProblem(p7core.gtopt.ProblemGeneric): """Adapter of OptimizationProblem to da.p7core.gtopt.ProblemGeneric. The methods prepare_problem() and evaluate() are defined according to pSeven's requirements. Refer to the API documentation of pSeven Core for more information. """ def __init__( self, problem, # type: OptimizationProblem evaluation_cost_type=None, # type: Optional[Mapping[str, CostType]] expensive_evaluations=None, # type: Optional[Mapping[str, int]] lower_bounds=None, # type: Optional[ndarray] upper_bounds=None, # type: Optional[ndarray] initial_point=None, # type: Optional[ndarray] ): # type: (...) -> None # noqa:D205,D212,D415 """ Args: problem: The optimization problem to be adapted to pSeven. evaluation_cost_type: The evaluation cost type of each function of the problem. If None, the evaluation cost types default to "Cheap". expensive_evaluations: The maximal number of expensive evaluations for each function of the problem. If None, this number is set automatically by pSeven. lower_bounds: The lower bounds on the design variables. If None, the lower bounds are read from the design space. upper_bounds: The upper bounds on the design variables. If None, the upper bounds are read from the design space. initial_point: The initial values of the design variables. If None, the initial values are read from the design space. """ self.__problem = problem if evaluation_cost_type is None: self.__evaluation_cost_type = dict() else: self.__evaluation_cost_type = evaluation_cost_type if expensive_evaluations is None: self.__expensive_evaluations = dict() else: self.__expensive_evaluations = expensive_evaluations # Set the design variables bounds and initial values design_space = problem.design_space if lower_bounds is None: self.__lower_bounds = design_space.get_lower_bounds() else: self.__lower_bounds = lower_bounds if upper_bounds is None: self.__upper_bounds = design_space.get_upper_bounds() else: self.__upper_bounds = upper_bounds if initial_point is not None: self.__initial_point = initial_point elif design_space.has_current_x(): self.__initial_point = design_space.get_current_x() else: self.__initial_point = full(design_space.dimension, None)
[docs] def prepare_problem(self): # type: (...) -> None """Initialize the problem for pSeven.""" self.__add_variables() self.__add_objectives() self.__add_constraints()
def __add_variables(self): # type: (...) -> None """Add the design variables to the pSeven problem.""" design_space = self.__problem.design_space for var_name in design_space.variables_names: var_indexes = design_space.get_variables_indexes([var_name]) lower_bound = self.__lower_bounds[var_indexes] upper_bound = self.__upper_bounds[var_indexes] current_x = self.__initial_point[var_indexes] indexed_names = design_space.get_indexed_var_name(var_name) for index in range(len(indexed_names)): bounds = (lower_bound[index], upper_bound[index]) initial_guess = current_x[index] pseven_type = self.__get_p7_variable_type(var_name, index) hints = {"@GT/VariableType": pseven_type} self.add_variable(bounds, initial_guess, indexed_names[index], hints) def __add_objectives(self): # type: (...) -> None """Add the objectives to the pSeven problem.""" objective = self.__problem.objective hints = self.__get_p7_function_hints(objective) dimension = self.__get_function_dimension(objective) if dimension > 1: for index in range(dimension): name = self.__get_component_name(objective, index) self.add_objective(name, hints) else: self.add_objective(objective.name, hints) # Add the objectives gradients if objective.has_jac(): self.enable_objectives_gradient() def __add_constraints(self): # type: (...) -> None """Add the constraints to the pSeven problem.""" problem = self.__problem for constraint in problem.constraints: bounds = self.__get_p7_constraint_bounds(constraint) hints = self.__get_p7_function_hints(constraint) dimension = self.__get_function_dimension(constraint) if dimension > 1: for index in range(dimension): name = self.__get_component_name(constraint, index) self.add_constraint(bounds, name, hints) else: self.add_constraint(bounds, constraint.name, hints) # Add the constraints gradients differentiable = all(constraint.has_jac() for constraint in problem.constraints) if problem.has_constraints() and differentiable: self.enable_constraints_gradient() def __get_p7_variable_type( self, variable_name, # type: str index, # type: int ): # type: (...) -> str """Return the pSeven variable type associated with a design variable component. Args: variable_name: The name of the design variable. index: The index of the variable component. Returns: The pSeven variable type. Raises: TypeError: If the type of the design variable is not supported by pSeven. """ var_type = self.__problem.design_space.get_type(variable_name)[index] if var_type == DesignSpace.FLOAT.value: return "Continuous" if var_type == DesignSpace.INTEGER.value: return "Integer" raise TypeError("Unsupported design variable type: {}".format(var_type)) # TODO: For future reference, pSeven also supports discrete and categorical # variables. def __get_p7_function_hints( self, function, # type: MDOFunction ): # type: (...) -> Dict[str, Union[str, int]] """Return the pSeven hints associated with a function. Args: function: The function. Returns: The pSeven hints. """ linearity_type = PSevenProblem.__get_p7_linearity_type(function) hints = {"@GTOpt/LinearityType": linearity_type} name = function.name if name in self.__evaluation_cost_type: hints["@GTOpt/EvaluationCostType"] = self.__evaluation_cost_type[name].value if name in self.__expensive_evaluations: hints["@GTOpt/ExpensiveEvaluations"] = self.__expensive_evaluations[name] return hints @staticmethod def __get_p7_linearity_type( function, # type: MDOFunction ): # type: (...) -> str """Return the pSeven linearity type of a function. Args: function: The function. Returns: The pSeven linearity type. """ if isinstance(function, MDOLinearFunction): return "Linear" if isinstance(function, MDOQuadraticFunction): return "Quadratic" return "Generic" def __get_function_dimension( self, function, # type: MDOFunction ): # type: (...) -> int """Return the dimension of a function. Args: function: The function. Returns: The dimension of the function. Raises: RuntimeError: If the function dimension is unavailable. """ design_space = self.__problem.design_space if function.has_dim(): return function.dim elif design_space.has_current_x(): current_x = design_space.get_current_x() return atleast_1d(function(current_x)).size else: raise RuntimeError("The function dimension is not available.") @staticmethod def __get_component_name( function, # type: MDOFunction index, # type: int ): # type: (...) -> str """Return the name of function component. Args: function: The function. index: The index of the function component. Returns: The name of the function component. """ return "{}{}{}".format(function.name, DesignSpace.SEP, index) @staticmethod def __get_p7_constraint_bounds( constraint, # type: MDOFunction ): # type: (...) -> Tuple[Optional[float], float] """Return the pSeven bounds associated with a constraint. Args: constraint: The constraint. Returns: The lower bound, the upper bound. Raises: ValueError: If the constraint type is invalid. """ if constraint.f_type == MDOFunction.TYPE_EQ: return 0.0, 0.0 if constraint.f_type == MDOFunction.TYPE_INEQ: return None, 0.0 raise ValueError("Invalid constraint type.") # TODO: For future reference, pSeven support constraints bounded from both # sides.
[docs] def evaluate( self, queryx, # type: ndarray querymask, # type: ndarray ): # type: (...) -> Tuple[List[List[float]], List[ndarray]] """Compute the values of the objectives and the constraints for pSeven. Args: queryx: The points to evaluate. (2D-array where each row is a point.) querymask: The evaluation request mask. Returns: The evaluation result. (2D-array-like with one row per point.), The evaluation masks. (Idem.) Raises: p7core.UserTerminated: If a termination criterion is reached. """ functions_batch = list() output_masks_batch = list() for x, mask in zip(queryx, querymask): # Evaluate the functions, unless a stopping criterion is satisfied functions = list() try: # Compute the objectives values objectives, obj_mask = self.__compute_objectives(x, mask) functions.extend(objectives) # Compute the constraints values constraints, constr_mask = self.__compute_constraints( x, mask[len(functions) :] ) functions.extend(constraints) # Compute the objectives gradients obj_grads, obj_grads_mask = self.__compute_objectives_gradients( x, mask[len(functions) :] ) functions.extend(obj_grads) # Compute the constraints gradients constr_grads, constr_grads_mask = self.__compute_constraints_gradients( x, mask[len(functions) :] ) except TerminationCriterion: # Interrupt pSeven raise p7core.UserTerminated("Gemseo stopping criterion satisfied") else: functions.extend(constr_grads) functions_batch.append(functions) output_mask = concatenate( [obj_mask, constr_mask, obj_grads_mask, constr_grads_mask] ) output_masks_batch.append(output_mask) return functions_batch, output_masks_batch
def __compute_objectives( self, x_vec, # type: ndarray mask, # type: ndarray ): # type: (...) -> Tuple[List[float], ndarray] obj_dim = self.__get_function_dimension(self.__problem.objective) if True in mask[:obj_dim]: objectives = atleast_1d(self.__problem.objective(x_vec)).tolist() output_mask = full_like(mask[:obj_dim], True) else: objectives = [None] * obj_dim output_mask = full_like(mask[:obj_dim], False) return objectives, output_mask def __compute_constraints( self, x_vec, # type: ndarray mask, # type: ndarray ): # type: (...) -> Tuple[List[float], ndarray] constraints = list() output_mask = list() n_inds = 0 for constraint in self.__problem.constraints: constr_dim = self.__get_function_dimension(constraint) if True in mask[n_inds : n_inds + constr_dim]: constraints.extend(atleast_1d(constraint(x_vec)).tolist()) output_mask.extend([True] * constr_dim) else: constraints.extend([None] * constr_dim) output_mask.extend([False] * constr_dim) n_inds += constr_dim return constraints, array(output_mask) def __compute_objectives_gradients( self, x_vec, # type: ndarray mask, # type: ndarray ): # type: (...) -> Tuple[List[float], ndarray] pb_dim = self.__problem.dimension obj_dim = self.__get_function_dimension(self.__problem.objective) n_values = obj_dim * pb_dim if self.__problem.objective.has_jac() and True in mask[:n_values]: obj_grads = self.__problem.objective.jac(x_vec).flatten().tolist() output_mask = full_like(mask[:n_values], True) elif self.__problem.objective.has_jac(): obj_grads = [None] * n_values output_mask = full_like(mask[:n_values], False) else: obj_grads = [] output_mask = array([]) return obj_grads, output_mask def __compute_constraints_gradients( self, x_vec, # type: ndarray mask, # type: ndarray ): # type: (...) -> Tuple[List[float], ndarray] constr_grads = list() output_mask = list() pb_dim = self.__problem.dimension n_inds = 0 if all(constraint.has_jac() for constraint in self.__problem.constraints): for constraint in self.__problem.constraints: constr_dim = self.__get_function_dimension(constraint) n_values = constr_dim * pb_dim if True in mask[n_inds : n_inds + n_values]: constr_grads.extend(constraint.jac(x_vec).flatten().tolist()) output_mask.extend([True] * n_values) else: constr_grads.extend([None] * n_values) output_mask.extend([False] * n_values) n_inds += n_values return constr_grads, array(output_mask)