# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Francois Gallard, Matthias De Lozzo
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""
Caching module to avoid multiple evaluations of a discipline
************************************************************
"""
from __future__ import division, unicode_literals
import logging
from os.path import exists
from typing import Union
import h5py
from numpy import append, array, bytes_, unicode_
from six import with_metaclass
from gemseo.core.cache import AbstractCache, AbstractFullCache, hash_data_dict, to_real
from gemseo.utils.data_conversion import DataConversion
from gemseo.utils.locks import synchronized
from gemseo.utils.multi_processing import RLock
from gemseo.utils.py23_compat import PY2, Path, long, string_array, string_dtype
from gemseo.utils.singleton import SingleInstancePerFileAttribute
LOGGER = logging.getLogger(__name__)
[docs]class HDF5Cache(AbstractFullCache):
"""Cache using disk HDF5 file to store the data."""
def __init__(self, hdf_file_path, hdf_node_path, tolerance=0.0, name=None):
"""Initialize a singleton to access a HDF file. This singleton is used for
multithreaded/multiprocessing access with a Lock.
Initialize cache tolerance.
By default, don't use approximate cache.
It is up to the user to choose to optimize CPU time with this or not
could be something like 2 * finfo(float).eps
Parameters
----------
hdf_file_path : str
Path of the HDF file.
hdf_node_path : str
Node of the HDF file.
tolerance : float
Tolerance that defines if two input vectors
are equal and cached data shall be returned.
If 0, no approximation is made. Default: 0.
name : str
Name of the cache.
Examples
--------
>>> from gemseo.caches.hdf5_cache import HDF5Cache
>>> cache = HDF5Cache('my_cache.h5', 'my_node')
"""
if not name:
name = hdf_node_path
self.__file_path = hdf_file_path
self._hdf_file = HDF5FileSingleton(hdf_file_path)
self.__hdf_node_path = hdf_node_path
super(HDF5Cache, self).__init__(tolerance, name)
self._read_hashes()
def __str__(self):
msg = super(HDF5Cache, self).__str__()
msg += "\n" + "HDF file path " + str(self.__file_path) + "\n"
msg += "HDF node path " + str(self.__hdf_node_path)
return msg
def _duplicate_from_scratch(self):
return HDF5Cache(
"inc_" + self.__file_path, self.__hdf_node_path, self.tolerance, self.name
)
def _set_lock(self):
"""Set a lock for multithreading, either from an external object or internally
by using RLock()."""
return self._hdf_file.lock
@synchronized
def _read_hashes(self):
"""Read the hashes dict in the HDF file."""
max_grp = self._hdf_file.read_hashes(self._hashes, self.__hdf_node_path)
self._last_accessed_group.value = max_grp
self._max_group.value = max_grp
n_hash = len(self._hashes)
if n_hash > 0:
msg = "Found %s entries in the cache file : %s node : %s"
LOGGER.info(msg, n_hash, self.__file_path, self.__hdf_node_path)
def _has_group(self, sample_id, var_group):
"""Check if the dataset has the particular variables group filled in.
:param int sample_id: sample ID.
:param str var_group: name of the variables group.
:return: True if the variables group is filled in.
:rtype: bool
"""
return self._hdf_file.has_group(sample_id, var_group, self.__hdf_node_path)
[docs] @synchronized
def clear(self):
"""Clear the cache.
Examples
--------
>>> from gemseo.caches.hdf5_cache import HDF5Cache
>>> from numpy import array
>>> cache = HDF5Cache('my_cache.h5', 'my_node')
>>> for index in range(5):
>>> data = {'x': array([1.])*index, 'y': array([.2])*index}
>>> cache.cache_outputs(data, ['x'], data, ['y'])
>>> cache.get_length()
5
>>> cache.clear()
>>> cache.get_length()
0
"""
super(HDF5Cache, self).clear()
self._hdf_file.clear(self.__hdf_node_path)
def _read_data(self, group_number, group_name, h5_open_file=None):
"""Read data from an HDF file.
:param str group_name: name of the group where data is written.
:param int group_number: number of the group.
:param h5_open_file: eventually the already opened file.
this improves performance but is incompatible with
multiprocess/treading
:returns: data dict and jacobian
"""
result = self._hdf_file.read_data(
group_number, group_name, self.__hdf_node_path, h5_open_file=h5_open_file
)[0]
if group_name == self.JACOBIAN_GROUP and result is not None:
result = DataConversion.dict_to_jac_dict(result)
return result
def _write_data(self, values, names, var_group, sample_id):
"""Write data associated with a variables group and a sample ID into the
dataset.
:param dict values: data dictionary where keys are variables names
and values are variables values (numpy arrays).
:param list(str) names: list of input data names to write.
:param str var_group: name of the variables group,
either AbstractCache.INPUTS_GROUP, AbstractCache.OUTPUTS_GROUP or
AbstractCache.JACOBIAN_GROUP.
:param int sample_id: sample ID.
"""
self._hdf_file.write_data(
values,
names,
var_group,
group_num=sample_id,
hdf_node_path=self.__hdf_node_path,
)
[docs] @synchronized
def get_data(self, index, **options):
"""Gets the data associated to a sample ID.
:param str index: sample ID.
:param options: options passed to the _read_data() method.
:return: input data, output data and jacobian.
:rtype: dict
"""
with h5py.File(self._hdf_file.hdf_file_path, "a") as h5file:
datum = super(HDF5Cache, self).get_data(index, h5_open_file=h5file)
return datum
@synchronized
def _get_all_data(self):
"""Same as _all_data() but first open a file, then pass the opened file to the
generator and lastly, close the file."""
with h5py.File(self._hdf_file.hdf_file_path, "a") as h5file:
for data in self._all_data(h5_open_file=h5file):
yield data
[docs]class HDF5FileSingleton(with_metaclass(SingleInstancePerFileAttribute, object)):
"""Singleton to access a HDF file Used for multithreaded/multiprocessing access with
a Lock."""
# We create a single instance of cache per HDF5 file
# to allow the multiprocessing lock to work
# Ie we want a single lock even if we instantiate multiple
# caches
HASH_TAG = "hash"
INPUTS_GROUP = AbstractCache.INPUTS_GROUP
OUTPUTS_GROUP = AbstractCache.OUTPUTS_GROUP
JACOBIAN_GROUP = AbstractCache.JACOBIAN_GROUP
FILE_FORMAT_VERSION = 1
def __init__(self, hdf_file_path):
"""Constructor.
:param hdf_file_path: path to the HDF5 file
"""
self.hdf_file_path = hdf_file_path
self.__check_file_format_version()
# Attach the lock to the file and NOT the Cache because it is a singleton.
self.lock = RLock()
[docs] def write_data(
self, data, data_names, group_name, group_num, hdf_node_path, h5_open_file=None
):
"""Cache input data to avoid re evaluation.
:param data: the data to cache
:param data_names: list of data names
:param group_name: inputs or outputs or jacobian group
:param hdf_node_path: name of the main HDF group
:param h5_open_file: eventually the already opened file.
this improves performance but is incompatible with
multiprocess/treading
"""
if h5_open_file is None:
h5_file = h5py.File(self.hdf_file_path, "a")
else:
h5_file = h5_open_file
if not len(h5_file):
self.__set_file_format_version(h5_file)
node_group = h5_file.require_group(hdf_node_path)
num_group = node_group.require_group(str(group_num))
io_group = num_group.require_group(group_name)
try:
# Write hash if needed
if num_group.get(self.HASH_TAG) is None:
data_hash = string_array([hash_data_dict(data, data_names)])
num_group.create_dataset(self.HASH_TAG, data=data_hash)
for data_name in data_names:
val = data.get(data_name)
if val is not None:
if val.dtype.type is unicode_:
io_group.create_dataset(
data_name, data=val.astype(string_dtype)
)
else:
io_group.create_dataset(data_name, data=to_real(val))
# IOError and RuntimeError are for python 2.7
except (RuntimeError, IOError, ValueError):
h5_file.close()
raise RuntimeError(
"Failed to cache dataset %s.%s.%s in file: %s",
hdf_node_path,
group_num,
group_name,
self.hdf_file_path,
)
if h5_open_file is None:
h5_file.close()
[docs] def read_data(self, group_number, group_name, hdf_node_path, h5_open_file=None):
"""Read a data dict in the hdf.
:param group_name: name of the group where data is written
:param group_number: number of the group
:param hdf_node_path: name of the main HDF group
:returns: data dict and jacobian
:param h5_open_file: eventually the already opened file.
this improves performance but is incompatible with
multiprocess/treading
"""
if h5_open_file is None:
h5_file = h5py.File(self.hdf_file_path, "r")
else:
h5_file = h5_open_file
node = h5_file[hdf_node_path]
if not self._has_group(group_number, group_name, hdf_node_path, h5_file):
return None, None
number_dataset = node[str(group_number)]
values_group = number_dataset[group_name]
data = {key: array(val) for key, val in values_group.items()}
if group_name == self.INPUTS_GROUP:
read_hash = number_dataset[self.HASH_TAG]
data_hash = long(array(read_hash)[0])
else:
data_hash = None
if h5_open_file is None:
h5_file.close()
##########################################
# Python key.encode("ascii")
##########################################
if PY2:
data = {key.encode("ascii"): val for key, val in data.items()}
for key, val in data.items():
if val.dtype.type is bytes_:
data[key] = val.astype(unicode_)
return data, data_hash
@staticmethod
def _has_group(group_number, group_name, hdf_node_path, h5file):
"""Check if a group is present in the HDF file.
:param group_name: name of the group where data is written
:param group_number: number of the group
:param hdf_node_path: name of the main HDF group
:param h5file: the opened HDF File pointer
:returns: True if the group exists
"""
node = h5file[hdf_node_path]
number_dataset = node.get(str(group_number))
if number_dataset is None:
return False
values_group = number_dataset.get(group_name)
if values_group is None:
return False
return True
[docs] def has_group(self, group_number, group_name, hdf_node_path):
"""Check if a group is present in the HDF file.
:param group_name: name of the group where data is written
:param group_number: number of the group
:param hdf_node_path: name of the main HDF group
:returns: True if the group exists
"""
with h5py.File(self.hdf_file_path, "r") as h5file:
has_grp = self._has_group(group_number, group_name, hdf_node_path, h5file)
return has_grp
[docs] def read_hashes(self, hashes_dict, hdf_node_path):
"""Read the hashes in the HDF file.
:param hashes_dict: dict of hashes to fill
:param hdf_node_path: name of the main HDF group
:return: max_group
"""
if not exists(self.hdf_file_path):
return 0
# We must lock so that no data is added to the cache meanwhile
with h5py.File(self.hdf_file_path, "r") as h5file:
node_group = h5file.get(hdf_node_path)
if node_group is None:
return 0
max_group = 0
for group_num, group in node_group.items():
group_num = int(group_num)
read_hash = group[self.HASH_TAG]
hash_value = long(array(read_hash)[0])
get_hash = hashes_dict.get(hash_value)
if get_hash is None:
hashes_dict[hash_value] = array([group_num])
else:
hashes_dict[hash_value] = append(get_hash, array([group_num]))
max_group = max(max_group, group_num)
return max_group
[docs] def clear(self, hdf_node_path):
"""Clear the data in the cache.
:param hdf_node_path: node path to clear
"""
with h5py.File(self.hdf_file_path, "a") as h5file:
del h5file[hdf_node_path]
def __check_file_format_version(self): # type: (...) -> None
"""Make sure the file can be handled.
Raises
ValueError: If the version of the file format is missing or
greater than the current one.
"""
if not Path(self.hdf_file_path).exists():
return
h5_file = h5py.File(self.hdf_file_path, "r")
if not len(h5_file):
h5_file.close()
return
version = h5_file.attrs.get("version")
h5_file.close()
if version is None:
raise ValueError(
"The file {} cannot be used because it has no file format version: "
"see HDFCache.update_file_format to convert it.".format(
self.hdf_file_path
)
)
if version > self.FILE_FORMAT_VERSION:
raise ValueError(
"The file {} cannot be used because its file format version is {} "
"while the expected version is {}: "
"see HDFCache.update_file_format to convert it.".format(
self.hdf_file_path, version, self.FILE_FORMAT_VERSION
)
)
@classmethod
def __set_file_format_version(
cls,
h5_file, # type: h5py.File
): # type: (...) -> None
"""Change the version of an HDF5 file to :attr:`.FILE_FORMAT_VERSION`.
Args:
h5_file: A HDF5 file object.
"""
h5_file.attrs["version"] = cls.FILE_FORMAT_VERSION