# Source code for gemseo.problems.scalable.diagonal

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or
#                  initial documentation
#        :author:  Matthias De Lozzo
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""
Scalable diagonal model
=======================

This module implements the concept of scalable diagonal model,
which is a particular scalable model built from an input-output
dataset relying on a diagonal design of experiments (DOE)
where inputs vary proportionally from their lower bounds
to their upper bounds, following the diagonal of the input space.

So for every output, the dataset catches its evolution
with respect to this proportion, which makes it a monodimensional behavior.
Then, for a new user-defined problem dimension,
the scalable model extrapolates this monodimensional behavior
to the different input directions.

The concept of scalable diagonal model is implemented through
the :class:.ScalableDiagonalModel class
which is composed of a :class:.ScalableDiagonalApproximation.
With regard to the diagonal DOE, |g| proposes the
:class:.DiagonalDOE class.
"""
from __future__ import absolute_import, division, unicode_literals

from numbers import Number
from os.path import join

import matplotlib.pyplot as plt
import numpy.random as npr
from future import standard_library
from numpy import arange, argsort, array, array_equal, atleast_1d, hstack
from numpy import max as np_max
from numpy import mean, median
from numpy import min as np_min
from numpy import nan_to_num, sqrt, vstack, where, zeros
from past.utils import old_div
from scipy.interpolate import InterpolatedUnivariateSpline

from gemseo.problems.scalable.model import ScalableModel
from gemseo.utils.data_conversion import DataConversion

standard_library.install_aliases()

from gemseo import LOGGER

[docs]class ScalableDiagonalModel(ScalableModel):
""" Scalable diagonal model. """

ABBR = "sdm"

def __init__(
self,
data,
sizes=None,
fill_factor=-1,
comp_dep=None,
inpt_dep=None,
force_input_dependency=False,
allow_unused_inputs=True,
seed=1,
group_dep=None,
):
"""Constructor.

:param AbstractFullCache data: learning dataset.
:param dict sizes: sizes of input and output variables.
If None, use the original sizes.
Default: None.
:param fill_factor: degree of sparsity of the dependency matrix.
Default: -1.
:param comp_dep: matrix that establishes the selection
of a single original component for each scalable component
:param inpt_dep: dependency matrix that establishes the
dependency of outputs wrt inputs
:param bool force_input_dependency: for any output, force dependency
with at least on input.
:param bool allow_unused_input: possibility to have an input
with no dependence with any output
:param int seed: seed
:param dict(list(str)) group_dep: dependency
between inputs and outputs
"""
if isinstance(fill_factor, Number):
fill_factor = {
function_name: fill_factor for function_name in data.outputs_names
}
elif not isinstance(fill_factor, dict):
raise TypeError(
"Fill factor must be "
"either a number between 0 and 1, "
"a number equal to -1 or a dictionary."
)
parameters = {
"fill_factor": fill_factor,
"comp_dep": comp_dep,
"inpt_dep": inpt_dep,
"force_input_dependency": force_input_dependency,
"allow_unused_inputs": allow_unused_inputs,
"seed": seed,
"group_dep": group_dep,
}
super(ScalableDiagonalModel, self).__init__(data, sizes, **parameters)
self.t_scaled, self.f_scaled = self.__build_scalable_functions()

def __build_dependencies(self):
"""Build dependencies.

:return: matrix that establishes the selection of a single original
component for each scalable component, dependency matrix that
establishes the dependency of outputs wrt inputs.
:rtype: ndarray, ndarray
"""
comp_dep = self.parameters["comp_dep"]
inpt_dep = self.parameters["inpt_dep"]
if comp_dep is None or inpt_dep is None:
comp_dep, inpt_dep = self.generate_random_dependency()
return comp_dep, inpt_dep

[docs]    def scalable_function(self, input_value=None):
"""Evaluate the scalable functions.

:param dict input_value: input values.
If None, use default inputs.
:return: evaluation of the scalable functions.
:rtype: dict
"""
dict_to_array = DataConversion.dict_to_array
input_value = input_value or self.default_inputs
input_value = dict_to_array(input_value, self.inputs_names)
scal_func = self.model.get_scalable_function
return {fname: scal_func(fname)(input_value) for fname in self.outputs_names}

[docs]    def scalable_derivatives(self, input_value=None):
"""Evaluate the scalable derivatives.

:param dict input_value: input values.
If None, use default inputs.
:return: evaluation of the scalable derivatives.
:rtype: dict
"""
dict_to_array = DataConversion.dict_to_array
input_value = input_value or self.default_inputs
input_value = dict_to_array(input_value, self.inputs_names)
scal_der = self.model.get_scalable_derivative
return {fname: scal_der(fname)(input_value) for fname in self.outputs_names}

[docs]    def build_model(self):
"""Build model with original sizes for input and output variables.

:return: scalable approximation.
:rtype: ScalableDiagonalApproximation
"""
comp_dep, inpt_dep = self.__build_dependencies()
seed = self.parameters["seed"]
approx = ScalableDiagonalApproximation
scalable_approximation = approx(
self.sizes, self.lower_bounds, self.upper_bounds, comp_dep, inpt_dep, seed
)
return scalable_approximation

def __build_scalable_functions(self):
""" Builds all the required functions from the original cache. """
t_scaled = {}
f_scaled = {}
for function_name in self.outputs_names:
t_sc, f_sc = self.model.build_scalable_function(
function_name, self.data, self.inputs_names
)
t_scaled[function_name] = t_sc
f_scaled[function_name] = f_sc
return t_scaled, f_scaled

def __get_variables_locations(self, names):
"""Get the locations of first component of each variables.

:param names: list of variables names.
:type names: list(str)
:return: list of locations.
:rtype: list(int)
"""
positions = []
current_position = 0
for name in names:
positions.append(current_position)
current_position += self.sizes[name]
return positions

def __convert_dependency_to_array(self, dependency):
"""Convert a dependency object of type dictionary into a dependency
object of type array.

:param dependency: input-output dependency structure.
:type dependency: dict
:return: dependency matrix.
:rtype: array
"""
matrix = None
for output_name in self.outputs_names:
row = hstack([dependency[output_name][inpt] for inpt in self.inputs_names])
matrix = row if matrix is None else vstack((matrix, row))
return matrix.T

[docs]    def plot_dependency(
self, add_levels=True, save=True, show=False, directory=".", png=False
):
"""This method plots the dependency matrix of a discipline
in the form of a chessboard,
where rows represent inputs, columns represent output and
gray scale represent the dependency level between inputs and outputs.

Default: True.
:param bool save: if True, export the plot into a file.
Default: True.
:param bool show: if True, display the plot.
Default: False.
:param str directory: directory path. Default: '.'.
:param bool png: if True, the file format is PNG. Otherwise, use PDF.
Default: False.
"""
inputs_positions = self.__get_variables_locations(self.inputs_names)
outputs_positions = self.__get_variables_locations(self.outputs_names)
dependency = self.model.io_dependency
dependency_matrix = self.__convert_dependency_to_array(dependency)
is_binary_matrix = array_equal(
dependency_matrix, dependency_matrix.astype(bool)
)
if not is_binary_matrix:
dp_sum = dependency_matrix.sum(0)
dependency_matrix = old_div(dependency_matrix, dp_sum)

fig, axes = plt.subplots()
axes.matshow(dependency_matrix, cmap="Greys", vmin=0)
axes.set_yticks(inputs_positions)
axes.set_yticklabels(self.inputs_names)
axes.set_xticks(outputs_positions)
axes.set_xticklabels(self.outputs_names, ha="left", rotation="vertical")
axes.tick_params(axis="both", which="both", length=0)
for pos in inputs_positions[1:]:
axes.axhline(y=pos - 0.5, color="w", linewidth=3.0)
for pos in outputs_positions[1:]:
axes.axvline(x=pos - 0.5, color="w", linewidth=3.0)
for i in range(dependency_matrix.shape[0]):
for j in range(dependency_matrix.shape[1]):
val = int(round(dependency_matrix[i, j] * 100))
med = median(dependency_matrix[:, j] * 100)
col = "white" if val > med else "black"
axes.text(j, i, val, ha="center", va="center", color=col)
fname = None
if save:
extension = ".png" if png else ".pdf"
fname = join(directory, self.name + "_dependency" + extension)
plt.savefig(fname)
if show:
plt.show()
else:
plt.close(fig)
return fname

[docs]    def plot_1d_interpolations(
self, save=False, show=False, step=0.01, varnames=None, directory=".", png=False
):
r"""This methods plots the scaled 1D interpolations,
a.k.a. basis functions.

A basis function is a monodimensional function
interpolating the samples of a given output component
over the input sampling line
:math:t\in[0,1]\mapsto \\underline{x}+t(\overline{x}-\\underline{x}).

There are as many basis functions
as there are output components from the discipline.
Thus, for a discipline with a single output in dimension 1,
there is 1 basis function.
For a discipline with a single output in dimension 2,
there are 2 basis functions.
For a discipline with an output in dimension 2
and an output in dimension 13,
there are 15 basis functions. And so on.
This method allows to plot the basis functions associated
with all outputs or only part of them,
either on screen (:code:show=True), in a file (:code:save=True)
or both.
We can also specify the discretization :code:step
whose default value is :code:0.01.

:param bool save: if True, export the plot as a PDF file
(Default value = False)
:param bool show: if True, display the plot (Default value = False)
:param bool step: Step to evaluate the 1d interpolation function
(Default value = 0.01)
:param list(str) varnames: names of the variable to plot;
if None, all variables are plotted (Default value = None)
:param str directory: directory path. Default: '.'.
:param bool png: if True, the file format is PNG. Otherwise, use PDF.
Default: False.
"""
function_names = varnames or self.outputs_names
x_vals = arange(0.0, 1.0 + step, step)
fnames = []
for func in function_names:
components = self.model.interpolation_dict[func]
for index, function in enumerate(components):
plt.figure()
doe_t = self.t_scaled[func]
doe_f = [output[index] for output in self.f_scaled[func]]
y_vals = function(x_vals)
plt.xlim(-0.05, 1.05)
plt.ylim(-0.1, 1.1)
plt.plot(
x_vals,
old_div((y_vals - min(y_vals)), (max(y_vals) - min(y_vals))),
label=func + str(index),
)
plt.plot(
doe_t,
old_div((doe_f - min(y_vals)), (max(y_vals) - min(y_vals))),
"or",
)
plt.xlabel("Scaled abscissa", fontsize=18)
plt.ylabel("Interpolation value", fontsize=18)
plt.title("1D interpolation of " + self.name + "." + func)
if save:
extension = ".png" if png else ".pdf"
fname = self.name + "_" + func + "_1D_interpolation_"
fname += str(index)
fname = join(directory, fname + extension)
plt.savefig(fname)
fnames.append(fname)
if show:
plt.show()
else:
plt.close()
return fnames

[docs]    def generate_random_dependency(self):
"""
Generates a random dependency structure for use in scalable discipline

:param bool force_input_dependency: force input dependency for a given
output. Default: True.
:param bool allow_unused_inputs: allow unused inputs, that is to say
input dimensions without any interaction with output functions.
Default: False.
:param dict(list(str)) io_dependency: input-output dependency
structure.
If None, all output components can depend on all input components.
Default: None.
:return: output component dependency and input-output dependency
:rtype: dict(int), dict(dict(array))
"""
npr.seed(self.parameters["seed"])
io_dependency = self.parameters["group_dep"] or {}
for function_name in self.outputs_names:
input_names = io_dependency.get(function_name, self.inputs_names)
io_dependency[function_name] = input_names

if self.parameters.get("inpt_dep") is None:
io_dep = self.__generate_random_io_dep(io_dependency)

if self.parameters.get("comp_dep") is None:
out_map = self.__generate_random_output_map()

# If an output function does not have any dependency with inputs,
# add a random dependency if independent outputs are forbidden
if self.parameters["force_input_dependency"]:
for function_name in self.outputs_names:
for function_component in range(self.sizes.get(function_name)):
self.__complete_random_dep(
io_dep, function_name, function_component, io_dependency
)

# If an input parameter does not have any dependency with output
# functions, add a random dependency if unused inputs are not allowed
if not self.parameters["allow_unused_inputs"]:
for input_name in self.inputs_names:
for input_component in range(self.sizes.get(input_name)):
self.__complete_random_dep(
io_dep, input_name, input_component, io_dependency
)
return out_map, io_dep

def __generate_random_io_dep(self, io_dependency):
"""Generate the dependency between the new inputs and the new outputs.

:param io_dependency: input-output dependency structure. If None,
all output components can depend on all input components.
Default: None.
:type io_dependency: dict(list(str))
:return: random input-output dependencies
:rtype: dict(dict(array))
"""
error_msg = (
"Fill factor must be a number, "
"either -1 or a real number between 0 and 1."
)
r_io_dependency = {}
for function_name in self.outputs_names:
fill_factor = self.parameters["fill_factor"].get(function_name, 0.7)
if fill_factor != -1.0 and (fill_factor < 0.0 or fill_factor > 1):
raise TypeError(error_msg)
function_size = self.sizes.get(function_name)
r_io_dependency[function_name] = {}
for input_name in self.inputs_names:
input_size = self.sizes.get(input_name)
if input_name in io_dependency[function_name]:
if 0 <= fill_factor <= 1:
rand_dep = npr.choice(
2,
(function_size, input_size),
p=[1.0 - fill_factor, fill_factor],
)
else:
rand_dep = npr.rand(function_size, input_size)
r_io_dependency[function_name][input_name] = rand_dep
else:
zeros_dep = zeros((function_size, input_size))
r_io_dependency[function_name][input_name] = zeros_dep
return r_io_dependency

def __generate_random_output_map(self):
"""Generate the dependency between the original and new output
components for the different outputs.

:return: component dependencies
:rtype: dict(int)
"""
out_map = {}
for function_name in self.outputs_names:
original_function_size = self.original_sizes.get(function_name)
function_size = self.sizes.get(function_name)
out_map[function_name] = npr.randint(
original_function_size, size=function_size
)
return out_map

def __complete_random_dep(self, r_io_dep, dataname, index, io_dep):
"""Complete random dependency if row (input name)
or column (function name) of the random dependency matrix is empty

:param array inpt_dep: input-output dependency.
:param str dataname: name of the variable to check
if component is empty.
:param int index: component index of the variable.
:param io_dep: input-output dependency structure. If None,
all output components can depend on all input components.
Default: None.
:type io_dep: dict(list(str))
"""
is_input = dataname in self.inputs_names
if is_input:
varnames = []
for function_name, inputs in io_dep.items():
if dataname in inputs:
varnames.append(function_name)
inpt_dep_mat = hstack(
[r_io_dep[varname][dataname].T for varname in varnames]
)
else:
varnames = io_dep[dataname]
inpt_dep_mat = hstack([r_io_dep[dataname][varname] for varname in varnames])
if sum(inpt_dep_mat[index, :]) == 0:
prob = [self.sizes.get(varname, 1) for varname in varnames]
prob = [float(x) / sum(prob) for x in prob]
id_var = npr.choice(len(varnames), p=prob)
id_comp = npr.randint(0, self.sizes.get(varnames[id_var], 1))
if is_input:
varname = varnames[id_var]
r_io_dep[varname][dataname][id_comp, index] = 1
else:
varname = varnames[id_var]
r_io_dep[dataname][varname][index, id_comp] = 1

[docs]class ScalableDiagonalApproximation(object):
"""
Methodology that captures the trends of a physical problem, and
extends it into a problem that has scalable input and outputs dimensions
The original and the resulting scalable problem have the same interface:
all inputs and outputs have the same names; only their dimensions vary.
"""

def __init__(self, sizes, var_lb, var_ub, output_dependency, io_dependency, seed=0):
"""
Constructor:

:param sizes: sizes of both input and output variables.
:type sizes: dict
:param var_lb: lower bounds of the input variables.
:type var_lb: dict
:param var_ub: upper bounds of the input variables.
:type var_ub: dict
:param output_dependency: dependency between old and new outputs.
:type output_dependency: dict
:param io_dependency: dependency between new inputs and new outputs.
:type io_dependency: dict
"""
super(ScalableDiagonalApproximation, self).__init__()
# information about input and output variables
self.sizes = sizes
self.var_lb = var_lb
self.var_ub = var_ub
# dependency matrices
self.output_dependency = output_dependency
self.io_dependency = io_dependency
# dictionaries of interpolations and extrapolations
self.interpolation_dict = {}
self.d_interpolation_dict = {}
self.interpolators_dict = {}
self.scalable_functions = {}
self.scalable_dfunctions = {}
# seed for random generator
npr.seed(seed)

[docs]    def build_scalable_function(self, function_name, dataset, input_names, degree=3):
"""Build interpolation interpolation from a 1D input and output
function. Add the model to the local dictionary

:param str function_name: name of the output function
:param AbstractFullCache dataset: the input-output dataset
:param list(str) input_names: names of the input variables
:param int degree: degree of interpolation (Default value = 3)
"""
# 1) Get the unscaled data
x_unscaled = []
f_unscaled = []
for data in dataset.get_all_data(True):
f_unscaled.append(data[dataset.OUTPUTS_GROUP][function_name])
x_unscaled.append(data[dataset.INPUTS_GROUP])

# 2) Scale the input samples: [a1, b1] x ... x [an, bn] -> [0, 1]^n
x_scaled = []
for input_data in x_unscaled:
squared_scaled_input_data = []
for varname, value in input_data.items():
scaled_value = value - self.var_lb[varname]
scaled_value /= self.var_ub[varname] - self.var_lb[varname]
squared_scaled_value = scaled_value ** 2.0
squared_scaled_value = nan_to_num(squared_scaled_value)
squared_scaled_input_data += list(squared_scaled_value)
x_scaled.append(squared_scaled_input_data)

# 3) Map the scaled input samples to [0,1]
t_scaled = [atleast_1d(sqrt(mean(val))) for val in x_scaled]

# 4) Sort t_scaled and f_unscaled following the t_scaled ascending
# order
indices = argsort([val[0] for val in t_scaled])
t_scaled = [t_scaled[index] for index in indices]
f_unscaled = [f_unscaled[index] for index in indices]

# 5) scale the output samples: [a1, b1] x ... x [am, bm] -> [0, 1]^m
f_scaled = self.scale_samples(f_unscaled)

# 6) interpolate the (t_scaled, f_scaled) data
self._interpolate(function_name, t_scaled, f_scaled, degree)

# 7) compute the total input and output sizes
(input_size, output_size) = self._compute_sizes(function_name, input_names)

# 8) extrapolation
self._extrapolate(function_name, input_names, input_size, output_size)

return t_scaled, f_scaled

[docs]    def get_scalable_function(self, output_function):
"""Retrieve the scalable function generated from
the original discipline

:param str output_function: name of the output function
"""
return self.scalable_functions[output_function]

[docs]    def get_scalable_derivative(self, output_function):
"""Retrieve the (scalable) gradient of the scalable function generated
from the original discipline

:param str output_function: name of the output function
"""
return self.scalable_dfunctions[output_function]

[docs]    @staticmethod
def scale_samples(samples):
"""Scale samples of array into [0, 1]

:param samples: samples of multivariate array
:type samples: list(array)
:return: samples of multivariate array
:rtype: array
"""
samples = array(samples)
col_min = np_min(samples, 0)
col_max = np_max(samples, 0)
scaled_samples = samples - col_min
range_col = col_max - col_min
scaling = where(abs(range_col) > 1e-6, range_col, 1)
scaled_samples /= scaling
return scaled_samples

def _interpolate(self, function_name, t_scaled, f_scaled, degree=3):
"""Interpolate a set of samples (t, y(t)) with a polynomial spline

:param str function_name: name of the interpolated function
:param list(list(float)) t_scaled: set of points
:param list(list(float)) f_scaled: set of images
:param int degree: degree of the polynomial interpolation
"""
nb_components = f_scaled[0].size
list_interpolations = []
list_derivatives = []
for component in range(nb_components):
f_scaled_component = [output[component] for output in f_scaled]
# compute interpolation
interpolation = InterpolatedUnivariateSpline(
t_scaled, f_scaled_component, k=degree
)
# store spline and derivative
list_interpolations.append(interpolation)
list_derivatives.append(interpolation.derivative())
# store interpolation and derivatives
self.interpolation_dict[function_name] = list_interpolations
self.d_interpolation_dict[function_name] = list_derivatives

def _compute_sizes(self, function_name, input_names):
"""Determine the size of the vector input and output

:param str function_name: function name
:param list(str) input_names: input names
"""
input_size = 0
for input_name in input_names:
input_size += self.sizes.get(input_name, 1)
output_size = self.sizes.get(function_name, 1)  # default 1
return (input_size, output_size)

def _extrapolate(self, function_name, input_names, input_size, output_size):
"""Extrapolate a 1D function to arbitrary input and output dimensions.
Generate a function that produces an output with a given size from an
input with a given size, and its derivative.

:param str function_name: name of the output function
:param list(str) input_names: names of the inputs
:param int input_size: size of the input vector
:param int output_size: size of the output vector
"""
# crop the matrices to the correct sizes and convert to array
io_dependency = {
input_name: dep_mat[0:output_size, 0 : self.sizes[input_name]]
for (input_name, dep_mat) in self.io_dependency[function_name].items()
}
io_dependency = DataConversion.dict_to_array(io_dependency, input_names)

# Convert the input-output dependency matrix to a list
# where the i-th element is a list whose j-th element corresponds to
# the degree of dependence between the i-th output component and the
# i-th input.
coefficients = [list(row) for row in io_dependency]

# Get the 1D interpolation functions and their derivatives
interpolated_fun_1d = self.interpolation_dict[function_name]
interpolated_dfun_1d = self.d_interpolation_dict[function_name]

# Get the indices of the 1D interpolation functions
# associated with the components of the new output.
components_list = self.output_dependency[function_name]

def scalable_function(input_vars):
"""n-dimensional to size_output-dimensional extrapolated function

:param list(float) input_vars: vector of inputs
:returns: size_output extrapolated output
"""
result = zeros(output_size)
for out_id in range(output_size):
interp_id = components_list[out_id]
coeffs = coefficients[out_id]
tmp = [
coeffs[in_id] * interpolated_fun_1d[interp_id](in_val)
for in_id, in_val in enumerate(input_vars)
]
tmp = old_div(sum(tmp), sum(coeffs))
result[out_id] = tmp
return result

def scalable_derivative(input_vars):
"""n-dimensional to size_output-dimensional extrapolated
function Jacobian

:param list(float) input_vars: vector of inputs
:returns: size_output - sizeof input_vars extrapolated output
"""
dresult = zeros([output_size, input_size])
for out_id in range(output_size):
interp_id = components_list[out_id]
coeffs = coefficients[out_id]
tmp = [
coeffs[in_id] * interpolated_dfun_1d[interp_id](in_val)
for in_id, in_val in enumerate(input_vars)
]
tmp = old_div(array(tmp), sum(coeffs))
dresult[out_id, :] = tmp
return dresult

self.scalable_functions[function_name] = scalable_function
self.scalable_dfunctions[function_name] = scalable_derivative