"""A database of function calls and design variables."""
from __future__ import annotations
import sys
from ast import literal_eval
from collections.abc import Iterable
from collections.abc import Iterator
from collections.abc import Mapping
from itertools import chain
from itertools import islice
from typing import TYPE_CHECKING
from typing import Callable
from typing import ClassVar
from typing import Union
from xml.etree.ElementTree import parse as parse_element
from numpy import array
from numpy import atleast_1d
from numpy import atleast_2d
from numpy import hstack
from numpy import ndarray
from numpy.linalg import norm
from gemseo.algos._hdf_database import HDFDatabase
from gemseo.algos.hashable_ndarray import HashableNdarray
from gemseo.utils.ggobi_export import save_data_arrays_to_xml
from gemseo.utils.string_tools import pretty_repr
from gemseo.utils.string_tools import repr_variable
from numbers import Number
from pathlib import Path
from numpy.typing import NDArray
DatabaseKeyType = Union[ndarray, HashableNdarray]
"""The type of a :class:`.Database` key."""
FunctionOutputValueType = Union[float, ndarray, list[int]]
"""The type of a function output value stored in a :class:`.Database`."""
DatabaseValueType = Mapping[str, FunctionOutputValueType]
"""The type of a :class:`.Database` value."""
ListenerType = Callable[[DatabaseKeyType], None]
"""The type of a listener attached to an :class:`.Database`."""
class Database(Mapping):
"""Storage of :class:`.MDOFunction` evaluations.
A :class:`.Database` is typically attached to an :class:`.OptimizationProblem`
to store the evaluations of its objective, constraints and observables.
a :class:`.Database` can be an optimization history
or a collection of samples in the case of a DOE.
It is useful when simulations are costly
because it avoids re-evaluating functions
at points where they have already been evaluated.
.. seealso:: :class:`.NormDBFunction`
It can also be post-processed by an :class:`.OptPostProcessor`
to visualize its content,
e.g. :class:`.OptHistoryView` generating a series of graphs
to visualize the histories of the objective, constraints and design variables.
A :class:`.Database` can be saved to an HDF file
for portability and cold post-processing
with its method :meth:`.to_hdf`.
A database can also be initialized from an HDF file
as ``database = Database.from_hdf(file_path)``.
.. note::
Saving an :class:`.OptimizationProblem` to an HDF file
using its method :class:`~.OptimizationProblem.to_hdf`
also saves its :class:`.Database`.
The database is based on a two-level dictionary-like mapping such as
``{x: {output_name: output_value, ...}, ...}`` with:
* ``x``: the input value as an :class:`.HashableNdarray`
wrapping a NumPy array that can be accessed as ``x.array``;
if the types of the input variables are different,
then they are promoted to the unique type that can represent all them,
for instance integer would be promoted to float;
* ``output_name``: either the name of the function
that has been evaluated at ``x_vect``,
the name of its gradient
(the gradient of a function called ``"f"`` is typically denoted as ``"@f"``)
and any additional information related to the methods which use the database;
* ``outputs``: the output value,
typically a float or a 1D-array for a function output,
a 1D- or 2D-array for a gradient
or a list for the iteration.
name: str
"""The name of the database."""
MISSING_VALUE_TAG: ClassVar[str] = "NA"
"""The tag for a missing value."""
GRAD_TAG: ClassVar[str] = "@"
"""The tag prefixing a function name to make it a gradient name.
E.g. ``"@f"`` is the name of the gradient of ``"f"`` when ``GRAD_TAG == "@"``.
__data: dict[HashableNdarray, DatabaseValueType]
"""The input values bound to the output values."""
__store_listeners: list[ListenerType]
"""The functions to be called when an item is stored to the database."""
__new_iter_listeners: list[ListenerType]
"""The functions to be called when a new iteration is stored to the database."""
__hdf_database: HDFDatabase
"""The handler to export the database to a HDF file."""
def __init__(
name: str = "",
) -> None:
name: The name to be given to the database.
If empty, use the class name.
""" # noqa: D205, D212, D415
self.name = name or self.__class__.__name__
self.__data = {}
self.__store_listeners = []
self.__new_iter_listeners = []
self.__hdf_database = HDFDatabase()
def last_item(self) -> DatabaseValueType:
"""The last item of the database."""
return next(reversed(self.__data.values()))
def get_hashable_ndarray(
original_array: DatabaseKeyType,
copy: bool = False,
) -> HashableNdarray:
"""Convert an array to a hashable array.
This hashable array basically represents a key of the database.
original_array: An array.
copy: Whether to copy the original array.
A hashable array wrapping the original array.
KeyError: If the original array is
neither an array nor a :class:`.HashableNdarray`.
if isinstance(original_array, ndarray):
return HashableNdarray(original_array, copy=copy)
if isinstance(original_array, HashableNdarray):
if copy:
return original_array
msg = (
"A database key must be either a NumPy array of a HashableNdarray; "
f"got {type(original_array)} instead."
raise KeyError(msg)
def __getitem__(self, x_vect: DatabaseKeyType) -> DatabaseValueType | None:
return self.__data[self.get_hashable_ndarray(x_vect)]
def __iter__(self) -> Iterator[HashableNdarray]:
return iter(self.__data)
def __len__(self) -> int:
return len(self.__data)
def __delitem__(self, x_vect: DatabaseKeyType) -> None:
del self.__data[self.get_hashable_ndarray(x_vect)]
def n_iterations(self) -> int:
"""The number of iterations.
This is the number of entries in the database.
return len(self)
def clear(self) -> None:
"""Clear the database."""
def clear_from_iteration(self, iteration: int) -> None:
"""Delete the items after a given iteration.
iteration: An iteration between 1 and the number of iterations;
it can also be a negative integer if counting from the last iteration
(e.g. -2 for the penultimate iteration).
iteration_index = self.__get_index(iteration)
for index, x in enumerate(tuple(self.__data.keys())):
if index > iteration_index:
del self.__data[x]
def remove_empty_entries(self) -> None:
"""Remove the entries that do not have output values."""
for x, outputs in tuple(self.items()):
if not outputs:
del self.__data[x]
def filter(self, output_names: Iterable[str]) -> None: # noqa: A003
"""Keep only some outputs and remove the other ones.
output_names: The names of the outputs that must be kept.
output_names = set(output_names)
for output_names_to_values in self.values():
for function_name in output_names_to_values.keys() - output_names:
del output_names_to_values[function_name]
def get_last_n_x_vect(self, n: int) -> list[ndarray]:
"""Return the last ``n`` input values.
n: The number of last iterations to be considered.
The last ``n`` input value.
ValueError: If the number ``n`` is higher than the number of iterations.
n_iterations = len(self)
if n > n_iterations:
msg = (
f"The number of last iterations ({n}) is greater "
f"than the number of iterations ({n_iterations})."
raise ValueError(msg)
return [
for x in islice(self.__data.keys(), n_iterations - n, n_iterations)
def get_x_vect_history(self) -> list[ndarray]:
"""Return the history of the input vector.
The history of the input vector.
return [x.wrapped_array for x in self.__data]
def check_output_history_is_empty(self, output_name: str) -> bool:
"""Check if the history of an output is empty.
output_name: The name of the output.
Whether the history of the output is empty.
return all(output_name not in outputs for outputs in self.values())
def get_function_history(
function_name: str,
with_x_vect: bool = False,
) -> ndarray | tuple[ndarray, ndarray]:
"""Return the history of a function output.
function_name: The name of the function.
with_x_vect: Whether to return also the input history.
The history of the function output, and possibly the input history.
output_history = []
input_history = []
for x, outputs in self.items():
function_value = outputs.get(function_name)
if function_value is not None:
if isinstance(function_value, ndarray) and function_value.size == 1:
function_value = function_value[0]
if with_x_vect:
output_history = array(output_history)
except ValueError:
# For Numpy > 1.24 that no longer automatically handle containers that
# cannot produce an array with a consistent shape.
output_history = array(output_history, dtype=object)
if with_x_vect:
return output_history, array(input_history)
return output_history
def get_gradient_history(
function_name: str,
with_x_vect: bool = False,
) -> ndarray | tuple[ndarray, ndarray]:
"""Return the history of the gradient of a function.
function_name: The name of the function
for which we want the gradient history.
with_x_vect: Whether the input history should be returned as well.
The history of the gradient of the function output,
and possibly the input history.
return self.get_function_history(
def get_iteration(self, x_vect: ndarray) -> int:
"""Return the iteration of an input value in the database.
x_vect: The input value.
The iteration of the input values in the database.
KeyError: If the required input value is not found.
hashed_input_value = HashableNdarray(x_vect)
for index, key in enumerate(self.__data.keys()):
if key == hashed_input_value:
return index + 1
raise KeyError(x_vect)
def get_x_vect(self, iteration: int) -> ndarray:
"""Return the input value at a specified iteration.
iteration: An iteration between 1 and the number of iterations;
it can also be a negative integer if counting from the last iteration
(e.g. -2 for the penultimate iteration).
The input value at this iteration.
iteration_index = self.__get_index(iteration)
# The database dictionary uses the input design variables as keys for the
# function values. Here we convert it to an iterator that returns the
# key located at the required iteration using the islice method from
# itertools.
x = next(islice(iter(self.__data), iteration_index, iteration_index + 1))
return x.wrapped_array
def __get_output(
x_vect_or_iteration: DatabaseKeyType | int,
tolerance: float = 0.0,
) -> DatabaseValueType | None:
r"""Return the output value corresponding to a given input value.
x_vect_or_iteration: An input value
or an iteration between 1 and the number of iterations;
it can also be a negative integer if counting from the last iteration
(e.g. -2 for the penultimate iteration).
tolerance: The relative tolerance :math:`\epsilon`
such that the input value :math:`x` is considered as equal
to the input value :math:`x_{\text{database}}` stored in the database
The output value at the given input value if any, otherwise ``None``.
if isinstance(x_vect_or_iteration, int):
return self.__get_output(self.get_x_vect(x_vect_or_iteration))
x = x_vect_or_iteration
if abs(tolerance) < sys.float_info.epsilon:
return self.get(x)
if isinstance(x, HashableNdarray):
x = x.wrapped_array
for db_input_value, db_output_names_to_values in self.items():
_db_in_value = db_input_value.wrapped_array
if norm(_db_in_value - x) <= tolerance * norm(_db_in_value):
return db_output_names_to_values
return None
def get_function_value(
function_name: str,
x_vect_or_iteration: DatabaseKeyType | int,
tolerance: float = 0.0,
) -> FunctionOutputValueType | None:
r"""Return the output value of a function corresponding to a given input value.
function_name: The name of the required output function.
x_vect_or_iteration: An input value
or an iteration between 1 and the number of iterations;
it can also be a negative integer if counting from the last iteration
(e.g. -2 for the penultimate iteration).
tolerance: The relative tolerance :math:`\epsilon`
such that the input value :math:`x` is considered as equal
to the input value :math:`x_{\text{database}}` stored in the database
The output value of the function at the given input value if any,
otherwise ``None``.
outputs = self.__get_output(x_vect_or_iteration, tolerance)
if outputs:
return outputs.get(function_name)
return None
def store(
x_vect: ndarray,
outputs: DatabaseValueType,
) -> None:
"""Store the output values associated to the input values.
x_vect: The input value.
outputs: The output value corresponding to the input value.
hashed_input_value = self.get_hashable_ndarray(x_vect, True)
stored_outputs = self.get(hashed_input_value)
current_outputs_is_empty = not stored_outputs
if stored_outputs is None:
self.__data[hashed_input_value] = outputs
# No new keys = already computed = new iteration
# otherwise just calls to other functions
if self.__store_listeners:
# Notify the new iteration after storing x
# because callbacks may need an updated x
if self.__new_iter_listeners and outputs and current_outputs_is_empty:
def add_store_listener(self, function: ListenerType) -> bool:
"""Add a function to be called when an item is stored to the database.
function: The function to be called.
Whether the function has been added;
otherwise, it was already attached to the database.
return self.__add_listener(function, self.__store_listeners)
def add_new_iter_listener(self, function: ListenerType) -> bool:
"""Add a function to be called when a new iteration is stored to the database.
function: The function to be called, it must have one argument that is
the current input value.
Whether the function has been added;
otherwise, it was already attached to the database.
return self.__add_listener(function, self.__new_iter_listeners)
def __add_listener(function: ListenerType, listeners: list[ListenerType]) -> bool:
"""Add a function as listener.
function: The function.
listeners: The listeners to which to add the function.
Whether the function has been added;
otherwise, it was already attached to the database.
if function in listeners:
return False
return True
def clear_listeners(
new_iter_listeners: Iterable[ListenerType] | None = (),
store_listeners: Iterable[ListenerType] | None = (),
) -> tuple[Iterable[ListenerType], Iterable[ListenerType]]:
"""Clear all the listeners.
new_iter_listeners: The functions to be removed
that were notified of a new iteration.
If empty, remove all such functions.
If ``None``, keep all these functions.
store_listeners: The functions to be removed
that were notified of a new entry in the database.
If empty, remove all such functions.
If ``None``, keep all these functions.
The listeners that were notified of a new iteration
and the listeners that were notified of a new entry in the database.
if store_listeners is None:
store_listeners = set()
elif store_listeners:
for listener in store_listeners:
store_listeners = self.__store_listeners
self.__store_listeners = []
if new_iter_listeners is None:
return set(), set(store_listeners)
if new_iter_listeners:
for listener in new_iter_listeners:
new_iter_listeners = self.__new_iter_listeners
self.__new_iter_listeners = []
return set(new_iter_listeners), set(store_listeners)
def notify_store_listeners(self, x_vect: DatabaseKeyType | None = None) -> None:
"""Notify the listeners that a new entry was stored in the database.
x_vect: The input value.
If ``None``, use the input value of the last iteration.
self.__notify_listeners(self.__store_listeners, x_vect)
def notify_new_iter_listeners(self, x_vect: DatabaseKeyType | None = None) -> None:
"""Notify the listeners that a new iteration is ongoing.
x_vect: The input value.
If ``None``, use the input value of the last iteration.
self.__notify_listeners(self.__new_iter_listeners, x_vect)
def __notify_listeners(
listeners: set[ListenerType],
x_vect: DatabaseKeyType | None,
) -> None:
"""Notify the listeners.
listeners: The listeners.
x_vect: The input value.
If ``None``, use the input value of the last iteration.
if not listeners:
if isinstance(x_vect, HashableNdarray):
x_vect = x_vect.wrapped_array
elif x_vect is None:
x_vect = self.get_x_vect(-1)
for function in listeners:
def get_function_names(self, skip_grad: bool = True) -> list[str]:
"""Return the names of the outputs contained in the database.
skip_grad: Whether to skip the names of gradient functions.
The names of the outputs in alphabetical order.
output_names = set()
for output_names_to_values in self.__data.values():
for outputs in output_names_to_values:
if skip_grad and outputs.startswith(self.GRAD_TAG):
return sorted(output_names)
def get_history(
function_names: Iterable[str] = (),
add_missing_tag: bool = False,
missing_tag: str | float = MISSING_VALUE_TAG,
) -> tuple[list[list[float | ndarray]], list[ndarray]]:
"""Return the history of the inputs and outputs.
This includes the inputs, functions and gradients.
function_names: The names of functions.
add_missing_tag: Whether to add the tag ``missing_tag``
to the iterations where data are missing.
missing_tag: The tag to represent missing data.
The history of the output values,
then the history of the input values.
ValueError: When a function has no values in the database.
if not function_names:
function_names = self.get_function_names()
all_function_names = set(self.get_function_names(skip_grad=False))
not_function_names = set(function_names) - all_function_names
if not_function_names:
suffix = (
"is not an output name"
if len(not_function_names) == 1
else "are not output names"
msg = (
f"{pretty_repr(not_function_names, use_and=True)} {suffix}; "
f"available ones are "
f"{pretty_repr(all_function_names, use_and=True)}."
raise ValueError(msg)
output_history = []
input_history = []
for x, output_names_to_values in self.items():
output_values = []
for function_name in function_names:
if function_name in output_names_to_values:
elif add_missing_tag:
if output_values:
return output_history, input_history
def to_hdf(
file_path: str | Path = "optimization_history.h5",
append: bool = False,
hdf_node_path: str = "",
) -> None:
"""Export the optimization database to an HDF file.
file_path: The path of the HDF file.
append: Whether to append the data to the file.
hdf_node_path: The path of the HDF node in which
the database should be exported.
If empty, the root node is considered.
self, file_path, append, hdf_node_path=hdf_node_path
def from_hdf(
file_path: str | Path = "optimization_history.h5",
name: str = "",
hdf_node_path: str = "",
) -> Database:
"""Create a database from an HDF file.
file_path: The path of the HDF file.
name: The name of the database.
hdf_node_path: The path of the HDF node from which
the database should be exported.
If empty, the root node is considered.
The database defined in the file.
database = cls(name)
database.update_from_hdf(file_path, hdf_node_path=hdf_node_path)
return database
def update_from_hdf(
file_path: str | Path = "optimization_history.h5",
hdf_node_path: str = "",
) -> None:
"""Update the current database from an HDF file.
file_path: The path of the HDF file.
hdf_node_path: The path of the HDF node from which
the database should be imported.
If empty, the root node is considered.
self, file_path, hdf_node_path=hdf_node_path
def get_history_array(
function_names: Iterable[str] = (),
add_missing_tag: bool = False,
missing_tag: str | float = MISSING_VALUE_TAG,
input_names: str | Iterable[str] = (),
with_x_vect: bool = True,
) -> tuple[NDArray[Number | str], list[str], Iterable[str]]:
"""Return the database as a 2D array shaped as ``(n_iterations, n_features)``.
The features are the outputs of interest and possibly the input variables.
function_names: The names of the functions
whose output values must be returned.
If empty, use all the functions.
input_names: The names of the input variables.
If empty, use :attr:`.input_names`.
add_missing_tag: If ``True``,
add the tag specified in ``missing_tag``
for data that are not available.
missing_tag: The tag that is added for data that are not available.
with_x_vect: If ``True``,
the input variables are returned in the history
as ``np.hstack((get_output_history, x_vect_history))``.
The history as an 2D array
whose rows are observations and columns are the variables,
the names of these columns
and the names of the functions.
f_names = function_names
if not f_names:
f_names = self.get_function_names()
f_history, x_history = self.get_history(f_names, add_missing_tag, missing_tag)
f_flat_names, f_flat_values = self.__split_history(f_history, f_names)
variables_flat_names = f_flat_names
f_history = array(f_flat_values).real
except ValueError:
# For Numpy > 1.24 that no longer automatically handle containers that
# cannot produce an array with a consistent shape.
f_history = array(f_flat_values, dtype=object).real
if with_x_vect:
if not input_names:
x_names = [f"x_{i + 1}" for i in range(len(self))]
elif isinstance(input_names, str):
x_names = [input_names]
x_names = input_names
x_flat_names, x_flat_values = self.__split_history(x_history, x_names)
variables_flat_names = f_flat_names + x_flat_names
x_history = array(x_flat_values).real
variables_history = hstack((f_history, x_history))
variables_history = f_history
return atleast_2d(variables_history), variables_flat_names, f_names
def __split_history(
history: list[list[float | ndarray]] | list[ndarray],
names: Iterable[str],
) -> tuple[list[str], list[list[float]]]:
"""Split a history.
history: A history of values.
names: The names of the variables.
The history as an array whose lines are observations,
the names of the columns of the array.
flat_values = []
names_to_flat_names = {}
for values in history:
flat_value = []
for value, name in zip(values, names):
value = atleast_1d(value)
size = value.size
names_to_flat_names[name] = [
repr_variable(name, i, size) for i in range(size)
return list(chain(*names_to_flat_names.values())), flat_values
def to_ggobi(
function_names: Iterable[str] = (),
file_path: str | Path = "opt_hist.xml",
input_names: str | Iterable[str] = (),
) -> None:
"""Export the database to an XML file for ggobi tool.
function_names: The names of functions.
If empty, use all the functions.
file_path: The path to the XML file.
input_names: The names of the input variables.
If empty, use :attr:`.input_names`.
values_array, variable_names, function_names = self.get_history_array(
function_names=function_names, add_missing_tag=True, input_names=input_names
def update_from_opendace(self, database_file: str | Path) -> None:
"""Update the current database from an opendace XML database.
database_file: The path to an opendace database.
tree = parse_element(database_file)
for link in tree.getroot().iter("link"):
data = {}
for information in link:
for x_ydyddy in information:
data[x_ydyddy.tag] = literal_eval(x_ydyddy.text)
data_reformat = data["y"]
for key, value in data["dy"].items():
data_reformat[self.get_gradient_name(key[1:])] = array(value)
self.store(array(data.pop("x")), data_reformat)
def get_gradient_name(cls, name: str) -> str:
"""Return the name of the gradient related to a function.
This name is the concatenation of a GRAD_TAG, e.g. '@',
and the name of the function, e.g. 'f'.
With this example, the name of the gradient is '@f'.
name: The name of a function.
The name of the gradient based on the name of the function.
return f"{cls.GRAD_TAG}{name}"
def __str__(self) -> str:
return str(self.__data)
def __get_index(self, iteration: int) -> int:
"""Return the index from an iteration.
iteration: The iteration.
The index.
ValueError: If the iteration is out of the possible range of iterations.
len_self = len(self)
if iteration == 0 or not (-len_self <= iteration <= len_self):
msg = (
"The iteration must be within {-N, ..., -1, 1, ..., N} "
f"where N={len_self} is the number of iterations."
raise ValueError(msg)
if iteration > 0:
return iteration - 1
return len_self + iteration