# Source code for gemseo.problems.scalable.data_driven.diagonal

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or
#                  initial documentation
#        :author:  Matthias De Lozzo
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""Scalable diagonal model.

This module implements the concept of scalable diagonal model,
which is a particular scalable model built from an input-output
dataset relying on a diagonal design of experiments (DOE)
where inputs vary proportionally from their lower bounds
to their upper bounds, following the diagonal of the input space.

So for every output, the dataset catches its evolution
with respect to this proportion, which makes it a mono dimensional behavior.
Then, for a new user-defined problem dimension,
the scalable model extrapolates this mono dimensional behavior
to the different input directions.

The concept of scalable diagonal model is implemented through
the :class:.ScalableDiagonalModel class
which is composed of a :class:.ScalableDiagonalApproximation.
With regard to the diagonal DOE, |g| proposes the
:class:.DiagonalDOE class.
"""

from __future__ import annotations

from numbers import Number
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Callable

import matplotlib.pyplot as plt
from numpy import arange
from numpy import argsort
from numpy import array
from numpy import array_equal
from numpy import atleast_1d
from numpy import hstack
from numpy import max as np_max
from numpy import mean
from numpy import median
from numpy import min as np_min
from numpy import nan_to_num
from numpy import sqrt
from numpy import vstack
from numpy import where
from numpy import zeros
from numpy.random import Generator
from numpy.random import default_rng
from scipy.interpolate import InterpolatedUnivariateSpline

from gemseo.problems.scalable.data_driven.model import ScalableModel
from gemseo.utils.data_conversion import concatenate_dict_of_arrays_to_array
from gemseo.utils.matplotlib_figure import save_show_figure
from gemseo.utils.seeder import SEED

if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Mapping
from collections.abc import Sequence

from numpy._typing import NDArray

from gemseo.datasets.io_dataset import IODataset

[docs]
class ScalableDiagonalModel(ScalableModel):
"""Scalable diagonal model."""

ABBR = "sdm"

__rng: Generator
"""The random number generator."""

def __init__(
self,
data: IODataset,
sizes: Sequence[int] | None = None,
fill_factor: float = -1,
comp_dep: NDArray[float] = None,
inpt_dep: NDArray[float] = None,
force_input_dependency: bool = False,
allow_unused_inputs: bool = True,
seed: int = SEED,
group_dep: Mapping[str, Iterable[str]] | None = None,
) -> None:
"""
Args:
data: The input-output dataset.
sizes: The sizes of the inputs and outputs.
If None, use the original sizes.
fill_factor: The degree of sparsity of the dependency matrix.
comp_dep: The matrix defining the selection
of a single original component for each scalable component.
inpt_dep: The input-output dependency matrix.
force_input_dependency: Whether to force the dependency of each output
with at least one input.
bool allow_unused_inputs: The possibility to have an input
with no dependence with any output.
seed: The seed for reproducible results.
group_dep: The dependency between the inputs and outputs.
"""  # noqa: D205, D212, D415
if isinstance(fill_factor, Number):
fill_factor = dict.fromkeys(
data.get_variable_names(data.OUTPUT_GROUP), fill_factor
)
elif not isinstance(fill_factor, dict):
msg = (
"Fill factor must be either a number between 0 and 1, "
"a number equal to -1 or a dictionary."
)
raise TypeError(msg)
parameters = {
"fill_factor": fill_factor,
"comp_dep": comp_dep,
"inpt_dep": inpt_dep,
"force_input_dependency": force_input_dependency,
"allow_unused_inputs": allow_unused_inputs,
"seed": seed,
"group_dep": group_dep,
}
self.__rng = default_rng(seed)
super().__init__(data, sizes, **parameters)
self.t_scaled, self.f_scaled = self.__build_scalable_functions()

def __build_dependencies(self) -> tuple[NDArray[float], NDArray[float]]:
"""Build dependencies.

Returns:
The matrix defining the selection of a single original component
for each scalable component,
and the dependency matrix between the inputs and the outputs.
"""
comp_dep = self.parameters["comp_dep"]
inpt_dep = self.parameters["inpt_dep"]
if comp_dep is None or inpt_dep is None:
comp_dep, inpt_dep = self.generate_random_dependency()
return comp_dep, inpt_dep

[docs]
def scalable_function(
self, input_value: Mapping[str, NDArray[float]] | None = None
) -> dict[str, NDArray[float]]:
"""Compute the outputs.

Args:
input_value: The input values.

Returns:
The values of the outputs.
"""
input_value = input_value or self.default_inputs
input_value = concatenate_dict_of_arrays_to_array(input_value, self.input_names)
scal_func = self.model.get_scalable_function
return {fname: scal_func(fname)(input_value) for fname in self.output_names}

[docs]
def scalable_derivatives(
self, input_value: Mapping[str, NDArray[float]] | None = None
) -> dict[str, NDArray[float]]:
"""Compute the derivatives.

Args:
input_value: The input values.

Returns:
The values of the derivatives.
"""
input_value = input_value or self.default_inputs
input_value = concatenate_dict_of_arrays_to_array(input_value, self.input_names)
scal_der = self.model.get_scalable_derivative
return {fname: scal_der(fname)(input_value) for fname in self.output_names}

[docs]
def build_model(self) -> ScalableDiagonalApproximation:
"""Build the model with the original sizes for input and output variables.

Returns:
The scalable approximation.
"""
comp_dep, inpt_dep = self.__build_dependencies()
seed = self.parameters["seed"]
return ScalableDiagonalApproximation(self.sizes, comp_dep, inpt_dep, seed)

def __build_scalable_functions(
self,
) -> tuple[dict[str, list[NDArray[float]]], dict[str, NDArray[float]]]:
"""Build all the required functions from the original dataset.

Returns:
The output names bound to the input and output samples scaled in [0, 1].
"""
t_scaled = {}
f_scaled = {}
for function_name in self.output_names:
t_sc, f_sc = self.model.build_scalable_function(
function_name, self.data, self.input_names
)
t_scaled[function_name] = t_sc
f_scaled[function_name] = f_sc
return t_scaled, f_scaled

def __get_variables_locations(self, names: Sequence[str]) -> list[int]:
"""Get the locations of first component of each variable.

Args:
names: The names of the variables

Returns:
The locations of the first component of each variable.
"""
positions = []
current_position = 0
for name in names:
positions.append(current_position)
current_position += self.sizes[name]
return positions

def __convert_dependency_to_array(self, dependency) -> NDArray[float]:
"""Convert a dictionary-like dependency structure into an array-like one.

Args:
dependency: The dictionary-like dependency structure.

Returns:
The array-like dependency structure.
"""
matrix = None
for output_name in self.output_names:
row = hstack([dependency[output_name][inpt] for inpt in self.input_names])
matrix = row if matrix is None else vstack((matrix, row))
return matrix.T

[docs]
def plot_dependency(
self,
save: bool = True,
show: bool = False,
directory: str = ".",
png: bool = False,
) -> str:
"""This method plots the dependency matrix of a discipline in the form of a
chessboard, where rows represent inputs, columns represent output and gray scale
represent the dependency level between inputs and outputs.

Args:
save: Whether to save the figure.
show: Whether to display the figure.
directory: The directory path.
png: Whether to use PNG file format instead of PDF.
"""
inputs_positions = self.__get_variables_locations(self.input_names)
outputs_positions = self.__get_variables_locations(self.output_names)
dependency = self.model.io_dependency
dependency_matrix = self.__convert_dependency_to_array(dependency)
is_binary_matrix = array_equal(
dependency_matrix, dependency_matrix.astype(bool)
)
if not is_binary_matrix:
dp_sum = dependency_matrix.sum(0)
dependency_matrix = dependency_matrix / dp_sum

fig, axes = plt.subplots()
axes.matshow(dependency_matrix, cmap="Greys", vmin=0)
axes.set_yticks(inputs_positions)
axes.set_yticklabels(self.input_names)
axes.set_xticks(outputs_positions)
axes.set_xticklabels(self.output_names, ha="left", rotation="vertical")
axes.tick_params(axis="both", which="both", length=0)
for pos in inputs_positions[1:]:
axes.axhline(y=pos - 0.5, color="r", linewidth=0.5)
for pos in outputs_positions[1:]:
axes.axvline(x=pos - 0.5, color="r", linewidth=0.5)
for i in range(dependency_matrix.shape[0]):
for j in range(dependency_matrix.shape[1]):
val = int(round(dependency_matrix[i, j] * 100))
med = median(dependency_matrix[:, j] * 100)
col = "white" if val > med else "black"
axes.text(j, i, val, ha="center", va="center", color=col)
if save:
extension = "png" if png else "pdf"
file_path = Path(directory) / f"{self.name}_dependency.{extension}"
else:
file_path = None
save_show_figure(fig, show, file_path)
return str(file_path)

[docs]
def plot_1d_interpolations(
self,
save: bool = False,
show: bool = False,
step: float = 0.01,
varnames: Sequence[str] | None = None,
directory: str = ".",
png: bool = False,
) -> list[str]:
r"""Plot the scaled 1D interpolations, a.k.a. the basis functions.

A basis function is a mono dimensional function
interpolating the samples of a given output component
over the input sampling line
:math:t\in[0,1]\mapsto \\underline{x}+t(\overline{x}-\\underline{x}).

There are as many basis functions
as there are output components from the discipline.
Thus, for a discipline with a single output in dimension 1,
there is 1 basis function.
For a discipline with a single output in dimension 2,
there are 2 basis functions.
For a discipline with an output in dimension 2
and an output in dimension 13,
there are 15 basis functions. And so on.
This method allows to plot the basis functions associated
with all outputs or only part of them,
either on screen (show=True), in a file (save=True)
or both.
We can also specify the discretization step
whose default value is 0.01.

Args:
save: Whether to save the figure.
show: Whether to display the figure.
step: The step to evaluate the 1d interpolation function.
varnames: The names of the variable to plot.
If None, all the variables are plotted.
directory: The directory path.
png: Whether to use PNG file format instead of PDF.

Returns:
The names of the files.
"""
function_names = varnames or self.output_names
x_vals = arange(0.0, 1.0 + step, step)
fnames = []
for func in function_names:
components = self.model.interpolation_dict[func]
for index, function in enumerate(components):
plt.figure()
doe_t = self.t_scaled[func]
doe_f = [output[index] for output in self.f_scaled[func]]
y_vals = function(x_vals)
plt.xlim(-0.05, 1.05)
plt.ylim(-0.1, 1.1)
plt.plot(
x_vals,
(y_vals - min(y_vals)) / (max(y_vals) - min(y_vals)),
label=func + str(index),
)
plt.plot(
doe_t,
(doe_f - min(y_vals)) / (max(y_vals) - min(y_vals)),
"or",
)
plt.xlabel("Scaled abscissa", fontsize=18)
plt.ylabel("Interpolation value", fontsize=18)
plt.title("1D interpolation of " + self.name + "." + func)
fig = plt.gcf()
if save:
extension = "png" if png else "pdf"
file_path = (
Path(directory)
/ f"{self.name}_{func}_1D_interpolation_{index}.{extension}"
)
fnames.append(str(file_path))
else:
file_path = None
save_show_figure(fig, show, file_path)
return fnames

[docs]
def generate_random_dependency(
self,
) -> tuple[dict[str, NDArray[int]], dict[str, dict[str, NDArray[float]]]]:
"""Generate a random dependency structure for use in scalable discipline.

Returns:
The dependency structure.
"""
io_dependency = self.parameters["group_dep"] or {}
for function_name in self.output_names:
input_names = io_dependency.get(function_name, self.input_names)
io_dependency[function_name] = input_names

if self.parameters.get("inpt_dep") is None:
io_dep = self.__generate_random_io_dep(io_dependency)

if self.parameters.get("comp_dep") is None:
out_map = self.__generate_random_output_map()

# If an output function does not have any dependency with inputs,
# add a random dependency if independent outputs are forbidden
if self.parameters["force_input_dependency"]:
for function_name in self.output_names:
for function_component in range(self.sizes.get(function_name)):
self.__complete_random_dep(
io_dep, function_name, function_component, io_dependency
)

# If an input parameter does not have any dependency with output
# functions, add a random dependency if unused inputs are not allowed
if not self.parameters["allow_unused_inputs"]:
for input_name in self.input_names:
for input_component in range(self.sizes.get(input_name)):
self.__complete_random_dep(
io_dep, input_name, input_component, io_dependency
)
return out_map, io_dep

def __generate_random_io_dep(
self, io_dependency: Mapping[str, Iterable[str]]
) -> dict[str, dict[str, NDArray[float]]]:
"""Generate the dependency between the new inputs and the new outputs.

Args:
io_dependency: The input-output dependency structure.

Returns:
The dependencies between the inputs and the outputs.
"""
error_msg = (
"Fill factor must be a number, "
"either -1 or a real number between 0 and 1."
)
r_io_dependency = {}
for function_name in self.output_names:
fill_factor = self.parameters["fill_factor"].get(function_name, 0.7)
if fill_factor != -1.0 and (fill_factor < 0.0 or fill_factor > 1):
raise TypeError(error_msg)
function_size = self.sizes.get(function_name)
r_io_dependency[function_name] = {}
for input_name in self.input_names:
input_size = self.sizes.get(input_name)
if input_name in io_dependency[function_name]:
if 0 <= fill_factor <= 1:
rand_dep = self.__rng.choice(
2,
(function_size, input_size),
p=[1.0 - fill_factor, fill_factor],
)
else:
rand_dep = self.__rng.random((function_size, input_size))
r_io_dependency[function_name][input_name] = rand_dep
else:
zeros_dep = zeros((function_size, input_size))
r_io_dependency[function_name][input_name] = zeros_dep
return r_io_dependency

def __generate_random_output_map(self) -> dict[str, NDArray[int]]:
"""Generate the dependency between the original and new output components.

Returns:
The output names bound to the original output components.
"""
out_map = {}
for function_name in self.output_names:
original_function_size = self.original_sizes.get(function_name)
function_size = self.sizes.get(function_name)
out_map[function_name] = self.__rng.integers(
original_function_size, size=function_size
)
return out_map

def __complete_random_dep(
self,
r_io_dep: NDArray[float],
dataname: str,
index: int,
io_dep: Mapping[Iterable[str]] | None,
) -> None:
"""Complete random dependency if row (input name) or column (function name) of
the random dependency matrix is empty.

Args:
r_io_dep: The dependency between inputs and outputs.
dataname: The name of the variable whose component must be check.
index: The index of the component of the variable.
io_dep: The dependency between the inputs and the outputs.
If None,
all output components can depend on all input components.
"""
is_input = dataname in self.input_names
if is_input:
varnames = []
for function_name, inputs in io_dep.items():
if dataname in inputs:
varnames.append(function_name)
inpt_dep_mat = hstack([
r_io_dep[varname][dataname].T for varname in varnames
])
else:
varnames = io_dep[dataname]
inpt_dep_mat = hstack([r_io_dep[dataname][varname] for varname in varnames])

if sum(inpt_dep_mat[index, :]) == 0:
prob = [self.sizes.get(varname, 1) for varname in varnames]
prob = [float(x) / sum(prob) for x in prob]
id_var = self.__rng.choice(len(varnames), p=prob)
id_comp = self.__rng.integers(0, self.sizes.get(varnames[id_var], 1))
if is_input:
varname = varnames[id_var]
r_io_dep[varname][dataname][id_comp, index] = 1
else:
varname = varnames[id_var]
r_io_dep[dataname][varname][index, id_comp] = 1

[docs]
class ScalableDiagonalApproximation:
"""Methodology that captures the trends of a physical problem, and extends it into a
problem that has scalable input and outputs dimensions The original and the
resulting scalable problem have the same interface:

all inputs and outputs have the same names; only their dimensions vary.
"""

# TODO: API: remove the argument "seed" which is not used.
def __init__(
self,
sizes: Mapping[str, int],
output_dependency,
io_dependency,
seed: int = SEED,
) -> None:
"""
Args:
sizes: The sizes of the inputs and outputs.
output_dependency: The dependency between the original and new outputs.
io_dependency: The dependency between the new inputs and outputs.
seed: The seed for reproducible results.
"""  # noqa: D205, D212, D415
super().__init__()
self.sizes = sizes
# dependency matrices
self.output_dependency = output_dependency
self.io_dependency = io_dependency
# dictionaries of interpolations and extrapolations
self.interpolation_dict = {}
self.d_interpolation_dict = {}
self.interpolators_dict = {}
self.scalable_functions = {}
self.scalable_dfunctions = {}

[docs]
def build_scalable_function(
self,
function_name: str,
dataset: IODataset,
input_names: Iterable[str],
degree: int = 3,
) -> tuple[list[NDArray[float]], NDArray[float]]:
"""Create the interpolation functions for a specific output.

Args:
function_name: The name of the output.
dataset: The input-output dataset.
input_names: The names of the inputs.
degree: The degree of interpolation.

Returns:
The input and output samples scaled in [0, 1].
"""
x2_scaled = nan_to_num(
dataset.get_view(group_names=dataset.INPUT_GROUP).to_numpy() ** 2
)
t_scaled = [atleast_1d(sqrt(mean(val.real))) for val in x2_scaled]
f_scaled = dataset.get_view(variable_names=function_name).to_numpy().real

# sort t_scaled and f_scaled following the t_scaled ascending order
indices = argsort([val[0] for val in t_scaled])
t_scaled = [t_scaled[index] for index in indices]
f_scaled = [f_scaled[index] for index in indices]

# scale the output samples: [a1, b1] x ... x [am, bm] -> [0, 1]^m
f_scaled = self.scale_samples(f_scaled)

# interpolate the (t_scaled, f_scaled) data
self._interpolate(function_name, t_scaled, f_scaled, degree)

# compute the total input and output sizes
(input_size, output_size) = self._compute_sizes(function_name, input_names)

# extrapolation
self._extrapolate(function_name, input_names, input_size, output_size)

return t_scaled, f_scaled

[docs]
def get_scalable_function(
self, output_function: str
) -> Callable[[Iterable[float]], NDArray[float]]:
"""Return the function computing an output.

Args:
output_function: The name of the output.

Returns:
The function computing the output.
"""
return self.scalable_functions[output_function]

[docs]
def get_scalable_derivative(
self, output_function: str
) -> Callable[[Iterable[float]], NDArray[float]]:
"""Return the function computing the derivatives of an output.

Args:
output_function: The name of the output.

Returns:
The function computing the derivatives of this output.
"""
return self.scalable_dfunctions[output_function]

[docs]
@staticmethod
def scale_samples(samples: Iterable[NDArray[float]]) -> NDArray[float]:
"""Scale array samples into [0, 1].

Args:
samples: The samples.

Returns:
The samples with components scaled in [0, 1].
"""
samples = array(samples)
col_min = np_min(samples, 0)
col_max = np_max(samples, 0)
scaled_samples = samples - col_min
range_col = col_max - col_min
scaling = where(abs(range_col) > 1e-6, range_col, 1)
scaled_samples /= scaling
return scaled_samples

def _interpolate(
self,
function_name: str,
t_scaled: Iterable[float],
f_scaled: Iterable[float],
degree: int = 3,
) -> None:
"""Interpolate a set of samples (t, y(t)) with a polynomial spline.

Args:
function_name: The name of the output.
t_scaled: The input values.
f_scaled: The output values.
degree: The degree of the polynomial interpolation.
"""
nb_components = f_scaled[0].size
list_interpolations = []
list_derivatives = []
for component in range(nb_components):
f_scaled_component = [output[component] for output in f_scaled]
# compute interpolation
interpolation = InterpolatedUnivariateSpline(
t_scaled, f_scaled_component, k=degree
)
# store spline and derivative
list_interpolations.append(interpolation)
list_derivatives.append(interpolation.derivative())
# store interpolation and derivatives
self.interpolation_dict[function_name] = list_interpolations
self.d_interpolation_dict[function_name] = list_derivatives

def _compute_sizes(
self, function_name: str, input_names: Sequence[str]
) -> tuple[int, int]:
"""Compute the sizes of the input and output vectors.

Args:
function_name: The name of the output.
input_names: The names of the inputs.

Returns:
The sizes of the input and output vectors.
"""
input_size = 0
for input_name in input_names:
input_size += self.sizes.get(input_name, 1)
output_size = self.sizes.get(function_name, 1)  # default 1
return input_size, output_size

def _extrapolate(
self,
function_name: str,
input_names: Iterable[str],
input_size: int,
output_size: int,
) -> None:
"""Extrapolate a 1D function to arbitrary input and output dimensions.

Generate a function
that produces an output with a given size
from an input with a given size,
as well as its derivative.

Args:
function_name: The name of the output.
input_names: The names of the inputs.
input_size: The size of the input vector.
output_size: The size of the output vector.
"""
# crop the matrices to the correct sizes and convert to array
io_dependency = {
input_name: dep_mat[0:output_size, 0 : self.sizes[input_name]]
for (input_name, dep_mat) in self.io_dependency[function_name].items()
}
io_dependency = concatenate_dict_of_arrays_to_array(io_dependency, input_names)

# Convert the input-output dependency matrix to a list
# where the i-th element is a list whose j-th element corresponds to
# the degree of dependence between the i-th output component and the
# i-th input.
io_dependency = [list(row) for row in io_dependency]

# Get the 1D interpolation functions and their derivatives
interpolated_fun_1d = self.interpolation_dict[function_name]
interpolated_dfun_1d = self.d_interpolation_dict[function_name]

# Get the indices of the 1D interpolation functions
# associated with the components of the new output.
outputs_to_original_ones = self.output_dependency[function_name]

def scalable_function(input_data: Iterable[float]) -> NDArray[float]:
"""Compute the output vector.

Args:
input_data: The input vector.

Returns:
The output vector.
"""
result = zeros(output_size)
for output_index in range(output_size):
func = interpolated_fun_1d[outputs_to_original_ones[output_index]]
coefficients = io_dependency[output_index]
result[output_index] = sum(
coefficient * func(input_value)
for coefficient, input_value in zip(coefficients, input_data)
) / sum(coefficients)

return result

def scalable_derivative(input_data: Iterable[float]) -> NDArray[float]:
"""Compute the Jacobian matrix.

Args:
input_data: The input vector.

Returns:
The Jacobian matrix.
"""
result = zeros([output_size, input_size])
for output_index in range(output_size):
func = interpolated_dfun_1d[outputs_to_original_ones[output_index]]
coefficients = io_dependency[output_index]
result[output_index, :] = array([
coefficient * func(input_value)
for coefficient, input_value in zip(coefficients, input_data)
]) / sum(coefficients)

return result

self.scalable_functions[function_name] = scalable_function
self.scalable_dfunctions[function_name] = scalable_derivative