Source code for gemseo.algos.design_space

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                           documentation
#        :author: Charlie Vanaret, Benoit Pauwels, Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""Design space.

A design space is used to represent the optimization's unknowns,
a.k.a. the design variables.

A :class:`.DesignSpace` describes this design space at a given state, in terms of names,
sizes, types, bounds and current values of the design variables.

Variables can easily be added to the :class:`.DesignSpace` using the
:meth:`.DesignSpace.add_variable` method or removed using the
:meth:`.DesignSpace.remove_variable` method.

We can also filter the design variables using the :meth:`.DesignSpace.filter` method.

Getters and setters are also available to get or set the value of a given variable
property.

Lastly, an instance of :class:`.DesignSpace` can be stored in a txt or HDF file.
"""

from __future__ import annotations

import collections
import logging
import re
from copy import deepcopy
from numbers import Number
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from typing import Final
from typing import Literal
from typing import NamedTuple
from typing import overload

import h5py
from numpy import abs as np_abs
from numpy import array
from numpy import atleast_1d
from numpy import bytes_
from numpy import complex128
from numpy import concatenate
from numpy import dtype
from numpy import empty
from numpy import equal
from numpy import finfo
from numpy import float64
from numpy import full
from numpy import genfromtxt
from numpy import hstack
from numpy import in1d
from numpy import inf
from numpy import int32
from numpy import isinf
from numpy import isnan
from numpy import logical_or
from numpy import mod
from numpy import ndarray
from numpy import ones_like
from numpy import round
from numpy import vectorize
from numpy import where
from numpy import zeros_like
from strenum import StrEnum

from gemseo.algos.opt_result import OptimizationResult
from gemseo.core.cache import hash_data_dict
from gemseo.third_party.prettytable import PrettyTable
from gemseo.utils.compatibility.scipy import ArrayType
from gemseo.utils.compatibility.scipy import sparse_classes
from gemseo.utils.data_conversion import flatten_nested_dict
from gemseo.utils.data_conversion import split_array_to_dict_of_arrays
from gemseo.utils.hdf5 import get_hdf5_group
from gemseo.utils.repr_html import REPR_HTML_WRAPPER
from gemseo.utils.string_tools import pretty_str

if TYPE_CHECKING:
    from collections.abc import Iterable
    from collections.abc import Mapping
    from collections.abc import Sequence

    from numpy.typing import NDArray

LOGGER = logging.getLogger(__name__)


class _DesignVariableType(StrEnum):
    """The type of design variable."""

    FLOAT = "float"
    INTEGER = "integer"


[docs] class DesignSpace(collections.abc.MutableMapping): """Description of a design space. It defines a set of variables from their names, sizes, types and bounds. In addition, it provides the current values of these variables that can be used as the initial solution of an :class:`.OptimizationProblem`. A :class:`.DesignSpace` has the same API as a dictionary, e.g. ``variable = design_space["x"]``, ``other_design_space["x"] = design_space["x"]``, ``del design_space["x"]``, ``for name, value in design_space["x"].items()``, ... """ name: str | None """The name of the space.""" dimension: int """The total dimension of the space, corresponding to the sum of the sizes of the variables.""" variable_names: list[str] """The names of the variables.""" variable_sizes: dict[str, int] """The sizes of the variables.""" variable_types: dict[str, ndarray] """The types of the variables components, which can be any :attr:`.DesignSpace.DesignVariableType`.""" normalize: dict[str, ndarray] """The normalization policies of the variables components indexed by the variables names; if `True`, the component can be normalized.""" DesignVariableType = _DesignVariableType
[docs] class DesignVariable(NamedTuple): """A design variable.""" size: int | None = 1 var_type: NDArray[_DesignVariableType] | _DesignVariableType | None = ( _DesignVariableType.FLOAT ) l_b: ndarray | None = None u_b: ndarray | None = None value: ndarray | None = None
__VARIABLE_TYPES_TO_DTYPES: ClassVar[dict[str, str]] = { DesignVariableType.FLOAT: "float64", DesignVariableType.INTEGER: "int32", } MINIMAL_FIELDS: ClassVar[list[str]] = ["name", "lower_bound", "upper_bound"] TABLE_NAMES: ClassVar[list[str]] = [ "name", "lower_bound", "value", "upper_bound", "type", ] DESIGN_SPACE_GROUP = "design_space" NAME_GROUP = "name" NAMES_GROUP = "names" LB_GROUP = "l_b" UB_GROUP = "u_b" VAR_TYPE_GROUP = "var_type" VALUE_GROUP = "value" SIZE_GROUP = "size" # separator that denotes a vector's components SEP = "!" __INT_DTYPE = dtype("int32") __FLOAT_DTYPE = dtype("float64") __COMPLEX_DTYPE = dtype("complex128") __DEFAULT_COMMON_DTYPE = __FLOAT_DTYPE """The default NumPy data type of the variables.""" __CAMEL_CASE_REGEX: Final[re.Pattern] = re.compile("[A-Z][^A-Z]*") """A regular expression to decompose a CamelCase string.""" __current_value_array: ndarray """The current value stored as a concatenated array.""" __norm_current_value: dict[str, ndarray] """The norm of the current value.""" __norm_current_value_array: ndarray """The norm of the current value stored as a concatenated array.""" __names_to_indices: dict[str, range] """The names bound to the indices in a design vector.""" def __init__(self, name: str = "") -> None: """ Args: name: The name to be given to the design space. If empty, the design space is unnamed. """ # noqa: D205, D212, D415 self.name = name self.variable_names = [] self.dimension = 0 self.variable_sizes = {} self.variable_types = {} self.normalize = {} self._lower_bounds = {} self._upper_bounds = {} # These attributes are stored for faster computation of normalization # and unnormalization self._norm_factor = None self._norm_factor_inv = None self.__lower_bounds_array = None self.__upper_bounds_array = None self.__integer_components = None self.__no_integer = True self.__norm_data_is_computed = False self.__norm_inds = None self.__to_zero = None self.__bound_tol = 100.0 * finfo(float64).eps self.__current_value = {} self.__has_current_value = False self.__common_dtype = self.__DEFAULT_COMMON_DTYPE self.__clear_dependent_data() self.__names_to_indices = {} @property def _current_value(self) -> dict[str, ndarray]: """The current design value.""" return self.__current_value def __update_current_metadata(self) -> None: """Update information about the current design value for quick access.""" self.__update_current_status() self.__update_common_dtype() if self.__has_current_value: self.__clear_dependent_data() def __clear_dependent_data(self) -> None: """Reset the data that depends on the current value.""" self.__current_value_array = array([]) self.__norm_current_value = {} self.__norm_current_value_array = array([]) def __update_common_dtype(self) -> None: """Update the common data type of the variables.""" if self.__has_current_value: self.__common_dtype = self.__get_common_dtype(self.__current_value) else: self.__common_dtype = self.__DEFAULT_COMMON_DTYPE def __update_current_status(self) -> None: """Update the availability of current design values for all the variables.""" if not self.__current_value or self.__current_value.keys() != set( self.variable_names ): self.__has_current_value = False return for value in self.__current_value.values(): if value is None: self.__has_current_value = False return self.__has_current_value = True def __delitem__( self, name: str, ) -> None: """Remove a variable from the design space. Args: name: The name of the variable to be removed. """ self.remove_variable(name)
[docs] def remove_variable( self, name: str, ) -> None: """Remove a variable from the design space. Args: name: The name of the variable to be removed. """ self.__norm_data_is_computed = False size = self.variable_sizes.pop(name) self.dimension -= size del self.__names_to_indices[name] for variable_name in reversed(self.variable_names): if variable_name == name: break indices = self.__names_to_indices[variable_name] # N.B. the steps of the ranges of indices are assumed equal to 1 self.__names_to_indices[variable_name] = range( indices.start - size, indices.stop - size ) self.variable_names.remove(name) del self.variable_types[name] del self.normalize[name] if name in self._lower_bounds: del self._lower_bounds[name] if name in self._upper_bounds: del self._upper_bounds[name] if name in self.__current_value: del self.__current_value[name] self.__update_current_metadata()
[docs] def filter( # noqa: A003 self, keep_variables: str | Iterable[str], copy: bool = False, ) -> DesignSpace: """Filter the design space to keep a subset of variables. Args: keep_variables: The names of the variables to be kept. copy: If ``True``, then a copy of the design space is filtered, otherwise the design space itself is filtered. Returns: Either the filtered original design space or a copy. Raises: ValueError: If the variable is not in the design space. """ if isinstance(keep_variables, str): keep_variables = [keep_variables] design_space = deepcopy(self) if copy else self for name in deepcopy(self.variable_names): if name not in keep_variables: design_space.remove_variable(name) for name in keep_variables: if name not in self.variable_names: msg = f"Variable '{name}' is not known." raise ValueError(msg) return design_space
[docs] def filter_dim( self, variable: str, keep_dimensions: Iterable[int], ) -> DesignSpace: """Filter the design space to keep a subset of dimensions for a variable. Args: variable: The name of the variable. keep_dimensions: The dimensions of the variable to be kept, between :math:`0` and :math:`d-1` where :math:`d` is the number of dimensions of the variable. Returns: The filtered design space. Raises: ValueError: If a dimension is unknown. """ self.__norm_data_is_computed = False removed_dimensions = list( set(range(self.variable_sizes[variable])) - set(keep_dimensions) ) bad_dimensions = list( set(keep_dimensions) - set(range(self.variable_sizes[variable])) ) size = len(removed_dimensions) self.dimension -= size self.variable_sizes[variable] -= size types = [] for dimension in keep_dimensions: if dimension in bad_dimensions: self.remove_variable(variable) msg = f"Dimension {dimension} of variable '{variable}' is not known." raise ValueError(msg) types.append(self.variable_types[variable][dimension]) self.variable_types[variable] = array(types) idx = keep_dimensions self.normalize[variable] = self.normalize[variable][idx] if variable in self._lower_bounds: self._lower_bounds[variable] = self._lower_bounds[variable][idx] if variable in self._upper_bounds: self._upper_bounds[variable] = self._upper_bounds[variable][idx] if variable in self.__current_value: self.__current_value[variable] = self.__current_value[variable][idx] self.__update_current_metadata() return self
[docs] def add_variable( self, name: str, size: int = 1, var_type: DesignVariableType | Sequence[DesignVariableType] = DesignVariableType.FLOAT, l_b: float | ndarray | None = None, u_b: float | ndarray | None = None, value: float | ndarray | None = None, ) -> None: r"""Add a variable to the design space. Args: name: The name of the variable. size: The size of the variable. var_type: Either the type of the variable or the types of its components. l_b: The lower bound of the variable. If ``None``, use :math:`-\infty`. u_b: The upper bound of the variable. If ``None``, use :math:`+\infty`. value: The default value of the variable. If ``None``, do not use a default value. Raises: ValueError: Either if the variable already exists or if the size is not a positive integer. """ self._check_variable_name(name) if size <= 0 or int(size) != size: msg = f"The size of '{name}' should be a positive integer." raise ValueError(msg) # name and size current_index = self.dimension self.__names_to_indices[name] = range(current_index, current_index + size) self.variable_names.append(name) self.dimension += size self.variable_sizes[name] = size # type self._add_type(name, size, var_type) # bounds self._add_bound(name, size, l_b) self._add_bound(name, size, u_b, is_lower=False) self._check_variable_bounds(name) # normalization policy self._add_norm_policy(name) if value is not None: array_value = atleast_1d(value) self._check_value(array_value, name) if len(array_value) == 1 and size > 1: array_value = full(size, value) self.__current_value[name] = array_value.astype( self.__VARIABLE_TYPES_TO_DTYPES[self.variable_types[name][0]], copy=False, ) try: self._check_current_value(name) except ValueError: # If a ValueError is raised, # we must remove the variable from the design space. # When using a python script, this has no interest. # When using a notebook, a cell can raise a ValueError, # but we can continue to the next cell, # and use a design space which contains a variables that leads to error. self.remove_variable(name) raise self.__update_current_metadata()
def _check_variable_name(self, name: str) -> None: """Check if the space contains a variable. Args: name: The name of the variable. Raises: ValueError: When the variable already exists. """ if name in self.variable_names: msg = f"The variable '{name}' already exists." raise ValueError(msg) @property def names_to_indices(self) -> dict[str, range]: """The names bound to the indices.""" return self.__names_to_indices def _add_type( self, name: str, size: int, var_type: DesignVariableType | Sequence[DesignVariableType] = DesignVariableType.FLOAT, ) -> None: """Add a type to a variable. Args: name: The name of the variable. size: The size of the variable. var_type: Either the type of the variable (see :attr:`.DesignSpace.DesignVariablesType`) or the types of its components. Raises: ValueError: Either if the number of component types is different from the variable size or if a variable type is unknown. """ if isinstance(var_type, (str, self.DesignVariableType, bytes)): var_type = [var_type] * size if len(var_type) != size: msg = f"The list of types for variable '{name}' should be of size {size}." raise ValueError(msg) var_types = [] for v_type in var_type: if isinstance(v_type, bytes): v_type = v_type.decode() if v_type not in set(self.DesignVariableType): msg = f'The type "{v_type}" of {name} is not known.' raise ValueError(msg) var_types += [v_type] self.variable_types[name] = array(var_types) self.__norm_data_is_computed = False def _add_norm_policy( self, name: str, ) -> None: """Add a normalization policy to a variable. Unbounded variables are not normalized. Bounded variables (both from above and from below) are normalized. Args: name: The name of a variable. Raises: ValueError: Either if the variable is not in the design space, if its size is not set, if the types of its components are not set or if there is no implemented normalization policy for the type of this variable. """ # Check that the variable is in the design space: if name not in self.variable_names: msg = f"Variable '{name}' is not known." raise ValueError(msg) # Check that the variable size is set: size = self.get_size(name) if size is None: msg = f"The size of variable '{name}' is not set." raise ValueError(msg) # Check that the variables types are set: variable_types = self.variable_types.get(name, None) if variable_types is None: msg = f"The components types of variable '{name}' are not set." raise ValueError(msg) # Set the normalization policy: normalize = empty(size) for i in range(size): var_type = variable_types[i] if var_type in self.__VARIABLE_TYPES_TO_DTYPES: if ( self._lower_bounds[name][i] == -inf or self._upper_bounds[name][i] == inf ): # Unbounded variables are not normalized: normalize[i] = False elif self._lower_bounds[name][i] == self._upper_bounds[name][i]: # Constant variables are not normalized: normalize[i] = False else: normalize[i] = True else: msg = "The normalization policy for type {0} is not implemented." raise ValueError(msg.format(var_type)) self.normalize[name] = normalize @staticmethod def __is_integer( values: ndarray | Number, ) -> ndarray: """Check if each value is an integer. Args: values: The array or number to be checked. Returns: Whether each of the given values is an integer. """ values = atleast_1d(values) return array([isinf(x) or x is None or not mod(x, 1) for x in values]) @staticmethod def __is_numeric( value: Any, ) -> bool: """Check that a value is numeric. Args: value: The value to be checked. Returns: Whether the value is numeric. """ res = (value is None) or hasattr(value, "real") if not res: try: float(value) except TypeError: return False return True @staticmethod def __is_not_nan( value: ndarray, ) -> bool: """Check that a value is not a nan. Args: value: The value to be checked. Returns: Whether the value is not a nan. """ return (value is None) or ~isnan(value) def _check_value( self, value: ndarray, name: str, ) -> bool: """Check that the value of a variable is valid. Args: value: The value to be checked. name: The name of the variable. Returns: Whether the value of the variable is valid. Raises: ValueError: Either if the array is not one-dimensional, if the value is not numerizable, if the value is nan or if there is a component value which is not an integer while the variable type is integer and ``allow_inf_int_bound`` is set to ``False``. """ all_indices = set(range(len(value))) # OK if the variable value is one-dimensional if len(value.shape) > 1: msg = ( f"Value {value} of variable '{name}' has dimension greater than 1 " "while a float or a 1d iterable object " "(array, list, tuple, ...) " "while a scalar was expected." ) raise ValueError(msg) # OK if all components are None if all(equal(value, None)): return True test = vectorize(self.__is_numeric)(value) indices = all_indices - set(test.nonzero()[0]) for idx in indices: msg = f"Value {value[idx]} of variable '{name}' is not numerizable." raise ValueError(msg) test = vectorize(self.__is_not_nan)(value) indices = all_indices - set(test.nonzero()[0]) for idx in indices: msg = f"Value {value[idx]} of variable '{name}' is NaN." raise ValueError(msg) # Check if some components of an integer variable are not integer. if self.variable_types[name][0] == self.DesignVariableType.INTEGER: indices = all_indices - set(self.__is_integer(value).nonzero()[0]) for idx in indices: msg = ( f"Component value {value[idx]} of variable '{name}'" " is not an integer " "while variable is of type integer " f"(index: {idx})." ) raise ValueError(msg) return True def _add_bound( self, name: str, size: int, bound: ndarray | Number, is_lower: bool = True, ) -> None: """Add a lower or upper bound to a variable. Args: name: The name of the variable. size: The size of the variable. bound: The bound of the variable. is_lower: If ``True``, the bound is a lower bound. Otherwise, it is an upper bound. Raises: ValueError: If the size of the bound is different from the size of the variable. """ self.__norm_data_is_computed = False bounds = self._lower_bounds if is_lower else self._upper_bounds if bound is None: infinity = full(size, inf) bound_to_update = -infinity if is_lower else infinity bounds.update({name: bound_to_update}) return infinity = -inf if is_lower else inf bound_to_update = atleast_1d(bound) if None in bound_to_update: bound_to_update = where( equal(bound_to_update, None), infinity, bound_to_update ).astype(self.__FLOAT_DTYPE) self._check_value(bound_to_update, name) if isinstance(bound, Number): # scalar: same lower bound for all components bound_to_update = full(size, bound) elif len(bound_to_update) != size: bound_prefix = "lower" if is_lower else "upper" msg = f"The {bound_prefix} bounds of '{name}' should be of size {size}." raise ValueError(msg) bounds.update({name: bound_to_update}) def _check_variable_bounds( self, name: str, ) -> None: """Check that the bounds of a variable are compatible and have the same size. Args: name: The name of the variable. Raises: ValueError: If the bounds of the variable are not valid. """ l_b = self._lower_bounds[name] u_b = self._upper_bounds[name] inds = (u_b < l_b).nonzero()[0] if inds.size != 0: msg = ( f"The bounds of variable '{name}'{inds} are not valid: " f"{l_b[inds]}!<{u_b[inds]}." ) raise ValueError(msg) def _check_current_value( self, name: str, ) -> None: """Check that the current value of a variable is between its bounds. Args: name: The name of the variable. Raises: ValueError: If the current value of the variable is outside its bounds. """ l_b = self._lower_bounds.get(name, None) u_b = self._upper_bounds.get(name, None) current_value = self.__current_value.get(name, None) not_none = ~equal(current_value, None) indices = ( logical_or( current_value[not_none] < l_b[not_none] - self.__bound_tol, current_value[not_none] > u_b[not_none] + self.__bound_tol, ) ).nonzero()[0] for index in indices: msg = ( f"The current value of variable '{name}' ({current_value[index]}) is " f"not between the lower bound {l_b[index]} and the upper bound " f"{u_b[index]}." ) raise ValueError(msg)
[docs] def has_current_value(self) -> bool: """Check if each variable has a current value. Returns: Whether the current design value is defined for all variables. """ return self.__has_current_value
[docs] def has_integer_variables(self) -> bool: """Check if the design space has at least one integer variable. Returns: Whether the design space has at least one integer variable. """ return self.DesignVariableType.INTEGER in [ self.variable_types[variable_name][0] for variable_name in self.variable_names ]
[docs] def check(self) -> None: """Check the state of the design space. Raises: ValueError: If the design space is empty. """ if not self.variable_names: msg = "The design space is empty." raise ValueError(msg) for name in self.variable_names: self._check_variable_bounds(name) if self.has_current_value(): self._check_current_names()
[docs] def check_membership( self, x_vect: Mapping[str, ndarray] | ndarray, variable_names: Sequence[str] | None = None, ) -> None: """Check whether the variables satisfy the design space requirements. Args: x_vect: The values of the variables. variable_names: The names of the variables. If ``None``, use the names of the variables of the design space. Raises: ValueError: Either if the dimension of the values vector is wrong, if the values are not specified as an array or a dictionary, if the values are outside the bounds of the variables or if the component of an integer variable is not an integer. """ if isinstance(x_vect, dict): self.__check_membership(x_vect, variable_names) elif isinstance(x_vect, ndarray): if x_vect.size != self.dimension: msg = ( f"The array should be of size {self.dimension}; got {x_vect.size}." ) raise ValueError(msg) if variable_names is None: if self.__lower_bounds_array is None: self.__lower_bounds_array = self.get_lower_bounds() if self.__upper_bounds_array is None: self.__upper_bounds_array = self.get_upper_bounds() self.__check_membership_x_vect(x_vect) else: self.__check_membership( split_array_to_dict_of_arrays( x_vect, self.variable_sizes, variable_names ), variable_names, ) else: msg = ( "The input vector should be an array or a dictionary; " f"got a {type(x_vect)} instead." ) raise TypeError(msg)
def __check_membership_x_vect(self, x_vect: ndarray) -> None: """Check whether a vector is comprised between the lower and upper bounds. Args: x_vect: The vector. Raises: ValueError: When the values are outside the bounds of the variables. """ l_b = self.__lower_bounds_array u_b = self.__upper_bounds_array indices = (x_vect < l_b - self.__bound_tol).nonzero()[0] if len(indices): value = x_vect[indices] lower_bound = l_b[indices] msg = ( f"The components {indices} of the given array ({value}) " f"are lower than the lower bound ({lower_bound}) " f"by {lower_bound - value}." ) raise ValueError(msg) indices = (x_vect > u_b + self.__bound_tol).nonzero()[0] if len(indices): value = x_vect[indices] upper_bound = u_b[indices] msg = ( f"The components {indices} of the given array ({value}) " f"are greater than the upper bound ({upper_bound}) " f"by {value - upper_bound}." ) raise ValueError(msg) def __check_membership( self, x_dict: Mapping[str, ndarray], variable_names: Iterable[str] | None, ) -> None: """Check whether the variables satisfy the design space requirements. Args: x_dict: The values of the variables. variable_names: The names of the variables. If ``None``, use the names of the variables of the design space. Raises: ValueError: Either if the dimension of an array is wrong, if the values are outside the bounds of the variables or if the component of an integer variable is not an integer. """ variable_names = variable_names or self.variable_names for name in variable_names: value = x_dict[name] if value is None: continue size = self.variable_sizes[name] l_b = self._lower_bounds.get(name, None) u_b = self._upper_bounds.get(name, None) if value.size != size: msg = ( f"The variable {name} of size {size} " f"cannot be set with an array of size {value.size}." ) raise ValueError(msg) for i in range(size): x_real = value[i].real if l_b is not None and x_real < l_b[i] - self.__bound_tol: msg = ( f"The component {name}[{i}] of the given array ({x_real}) " f"is lower than the lower bound ({l_b[i]}) " f"by {l_b[i] - x_real:.1e}." ) raise ValueError(msg) if u_b is not None and u_b[i] + self.__bound_tol < x_real: msg = ( f"The component {name}[{i}] of the given array ({x_real}) " f"is greater than the upper bound ({l_b[i]}) " f"by {x_real - u_b[i]:.1e}." ) raise ValueError(msg) if ( self.variable_types[name][0] == self.DesignVariableType.INTEGER ) and not self.__is_integer(x_real): msg = ( f"The variable {name} is of type integer; " f"got {name}[{i}] = {x_real}." ) raise ValueError(msg)
[docs] def get_active_bounds( self, x_vec: ndarray | None = None, tol: float = 1e-8, ) -> tuple[dict[str, ndarray], dict[str, ndarray]]: """Determine which bound constraints of a design value are active. Args: x_vec: The design value at which to check the bounds. If ``None``, use the current design value. tol: The tolerance of comparison of a scalar with a bound. Returns: Whether the components of the lower and upper bound constraints are active, the first returned value representing the lower bounds and the second one the upper bounds, e.g. .. code-block:: python ( { "x": array(are_x_lower_bounds_active), "y": array(are_y_lower_bounds_active), }, { "x": array(are_x_upper_bounds_active), "y": array(are_y_upper_bounds_active), }, ) where: .. code-block:: python are_x_lower_bounds_active = [True, False] are_x_upper_bounds_active = [False, False] are_y_lower_bounds_active = [False] are_y_upper_bounds_active = [True] """ if x_vec is None: current_x = self.__current_value self.check_membership(self.get_current_value()) elif isinstance(x_vec, ndarray): current_x = self.array_to_dict(x_vec) elif isinstance(x_vec, dict): current_x = x_vec else: msg = f"Expected dict or array for x_vec argument; got {type(x_vec)}." raise TypeError(msg) active_l_b = {} active_u_b = {} for name in self.variable_names: l_b = self._lower_bounds.get(name) l_b = where(equal(l_b, None), -inf, l_b) u_b = self._upper_bounds.get(name) u_b = where(equal(u_b, None), inf, u_b) x_vec_i = current_x[name] # lower bound saturated active_l_b[name] = np_abs(x_vec_i - l_b) <= tol # upper bound saturated active_u_b[name] = np_abs(x_vec_i - u_b) <= tol return active_l_b, active_u_b
def _check_current_names( self, variable_names: Iterable[str] | None = None, ) -> None: """Check the names of the current design value. Args: variable_names: The names of the variables. If ``None``, use the names of the variables of the design space. Raises: ValueError: If the names of the variables of the current design value and the names of the variables of the design space are different. """ if sorted(set(self.variable_names)) != sorted(self.__current_value.keys()): msg = ( f"Expected current_x variables: {self.variable_names}; " f"got {list(self.__current_value.keys())}." ) raise ValueError(msg) self.check_membership(self.__current_value, variable_names)
[docs] def get_current_value( self, variable_names: Sequence[str] | None = None, complex_to_real: bool = False, as_dict: bool = False, normalize: bool = False, ) -> ndarray | dict[str, ndarray]: """Return the current design value. If the names of the variables are empty then an empty data is returned. Args: variable_names: The names of the design variables. If ``None``, use all the design variables. complex_to_real: Whether to cast complex numbers to real ones. as_dict: Whether to return the current design value as a dictionary of the form ``{variable_name: variable_value}``. normalize: Whether to normalize the design values in :math:`[0,1]` with the bounds of the variables. Returns: The current design value. Raises: ValueError: If names in ``variable_names`` are not in the design space. Warnings: For performance purposes, :meth:`.get_current_value` does not return a copy of the current value. This means that modifying the returned object will make the :class:`.DesignSpace` inconsistent (the current design value stored as a NumPy array and the current design value stored as a dictionary of NumPy arrays will be different). To modify the returned object without impacting the :class:`.DesignSpace`, you shall copy this object and modify the copy. See Also: To modify the current value, please use :meth:`.set_current_value` or :meth:`.set_current_variable`. """ if variable_names is not None: if not variable_names: return {} if as_dict else array([]) not_variable_names = set(variable_names) - set(self.variable_names) if not_variable_names: msg = ( "There are no such variables named: " f"{pretty_str(not_variable_names)}." ) raise ValueError(msg) if self.__has_current_value and not len(self.__current_value_array): self.__current_value_array = self.dict_to_array(self.__current_value) if normalize: if self.__has_current_value and not len(self.__norm_current_value_array): self.__norm_current_value_array = self.normalize_vect( self.__current_value_array ) self.__norm_current_value = self.array_to_dict( self.__norm_current_value_array ) current_x_array = self.__norm_current_value_array current_x_dict = self.__norm_current_value else: current_x_array = self.__current_value_array current_x_dict = self.__current_value if variable_names is None or set(variable_names) == set(self.variable_names): if as_dict: if complex_to_real: return {k: v.real for k, v in current_x_dict.items()} return current_x_dict if not self.__has_current_value: variables = set(self.variable_names) - current_x_dict.keys() msg = ( "There is no current value for the design variables: " f"{pretty_str(variables)}." ) raise KeyError(msg) if variable_names is None or list(variable_names) == self.variable_names: if complex_to_real: return current_x_array.real return current_x_array if as_dict: current_value = {name: current_x_dict[name] for name in variable_names} if complex_to_real: return {k: v.real for k, v in current_value.items()} return current_value current_x_array = self.dict_to_array( current_x_dict, variable_names=variable_names ) if complex_to_real: return current_x_array.real return current_x_array
[docs] def get_indexed_var_name( self, variable_name: str, ) -> str | list[str]: """Create the names of the components of a variable. If the size of the variable is equal to 1, this method returns the name of the variable. Otherwise, it concatenates the name of the variable, the separator :attr:`.DesignSpace.SEP` and the index of the component. Args: variable_name: The name of the variable. Returns: The names of the components of the variable. """ size = self.variable_sizes[variable_name] if size == 1: return variable_name return [variable_name + self.SEP + str(i) for i in range(size)]
[docs] def get_indexed_variable_names(self) -> list[str]: """Create the names of the components of all the variables. If the size of the variable is equal to 1, this method uses its name. Otherwise, it concatenates the name of the variable, the separator :attr:`.DesignSpace.SEP` and the index of the component. Returns: The name of the components of all the variables. """ var_ind_names = [] for var in self.variable_names: vnames = self.get_indexed_var_name(var) if isinstance(vnames, str): var_ind_names.append(vnames) else: var_ind_names += vnames return var_ind_names
[docs] def get_variables_indexes( self, variable_names: Iterable[str], use_design_space_order: bool = True, ) -> NDArray[int]: """Return the indexes of a design array corresponding to variables names. Args: variable_names: The names of the variables. use_design_space_order: Whether to order the indexes according to the order of the variables names in the design space. Otherwise the indexes will be ordered in the same order as the variables names were required. Returns: The indexes of a design array corresponding to the variables names. """ if use_design_space_order: names = [name for name in self.variable_names if name in variable_names] else: names = variable_names return concatenate([self.__names_to_indices[name] for name in names])
def __update_normalization_vars(self) -> None: """Compute the inner attributes used for normalization and unnormalization.""" self.__lower_bounds_array = self.get_lower_bounds() self.__upper_bounds_array = self.get_upper_bounds() self._norm_factor = self.__upper_bounds_array - self.__lower_bounds_array self.__norm_inds = self.dict_to_array(self.normalize).nonzero()[0] # In case lb=ub norm_factor_is_zero = self._norm_factor == 0.0 self.__to_zero = norm_factor_is_zero.nonzero()[0] self._norm_factor_inv = 1 / where(norm_factor_is_zero, 1, self._norm_factor) self.__integer_components = concatenate([ self.variable_types[variable_name] == self.DesignVariableType.INTEGER for variable_name in self.variable_names ]) self.__no_integer = not self.__integer_components.any() self.__norm_data_is_computed = True
[docs] def normalize_vect( self, x_vect: ArrayType, minus_lb: bool = True, out: ndarray | None = None, ) -> ArrayType: r"""Normalize a vector of the design space. If `minus_lb` is True: .. math:: x_u = \frac{x-l_b}{u_b-l_b} where :math:`l_b` and :math:`u_b` are the lower and upper bounds of :math:`x`. Otherwise: .. math:: x_u = \frac{x}{u_b-l_b} Unbounded variables are not normalized. Args: x_vect: The values of the design variables. minus_lb: If ``True``, remove the lower bounds at normalization. out: The array to store the normalized vector. If ``None``, create a new array. Returns: The normalized vector. """ if not self.__norm_data_is_computed: self.__update_normalization_vars() if out is None: use_out = False out = x_vect.copy() else: use_out = True out[...] = x_vect # Normalize the relevant components: current_x_dtype = self.__common_dtype # Normalization will not work with integers. if current_x_dtype.kind == "i": current_x_dtype = self.__FLOAT_DTYPE if out.dtype != current_x_dtype: if use_out: out[...] = out.astype(current_x_dtype, copy=False) else: out = out.astype(current_x_dtype, copy=False) norm_inds = self.__norm_inds if minus_lb: out[..., norm_inds] -= self.__lower_bounds_array[norm_inds] if isinstance(out, sparse_classes): # Construct a mask to only scale the required columns column_mask = in1d(out.indices, norm_inds) # Scale the corresponding coefficients out.data[column_mask] *= self._norm_factor_inv[out.indices][column_mask] else: out[..., norm_inds] *= self._norm_factor_inv[norm_inds] # In case lb=ub, put value to 0. to_zero = self.__to_zero if to_zero.size > 0: out[..., to_zero] = 0.0 return out
[docs] def normalize_grad( self, g_vect: ArrayType, ) -> ArrayType: r"""Normalize an unnormalized gradient. This method is based on the chain rule: .. math:: \frac{df(x)}{dx} = \frac{df(x)}{dx_u}\frac{dx_u}{dx} = \frac{df(x)}{dx_u}\frac{1}{u_b-l_b} where :math:`x_u = \frac{x-l_b}{u_b-l_b}` is the normalized input vector, :math:`x` is the unnormalized input vector and :math:`l_b` and :math:`u_b` are the lower and upper bounds of :math:`x`. Then, the normalized gradient reads: .. math:: \frac{df(x)}{dx_u} = (u_b-l_b)\frac{df(x)}{dx} where :math:`\frac{df(x)}{dx}` is the unnormalized one. Args: g_vect: The gradient to be normalized. Returns: The normalized gradient. """ return self.unnormalize_vect(g_vect, minus_lb=False, no_check=True)
[docs] def unnormalize_grad( self, g_vect: ArrayType, ) -> ArrayType: r"""Unnormalize a normalized gradient. This method is based on the chain rule: .. math:: \frac{df(x)}{dx} = \frac{df(x)}{dx_u}\frac{dx_u}{dx} = \frac{df(x)}{dx_u}\frac{1}{u_b-l_b} where :math:`x_u = \frac{x-l_b}{u_b-l_b}` is the normalized input vector, :math:`x` is the unnormalized input vector, :math:`\frac{df(x)}{dx_u}` is the unnormalized gradient :math:`\frac{df(x)}{dx}` is the normalized one, and :math:`l_b` and :math:`u_b` are the lower and upper bounds of :math:`x`. Args: g_vect: The gradient to be unnormalized. Returns: The unnormalized gradient. """ return self.normalize_vect(g_vect, minus_lb=False)
[docs] def unnormalize_vect( self, x_vect: ArrayType, minus_lb: bool = True, no_check: bool = False, out: ndarray | None = None, ) -> ArrayType: """Unnormalize a normalized vector of the design space. If `minus_lb` is True: .. math:: x = x_u(u_b-l_b) + l_b where :math:`x_u` is the normalized input vector, :math:`x` is the unnormalized input vector and :math:`l_b` and :math:`u_b` are the lower and upper bounds of :math:`x`. Otherwise: .. math:: x = x_u(u_b-l_b) Args: x_vect: The values of the design variables. minus_lb: Whether to remove the lower bounds at normalization. no_check: Whether to check if the components are in :math:`[0,1]`. out: The array to store the unnormalized vector. If ``None``, create a new array. Returns: The unnormalized vector. """ if not self.__norm_data_is_computed: self.__update_normalization_vars() norm_inds = self.__norm_inds lower_bounds = self.__lower_bounds_array if not no_check: value = x_vect[..., norm_inds] lower_bounds_violated = value < -self.__bound_tol upper_bounds_violated = value > 1 + self.__bound_tol any_lower_bound_violated = lower_bounds_violated.any() any_upper_bound_violated = upper_bounds_violated.any() msg = "All components of the normalized vector should be between 0 and 1; " if any_lower_bound_violated: msg += f"lower bounds violated: {value[lower_bounds_violated]}; " if any_upper_bound_violated: msg += f"upper bounds violated: {value[upper_bounds_violated]}; " if any_lower_bound_violated or any_upper_bound_violated: msg = msg[:-2] + "." LOGGER.warning(msg) if out is None: out = x_vect.copy() else: out *= 0 out = x_vect # Unnormalize the relevant components: recast_to_int = False current_x_dtype = self.__common_dtype # Normalization will not work with integers. if current_x_dtype.kind == "i": current_x_dtype = self.__FLOAT_DTYPE recast_to_int = True if out.dtype != current_x_dtype: out = out.astype(current_x_dtype, copy=False) if isinstance(out, sparse_classes): # Construct a mask to only scale the required columns column_mask = in1d(out.indices, norm_inds) # Scale the corresponding coefficients out.data[column_mask] *= self._norm_factor[out.indices][column_mask] else: out[..., norm_inds] *= self._norm_factor[norm_inds] if minus_lb: out[..., norm_inds] += lower_bounds[norm_inds] # In case lb=ub, put value to lower bound. to_lower_bounds = self.__to_zero if to_lower_bounds.size > 0: out[..., to_lower_bounds] = lower_bounds[to_lower_bounds] if not self.__no_integer: self.round_vect(out, copy=False) if recast_to_int: out = out.astype(int32) return out
[docs] def transform_vect( self, vector: ndarray, out: ndarray | None = None, ) -> ndarray: """Map a point of the design space to a vector with components in :math:`[0,1]`. Args: vector: A point of the design space. out: The array to store the transformed vector. If ``None``, create a new array. Returns: A vector with components in :math:`[0,1]`. """ return self.normalize_vect(vector, out=out)
[docs] def untransform_vect( self, vector: ndarray, no_check: bool = False, out: ndarray | None = None, ) -> ndarray: """Map a vector with components in :math:`[0,1]` to the design space. Args: vector: A vector with components in :math:`[0,1]`. no_check: Whether to check if the components are in :math:`[0,1]`. out: The array to store the untransformed vector. If ``None``, create a new array. Returns: A point of the variables space. """ return self.unnormalize_vect(vector, no_check=no_check, out=out)
[docs] def round_vect( self, x_vect: ndarray, copy: bool = True, ) -> ndarray: """Round the vector where variables are of integer type. Args: x_vect: The values to be rounded. copy: Whether to round a copy of ``x_vect``. Returns: The rounded values. """ if not self.__norm_data_is_computed: self.__update_normalization_vars() if self.__no_integer: return x_vect rounded_x_vect = x_vect.copy() if copy else x_vect are_integers = self.__integer_components rounded_x_vect[..., are_integers] = round(x_vect[..., are_integers]) return rounded_x_vect
[docs] def set_current_value( self, value: ndarray | Mapping[str, ndarray] | OptimizationResult, ) -> None: """Set the current design value. Args: value: The value of the current design. Raises: ValueError: If the value has a wrong dimension. TypeError: If the value is neither a mapping of NumPy arrays, a NumPy array nor an :class:`.OptimizationResult`. """ if isinstance(value, dict): self.__current_value = value elif isinstance(value, ndarray): if value.size != self.dimension: msg = ( "Invalid current_x, " f"dimension mismatch: {self.dimension} != {value.size}." ) raise ValueError(msg) self.__current_value = self.array_to_dict(value) elif isinstance(value, OptimizationResult): if value.x_opt.size != self.dimension: msg = ( "Invalid x_opt, " f"dimension mismatch: {self.dimension} != {value.x_opt.size}." ) raise ValueError(msg) self.__current_value = self.array_to_dict(value.x_opt) else: msg = ( "The current design value should be either an array, " "a dictionary of arrays " "or an optimization result; " f"got {type(value)} instead." ) raise TypeError(msg) for name, value in self.__current_value.items(): if value is not None: variable_type = self.variable_types[name] if isinstance(variable_type, ndarray): variable_type = variable_type[0] if variable_type == self.DesignVariableType.INTEGER: value = value.astype(self.__VARIABLE_TYPES_TO_DTYPES[variable_type]) self.__current_value[name] = value self.__update_current_metadata() if self.__current_value: self._check_current_names()
[docs] def set_current_variable( self, name: str, current_value: ndarray, ) -> None: """Set the current value of a single variable. Args: name: The name of the variable. current_value: The current value of the variable. """ if name in self.variable_names: self.__current_value[name] = current_value self.__update_current_metadata() else: msg = f"Variable '{name}' is not known." raise ValueError(msg)
[docs] def get_size( self, name: str, ) -> int | None: """Get the size of a variable. Args: name: The name of the variable. Returns: The size of the variable, None if it is not known. """ return self.variable_sizes.get(name, None)
[docs] def get_type( self, name: str, ) -> str | None: """Return the type of a variable. Args: name: The name of the variable. Returns: The type of the variable, None if it is not known. """ return self.variable_types.get(name, None)
[docs] def get_lower_bound( self, name: str, ) -> ndarray | None: """Return the lower bound of a variable. Args: name: The name of the variable. Returns: The lower bound of the variable (possibly infinite). """ return self._lower_bounds.get(name)
[docs] def get_upper_bound( self, name: str, ) -> ndarray | None: """Return the upper bound of a variable. Args: name: The name of the variable. Returns: The upper bound of the variable (possibly infinite). """ return self._upper_bounds.get(name)
@overload def get_lower_bounds( self, variable_names: Sequence[str] | None = None, as_dict: Literal[False] = False, ) -> ndarray: ... @overload def get_lower_bounds( self, variable_names: Sequence[str] | None = None, as_dict: Literal[True] = False, ) -> dict[str, ndarray]: ...
[docs] def get_lower_bounds( self, variable_names: Sequence[str] | None = None, as_dict: bool = False, ) -> ndarray | dict[str, ndarray]: """Return the lower bounds of design variables. Args: variable_names: The names of the design variables. If ``None``, the lower bounds of all the design variables are returned. as_dict: Whether to return the lower bounds as a dictionary of the form ``{variable_name: variable_lower_bound}``. Returns: The lower bounds of the design variables. """ return self.__get_values( variable_names, as_dict, self._lower_bounds, self.__lower_bounds_array )
@overload def get_upper_bounds( self, variable_names: Sequence[str] | None = None, as_dict: Literal[False] = False, ) -> ndarray: ... @overload def get_upper_bounds( self, variable_names: Sequence[str] | None = None, as_dict: Literal[True] = False, ) -> dict[str, ndarray]: ...
[docs] def get_upper_bounds( self, variable_names: Sequence[str] | None = None, as_dict: bool = False, ) -> ndarray | dict[str, ndarray]: """Return the upper bounds of design variables. Args: variable_names: The names of the design variables. If ``None``, the upper bounds of all the design variables are returned. as_dict: Whether to return the upper bounds as a dictionary of the form ``{variable_name: variable_upper_bound}``. Returns: The upper bounds of the design variables. """ return self.__get_values( variable_names, as_dict, self._upper_bounds, self.__upper_bounds_array )
@overload def __get_values( self, variable_names: Sequence[str] | None, as_dict: Literal[False], value_as_dict: dict[str, ndarray], value_as_array: ndarray, ) -> ndarray: ... @overload def __get_values( self, variable_names: Sequence[str] | None, as_dict: Literal[True], value_as_dict: dict[str, ndarray], value_as_array: ndarray, ) -> dict[str, ndarray]: ... def __get_values( self, variable_names: Sequence[str] | None, as_dict: bool, value_as_dict: dict[str, ndarray], value_as_array: ndarray, ) -> ndarray | dict[str, ndarray]: """Return the (lower or upper) bounds of design variables. Args: variable_names: The names of the design variables. If ``None``, then the values of all the design variables are returned. as_dict: Whether to return the value as a dictionary of the form ``{variable_name: variable_value}``. value_as_dict: A dictionary of the values of all the design variables. value_as_array: The NumPy array of the values of all the design variables Returns: The bounds of the design variables. """ if self.__norm_data_is_computed and variable_names is None and not as_dict: # The array of all the bounds is up to date return value_as_array if not as_dict: return self.dict_to_array(value_as_dict, variable_names=variable_names) if variable_names is None: return value_as_dict return {name: value_as_dict[name] for name in variable_names}
[docs] def set_lower_bound( self, name: str, lower_bound: ndarray | None, ) -> None: """Set the lower bound of a variable. Args: name: The name of the variable. lower_bound: The value of the lower bound. Raises: ValueError: If the variable does not exist. """ if name not in self.variable_names: msg = f"Variable '{name}' is not known." raise ValueError(msg) self._add_bound(name, self.variable_sizes[name], lower_bound) self._add_norm_policy(name)
[docs] def set_upper_bound( self, name: str, upper_bound: ndarray | None, ) -> None: """Set the upper bound of a variable. Args: name: The name of the variable. upper_bound: The value of the upper bound. Raises: ValueError: If the variable does not exist. """ if name not in self.variable_names: msg = f"Variable '{name}' is not known." raise ValueError(msg) self._add_bound(name, self.variable_sizes[name], upper_bound, is_lower=False) self._add_norm_policy(name)
[docs] def array_to_dict( self, x_array: ndarray, ) -> dict[str, ndarray]: """Convert a design array into a dictionary indexed by the variables names. Args: x_array: A design value expressed as a NumPy array. Returns: The design value expressed as a dictionary of NumPy arrays. """ return split_array_to_dict_of_arrays( x_array, self.variable_sizes, self.variable_names, )
@classmethod def __get_common_dtype( cls, x_dict: Mapping[str, ndarray], ) -> dtype: """Return the common data type. Use the following rules by parsing the values `x_dict`: - there is a complex value: returns `numpy.complex128`, - there are real and mixed float/int values: returns `numpy.float64`, - there are only integer values: returns `numpy.int32`. Args: x_dict: The values to be parsed. Raises: TypeError: If the values of the data dictionary are not NumPy arrays. """ has_float = False has_int = False for val_arr in x_dict.values(): if not isinstance(val_arr, ndarray): msg = "x_dict values must be ndarray." raise TypeError(msg) if val_arr.dtype.kind == "c": return cls.__COMPLEX_DTYPE if val_arr.dtype.kind == "i": has_int = True if val_arr.dtype.kind == "f": has_float = True if has_float: return cls.__FLOAT_DTYPE if has_int: return cls.__INT_DTYPE return cls.__FLOAT_DTYPE
[docs] def dict_to_array( self, design_values: Mapping[str, ndarray], variable_names: Iterable[str] | None = None, ) -> ndarray: """Convert a mapping of design values into a NumPy array. Args: design_values: The mapping of design values. variable_names: The design variables to be considered. If ``None``, consider all the design variables. Returns: The design values as a NumPy array. Notes: The data type of the returned NumPy array is the most general data type of the values of the mapping ``design_values`` corresponding to the keys iterable from ``variables_names``. """ if variable_names is None: variable_names = self.variable_names data = {name: design_values[name] for name in variable_names} return hstack(list(data.values())).astype(self.__get_common_dtype(data))
[docs] def get_pretty_table( self, fields: Sequence[str] | None = None, with_index: bool = False, capitalize: bool = False, simplify: bool = False, ) -> PrettyTable: """Build a tabular view of the design space. Args: fields: The name of the fields to be exported. If ``None``, export all the fields. with_index: Whether to show index of names for arrays. This is ignored for scalars. capitalize: Whether to capitalize the field names and replace ``"_"`` by ``" "``. simplify: Whether to return a simplified tabular view. Returns: A tabular view of the design space. """ if fields is None: fields = self.TABLE_NAMES if capitalize: field_names = [field.capitalize().replace("_", " ") for field in fields] else: field_names = fields table = PrettyTable(field_names) table.float_format = "%.16g" for name in self.variable_names: size = self.variable_sizes[name] l_b = self._lower_bounds.get(name) u_b = self._upper_bounds.get(name) var_type = self.variable_types[name] curr = self.__current_value.get(name) name_template = f"{name}" if with_index and size > 1: name_template += "[{index}]" for i in range(size): data = { "name": name_template.format(name=name, index=i), "value": None, "lower_bound": float("-inf"), "upper_bound": float("inf"), "type": var_type[i], } if l_b is not None and l_b[i] is not None: data["lower_bound"] = l_b[i] if u_b is not None and u_b[i] is not None: data["upper_bound"] = u_b[i] if curr is not None: value = curr[i] # The current value of a float variable can be a complex array # when approximating gradients with complex step. if var_type[i] == "float": value = value.real data["value"] = value table.add_row([data[key] for key in fields]) for name in ("Name", "Type") if capitalize else ("name", "type"): table.align[name] = "l" return table
[docs] def to_hdf( self, file_path: str | Path, append: bool = False, hdf_node_path: str = "", ) -> None: """Export the design space to an HDF file. Args: file_path: The path to the file to export the design space. append: If ``True``, appends the data in the file. hdf_node_path: The path of the HDF node in which the design space should be exported. If empty, the root node is considered. """ mode = "a" if append else "w" with h5py.File(file_path, mode) as h5file: if hdf_node_path: h5file = h5file.require_group(hdf_node_path) design_vars_grp = h5file.require_group(self.DESIGN_SPACE_GROUP) design_vars_grp.create_dataset( self.NAMES_GROUP, data=array(self.variable_names, dtype=bytes_) ) for name in self.variable_names: var_grp = design_vars_grp.require_group(name) var_grp.create_dataset(self.SIZE_GROUP, data=self.variable_sizes[name]) l_b = self._lower_bounds.get(name) if l_b is not None: var_grp.create_dataset(self.LB_GROUP, data=l_b) u_b = self._upper_bounds.get(name) if u_b is not None: var_grp.create_dataset(self.UB_GROUP, data=u_b) var_type = self.variable_types[name] if var_type is not None: data_array = array(var_type, dtype="bytes") var_grp.create_dataset( self.VAR_TYPE_GROUP, data=data_array, dtype=data_array.dtype, ) value = self.__current_value.get(name) if value is not None: var_grp.create_dataset(self.VALUE_GROUP, data=self.__to_real(value))
[docs] @classmethod def from_hdf(cls, file_path: str | Path, hdf_node_path: str = "") -> DesignSpace: """Create a design space from an HDF file. Args: file_path: The path to the HDF file. hdf_node_path: The path of the HDF node from which the database should be imported. If empty, the root node is considered. Returns: The design space defined in the file. """ design_space = cls() with h5py.File(file_path) as h5file: h5file = get_hdf5_group(h5file, hdf_node_path) design_vars_grp = get_hdf5_group(h5file, design_space.DESIGN_SPACE_GROUP) variable_names = get_hdf5_group(design_vars_grp, design_space.NAMES_GROUP) for name in variable_names: name = name.decode() var_group = get_hdf5_group(design_vars_grp, name) l_b = design_space.__read_opt_attr_array( var_group, design_space.LB_GROUP ) u_b = design_space.__read_opt_attr_array( var_group, design_space.UB_GROUP ) var_type = design_space.__read_opt_attr_array( var_group, design_space.VAR_TYPE_GROUP ) value = design_space.__read_opt_attr_array( var_group, design_space.VALUE_GROUP ) size = get_hdf5_group(var_group, design_space.SIZE_GROUP)[()] design_space.add_variable(name, size, var_type, l_b, u_b, value) design_space.check() return design_space
@staticmethod def __read_opt_attr_array( var_group: h5py.Group, dataset_name: str, ) -> ndarray | None: """Read data in a group. Args: var_group: The variable group. dataset_name: The name of the dataset. Returns: The data found in the group, if it exists. Otherwise, None. """ data = var_group.get(dataset_name) if data is not None: data = array(data) return data @staticmethod def __to_real( data: ndarray, ) -> ndarray: """Convert complex to real NumPy array. Args: data: A complex NumPy array. Returns: A real NumPy array. """ return array(array(data, copy=False).real, dtype=float64)
[docs] def to_complex(self) -> None: """Cast the current value to complex.""" for name, val in self.__current_value.items(): self.__current_value[name] = array(val, dtype=complex128) self.__update_common_dtype()
[docs] @classmethod def from_file( cls, file_path: str | Path, hdf_node_path: str = "", **options: Any ) -> DesignSpace: """Create a design space from a file. Args: file_path: The path to the file. If the extension starts with `"hdf"`, the file will be considered as an HDF file. hdf_node_path: The path of the HDF node from which the database should be imported. If empty, the root node is considered. **options: The keyword reading options. Returns: The design space defined in the file. """ if h5py.is_hdf5(file_path): return cls.from_hdf(file_path, hdf_node_path) return cls.from_csv(file_path, **options)
[docs] def to_file(self, file_path: str | Path, **options) -> None: """Save the design space. Args: file_path: The file path to save the design space. If the extension starts with `"hdf"`, the design space will be saved in an HDF file. **options: The keyword reading options. """ file_path = Path(file_path) if file_path.suffix.startswith((".hdf", ".h5")): self.to_hdf(file_path, append=options.get("append", False)) else: self.to_csv(file_path, **options)
[docs] def to_csv( self, output_file: str | Path, fields: Sequence[str] | None = None, header_char: str = "", **table_options: Any, ) -> None: """Export the design space to a CSV file. Args: output_file: The path to the file. fields: The fields to be exported. If ``None``, export all fields. header_char: The header character. **table_options: The names and values of additional attributes for the :class:`.PrettyTable` view generated by :meth:`.DesignSpace.get_pretty_table`. """ output_file = Path(output_file) table = self.get_pretty_table(fields=fields) table.border = False for option, val in table_options.items(): table.__setattr__(option, val) with output_file.open("w") as outf: table_str = header_char + table.get_string() outf.write(table_str)
[docs] @classmethod def from_csv( cls, file_path: str | Path, header: Iterable[str] | None = None ) -> DesignSpace: """Create a design space from a CSV file. Args: file_path: The path to the CSV file. header: The names of the fields saved in the file. If ``None``, read them in the file. Returns: The design space defined in the file. Raises: ValueError: If the file does not contain the minimal variables in its header. """ design_space = cls() float_data = genfromtxt(file_path, dtype="float") str_data = genfromtxt(file_path, dtype="str") if header is None: header = str_data[0, :].tolist() start_read = 1 else: start_read = 0 if not set(cls.MINIMAL_FIELDS).issubset(set(header)): msg = ( f"Malformed DesignSpace input file {file_path} does not contain " "minimal variables in header:" f"{cls.MINIMAL_FIELDS}; got instead: {header}." ) raise ValueError(msg) col_map = {field: i for i, field in enumerate(header)} var_names = str_data[start_read:, 0].tolist() unique_names = [] prev_name = None for name in var_names: # set([]) does not preserve order ! if name not in unique_names: unique_names.append(name) prev_name = name elif prev_name != name: msg = ( f"Malformed DesignSpace input file {file_path} contains some " f"variables ({file_path}) in a non-consecutive order." ) raise ValueError(msg) k = start_read lower_bounds_field = cls.MINIMAL_FIELDS[1] upper_bounds_field = cls.MINIMAL_FIELDS[2] value_field = cls.TABLE_NAMES[2] var_type_field = cls.TABLE_NAMES[-1] for name in unique_names: size = var_names.count(name) l_b = float_data[k : k + size, col_map[lower_bounds_field]] u_b = float_data[k : k + size, col_map[upper_bounds_field]] if value_field in col_map: value = float_data[k : k + size, col_map[value_field]] if "None" in str_data[k : k + size, col_map[value_field]]: value = None else: value = None if var_type_field in col_map: var_type = str_data[k : k + size, col_map[var_type_field]].tolist() else: var_type = cls.DesignVariableType.FLOAT design_space.add_variable(name, size, var_type, l_b, u_b, value) k += size design_space.check() return design_space
def _get_string_representation( self, use_html: bool, title: str = "", simplify: bool = False ) -> str: """Return the string representation of the design space. Args: use_html: Whether the string representation is HTML code. title: The title of the table. If empty, use the name of the class. simplify: Whether to return a simplified string representation. Returns: The string representation of the design space. """ if not title: title = " ".join( self.__CAMEL_CASE_REGEX.findall(self.__class__.__name__) ).lower() title = title.capitalize() post_title = ": " if self.name else ":" new_line = "<br/>" if use_html else "\n" pretty_table = self.get_pretty_table( with_index=True, capitalize=True, simplify=simplify ) pretty_table_method = "get_html_string" if use_html else "get_string" table = getattr(pretty_table, pretty_table_method)() return f"{title}{post_title}{self.name}{new_line}{table}" def __repr__(self) -> str: return self._get_string_representation(False) def __str__(self) -> str: return self._get_string_representation(False, simplify=True) def _repr_html_(self) -> str: return REPR_HTML_WRAPPER.format(self._get_string_representation(True))
[docs] def project_into_bounds( self, x_c: ndarray, normalized: bool = False, ) -> ndarray: """Project a vector onto the bounds, using a simple coordinate wise approach. Args: normalized: If ``True``, then the vector is assumed to be normalized. x_c: The vector to be projected onto the bounds. Returns: The projected vector. """ if not self.__norm_data_is_computed: self.__update_normalization_vars() if not normalized: l_b = self.__lower_bounds_array u_b = self.__upper_bounds_array else: l_b = zeros_like(x_c) u_b = ones_like(x_c) x_p = array(x_c) l_inds = (x_c < l_b).nonzero() x_p[l_inds] = l_b[l_inds] u_inds = (x_c > u_b).nonzero() x_p[u_inds] = u_b[u_inds] return x_p
def __contains__( self, variable: str, ) -> bool: return variable in self.variable_names def __len__(self) -> int: return len(self.variable_names) def __iter__(self) -> Iterable[str]: return iter(self.variable_names) def __setitem__( self, name: str, item: DesignVariable, ) -> None: self.add_variable( name, size=item.size, var_type=item.var_type, l_b=item.l_b, u_b=item.u_b, value=item.value, ) def __eq__( self, other: DesignSpace, ) -> bool: if not isinstance(other, self.__class__): return False if len(other) != len(self): return False for key, val in self.items(): if key not in other: return False hash1 = hash_data_dict(flatten_nested_dict(val._asdict())) hash2 = hash_data_dict(flatten_nested_dict(other[key]._asdict())) if hash1 != hash2: return False return True def __getitem__( self, name: str, ) -> DesignVariable: """Return the data associated with a given variable. These data are: type, size, lower bound, upper bound and current value. Args: name: The name of the variable. Returns: The data associated with the variable. Raises: ValueError: If the variable name does not exist. """ if name not in self.variable_names: msg = f"Variable '{name}' is not known." raise KeyError(msg) try: value = self.get_current_value([name]) except KeyError: value = None return self.DesignVariable( size=self.get_size(name), var_type=self.get_type(name), l_b=self.get_lower_bound(name), u_b=self.get_upper_bound(name), value=value, )
[docs] def extend( self, other: DesignSpace, ) -> None: """Extend the design space with another design space. Args: other: The design space to be appended to the current one. """ for name, variable in other.items(): self.add_variable( name, variable.size, variable.var_type, variable.l_b, variable.u_b, variable.value, )
@staticmethod def __cast_array_to_list( value: str | int | ndarray, ) -> str | int | list[str | int]: """Convert a value to a ``List`` if it is a NumPy array. Args: value: The value to be cast. Returns: Either the original value or the NumPy array converted to a ``List``. """ return value if not isinstance(value, ndarray) else value.tolist() @classmethod def __cast_mapping( cls, mapping: Mapping[str, str | int | ndarray], ) -> dict[str, str | int | list[str | int]]: """Convert the NumPy arrays of a mapping to ``List``. Args: mapping: The value to be cast. Returns: The original mapping with NumPy values converted to a ``List``. """ return { key: { sub_key: cls.__cast_array_to_list(sub_val) for sub_key, sub_val in val.items() } for key, val in mapping.items() }
[docs] def rename_variable( self, current_name: str, new_name: str, ) -> None: """Rename a variable. Args: current_name: The name of the variable to rename. new_name: The new name of the variable. """ if current_name not in self.variable_names: msg = f"The variable {current_name} is not in the design space." raise ValueError(msg) self.variable_names[self.variable_names.index(current_name)] = new_name for dictionary in [ self.variable_sizes, self.variable_types, self.normalize, self._lower_bounds, self._upper_bounds, ]: dictionary[new_name] = dictionary.pop(current_name) current_value = self._current_value.pop(current_name, None) if current_value is not None: self._current_value[new_name] = current_value
[docs] def initialize_missing_current_values(self) -> None: """Initialize the current values of the design variables when missing. Use: - the center of the design space when the lower and upper bounds are finite, - the lower bounds when the upper bounds are infinite, - the upper bounds when the lower bounds are infinite, - zero when the lower and upper bounds are infinite. """ for name, value in self.items(): if value.value is not None: continue current_value = [] for l_b_i, u_b_i in zip(value.l_b, value.u_b): if l_b_i == -inf: current_value_i = 0 if u_b_i == inf else u_b_i else: current_value_i = l_b_i if u_b_i == inf else (l_b_i + u_b_i) / 2 current_value.append(current_value_i) if self.DesignVariableType.FLOAT in value.var_type: var_type = self.DesignVariableType.FLOAT else: var_type = self.DesignVariableType.INTEGER self.set_current_variable( name, array( current_value, dtype=self.__VARIABLE_TYPES_TO_DTYPES[var_type], ), )