# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Damien Guenot
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""
DOE library base class wrapper
******************************
"""
from __future__ import division, unicode_literals
import logging
import traceback
from typing import List, Union
from numpy import ndarray, savetxt
from scipy.spatial import distance
from gemseo.algos.driver_lib import DriverLib
from gemseo.core.parallel_execution import ParallelExecution
LOGGER = logging.getLogger(__name__)
DOELibraryOptionType = Union[str, float, int, bool, List[str], ndarray]
[docs]class DOELibrary(DriverLib):
"""Abstract class to use for DOE library link See DriverLib."""
MIN_DIMS = "min_dims"
DESIGN_ALGO_NAME = "Design algorithm"
SAMPLES_TAG = "samples"
PHIP_CRITERIA = "phi^p"
N_SAMPLES = "n_samples"
EVAL_JAC = "eval_jac"
N_PROCESSES = "n_processes"
WAIT_TIME_BETWEEN_SAMPLES = "wait_time_between_samples"
DIMENSION = "dimension"
SEED = "seed"
def __init__(self):
"""Constructor Abstract class."""
super(DOELibrary, self).__init__()
self.samples = None
self.seed = 0
[docs] @staticmethod
def compute_phip_criteria(samples):
"""Compute the phi^p criteria (see Morris & Mitchell, Exploratory designs for
computational experiments, 1995)
:param samples: design variables list
"""
distance_array = distance.cdist(samples, samples, "euclidean")
phi_p = 0.0
phip_power = 10.0
for index_j in range(samples.shape[0]):
for index_i in range(index_j):
phi_p += distance_array[index_i, index_j] ** (-phip_power)
phi_p = phi_p ** (1.0 / phip_power)
LOGGER.info(
"Value of Phi^p criteria with p=%s " "(Morris & Mitchell, 1995): %s",
str(phip_power),
str(phi_p),
)
return phi_p
def _pre_run(self, problem, algo_name, **options):
"""To be overriden by subclasses Specific method to be executed just before _run
method call.
:param problem: the problem to be solved
:param algo_name: name of the algorithm
:param options: the options dict for the algorithm,
see associated JSON file
"""
super(DOELibrary, self)._pre_run(problem, algo_name, **options)
problem.stop_if_nan = False
LOGGER.info("%s", problem)
options[self.DIMENSION] = self.problem.dimension
self.samples = self._generate_samples(**options)
self.init_iter_observer(len(self.samples), "DOE sampling")
self.problem.add_callback(self.new_iteration_callback)
def _generate_samples(self, **options):
"""Generates the list of x samples.
:param options: the options dict for the algorithm,
see associated JSON file
"""
raise NotImplementedError()
def __call__(self, n_samples, dimension, **options):
"""Generate samples in the unit hypercube.
:param int n_samples: number of samples.
:param int dimension: parameter space dimension.
:param options: options passed to the DOE algorithm.
:returns: samples.
:rtype: ndarray
"""
options[self.DIMENSION] = dimension
options[self.N_SAMPLES] = n_samples
return self._generate_samples(**options)
def _run(self, **options):
"""Runs the algorithm, to be overloaded by subclasses.
:param options: the options dict for the algorithm,
see associated JSON file
"""
eval_jac = options.get(self.EVAL_JAC, False)
n_processes = options.get(self.N_PROCESSES, 1)
wait_time_between_samples = options.get(self.WAIT_TIME_BETWEEN_SAMPLES, 0)
self.evaluate_samples(eval_jac, n_processes, wait_time_between_samples)
return self.get_optimum_from_database()
def _display_fullfact_warning(self, n_samples):
"""Display the number of samples along each direction for full factorial design
of DOE.
:param n_samples: number of samples
"""
dim = self.problem.dimension
n_samples_dir = int(n_samples ** (1.0 / dim))
LOGGER.info(
"Full factorial design required. Number of samples along each"
" direction for a design vector of size %s with %s samples: %s",
str(dim),
str(n_samples),
str(n_samples_dir),
)
LOGGER.info(
"Final number of samples for DOE = %s vs %s requested",
str(n_samples_dir ** dim),
str(n_samples),
)
[docs] def export_samples(self, doe_output_file):
"""Export samples generated by DOE library to a csv file.
:param doe_output_file: export file name
:type doe_output_file: string
"""
if self.samples is None:
raise RuntimeError("Samples are None, execute method before export")
savetxt(doe_output_file, self.samples, delimiter=",")
[docs] def evaluate_samples(
self, eval_jac=False, n_processes=1, wait_time_between_samples=0
):
"""Evaluates all functions of optimization problem at the samples.
:param eval_jac: if True, the jacobian is also evaluated
(Default value = False)
"""
unnormalize_vect = self.problem.design_space.unnormalize_vect
unnormalize_grad = self.problem.design_space.normalize_vect
round_vect = self.problem.design_space.round_vect
if n_processes > 1:
LOGGER.info("Running DOE in parallel on n_processes = %s", str(n_processes))
all_funcs = self.problem.evaluate_functions
n_samples = len(self.samples)
# Create a list of tasks: execute functions
workers = [lambda sample: all_funcs(sample, eval_jac)] * n_samples
parallel = ParallelExecution(workers, n_processes=n_processes)
parallel.wait_time_between_fork = wait_time_between_samples
# Define a callback function to store the samples on the fly
# during the parallel execution
database = self.problem.database
# Initialize the order as it is not necessarily guaranteed
# when using parallel execution
for sample in self.samples:
x_u = unnormalize_vect(sample)
x_r = round_vect(x_u)
database.store(x_r, {}, add_iter=True)
def store_callback(index, outputs):
"""Store outputs in the database.
:param index: sample index
:param outputs: outputs of the parallel execution
"""
out, jac = outputs
if jac:
for key, val in jac.items():
val = unnormalize_grad(val, minus_lb=False)
out["@" + key] = val
x_u = unnormalize_vect(self.samples[index])
x_r = round_vect(x_u)
database.store(x_r, out)
# The list of inputs of the tasks is the list of samples
parallel.execute(self.samples, exec_callback=store_callback)
# We added empty entries by default to keep order in the database
# but when the DOE point is failed, this is not consistent
# with the serial exec, so we clean the DB
database.remove_empty_entries()
else: # Sequential execution
if wait_time_between_samples != 0:
LOGGER.warning(
"Wait time between samples option is ignored" " in sequential run."
)
for x_norm in self.samples:
try:
self.problem.evaluate_functions(x_norm, eval_jac)
except ValueError:
LOGGER.error(
"Problem with evaluation of sample :"
"%s result is not taken into account "
"in DOE.",
str(x_norm),
)
LOGGER.error(traceback.format_exc())
[docs] @staticmethod
def is_algorithm_suited(algo_dict, problem):
"""Checks if the algorithm is suited to the problem according to its algo dict.
:param algo_dict: the algorithm characteristics
:param problem: the opt_problem to be solved
"""
return True
@staticmethod
def _rescale_samples(samples):
"""When the samples are out of the [0,1] bounds, rescales them.
:param samples: the samples to rescale
:returns: samples normed ndarray
"""
if (not (samples >= 0.0).all()) or (not (samples <= 1.0).all()):
max_s = samples.max()
min_s = samples.min()
if abs(max_s - min_s) > 1e-14:
samples_n = (samples - min_s) / (max_s - min_s)
assert samples_n.shape == samples.shape
return samples_n
return samples
return samples