# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Charlie Vanaret, Benoit Pauwels, Francois Gallard
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""
Design space
============
A design space is used to represent the optimization's unknowns,
a.k.a. design variables. A :class:`.DesignSpace` describes
this design space at a given state, in terms of names, sizes, types, bounds
and current values of the design variables. Variables can easily be added to
the :class:`.DesignSpace` using the :meth:`.DesignSpace.add_variable` method
or removed using the :meth:`.DesignSpace.remove_variable` method. We can also
filter the design variables using the :meth:`.DesignSpace.filter` method.
Getters and setters are also available to get or set the value of a given
variable property. Lastly, an instance of :class:`.DesignSpace` can be stored
in a txt or HDF file.
"""
from __future__ import absolute_import, division, unicode_literals
from copy import deepcopy
from os.path import exists
import h5py
from future import standard_library
from numpy import abs as np_abs
from numpy import (
array,
atleast_1d,
complex128,
concatenate,
empty,
equal,
finfo,
float64,
genfromtxt,
inf,
int32,
isnan,
logical_or,
mod,
ndarray,
ones,
ones_like,
)
from numpy import round_ as np_round
from numpy import string_, vectorize, where, zeros_like
from six import string_types
from gemseo.algos.opt_result import OptimizationResult
from gemseo.third_party.prettytable import PrettyTable
from gemseo.utils.py23_compat import string_array, to_unicode_list
standard_library.install_aliases()
from gemseo import LOGGER
[docs]class DesignSpace(object):
"""
Class that describes the design space at a given state:
the names/sizes/types/bounds of the variables and the
initial solution of the optimization problem
"""
FLOAT = "float"
INTEGER = "integer"
AVAILABLE_TYPES = [FLOAT, INTEGER]
MINIMAL_FIELDS = ["name", "lower_bound", "upper_bound"]
TABLE_NAMES = ["name", "lower_bound", "value", "upper_bound", "type"]
DESIGN_SPACE_GROUP = "design_space"
NAMES_GROUP = "names"
LB_GROUP = "l_b"
UB_GROUP = "u_b"
VAR_TYPE_GROUP = "var_type"
VALUE_GROUP = "value"
SIZE_GROUP = "size"
# separator that denotes a vector's components
SEP = "!"
def __init__(self, hdf_file=None):
"""
Constructor
"""
self.variables_names = []
self.dimension = 0
self.variables_sizes = {}
self.variables_types = {}
self.normalize = {}
self._lower_bounds = {}
self._upper_bounds = {}
# These attributes are stored for faster computation of normalization
# and unnormalization
self._norm_factor = None
self.__lower_bounds_array = None
self.__upper_bounds_array = None
self.__int_vars_indices = None
self.__norm_data_is_computed = False
self.__norm_inds = None
self.__to_zero = None
self.__bound_tol = 100.0 * finfo(float64).eps
self._current_x = {}
if hdf_file is not None:
self.import_hdf(hdf_file)
[docs] def remove_variable(self, name):
"""Remove a variable (and bounds and types) from the design space
:param name: name of the variable to remove
"""
self.__norm_data_is_computed = False
size = self.variables_sizes[name]
self.dimension -= size
self.variables_names.remove(name)
del self.variables_sizes[name]
del self.variables_types[name]
del self.normalize[name]
if name in self._lower_bounds:
del self._lower_bounds[name]
if name in self._upper_bounds:
del self._upper_bounds[name]
if name in self._current_x:
del self._current_x[name]
[docs] def filter(self, keep_variables):
"""Filters the design space to keep a sublist of variables
:param keep_variables: the list of variables to keep
"""
if isinstance(keep_variables, string_types):
keep_variables = [keep_variables]
for name in deepcopy(self.variables_names):
if name not in keep_variables:
self.remove_variable(name)
for name in keep_variables:
if name not in self.variables_names:
raise ValueError('Variable "' + str(name) + '" is not known')
return self
[docs] def filter_dim(self, variable, keep_dimensions):
"""Filters the design space to keep a sublist of dimensions
for a given variable
:param variable: the variable
:param keep_dimensions: the list of dimension to keep
"""
self.__norm_data_is_computed = False
removed_dimensions = list(
set(range(self.variables_sizes[variable])) - set(keep_dimensions)
)
bad_dimensions = list(
set(keep_dimensions) - set(range(self.variables_sizes[variable]))
)
size = len(removed_dimensions)
self.dimension -= size
self.variables_sizes[variable] -= size
types = []
for dimension in keep_dimensions:
if dimension in bad_dimensions:
self.remove_variable(variable)
raise ValueError(
"Dimension "
+ str(dimension)
+ ' of variable "'
+ str(variable)
+ '" is not known'
)
types.append(self.variables_types[variable][dimension])
self.variables_types[variable] = array(types)
idx = keep_dimensions
self.normalize[variable] = self.normalize[variable][idx]
if variable in self._lower_bounds:
self._lower_bounds[variable] = self._lower_bounds[variable][idx]
if variable in self._upper_bounds:
self._upper_bounds[variable] = self._upper_bounds[variable][idx]
if variable in self._current_x:
self._current_x[variable] = self._current_x[variable][idx]
return self
[docs] def add_variable(
self, name, size=1, var_type=FLOAT, l_b=None, u_b=None, value=None
):
"""Add a variable to the design space
:param name: param size: (Default value = 1)
:param var_type: Default value = FLOAT)
:param l_b: Default value = None)
:param u_b: Default value = None)
:param value: Default value = None)
:param size: (Default value = 1)
"""
if name in self.variables_names:
raise ValueError("Variable " + name + " already exists")
if size <= 0 or int(size) != size:
raise ValueError("The size of " + name + " should be a positive integer.")
# name and size
self.variables_names.append(name)
self.dimension += size
self.variables_sizes[name] = size
# type
self._add_type(name, size, var_type)
# bounds
self._add_bound(name, size, l_b, is_lower=True)
self._add_bound(name, size, u_b, is_lower=False)
self._check_variable_bounds(name)
# normalization policy
self._add_norm_policy(name)
if value is not None:
array_value = atleast_1d(value)
self._check_value(array_value, name)
if len(array_value) == 1 and size > 1:
array_value = array_value * ones(size)
self._current_x[name] = array_value
self._check_current_x_value(name)
def _add_type(self, name, size, var_type=None):
"""Add a type to a variable
:param name: name of the variable
:param size: size of the variable
:param var_type: type in self.AVAILABLE_TYPES,
or None, which is then FLOAT by default
"""
if isinstance(var_type, bytes):
var_type = var_type.decode()
if var_type is None:
var_type = self.FLOAT
if hasattr(var_type, "__iter__") and not isinstance(var_type, string_types):
if len(var_type) != size:
raise ValueError(
"The list of types for variable "
+ name
+ " should be of size "
+ str(size)
)
# a type for each component
var_type = [
v_type.decode() if isinstance(v_type, bytes) else v_type
for v_type in var_type
]
for v_type in var_type:
if v_type not in self.AVAILABLE_TYPES:
msg = 'The type "{0}" of {1} is not known'.format(v_type, name)
raise ValueError(msg)
self.variables_types[name] = array(var_type)
else:
# same type for all components
if var_type not in self.AVAILABLE_TYPES:
raise ValueError('Type "' + str(var_type) + '" is not known')
var_type_array = array([var_type] * size)
self.variables_types[name] = var_type_array
self.__norm_data_is_computed = False
def _add_norm_policy(self, name):
"""Adds a normalization policy to a variable.
Unbounded variables are not normalized.
Bounded variables (both from above and from below) are normalized.
:param name: variable name
"""
# Check that the variable is in the design space:
if name not in self.variables_names:
raise ValueError("Variable " + name + " is not in the design " + "space.")
# Check that the variable size is set:
size = self.get_size(name)
if size is None:
raise ValueError("The size of variable " + name + " is not set.")
# Check that the variables types are set:
variables_types = self.variables_types.get(name, None)
if variables_types is None:
raise ValueError(
"The components types of variable " + name + " are not set."
)
# Set the normalization policy:
normalize = empty(size)
for i in range(size):
var_type = variables_types[i]
if var_type in (DesignSpace.INTEGER, DesignSpace.FLOAT):
if (
self._lower_bounds[name][i] == -inf
or self._upper_bounds[name][i] == inf
):
# Unbounded variables are not normalized:
normalize[i] = False
elif self._lower_bounds[name][i] == self._upper_bounds[name][i]:
# Constant variables are not normalized:
normalize[i] = False
else:
normalize[i] = True
else:
msg = "The normalization policy for type {0}" + " is not implemented."
raise ValueError(msg.format(var_type))
self.normalize[name] = normalize
@staticmethod
def __is_integer(value):
""" Checks that all values are integers """
are_none = equal(value, None)
are_int = equal(mod(value.astype("f"), 1), 0)
return logical_or(are_none, are_int)
@staticmethod
def __is_numeric(value):
""" Checks that a value is numeric """
res = (value is None) or hasattr(value, "real")
try:
if not res:
float(value)
return True
except TypeError:
return False
@staticmethod
def __isnot_nan(value):
""" Checks that a value is not nan"""
return (value is None) or ~isnan(value)
def _check_value(self, value, name):
"""
Checks that a variable value is valid
:param value: a numpy array
"""
all_indices = set(range(len(value)))
# OK if the variable value is one-dimensional
if len(value.shape) > 1:
raise ValueError(
"Value "
+ str(value)
+ " of variable "
+ str(name)
+ " has dimension greater than 1"
+ "while a float or a 1d iterable object "
+ "(array, list, tuple, ...) "
+ "while a scalar was expected."
)
# OK if all components are None
if all(equal(value, None)):
return True
test = vectorize(self.__is_numeric)(value)
indices = all_indices - set(list(where(test)[0]))
for idx in indices:
raise ValueError(
"Value "
+ str(value[idx])
+ " of variable "
+ str(name)
+ " is not numerizable."
)
test = vectorize(self.__isnot_nan)(value)
indices = all_indices - set(list(where(test)[0]))
for idx in indices:
raise ValueError(
"Value " + str(value[idx]) + " of variable " + str(name) + " is nan."
)
# Check if some components of an integer variable are not integer.
if self.variables_types[name][0] == self.INTEGER:
indices = all_indices - set(list(where(self.__is_integer(value))[0]))
for idx in indices:
raise ValueError(
"Component value "
+ str(value[idx])
+ " of variable "
+ str(name)
+ " is not an integer "
+ "while variable is of type integer"
+ "(index: "
+ str(idx)
+ ")."
)
def _add_bound(self, name, size, bound, is_lower=True):
"""Add a lower or upper bound to a variable
:param name: name of the variable
:param bound: lower or upper bound (array)
:param size: size of the variable
:param is_lower: if True, bound is a lower bound
"""
self.__norm_data_is_computed = False
if is_lower:
bound_dict = self._lower_bounds
else:
bound_dict = self._upper_bounds
infinity = inf * ones(size)
if bound is None:
if is_lower:
bound_array = -infinity
else:
bound_array = infinity
bound_dict.update({name: bound_array})
return
bound_array = atleast_1d(bound)
self._check_value(bound_array, name)
if hasattr(bound, "__iter__"):
# iterable structure
if len(bound_array) != size:
bound_str = "lower" if is_lower else "upper"
raise ValueError(
"The "
+ bound_str
+ " bounds of "
+ name
+ " should be of size "
+ str(size)
)
if is_lower:
bound_array = where(equal(bound_array, None), -infinity, bound_array)
else:
bound_array = where(equal(bound_array, None), infinity, bound_array)
bound_dict.update({name: bound_array})
else:
# scalar: same lower bound for all components
bound_array = bound * ones(size)
bound_dict.update({name: bound_array})
return
def _check_variable_bounds(self, name):
"""Check that the bounds are compatible and are the same size
:param name: name of the variable
"""
l_b = self._lower_bounds.get(name, None)
u_b = self._upper_bounds.get(name, None)
inds = where(u_b < l_b)[0]
if inds.size != 0:
raise ValueError(
"The bounds of variable "
+ name
+ str(inds)
+ " are not valid : "
+ str(l_b[inds])
+ "!<"
+ str(u_b[inds])
)
def _check_current_x_value(self, name):
"""Check that the current x values are between bounds
:param name: name of the variable
"""
l_b = self._lower_bounds.get(name, None)
u_b = self._upper_bounds.get(name, None)
c_x = self._current_x.get(name, None)
not_none = ~equal(c_x, None)
inds = where(
logical_or(
c_x[not_none] < l_b[not_none] - 1e-14,
c_x[not_none] > u_b[not_none] + 1e-14,
)
)[0]
for idx in inds:
raise ValueError(
"The current value of variable "
+ name
+ "!"
+ str(c_x[idx])
+ " is not between "
+ "the lower bound "
+ str(l_b[idx])
+ " and "
+ "the upper bound "
+ str(u_b[idx])
)
[docs] def has_current_x(self):
"""
Tests if current_x is defined
:returns: True if current_x is defined
"""
if self._current_x is None or len(self._current_x) != len(self.variables_names):
return False
for val in self._current_x.values():
if val is not None:
return True
return False
[docs] def check(self):
"""Check the state of the design space"""
if not self.variables_names:
raise ValueError("Design space is empty !")
for name in self.variables_names:
self._check_variable_bounds(name)
if self.has_current_x():
self._check_current_x()
[docs] def check_membership(self, x_vect, variables_names=None):
"""Checks whether the input variables satisfy the design space
requirements.
:param x_vect: design variables
:type x_vect: dict or array
:param variables_names: names of the variables to be checked
"""
# Convert the input vector into a dictionary if necessary:
if isinstance(x_vect, dict):
x_dict = x_vect
elif isinstance(x_vect, ndarray):
if x_vect.size != self.dimension:
raise ValueError(
"The dimension of the input array ("
+ str(x_vect.size)
+ ") should be "
+ str(self.dimension)
+ "."
)
x_dict = self.array_to_dict(x_vect)
else:
raise TypeError(
"The input vector should be an array or a "
+ "dictionary. Got "
+ str(type(x_vect))
+ " instead."
)
# Check the membership of the input vector to the design space:
variables_names = variables_names or self.variables_names
for name in variables_names:
if x_dict[name] is None:
continue
size = self.variables_sizes[name]
l_b = self._lower_bounds.get(name, None)
u_b = self._upper_bounds.get(name, None)
if atleast_1d(x_dict[name]).size != size:
raise ValueError(
"The component "
+ name
+ " of the given"
+ " array should have size "
+ str(size)
+ "."
)
for i in range(size):
x_real = atleast_1d(x_dict[name])[i].real
if l_b is not None and x_real < l_b[i] - self.__bound_tol:
msg = "The component " + name + self.SEP + str(i)
msg += " of the given array (" + str(x_real) + ") is "
msg += "lower than the lower bound (" + str(l_b[i]) + ")"
msg += " by {:.1e}.".format(l_b[i] - x_real)
raise ValueError(msg)
if u_b is not None and u_b[i] + self.__bound_tol < x_real:
msg = "The component " + name + self.SEP + str(i)
msg += " of the given array (" + str(x_real) + ") is "
msg += "greater than the upper bound ("
msg += str(u_b[i]) + ")"
msg += " by {:.1e}.".format(x_real - u_b[i])
raise ValueError(msg)
if (
self.variables_types[name][0] == self.INTEGER
) and not self.__is_integer(x_real):
msg = "The component " + name + self.SEP + str(i)
msg += " of the given array is not an integer "
msg += " while variable is of type integer !"
msg += " value = " + str(x_real)
raise ValueError(msg)
[docs] def get_active_bounds(self, x_vec=None, tol=1e-8):
"""Determine which bound constraints of the current point are active
:param x_vec: the point at which we check the bounds
:param tol: tolerance of comparison of a scalar with a bound
(Default value = 1e-8)
"""
if x_vec is None:
x_dict = self._current_x
self.check_membership(self.get_current_x())
elif isinstance(x_vec, ndarray):
x_dict = self.array_to_dict(x_vec)
elif isinstance(x_vec, dict):
x_dict = x_vec
else:
raise TypeError(
"Expected dict or array for x_vec argument,"
+ " got "
+ str(type(x_vec))
)
active_l_b = {}
active_u_b = {}
for name in self.variables_names:
l_b = self._lower_bounds.get(name)
l_b = where(equal(l_b, None), -inf, l_b)
u_b = self._upper_bounds.get(name)
u_b = where(equal(u_b, None), inf, u_b)
x_vec_i = x_dict[name]
# lower bound saturated
albi = where(np_abs(x_vec_i - l_b) <= tol, True, False)
active_l_b[name] = albi
# upper bound saturated
aubi = where(np_abs(x_vec_i - u_b) <= tol, True, False)
active_u_b[name] = aubi
return active_l_b, active_u_b
def _check_current_x(self, variables_names=None):
"""Checks the current point.
:param variables_names: Default value = None)
"""
if sorted(set(self.variables_names)) != sorted(set(self._current_x.keys())):
raise ValueError(
"Expected current_x variables :"
+ str(self.variables_names)
+ ", got "
+ str(list(self._current_x.keys()))
)
self.check_membership(self._current_x, variables_names)
[docs] def get_current_x(self, variables_names=None):
"""Gets the current point in the design space.
:param variables_names: names of the required variables, optional
:type variables_names: list(str)
:returns: the x vector as array
:rtype: ndarray
"""
try:
x_arr = self.dict_to_array(self._current_x, all_var_list=variables_names)
return x_arr
except KeyError as err:
raise KeyError("DesignSpace has no current_x for " + err.args[0])
[docs] def get_indexed_var_name(self, variable_name):
"""
Retuns a list of the variables names with their indices
such as [x!0,x!1,y,z!0,z!1]
"""
size = self.variables_sizes[variable_name]
if size == 1:
return variable_name
return [variable_name + self.SEP + str(i) for i in range(size)]
[docs] def get_indexed_variables_names(self):
"""
Retuns a list of the variables names with their indices
such as [x!0,x!1,y,z!0,z!1]
"""
var_ind_names = []
for var in self.variables_names:
vnames = self.get_indexed_var_name(var)
if isinstance(vnames, string_types):
var_ind_names.append(vnames)
else:
var_ind_names += vnames
return var_ind_names
def __update_normalization_vars(self):
"""
Computes inner attributes used to compute
the normalization/unnormalization
"""
self.__lower_bounds_array = self.get_lower_bounds()
self.__upper_bounds_array = self.get_upper_bounds()
self._norm_factor = self.__upper_bounds_array - self.__lower_bounds_array
norm_array = self.dict_to_array(self.normalize)
self.__norm_inds = where(norm_array)[0]
# In case lb=ub
self.__to_zero = where(self._norm_factor == 0.0)[0]
var_ind_list = []
for var in self.variables_names:
# Store the mask of int variables
to_one = self.variables_types[var] == self.INTEGER
var_ind_list.append(to_one)
self.__int_vars_indices = concatenate(var_ind_list)
self.__norm_data_is_computed = True
[docs] def normalize_vect(self, x_vect, minus_lb=True):
"""Normalizes a vector of the design space.
Unbounded variables are not normalized.
:param x_vect: design variables
:type x_vect: ndarray
:param minus_lb: if True, remove lower bounds at normalization
:return: normalized vector
"""
if not self.__norm_data_is_computed:
self.__update_normalization_vars()
# Normalize the relevant components:
norm_vect = array(x_vect, copy=True)
# In case lb=ub
norm_inds = self.__norm_inds
if len(x_vect.shape) == 1:
if minus_lb:
norm_vect[norm_inds] -= self.__lower_bounds_array[norm_inds]
norm_vect[norm_inds] /= self._norm_factor[norm_inds]
# In case lb=ub put value to 0
elif len(x_vect.shape) == 2:
if minus_lb:
norm_vect[:, norm_inds] -= self.__lower_bounds_array[norm_inds]
norm_vect[:, norm_inds] /= self._norm_factor[norm_inds]
else:
raise ValueError("The array to be normalized must be 1d or 2d.")
to_zero = self.__to_zero
if to_zero.size > 0:
norm_vect[to_zero] = 0.0
return norm_vect
[docs] def unnormalize_vect(self, x_vect, minus_lb=True, no_check=False):
"""Unnormalizes a normalized vector of the design space.
:param x_vect: design variables
:type x_vect: ndarray
:param minus_lb: if True, remove lower bounds at normalization
:param no_check: if True, dont check that values are in [0,1]
:return: normalized vector
"""
if not self.__norm_data_is_computed:
self.__update_normalization_vars()
norm_inds = self.__norm_inds
l_bounds = self.__lower_bounds_array
if not no_check:
# Get the indexes of the components to be unnormalized:
# Check whether the input components are between 0 and 1:
if (x_vect < -1e-12).any() or (x_vect > 1 + 1e-12).any():
msg = "All components of the "
msg += "normalized vector should be between 0 and 1."
lb_viol = x_vect[x_vect < -1e-12]
if lb_viol.size != 0:
msg += " lower bounds violated : " + str(lb_viol)
ub_viol = x_vect[x_vect > 1 + 1e-12]
if ub_viol.size != 0:
msg += " upper bounds violated : " + str(ub_viol)
LOGGER.warning(msg)
# Unnormalize the relevant components:
unnorm_vect = array(x_vect, copy=True)
n_dims = len(x_vect.shape)
if n_dims == 1:
unnorm_vect[norm_inds] *= self._norm_factor[norm_inds]
if minus_lb:
unnorm_vect[norm_inds] += l_bounds[norm_inds]
elif n_dims == 2:
unnorm_vect[:, norm_inds] *= self._norm_factor[norm_inds]
if minus_lb:
unnorm_vect[:, norm_inds] += l_bounds[norm_inds]
else:
raise ValueError(
"The array to be unnormalized must be 1d or 2d"
+ ", got "
+ str(n_dims)
+ "d !"
)
inds_fixed = self.__to_zero
if inds_fixed.size > 0:
if n_dims == 1:
unnorm_vect[inds_fixed] = l_bounds[inds_fixed]
else:
unnorm_vect[:, inds_fixed] = l_bounds[inds_fixed]
r_xvec = self.round_vect(unnorm_vect)
return r_xvec
[docs] def round_vect(self, x_vect):
"""
Rounds the vector where variables are of integer type
:param x_vect: design variables to round
"""
if not self.__norm_data_is_computed:
self.__update_normalization_vars()
int_vars = self.__int_vars_indices
if not int_vars.any():
return x_vect
n_dims = len(x_vect.shape)
rounded_x_vect = x_vect.copy()
if n_dims == 1:
rounded_x_vect[int_vars] = np_round(rounded_x_vect[int_vars])
elif n_dims == 2:
rounded_x_vect[:, int_vars] = np_round(rounded_x_vect[:, int_vars])
else:
raise ValueError(
"The array to be unnormalized must be 1d or 2d"
+ ", got "
+ str(n_dims)
+ "d !"
)
return rounded_x_vect
[docs] def get_current_x_normalized(self):
"""Returns the current point normalized.
:returns: the x vector as array normalized by the bounds
"""
try:
current_x = self.get_current_x()
except KeyError as err:
raise KeyError("Cannot compute normalized current x since " + err.args[0])
return self.normalize_vect(current_x)
[docs] def get_current_x_dict(self):
"""Get the current point in the design space
:returns: the x vector as a dict, keys are the variable names
values are the variable vales as np array
"""
return self._current_x
[docs] def set_current_x(self, current_x):
"""Set the current point
:param current_x: the current design vector
"""
if isinstance(current_x, dict):
self._current_x = current_x
elif isinstance(current_x, ndarray):
if current_x.size != self.dimension:
raise ValueError(
"Invalid current_x, dimension mismatch "
+ str(self.dimension)
+ " != "
+ str(current_x.size)
)
self._current_x = self.array_to_dict(current_x)
elif isinstance(current_x, OptimizationResult):
if current_x.x_opt.size != self.dimension:
raise ValueError(
"Invalid x_opt, dimension mismatch "
+ str(self.dimension)
+ " != "
+ str(current_x.x_opt.size)
)
x_array = self.array_to_dict(current_x.x_opt)
self._current_x = x_array
else:
raise TypeError(
"The current should be an array or a dict. "
+ "Got "
+ str(type(current_x))
+ " instead."
)
self._check_current_x()
[docs] def set_current_variable(self, name, current_value):
"""Set the current value of a single variable
:param name: name of the variable
:param current_value: current value of the variable
"""
if name in self.variables_names:
self._current_x[name] = current_value
else:
raise ValueError("Variable " + str(name) + " is not in the design space !")
[docs] def get_size(self, name):
"""Get the size of a variable
Return None if the variable is not known
:param name: name of the variable
"""
return self.variables_sizes.get(name, None)
[docs] def get_type(self, name):
"""Get the type of a variable
Return None if the variable is not known
:param name: name of the variable
"""
return self.variables_types.get(name, None)
[docs] def get_lower_bound(self, name):
"""Gets the lower bound of a variable.
:param name: variable name
:returns: variable lower bound (possibly infinite)
"""
return self._lower_bounds.get(name)
[docs] def get_upper_bound(self, name):
"""Gets the upper bound of a variable.
:param name: variable name
:returns: variable upper bound (possibly infinite)
"""
return self._upper_bounds.get(name)
[docs] def get_lower_bounds(self, variables_names=None):
"""Generates an array of the variables' lower bounds.
:param variables_names: names of the variables of which the lower
bounds are required
"""
if self.__norm_data_is_computed and variables_names is None:
return self.__lower_bounds_array
return self.dict_to_array(self._lower_bounds, all_var_list=variables_names)
[docs] def get_upper_bounds(self, variables_names=None):
"""Generates an array of the variables' upper bounds.
:param variables_names: names of the variables of which the upper
bounds are required
"""
if self.__norm_data_is_computed and variables_names is None:
return self.__upper_bounds_array
return self.dict_to_array(self._upper_bounds, all_var_list=variables_names)
[docs] def set_lower_bound(self, name, lower_bound):
"""Set a new lower bound for variable name
:param name: name of the variable
:param lower_bound: lower bound
"""
if name not in self.variables_names:
raise ValueError("Variable " + name + " does not exist")
self._add_bound(name, self.variables_sizes[name], lower_bound, is_lower=True)
self._add_norm_policy(name)
[docs] def set_upper_bound(self, name, upper_bound):
"""Set a new upper bound for variable name
:param name: name of the variable
:param upper_bound: upper bound
"""
if name not in self.variables_names:
raise ValueError("Variable " + name + " does not exist")
self._add_bound(name, self.variables_sizes[name], upper_bound, is_lower=False)
self._add_norm_policy(name)
[docs] def array_to_dict(self, x_array):
"""Split the current point into a dictionary with variables names
:param x_array: x array to be converted to a dict of array
"""
x_dict = {}
current_index = 0
# order given by self.variables_names
for name in self.variables_names:
size = self.variables_sizes[name]
x_dict[name] = x_array[current_index : current_index + size]
current_index += size
return x_dict
@staticmethod
def __get_common_dtype(x_dict):
"""
If dict has a value complex array, returns numpy.complex128
if dict has real values and mixed floats/int, returns numpy.float64
if dict has only int values, returns numpy.int32
:param x_dict : dictionary of variables
"""
has_float = False
has_int = False
for val_arr in x_dict.values():
if not isinstance(val_arr, ndarray):
raise TypeError("x_dict values must be ndarray")
if val_arr.dtype == complex128:
return complex128
if val_arr.dtype == int32:
has_int = True
if val_arr.dtype == float64:
has_float = True
if has_float:
return float64
if has_int:
return int32
return float64
[docs] def dict_to_array(self, x_dict, all_vars=True, all_var_list=None):
"""Aggregate a point as dictionary into array
:param x_dict: point as dictionary
:param all_vars: if True, all variables shall be in x_dict
:param all_var_list: list of whole set of variables,
if None, use self.variables_names
"""
dtype = self.__get_common_dtype(x_dict)
if all_var_list is None:
all_var_list = self.variables_names
if all_vars:
array_list = [array(x_dict[name], dtype=dtype) for name in all_var_list]
else:
array_list = [
array(x_dict[name], dtype=dtype)
for name in all_var_list
if name in x_dict
]
return concatenate(array_list)
[docs] def get_pretty_table(self, fields=None):
"""Builds a PrettyTable object from the design space data
:param fields: list of fields to export, by default all
:returns: the pretty table object
"""
if fields is None:
fields = self.TABLE_NAMES
table = PrettyTable(fields)
table.float_format = "%.16g"
for name in self.variables_names:
size = self.variables_sizes[name]
l_b = self._lower_bounds.get(name)
u_b = self._upper_bounds.get(name)
var_type = self.variables_types[name]
curr = self._current_x.get(name)
for i in range(size):
data = {
"name": name,
"value": None,
"lower_bound": float("-inf"),
"upper_bound": float("inf"),
"type": var_type[i],
}
if l_b is not None and l_b[i] is not None:
data["lower_bound"] = l_b[i]
if u_b is not None and u_b[i] is not None:
data["upper_bound"] = u_b[i]
if curr is not None:
data["value"] = curr[i]
table.add_row([data[key] for key in fields])
table.align["name"] = "l"
table.align["type"] = "l"
return table
[docs] def export_hdf(self, file_path, append=False):
"""Export to hdf file.
:param file_path: path to file to write
:param append: if True, appends the data in the file
"""
if append:
mode = "a"
else:
mode = "w"
h5file = h5py.File(file_path, mode)
design_vars_grp = h5file.require_group(self.DESIGN_SPACE_GROUP)
design_vars_grp.create_dataset(
self.NAMES_GROUP, data=array(self.variables_names, dtype=string_)
)
for name in self.variables_names:
var_grp = design_vars_grp.require_group(name)
var_grp.create_dataset(self.SIZE_GROUP, data=self.variables_sizes[name])
l_b = self._lower_bounds.get(name)
if l_b is not None:
var_grp.create_dataset(self.LB_GROUP, data=l_b)
u_b = self._upper_bounds.get(name)
if u_b is not None:
var_grp.create_dataset(self.UB_GROUP, data=u_b)
var_type = self.variables_types[name]
if var_type is not None:
data_array = string_array(var_type)
dtype = data_array.dtype
var_grp.create_dataset(
self.VAR_TYPE_GROUP, data=data_array, dtype=dtype
)
value = self._current_x.get(name)
if value is not None:
var_grp.create_dataset(self.VALUE_GROUP, data=self.__to_real(value))
h5file.close()
[docs] def import_hdf(self, file_path):
"""Imports design space from hdf file
:param file_path:
"""
if not exists(file_path):
raise ValueError("Input hdf file does not exist ! " + str(file_path))
h5file = h5py.File(file_path, "r")
try:
design_vars_grp = h5file[self.DESIGN_SPACE_GROUP]
variables_names = design_vars_grp[
self.NAMES_GROUP
].value # pylint: disable=E1101
for name in variables_names:
name = name.decode()
var_group = design_vars_grp[name]
l_b = self.__read_opt_attr_array(var_group, self.LB_GROUP)
u_b = self.__read_opt_attr_array(var_group, self.UB_GROUP)
var_type = self.__read_opt_attr_array(var_group, self.VAR_TYPE_GROUP)
value = self.__read_opt_attr_array(var_group, self.VALUE_GROUP)
size = var_group[self.SIZE_GROUP].value
self.add_variable(name, size, var_type, l_b, u_b, value)
except KeyError as err:
h5file.close()
raise KeyError(
"Invalid design space hdf5 file "
+ str(file_path)
+ " missing dataset. "
+ str(err.args[0])
)
h5file.close()
self.check()
@staticmethod
def __read_opt_attr_array(var_group, dataset_name):
"""
Reads an array in a group, can be optional
If data does not exists, returns None
:param var_group : the variable group
:param dataset_name : name of the data
:returns: the data as np array, or None
"""
inval = var_group.get(dataset_name)
if inval is not None:
inval = array(inval)
return inval
@staticmethod
def __to_real(data):
""" Convert complex to real numpy array """
return array(array(data, copy=False).real, dtype=float64)
[docs] def to_complex(self):
""" Casts the current value to complex """
for name, val in self._current_x.items():
self._current_x[name] = array(val, dtype=complex128)
[docs] def export_to_txt(self, output_file, fields=None, header_char="", **table_options):
"""Exports the design space to a text file
:param output_file: output file path
:param fields: list of fields to export, by default all
"""
table = self.get_pretty_table(fields)
table.border = False
for option, val in table_options.items():
table.__setattr__(option, val)
with open(output_file, "w") as outf:
table_str = header_char + table.get_string()
outf.write(table_str)
[docs] @staticmethod
def read_from_txt(input_file, header=None):
"""Parses a csv file to read the DesignSpace
:param input_file: returns: s: the design space
:param header: fields list, or by default, read in the file
:returns: the design space
"""
float_data = genfromtxt(input_file, dtype="float")
str_data = genfromtxt(input_file, dtype="str")
if header is None:
header = to_unicode_list(str_data[0, :].tolist())
start_read = 1
else:
start_read = 0
if not set(DesignSpace.MINIMAL_FIELDS).issubset(set(header)):
raise ValueError(
"Malformed DesignSpace input file "
+ str(input_file)
+ " does not contain minimal "
+ "variables in header :"
+ str(DesignSpace.MINIMAL_FIELDS)
+ ", got instead : "
+ str(header)
)
col_map = {field: i for i, field in enumerate(header)}
var_names = to_unicode_list(str_data[start_read:, 0].tolist())
unique_names = []
prev_name = None
for name in var_names: # set([]) does not preserve order !
if name not in unique_names:
unique_names.append(name)
prev_name = name
elif prev_name != name:
raise ValueError(
"Malformed DesignSpace input file "
+ str(input_file)
+ " contains some variables ("
+ name
+ ") in a non-consecutive order "
)
k = start_read
design_space = DesignSpace()
lower_bounds_field = DesignSpace.MINIMAL_FIELDS[1]
upper_bounds_field = DesignSpace.MINIMAL_FIELDS[2]
value_field = DesignSpace.TABLE_NAMES[2]
var_type_field = DesignSpace.TABLE_NAMES[-1]
for name in unique_names:
size = var_names.count(name)
l_b = float_data[k : k + size, col_map[lower_bounds_field]]
u_b = float_data[k : k + size, col_map[upper_bounds_field]]
if value_field in col_map:
value = float_data[k : k + size, col_map[value_field]]
else:
value = None
if var_type_field in col_map:
var_type = str_data[k : k + size, col_map[var_type_field]].tolist()
else:
var_type = None
design_space.add_variable(name, size, var_type, l_b, u_b, value)
k += size
design_space.check()
return design_space
[docs] def log_me(self):
"""Logs a representation of the design_space characteristics
as a table
"""
msg = str(self)
for line in msg.split("\n"):
LOGGER.info(line)
def __str__(self, *args, **kwargs):
desc = "Design Space: "
desc += "\n" + str(self.get_pretty_table().get_string())
return desc
[docs] def project_into_bounds(self, x_c, normalized=False):
"""
Projects x_c onto the bounds, using a simple
coordinate wise approach
:param x_c: x vector (np array)
:returns: projected x_c
"""
if not self.__norm_data_is_computed:
self.__update_normalization_vars()
if not normalized:
l_b = self.__lower_bounds_array
u_b = self.__upper_bounds_array
else:
l_b = zeros_like(x_c)
u_b = ones_like(x_c)
x_p = array(x_c)
l_inds = where(x_c < l_b)
x_p[l_inds] = l_b[l_inds]
u_inds = where(x_c > u_b)
x_p[u_inds] = u_b[u_inds]
return x_p