r"""
RBF regression
==============

The radial basis function surrogate discipline expresses the model output
as a weighted sum of kernel functions centered on the learning input data:

.. math::

y = w_1K(\|x-x_1\|;\epsilon) + w_2K(\|x-x_2\|;\epsilon) + ...
+ w_nK(\|x-x_n\|;\epsilon)

and the coefficients :math:(w_1, w_2, ..., w_n)
are estimated by least square regression.

Dependence
----------
The RBF model relies on the Rbf class
of the scipy library <https://docs.scipy.org/doc/scipy/reference/generated/
scipy.interpolate.Rbf.html>_.
"""
from __future__ import absolute_import, division, unicode_literals

import pickle
from os.path import join

from future import standard_library
from numpy import array, average, exp, finfo, hstack, log, sqrt
from numpy.linalg import norm
from scipy.interpolate import Rbf
from six import string_types

from gemseo.mlearning.regression.regression import MLRegressionAlgo
from gemseo.utils.py23_compat import PY3

standard_library.install_aliases()

from gemseo import LOGGER

[docs]class RBFRegression(MLRegressionAlgo):
""" Regression based on radial basis functions. """

LIBRARY = "scipy"
ABBR = "RBF"

EUCLIDEAN = "euclidean"

GAUSSIAN = "gaussian"
LINEAR = "linear"
CUBIC = "cubic"
QUINTIC = "quintic"
THIN_PLATE = "thin_plate"

AVAILABLE_FUNCTIONS = [
GAUSSIAN,
LINEAR,
CUBIC,
QUINTIC,
THIN_PLATE,
]

def __init__(
self,
data,
transformer=None,
input_names=None,
output_names=None,
der_function=None,
epsilon=None,
**parameters
):
r"""Constructor.

:param data: learning dataset
:type data: Dataset
:param transformer: transformation strategy for data groups.
If None, do not transform data. Default: None.
:type transformer: dict(str)
:param input_names: names of the input variables. Default: None.
:type input_names: list(str)
:param output_names: names of the output variables. Default: None.
:type output_names: list(str)
:type function: str or callable
:param der_function: derivative of radial basis function, only to be
provided if function is callable and not str. The der_function
should take three arguments (input_data, norm_input_data, eps). For
a RBF of the form function(:math:r),
der_function(:math:x, :math:|x|, :math:\epsilon) should
return :math:\epsilon^{-1} x/|x| f'(|x|/\epsilon). Default: None.
:type der_function: callable
:param epsilon: Adjustable constant for Gaussian or
:type epsilon: float
:param parameters: other RBF parameters (sklearn).
"""
if isinstance(function, string_types):
function = str(function)
super(RBFRegression, self).__init__(
data,
transformer=transformer,
input_names=input_names,
output_names=output_names,
function=function,
epsilon=epsilon,
**parameters
)
self.y_average = 0.0
self.der_function = der_function

[docs]    class RBFDerivatives(object):
r"""Derivatives of functions used in RBFRegression.

For a RBF of the form :math:f(r), :math:r scalar,
the derivative functions are defined by :math:d(f(r))/dx,
with :math:r=|x|/\epsilon. The functions are thus defined
by :math:df/dx = \epsilon^{-1} x/|x| f'(|x|/\epsilon).
This convention is chosen to avoid division by :math:|x| when
the terms may be cancelled out, as :math:f'(r) often has a term
in :math:r.
"""

TOL = finfo(float).eps

[docs]        @classmethod
r"""Compute derivative w.r.t. :math:x of the function
:math:f(r) = \sqrt{r^2 + 1}.

:param float input_data: Input variable (vector).
:param float norm_input_data: Norm of input variable.
:return: Derivative of the function.
:rtype: float
"""
return input_data / eps ** 2 / sqrt((norm_input_data / eps) ** 2 + 1)

[docs]        @classmethod
r"""Compute derivative w.r.t. :math:x of the function
:math:f(r) = 1/\sqrt{r^2 + 1}.

:param float input_data: Input variable (vector).
:param float norm_input_data: Norm of input variable.
:return: Derivative of the function.
:rtype: float
"""
return -input_data / eps ** 2 / ((norm_input_data / eps) ** 2 + 1) ** 1.5

[docs]        @classmethod
def der_gaussian(cls, input_data, norm_input_data, eps):
r"""Compute derivative w.r.t. :math:x of the function
:math:f(r) = \exp(-r^2).

:param float input_data: Input variable (vector).
:param float norm_input_data: Norm of input variable.
:return: Derivative of the function.
:rtype: float
"""
return -2 * input_data / eps ** 2 * exp(-((norm_input_data / eps) ** 2))

[docs]        @classmethod
def der_linear(cls, input_data, norm_input_data, eps):
"""Compute derivative w.r.t. :math:x of the function
:math:f(r) = r.
If :math:x=0, return 0 (determined up to a tolerance).

:param float input_data: Input variable (vector).
:param float norm_input_data: Norm of input variable.
:return: Derivative of the function.
:rtype: float
"""
return (
(norm_input_data > cls.TOL)
* input_data
/ eps
/ (norm_input_data + cls.TOL)
)

[docs]        @classmethod
def der_cubic(cls, input_data, norm_input_data, eps):
"""Compute derivative w.r.t. :math:x of the function
:math:f(r) = r^3.

:param float input_data: Input variable (vector) :math:x.
:param float norm_input_data: Norm of input variable :math:|x|.
:return: Derivative of the function.
:rtype: float
"""
return 3 * norm_input_data * input_data / eps ** 3

[docs]        @classmethod
def der_quintic(cls, input_data, norm_input_data, eps):
"""Compute derivative w.r.t. :math:x of the function
:math:f(r) = r^5.

:param float input_data: Input variable (vector).
:param float norm_input_data: Norm of input variable.
:return: Derivative of the function.
:rtype: float
"""
return 5 * norm_input_data ** 3 * input_data / eps ** 5

[docs]        @classmethod
def der_thin_plate(cls, input_data, norm_input_data, eps):
r"""Compute derivative w.r.t. :math:x of the function
:math:f(r) = r^2 \log(r).
If :math:x=0, return 0 (determined up to a tolerance).

:param float input_data: Input variable (vector).
:param float norm_input_data: Norm of input variable.
:return: Derivative of the function.
:rtype: float
"""
return (
(norm_input_data > cls.TOL)
* input_data
/ eps ** 2
* (1 + 2 * log(norm_input_data / eps + cls.TOL))
)

def _fit(self, input_data, output_data):
"""Fit the regression model.

:param ndarray input_data: input data (2D)
:param ndarray output_data: output data (2D)
"""
self.y_average = average(output_data, axis=0)
output_data -= self.y_average
if PY3:
args = list(input_data.T) + [output_data]
self.algo = Rbf(*args, mode="N-D", smooth=0.0, **self.parameters)
else:
self.algo = []
for output in range(output_data.shape[1]):
args = hstack([input_data, output_data[:, [output]]])
rbf = Rbf(*args.T, smooth=0.0, **self.parameters)
self.algo.append(rbf)

def _predict(self, input_data):
"""Predict output for given input data.

:param ndarray input_data: input data (2D).
:return: output prediction (2D).
:rtype: ndarray
"""
if PY3:
output_data = self.algo(*input_data.T)
if len(output_data.shape) == 1:
output_data = output_data[:, None]  # n_outputs=1, rbf reduces
output_data = output_data + self.y_average
else:
output_data = [rbf(*input_data.T) for rbf in self.algo]
output_data = array(output_data).T + self.y_average
return output_data

def _predict_jacobian(self, input_data):
"""Predict Jacobian of the regression model for the given input data.

:param ndarray input_data: input_data (2D).
:return: Jacobian matrices (3D, one for each sample).
:rtype: ndarray
"""
self._check_available_jacobian()
der_func = self.der_function or getattr(
self.RBFDerivatives, "der_{}".format(self.function)
)
#             predict_samples                        learn_samples
# Dimensions : ( n_samples , n_outputs , n_inputs , n_learn_samples )
# input_data : ( n_samples ,           , n_inputs ,                 )
# ref_points : (           ,           , n_inputs , n_learn_samples )
# nodes      : (           , n_outputs ,          , n_learn_samples )
# jacobians  : ( n_samples , n_outputs , n_inputs ,                 )
if PY3:
eps = self.algo.epsilon
ref_points = self.algo.xi[None, None]
nodes = self.algo.nodes.T[None, :, None]
else:
eps = [rbf.epsilon for rbf in self.algo]
eps = array(eps)[None, :, None, None]  # 1 epsilon for each output
ref_points = self.algo[0].xi  # same xi for all algos
nodes = array([rbf.nodes for rbf in self.algo])[None, :, None]
input_data = input_data[:, None, :, None]
diffs = input_data - ref_points
dists = norm(diffs, axis=2)[:, :, None]
contributions = nodes * der_func(diffs, dists, eps=eps)
jacobians = contributions.sum(-1)
return jacobians

def _check_available_jacobian(self):
""" Check if the Jacobian is available for the given setup. """
if PY3:
norm_name = self.algo.norm
else:
norm_name = self.algo[0].norm

if norm_name != self.EUCLIDEAN:
raise NotImplementedError(
"Jacobian is only implemented for " "euclidean norm."
)

if callable(self.function) and self.der_function is None:
raise NotImplementedError(
"No der_function is provided. Add "
"der_function in RBFRegression "
"constructor."
)

def _save_algo(self, directory):
"""Save external machine learning algorithm.

:param str directory: algorithm directory.
"""
if PY3:
super(RBFRegression, self)._save_algo(directory)
else:
filename = join(directory, "algo.pkl")
with open(filename, "wb") as handle:
pickled_rbf = pickle.Pickler(handle, protocol=2)
pickled_rbf_list = []
for rbf in self.algo:
pickled_rbf_list.append({})
for key in rbf.__dict__.keys():
if key not in ["_function", "norm"]:
pickled_rbf_list[-1][key] = rbf.__getattribute__(key)
pickled_rbf.dump(pickled_rbf_list)

:param str directory: algorithm directory.
"""
if PY3:
else:
filename = join(directory, "algo.pkl")
self.algo = []
with open(filename, "rb") as handle:
unpickled_rbf = pickle.Unpickler(handle)
for rbf in unpickled_rbf_list:
algo_i = Rbf(
array([1, 2, 3]),
array([10, 20, 30]),
array([100, 200, 300]),
function=rbf["function"],
)
for key, value in rbf.items():
algo_i.__setattr__(key, value)
self.algo.append(algo_i)

def _get_objects_to_save(self):
""" Get objects to save. """
objects = super(RBFRegression, self)._get_objects_to_save()
objects["y_average"] = self.y_average
objects["der_function"] = self.der_function
return objects

@property
def function(self):
"""Kernel function name.

The name is possibly different from self.parameters['function'], as it
is mapped (scipy). Examples:

return: Name of kernel function.
rtype: str
"""
if PY3:
function = self.algo.function
else:
function = self.algo[0].function
return function

[docs]    @classmethod
def get_available_functions(cls):
""" Get available RBFs. """
return cls.AVAILABLE_FUNCTIONS