Source code for gemseo.algos.doe.doe_lib

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                           documentation
#        :author: Damien Guenot
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""
DOE library base class wrapper
******************************
"""

from __future__ import division, unicode_literals

import logging
import traceback
from typing import List, Union

from numpy import ndarray, savetxt
from scipy.spatial import distance

from gemseo.algos.driver_lib import DriverLib
from gemseo.core.parallel_execution import ParallelExecution

LOGGER = logging.getLogger(__name__)

DOELibraryOptionType = Union[str, float, int, bool, List[str], ndarray]


[docs]class DOELibrary(DriverLib): """Abstract class to use for DOE library link See DriverLib.""" MIN_DIMS = "min_dims" DESIGN_ALGO_NAME = "Design algorithm" SAMPLES_TAG = "samples" PHIP_CRITERIA = "phi^p" N_SAMPLES = "n_samples" EVAL_JAC = "eval_jac" N_PROCESSES = "n_processes" WAIT_TIME_BETWEEN_SAMPLES = "wait_time_between_samples" DIMENSION = "dimension" SEED = "seed" def __init__(self): """Constructor Abstract class.""" super(DOELibrary, self).__init__() self.samples = None self.seed = 0
[docs] @staticmethod def compute_phip_criteria(samples): """Compute the phi^p criteria (see Morris & Mitchell, Exploratory designs for computational experiments, 1995) :param samples: design variables list """ distance_array = distance.cdist(samples, samples, "euclidean") phi_p = 0.0 phip_power = 10.0 for index_j in range(samples.shape[0]): for index_i in range(index_j): phi_p += distance_array[index_i, index_j] ** (-phip_power) phi_p = phi_p ** (1.0 / phip_power) LOGGER.info( "Value of Phi^p criteria with p=%s " "(Morris & Mitchell, 1995): %s", str(phip_power), str(phi_p), ) return phi_p
def _pre_run(self, problem, algo_name, **options): """To be overriden by subclasses Specific method to be executed just before _run method call. :param problem: the problem to be solved :param algo_name: name of the algorithm :param options: the options dict for the algorithm, see associated JSON file """ super(DOELibrary, self)._pre_run(problem, algo_name, **options) problem.stop_if_nan = False LOGGER.info("%s", problem) options[self.DIMENSION] = self.problem.dimension self.samples = self._generate_samples(**options) self.init_iter_observer(len(self.samples), "DOE sampling") self.problem.add_callback(self.new_iteration_callback) def _generate_samples(self, **options): """Generates the list of x samples. :param options: the options dict for the algorithm, see associated JSON file """ raise NotImplementedError() def __call__(self, n_samples, dimension, **options): """Generate samples in the unit hypercube. :param int n_samples: number of samples. :param int dimension: parameter space dimension. :param options: options passed to the DOE algorithm. :returns: samples. :rtype: ndarray """ options[self.DIMENSION] = dimension options[self.N_SAMPLES] = n_samples return self._generate_samples(**options) def _run(self, **options): """Runs the algorithm, to be overloaded by subclasses. :param options: the options dict for the algorithm, see associated JSON file """ eval_jac = options.get(self.EVAL_JAC, False) n_processes = options.get(self.N_PROCESSES, 1) wait_time_between_samples = options.get(self.WAIT_TIME_BETWEEN_SAMPLES, 0) self.evaluate_samples(eval_jac, n_processes, wait_time_between_samples) return self.get_optimum_from_database() def _display_fullfact_warning(self, n_samples): """Display the number of samples along each direction for full factorial design of DOE. :param n_samples: number of samples """ dim = self.problem.dimension n_samples_dir = int(n_samples ** (1.0 / dim)) LOGGER.info( "Full factorial design required. Number of samples along each" " direction for a design vector of size %s with %s samples: %s", str(dim), str(n_samples), str(n_samples_dir), ) LOGGER.info( "Final number of samples for DOE = %s vs %s requested", str(n_samples_dir ** dim), str(n_samples), )
[docs] def export_samples(self, doe_output_file): """Export samples generated by DOE library to a csv file. :param doe_output_file: export file name :type doe_output_file: string """ if self.samples is None: raise RuntimeError("Samples are None, execute method before export") savetxt(doe_output_file, self.samples, delimiter=",")
[docs] def evaluate_samples( self, eval_jac=False, n_processes=1, wait_time_between_samples=0 ): """Evaluates all functions of optimization problem at the samples. :param eval_jac: if True, the jacobian is also evaluated (Default value = False) """ unnormalize_vect = self.problem.design_space.unnormalize_vect unnormalize_grad = self.problem.design_space.normalize_vect round_vect = self.problem.design_space.round_vect if n_processes > 1: LOGGER.info("Running DOE in parallel on n_processes = %s", str(n_processes)) all_funcs = self.problem.evaluate_functions n_samples = len(self.samples) # Create a list of tasks: execute functions workers = [lambda sample: all_funcs(sample, eval_jac)] * n_samples parallel = ParallelExecution(workers, n_processes=n_processes) parallel.wait_time_between_fork = wait_time_between_samples # Define a callback function to store the samples on the fly # during the parallel execution database = self.problem.database # Initialize the order as it is not necessarily guaranteed # when using parallel execution for sample in self.samples: x_u = unnormalize_vect(sample) x_r = round_vect(x_u) database.store(x_r, {}, add_iter=True) def store_callback(index, outputs): """Store outputs in the database. :param index: sample index :param outputs: outputs of the parallel execution """ out, jac = outputs if jac: for key, val in jac.items(): val = unnormalize_grad(val, minus_lb=False) out["@" + key] = val x_u = unnormalize_vect(self.samples[index]) x_r = round_vect(x_u) database.store(x_r, out) # The list of inputs of the tasks is the list of samples parallel.execute(self.samples, exec_callback=store_callback) # We added empty entries by default to keep order in the database # but when the DOE point is failed, this is not consistent # with the serial exec, so we clean the DB database.remove_empty_entries() else: # Sequential execution if wait_time_between_samples != 0: LOGGER.warning( "Wait time between samples option is ignored" " in sequential run." ) for x_norm in self.samples: try: self.problem.evaluate_functions(x_norm, eval_jac) except ValueError: LOGGER.error( "Problem with evaluation of sample :" "%s result is not taken into account " "in DOE.", str(x_norm), ) LOGGER.error(traceback.format_exc())
[docs] @staticmethod def is_algorithm_suited(algo_dict, problem): """Checks if the algorithm is suited to the problem according to its algo dict. :param algo_dict: the algorithm characteristics :param problem: the opt_problem to be solved """ return True
@staticmethod def _rescale_samples(samples): """When the samples are out of the [0,1] bounds, rescales them. :param samples: the samples to rescale :returns: samples normed ndarray """ if (not (samples >= 0.0).all()) or (not (samples <= 1.0).all()): max_s = samples.max() min_s = samples.min() if abs(max_s - min_s) > 1e-14: samples_n = (samples - min_s) / (max_s - min_s) assert samples_n.shape == samples.shape return samples_n return samples return samples