# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Charlie Vanaret
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""Conversion from a NumPy array to a dictionary of NumPy arrays and vice versa."""
from __future__ import division, unicode_literals
import collections
from copy import deepcopy
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Union
from numpy import array, hstack, ndarray, vstack, zeros
if TYPE_CHECKING:
from gemseo.core.discipline import MDODiscipline
[docs]class DataConversion(object):
"""Methods to juggle NumPy arrays and dictionaries of Numpy arrays."""
FLAT_JAC_SEP = "!d$_$d!"
[docs] @staticmethod
def dict_to_array(
data_dict, # type: Mapping[str,ndarray]
data_names, # type: Iterable[str]
): # type: (...) -> ndarray
"""Concatenate some values of a mapping associating values to names.
This allows to convert:
.. code-block:: python
{'x': array([1.])}, 'y': array([2., 3.])}
to:
.. code-block:: python
array([1., 2., 3.])
Args:
data_dict: The mapping to be converted;
it associates values to names.
data_names: The names to be used for the concatenation.
Returns:
The concatenation of the values for the provided names.
"""
if not data_names:
return array([])
return hstack([data_dict[name] for name in data_names])
[docs] @staticmethod
def list_of_dict_to_array(
data_list, # type: Iterable[Mapping[str,Union[ndarray,Mapping[str,ndarray]]]]
data_names, # type: Iterable[str]
group=None, # type: Optional[str]
): # type: (...) -> ndarray
"""Concatenate some values of mappings associating values to names.
The names can be either grouped:
.. code-block:: python
[
{'group1':
{'x': array([3.])},
'group2':
{'y': array([1., 1.])}
},
{'group1':
{'x': array([6.])},
'group2':
{'y': array([2., 2.])}
}
]
or ungrouped:
.. code-block:: python
[
{'x': array([3.]), 'y': array([1., 1.])},
{'x': array([6.]), 'y': array([2., 2.])}
]
For both cases,
if ``data_names=["y", "x"]``,
the returned object will be
.. code-block:: python
array([[1., 1., 3.],
[2., 2., 6.]])
Args:
data_list: The mappings to be converted;
it associates values to names, possibly classified by groups.
data_names: The names to be used for the concatenation.
group: The name of the group to be considered.
If None, the data is assumed to have no group.
Returns:
The concatenation of the values of the passed names.
"""
dict_to_array = DataConversion.dict_to_array
if group is None:
return array([dict_to_array(data, data_names) for data in data_list])
return vstack([dict_to_array(data[group], data_names) for data in data_list])
[docs] @staticmethod
def array_to_dict(
data_array, # type: ndarray
data_names, # type: Iterable[str]
data_sizes, # type: Mapping[str,int]
): # type: (...) -> Dict[str,ndarray]
"""Convert an NumPy array into a dictionary of NumPy arrays indexed by names.
This allows to convert:
.. code-block:: python
array([1., 2., 3.])
to:
.. code-block:: python
{'x': array([1.])}, 'y': array([2., 3.])}
Args:
data_array: The data array to be converted.
data_names: The names to be used as keys of the dictionary.
The data array must contain the values of these names in the same order,
e.g. ``data_array=array([1.,2.])`` and ``data_names=["x","y"]``
implies that ``x=array([1.])`` and ``x=array([2.])``.
data_sizes: The sizes of the variables
e.g. ``data_array=array([1.,2.,3.])``, ``data_names=["x","y"]``
and ``data_sizes={"x":2,"y":1}`` implies that
``x=array([1.,2.])`` and ``x=array([3.])``.
Returns:
The data mapped to the names.
Raises:
ValueError: If the number of dimensions of the data array is greater than 2.
"""
if data_array.ndim > 2:
raise ValueError("Invalid data dimension >2 !")
current_position = 0
array_dict = {}
for data_name in data_names:
array_dict[data_name] = data_array[
..., current_position : current_position + data_sizes[data_name]
]
current_position += data_sizes[data_name]
return array_dict
[docs] @staticmethod
def jac_2dmat_to_dict(
flat_jac, # type: ndarray
outputs, # type: Iterable[str]
inputs, # type: Iterable[str]
data_sizes, # type: Mapping[str,int]
): # type: (...) -> Dict[str,Dict[str,ndarray]]
"""Convert a full Jacobian matrix into elementary Jacobian matrices.
The full Jacobian matrix is passed as a two-dimensional NumPy array.
Its first dimension represents the outputs
and its second one represents the inputs.
Args:
flat_jac: The full Jacobian matrix.
inputs: The names of the inputs.
outputs: The names of the outputs.
data_sizes: The sizes of the inputs and outputs.
Returns:
The Jacobian matrices indexed by the names of the inputs and outputs.
Precisely,
``jac[output][input]`` is a two-dimensional NumPy array
representing the Jacobian matrix
for the input ``input`` and output ``output``,
with the output components in the first dimension
and the output components in the second one.
"""
output_index = 0
jacobian = {}
for output_name in outputs:
output_jacobian = jacobian[output_name] = jacobian[output_name] = {}
output_size = data_sizes[output_name]
input_index = 0
for input_name in inputs:
input_size = data_sizes[input_name]
output_jacobian[input_name] = flat_jac[
output_index : output_index + output_size,
input_index : input_index + input_size,
]
input_index += input_size
output_index += output_size
return jacobian
[docs] @staticmethod
def jac_3dmat_to_dict(
jac, # type: ndarray
outputs, # type: Iterable[str]
inputs, # type: Iterable[str]
data_sizes, # type: Mapping[str,int]
): # type: (...) -> Dict[str,Dict[str,ndarray]]
"""Convert several full Jacobian matrices into elementary Jacobian matrices.
The full Jacobian matrices are passed as a three-dimensional NumPy array.
Its first dimension represents the different full Jacobian matrices,
its second dimension represents the outputs
and its third one represents the inputs.
Args:
jac: The full Jacobian matrices.
inputs: The names of the inputs.
outputs: The names of the outputs.
data_sizes: The sizes of the inputs and outputs.
Returns:
The Jacobian matrices indexed by the names of the inputs and outputs.
Precisely,
``jac[output][input]`` is a three-dimensional NumPy array
where ``jac[output][input][i]`` represents the ``i``-th Jacobian matrix
for the input ``input`` and output ``output``,
with the output components in the first dimension
and the output components in the second one.
"""
output_index = 0
jacobian = {}
for output_name in outputs:
output_jacobian = jacobian[output_name] = {}
output_size = data_sizes[output_name]
input_index = 0
for input_name in inputs:
input_size = data_sizes[input_name]
output_jacobian[input_name] = jac[
:,
output_index : output_index + output_size,
input_index : input_index + input_size,
]
input_index += input_size
output_index += output_size
return jacobian
[docs] @staticmethod
def dict_jac_to_2dmat(
jac_dict, # type: Mapping[str,Mapping[str,ndarray]]
outputs, # type: Iterable[str]
inputs, # type: Iterable[str]
data_sizes, # type: Mapping[str,int]
): # type: (...) -> ndarray
"""Convert elementary Jacobian matrices into a full Jacobian matrix.
Args:
jac_dict: The elementary Jacobian matrices
indexed by the names of the inputs and outputs.
inputs: The names of the inputs.
outputs: The names of the outputs.
data_sizes: The sizes of the inputs and outputs.
Returns:
The full Jacobian matrix
whose first dimension represents the outputs
and the second one represents the inputs,
both preserving the order of variables passed as arguments.
"""
n_outputs = sum((data_sizes[output_name] for output_name in outputs))
n_inputs = sum((data_sizes[input_name] for input_name in inputs))
flat_jac = zeros((n_outputs, n_inputs))
output_index = 0
for output_name in outputs:
output_jac_dict = jac_dict[output_name]
output_size = data_sizes[output_name]
input_index = 0
for input_name in inputs:
input_size = data_sizes[input_name]
flat_jac[
output_index : output_index + output_size,
input_index : input_index + input_size,
] = output_jac_dict[input_name]
input_index += input_size
output_index += output_size
return flat_jac
[docs] @staticmethod
def dict_jac_to_dict(
jac_dict, # type: Mapping[str,Mapping[str,ndarray]]
): # type: (...) -> Dict[str,ndarray]
"""Reindex a mapping of elementary Jacobian matrices by Jacobian names.
A Jacobian name is built with the method :meth:`.flat_jac_name`
from the input and output names.
Args:
jac_dict: The elementary Jacobian matrices
indexed by input and output names.
Returns:
The elementary Jacobian matrices index by Jacobian names.
"""
jacobian = {}
for output_name, jac_dict_loc in jac_dict.items():
for input_name, jac_value in jac_dict_loc.items():
jac_name = DataConversion.flat_jac_name(output_name, input_name)
jacobian[jac_name] = jac_value
return jacobian
[docs] @staticmethod
def flat_jac_name(
out_name, # type: str
inpt_name, # type: str
): # type: (...) -> str
"""Concatenate the name of the output and input, with a separator.
Args:
out_name: The name of the output.
inpt_name: The name of the input.
Returns:
The name of the output concatenated with the name of the input.
"""
return out_name + DataConversion.FLAT_JAC_SEP + inpt_name
[docs] @staticmethod
def dict_to_jac_dict(
flat_jac_dict, # type:Mapping[str,ndarray]
): # type: (...) -> Mapping[str,Mapping[str,ndarray]]
"""Reindex a mapping of elementary Jacobian matrices by input and output names.
Args:
flat_jac_dict: The elementary Jacobian matrices index by Jacobian names.
A Jacobian name is built with the method :meth:`.flat_jac_name`
from the input and output names.
Returns:
The elementary Jacobian matrices index by input and output names.
"""
jac_names = [
jac_name.split(DataConversion.FLAT_JAC_SEP) for jac_name in flat_jac_dict
]
output_names = set(jac_name[0] for jac_name in jac_names)
input_names = set(jac_name[1] for jac_name in jac_names)
jacobian = {}
for output_name in output_names:
output_jacobian = jacobian[output_name] = {}
for input_name in input_names:
jac_name = DataConversion.flat_jac_name(output_name, input_name)
output_jacobian[input_name] = flat_jac_dict[jac_name]
return jacobian
[docs] @staticmethod
def update_dict_from_array(
reference_input_data, # type: Mapping[str,ndarray]
data_names, # type: Iterable[str]
values_array, # type: ndarray
): # type: (...) -> Dict[str,ndarray]
"""Update a data mapping from data array and names.
The order of the data in the array follows the order of the data names.
Args:
reference_input_data: The reference data to be updated.
data_names: The names for which to update the data.
values_array: The data with which to update the reference one.
Returns:
The updated data mapping.
Raises:
TypeError: If the data with which to update the reference one
is not a NumPy array.
ValueError:
* If a name for which to update the data is missing
from the reference data.
* If the size of the data with which to update the reference one
is inconsistent with the reference data.
"""
if not isinstance(values_array, ndarray):
raise TypeError(
"Values array must be a numpy.ndarray, "
"got instead: {}.".format(type(values_array))
)
data = dict(deepcopy(reference_input_data))
if not data_names:
return data
i_min = i_max = 0
for data_name in data_names:
data_value = reference_input_data.get(data_name)
if data_value is None:
raise ValueError(
"Reference data has no item named: {}.".format(data_name)
)
i_max = i_min + data_value.size
if len(values_array) < i_max:
raise ValueError(
"Inconsistent input array size of values array {} "
"with reference data shape {} "
"for data named: {}.".format(
values_array, data_value.shape, data_name
)
)
data[data_name] = values_array[i_min:i_max].reshape(data_value.shape)
data[data_name] = data[data_name].astype(data_value.dtype)
i_min = i_max
if i_max != values_array.size:
raise ValueError(
"Inconsistent data shapes:\n"
"could not use the whole data array of shape {} "
"(only reached max index = {}),\n"
"while updating data dictionary keys {}\n"
" of shapes : {}.".format(
values_array.shape,
i_max,
data_names,
[
(data_name, reference_input_data[data_name].shape)
for data_name in data_names
],
)
)
return data
[docs] @staticmethod
def deepcopy_datadict(
data_dict, # type: Mapping[str,ndarray]
keys=None, # type:Optional[Iterable[str]]
):
"""Perform a deep copy of a data mapping.
This treats the NumPy arrays specially
using ``array.copy()`` instead of ``deepcopy``.
Args:
data_dict: The data mapping to be copied.
keys: The keys of the mapping to be considered.
If None, consider all the mapping keys.
Returns:
A deep copy of the data mapping.
"""
deep_copy = {}
selected_keys = data_dict.keys()
if keys is not None:
selected_keys = [
key for key in keys if key in set(keys) & set(selected_keys)
]
for key in selected_keys:
value = data_dict[key]
if isinstance(value, ndarray):
deep_copy[key] = value.copy()
else:
deep_copy[key] = deepcopy(value)
return deep_copy
@staticmethod
def __get_all_disciplines(
disciplines, # type: Iterable[MDODiscipline]
recursive, # type: bool
): # type: (...) -> List[MDODiscipline]
"""Return both disciplines and sub-disciplines.
Args:
disciplines: The disciplines.
recursive: If True,
search for the inputs of the sub-disciplines,
when some disciplines are scenarios.
Returns:
Both disciplines and sub-disciplines.
"""
all_disciplines = [
discipline for discipline in disciplines if not discipline.is_scenario()
]
if recursive:
scenarios = [
discipline for discipline in disciplines if discipline.is_scenario()
]
sub_disciplines = list(
set.union(*(set(scenario.disciplines) for scenario in scenarios))
)
return sub_disciplines + all_disciplines
return all_disciplines
[docs] @staticmethod
def get_all_outputs(
disciplines, # type: Iterable[MDODiscipline]
recursive=False, # type: bool
): # type: (...) -> List[str]
"""Return all the output names of the disciplines.
Args:
disciplines: The disciplines.
recursive: If True,
search for the outputs of the sub-disciplines,
when some disciplines are scenarios.
Returns:
The names of the outputs.
"""
get_disciplines = DataConversion.__get_all_disciplines
return list(
set.union(
*(
set(discipline.get_output_data_names())
for discipline in get_disciplines(disciplines, recursive=recursive)
)
)
)
[docs]def flatten_mapping(
mapping, # type: Mapping
parent_key="", # type: str
sep="_", # type: str
): # type: (...) -> Dict
"""Flatten a nested mapping.
Args:
mapping: The mapping to be flattened.
parent_key: The key for which ``mapping`` is the value.
sep: The keys separator, to be used as ``{parent_key}{sep}{child_key}``.
"""
return dict(_flatten_mapping(mapping, parent_key, sep))
def _flatten_mapping(
mapping, # type: Mapping
parent_key, # type: str
sep, # type: str
): # type: (...) -> Dict
"""Flatten a nested mapping.
Args:
mapping: The mapping to be flattened.
parent_key: The key for which ``mapping`` is the value.
sep: The keys separator, to be used as ``{parent_key}{sep}{child_key}``.
"""
for key, value in mapping.items():
new_key = parent_key + sep + key if parent_key else key
if isinstance(value, collections.Mapping):
for item in flatten_mapping(value, new_key, sep=sep).items():
yield item
else:
yield new_key, value