Source code for gemseo.mlearning.regression.rbf

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry,
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                         documentation
#        :author: Francois Gallard, Matthias De Lozzo
RBF regression

The radial basis function surrogate discipline expresses the model output
as a weighted sum of kernel functions centered on the learning input data:

.. math::

    y = w_1K(\|x-x_1\|;\epsilon) + w_2K(\|x-x_2\|;\epsilon) + ...
        + w_nK(\|x-x_n\|;\epsilon)

and the coefficients :math:`(w_1, w_2, ..., w_n)`
are estimated by least square regression.

The RBF model relies on the Rbf class
of the `scipy library <
from __future__ import absolute_import, division, unicode_literals

import pickle
from os.path import join

from future import standard_library
from numpy import array, average, exp, finfo, hstack, log, sqrt
from numpy.linalg import norm
from scipy.interpolate import Rbf
from six import string_types

from gemseo.mlearning.regression.regression import MLRegressionAlgo
from gemseo.utils.py23_compat import PY3


from gemseo import LOGGER

[docs]class RBFRegression(MLRegressionAlgo): """ Regression based on radial basis functions. """ LIBRARY = "scipy" ABBR = "RBF" EUCLIDEAN = "euclidean" MULTIQUADRIC = "multiquadric" INVERSE_MULTIQUADRIC = "inverse_multiquadric" GAUSSIAN = "gaussian" LINEAR = "linear" CUBIC = "cubic" QUINTIC = "quintic" THIN_PLATE = "thin_plate" AVAILABLE_FUNCTIONS = [ MULTIQUADRIC, INVERSE_MULTIQUADRIC, GAUSSIAN, LINEAR, CUBIC, QUINTIC, THIN_PLATE, ] def __init__( self, data, transformer=None, input_names=None, output_names=None, function=MULTIQUADRIC, der_function=None, epsilon=None, **parameters ): r"""Constructor. :param data: learning dataset :type data: Dataset :param transformer: transformation strategy for data groups. If None, do not transform data. Default: None. :type transformer: dict(str) :param input_names: names of the input variables. Default: None. :type input_names: list(str) :param output_names: names of the output variables. Default: None. :type output_names: list(str) :param function: radial basis function. Default: 'multiquadric'. :type function: str or callable :param der_function: derivative of radial basis function, only to be provided if function is callable and not str. The der_function should take three arguments (input_data, norm_input_data, eps). For a RBF of the form function(:math:`r`), der_function(:math:`x`, :math:`|x|`, :math:`\epsilon`) should return :math:`\epsilon^{-1} x/|x| f'(|x|/\epsilon)`. Default: None. :type der_function: callable :param epsilon: Adjustable constant for Gaussian or multiquadrics functions. Default: None. :type epsilon: float :param parameters: other RBF parameters (sklearn). """ if isinstance(function, string_types): function = str(function) super(RBFRegression, self).__init__( data, transformer=transformer, input_names=input_names, output_names=output_names, function=function, epsilon=epsilon, **parameters ) self.y_average = 0.0 self.der_function = der_function
[docs] class RBFDerivatives(object): r"""Derivatives of functions used in RBFRegression. For a RBF of the form :math:`f(r)`, :math:`r` scalar, the derivative functions are defined by :math:`d(f(r))/dx`, with :math:`r=|x|/\epsilon`. The functions are thus defined by :math:`df/dx = \epsilon^{-1} x/|x| f'(|x|/\epsilon)`. This convention is chosen to avoid division by :math:`|x|` when the terms may be cancelled out, as :math:`f'(r)` often has a term in :math:`r`. """ TOL = finfo(float).eps
[docs] @classmethod def der_multiquadric(cls, input_data, norm_input_data, eps): r"""Compute derivative w.r.t. :math:`x` of the function :math:`f(r) = \sqrt{r^2 + 1}`. :param float input_data: Input variable (vector). :param float norm_input_data: Norm of input variable. :return: Derivative of the function. :rtype: float """ return input_data / eps ** 2 / sqrt((norm_input_data / eps) ** 2 + 1)
[docs] @classmethod def der_inverse_multiquadric(cls, input_data, norm_input_data, eps): r"""Compute derivative w.r.t. :math:`x` of the function :math:`f(r) = 1/\sqrt{r^2 + 1}`. :param float input_data: Input variable (vector). :param float norm_input_data: Norm of input variable. :return: Derivative of the function. :rtype: float """ return -input_data / eps ** 2 / ((norm_input_data / eps) ** 2 + 1) ** 1.5
[docs] @classmethod def der_gaussian(cls, input_data, norm_input_data, eps): r"""Compute derivative w.r.t. :math:`x` of the function :math:`f(r) = \exp(-r^2)`. :param float input_data: Input variable (vector). :param float norm_input_data: Norm of input variable. :return: Derivative of the function. :rtype: float """ return -2 * input_data / eps ** 2 * exp(-((norm_input_data / eps) ** 2))
[docs] @classmethod def der_linear(cls, input_data, norm_input_data, eps): """Compute derivative w.r.t. :math:`x` of the function :math:`f(r) = r`. If :math:`x=0`, return 0 (determined up to a tolerance). :param float input_data: Input variable (vector). :param float norm_input_data: Norm of input variable. :return: Derivative of the function. :rtype: float """ return ( (norm_input_data > cls.TOL) * input_data / eps / (norm_input_data + cls.TOL) )
[docs] @classmethod def der_cubic(cls, input_data, norm_input_data, eps): """Compute derivative w.r.t. :math:`x` of the function :math:`f(r) = r^3`. :param float input_data: Input variable (vector) :math:`x`. :param float norm_input_data: Norm of input variable :math:`|x|`. :return: Derivative of the function. :rtype: float """ return 3 * norm_input_data * input_data / eps ** 3
[docs] @classmethod def der_quintic(cls, input_data, norm_input_data, eps): """Compute derivative w.r.t. :math:`x` of the function :math:`f(r) = r^5`. :param float input_data: Input variable (vector). :param float norm_input_data: Norm of input variable. :return: Derivative of the function. :rtype: float """ return 5 * norm_input_data ** 3 * input_data / eps ** 5
[docs] @classmethod def der_thin_plate(cls, input_data, norm_input_data, eps): r"""Compute derivative w.r.t. :math:`x` of the function :math:`f(r) = r^2 \log(r)`. If :math:`x=0`, return 0 (determined up to a tolerance). :param float input_data: Input variable (vector). :param float norm_input_data: Norm of input variable. :return: Derivative of the function. :rtype: float """ return ( (norm_input_data > cls.TOL) * input_data / eps ** 2 * (1 + 2 * log(norm_input_data / eps + cls.TOL)) )
def _fit(self, input_data, output_data): """Fit the regression model. :param ndarray input_data: input data (2D) :param ndarray output_data: output data (2D) """ self.y_average = average(output_data, axis=0) output_data -= self.y_average if PY3: args = list(input_data.T) + [output_data] self.algo = Rbf(*args, mode="N-D", smooth=0.0, **self.parameters) else: self.algo = [] for output in range(output_data.shape[1]): args = hstack([input_data, output_data[:, [output]]]) rbf = Rbf(*args.T, smooth=0.0, **self.parameters) self.algo.append(rbf) def _predict(self, input_data): """Predict output for given input data. :param ndarray input_data: input data (2D). :return: output prediction (2D). :rtype: ndarray """ if PY3: output_data = self.algo(*input_data.T) if len(output_data.shape) == 1: output_data = output_data[:, None] # n_outputs=1, rbf reduces output_data = output_data + self.y_average else: output_data = [rbf(*input_data.T) for rbf in self.algo] output_data = array(output_data).T + self.y_average return output_data def _predict_jacobian(self, input_data): """Predict Jacobian of the regression model for the given input data. :param ndarray input_data: input_data (2D). :return: Jacobian matrices (3D, one for each sample). :rtype: ndarray """ self._check_available_jacobian() der_func = self.der_function or getattr( self.RBFDerivatives, "der_{}".format(self.function) ) # predict_samples learn_samples # Dimensions : ( n_samples , n_outputs , n_inputs , n_learn_samples ) # input_data : ( n_samples , , n_inputs , ) # ref_points : ( , , n_inputs , n_learn_samples ) # nodes : ( , n_outputs , , n_learn_samples ) # jacobians : ( n_samples , n_outputs , n_inputs , ) if PY3: eps = self.algo.epsilon ref_points = self.algo.xi[None, None] nodes = self.algo.nodes.T[None, :, None] else: eps = [rbf.epsilon for rbf in self.algo] eps = array(eps)[None, :, None, None] # 1 epsilon for each output ref_points = self.algo[0].xi # same xi for all algos nodes = array([rbf.nodes for rbf in self.algo])[None, :, None] input_data = input_data[:, None, :, None] diffs = input_data - ref_points dists = norm(diffs, axis=2)[:, :, None] contributions = nodes * der_func(diffs, dists, eps=eps) jacobians = contributions.sum(-1) return jacobians def _check_available_jacobian(self): """ Check if the Jacobian is available for the given setup. """ if PY3: norm_name = self.algo.norm else: norm_name = self.algo[0].norm if norm_name != self.EUCLIDEAN: raise NotImplementedError( "Jacobian is only implemented for " "euclidean norm." ) if callable(self.function) and self.der_function is None: raise NotImplementedError( "No der_function is provided. Add " "der_function in RBFRegression " "constructor." ) def _save_algo(self, directory): """Save external machine learning algorithm. :param str directory: algorithm directory. """ if PY3: super(RBFRegression, self)._save_algo(directory) else: filename = join(directory, "algo.pkl") with open(filename, "wb") as handle: pickled_rbf = pickle.Pickler(handle, protocol=2) pickled_rbf_list = [] for rbf in self.algo: pickled_rbf_list.append({}) for key in rbf.__dict__.keys(): if key not in ["_function", "norm"]: pickled_rbf_list[-1][key] = rbf.__getattribute__(key) pickled_rbf.dump(pickled_rbf_list)
[docs] def load_algo(self, directory): """Load external machine learning algorithm. :param str directory: algorithm directory. """ if PY3: super(RBFRegression, self).load_algo(directory) else: filename = join(directory, "algo.pkl") self.algo = [] with open(filename, "rb") as handle: unpickled_rbf = pickle.Unpickler(handle) unpickled_rbf_list = unpickled_rbf.load() for rbf in unpickled_rbf_list: algo_i = Rbf( array([1, 2, 3]), array([10, 20, 30]), array([100, 200, 300]), function=rbf["function"], ) for key, value in rbf.items(): algo_i.__setattr__(key, value) self.algo.append(algo_i)
def _get_objects_to_save(self): """ Get objects to save. """ objects = super(RBFRegression, self)._get_objects_to_save() objects["y_average"] = self.y_average objects["der_function"] = self.der_function return objects @property def function(self): """Kernel function name. The name is possibly different from self.parameters['function'], as it is mapped (scipy). Examples: 'inverse' -> 'inverse_multiquadric' 'InverSE MULtiQuadRIC' -> 'inverse_multiquadric' return: Name of kernel function. rtype: str """ if PY3: function = self.algo.function else: function = self.algo[0].function return function
[docs] @classmethod def get_available_functions(cls): """ Get available RBFs. """ return cls.AVAILABLE_FUNCTIONS