# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Francois Gallard, Matthias De Lozzo
# OTHER AUTHORS - MACROSCOPIC CHANGES
r"""Gaussian process regression model.
Overview
--------
The Gaussian process regression (GPR) model
expresses the model output as a weighted sum of kernel functions
centered on the learning input data:
.. math::
y = \mu
+ w_1\kappa(\|x-x_1\|;\epsilon)
+ w_2\kappa(\|x-x_2\|;\epsilon)
+ ...
+ w_N\kappa(\|x-x_N\|;\epsilon)
Details
-------
The GPR model relies on the assumption
that the original model :math:`f` to replace
is an instance of a Gaussian process (GP) with mean :math:`\mu`
and covariance :math:`\sigma^2\kappa(\|x-x'\|;\epsilon)`.
Then, the GP conditioned by the learning set
:math:`(x_i,y_i)_{1\leq i \leq N}`
is entirely defined by its expectation:
.. math::
\hat{f}(x) = \hat{\mu} + \hat{w}^T k(x)
and its covariance:
.. math::
\hat{c}(x,x') = \hat{\sigma}^2 - k(x)^T K^{-1} k(x')
where :math:`[\hat{\mu};\hat{w}]=([1_N~K]^T[1_N~K])^{-1}[1_N~K]^TY` with
:math:`K_{ij}=\kappa(\|x_i-x_j\|;\hat{\epsilon})`,
:math:`k_i(x)=\kappa(\|x-x_i\|;\hat{\epsilon})`
and :math:`Y_i=y_i`.
The correlation length vector :math:`\epsilon`
is estimated by numerical non-linear optimization.
Surrogate model
---------------
The expectation :math:`\hat{f}` is the surrogate model of :math:`f`.
Error measure
-------------
The standard deviation :math:`\hat{s}` is a local error measure
of :math:`\hat{f}`:
.. math::
\hat{s}(x):=\sqrt{\hat{c}(x,x)}
Interpolation or regression
---------------------------
The GPR model can be regressive or interpolative
according to the value of the nugget effect :math:`\alpha\geq 0`
which is a regularization term
applied to the correlation matrix :math:`K`.
When :math:`\alpha = 0`,
the surrogate model interpolates the learning data.
Dependence
----------
The GPR model relies on the GaussianProcessRegressor class
of the `scikit-learn library <https://scikit-learn.org/stable/modules/
generated/sklearn.gaussian_process.GaussianProcessRegressor.html>`_.
"""
from __future__ import annotations
import logging
from typing import Callable
from typing import ClassVar
from typing import Iterable
from typing import Mapping
from typing import Tuple
import sklearn.gaussian_process
from numpy import atleast_2d
from numpy import ndarray
from numpy import repeat
from sklearn.gaussian_process.kernels import Kernel
from gemseo.core.dataset import Dataset
from gemseo.mlearning.core.ml_algo import DataType
from gemseo.mlearning.core.ml_algo import TransformerType
from gemseo.mlearning.regression.regression import MLRegressionAlgo
from gemseo.utils.data_conversion import concatenate_dict_of_arrays_to_array
from gemseo.utils.python_compatibility import Final
LOGGER = logging.getLogger(__name__)
__Bounds = Tuple[float, float]
[docs]class GaussianProcessRegressor(MLRegressionAlgo):
"""Gaussian process regression model."""
SHORT_ALGO_NAME: ClassVar[str] = "GPR"
LIBRARY: Final[str] = "scikit-learn"
__DEFAULT_BOUNDS: Final[tuple[float, float]] = (0.01, 100.0)
def __init__(
self,
data: Dataset,
transformer: Mapping[str, TransformerType] | None = None,
input_names: Iterable[str] | None = None,
output_names: Iterable[str] | None = None,
kernel: Kernel | None = None,
bounds: __Bounds | Mapping[str, __Bounds] | None = None,
alpha: float | ndarray = 1e-10,
optimizer: str | Callable = "fmin_l_bfgs_b",
n_restarts_optimizer: int = 10,
random_state: int | None = None,
) -> None:
"""
Args:
kernel: The kernel specifying the covariance model.
If ``None``, use a Matérn(2.5).
bounds: The lower and upper bounds of the parameter length scales
when ``kernel`` is ``None``.
Either a unique lower-upper pair common to all the inputs
or lower-upper pairs for some of them.
When ``bounds`` is ``None`` or when an input has no pair,
the lower bound is 0.01 and the upper bound is 100.
alpha: The nugget effect to regularize the model.
optimizer: The optimization algorithm to find the parameter length scales.
n_restarts_optimizer: The number of restarts of the optimizer.
random_state: The seed used to initialize the centers.
If None, the random number generator is the RandomState instance
used by `numpy.random`.
"""
super().__init__(
data,
transformer=transformer,
input_names=input_names,
output_names=output_names,
kernel=kernel,
alpha=alpha,
optimizer=optimizer,
n_restarts_optimizer=n_restarts_optimizer,
random_state=random_state,
)
if kernel is None:
kernel = sklearn.gaussian_process.kernels.Matern(
(1.0,) * self._reduced_dimensions[0],
self.__compute_parameter_length_scale_bounds(bounds),
nu=2.5,
)
self.algo = sklearn.gaussian_process.GaussianProcessRegressor(
normalize_y=False,
kernel=kernel,
copy_X_train=True,
alpha=alpha,
optimizer=optimizer,
n_restarts_optimizer=n_restarts_optimizer,
random_state=random_state,
)
self.parameters["kernel"] = kernel.__class__.__name__
@property
def kernel(self): # (...) -> Kernel
"""The kernel used for prediction."""
if self.is_trained:
return self.algo.kernel_
else:
return self.algo.kernel
def __compute_parameter_length_scale_bounds(
self,
bounds: __Bounds | Mapping[str, __Bounds] | None,
) -> list[tuple[float, float]]:
"""Return the lower and upper bounds for the parameter length scales.
Args:
bounds: The lower and upper bounds of the parameter length scales.
Either a unique lower-upper pair common to all the inputs
or lower-upper pairs for some of them.
When an input has no pair,
the lower bound is 0.01 and the upper bound is 100.
Returns:
The lower and upper bounds of the parameter length scales.
"""
dimension = self._reduced_dimensions[0]
if bounds is None:
return [self.__DEFAULT_BOUNDS] * dimension
if isinstance(bounds, tuple):
return [bounds] * dimension
bounds_ = []
for name in self.input_names:
name_bounds = bounds.get(name, self.__DEFAULT_BOUNDS)
bounds_.extend([name_bounds] * self.sizes[name])
return bounds_
def _fit(
self,
input_data: ndarray,
output_data: ndarray,
) -> None:
self.algo.fit(input_data, output_data)
def _predict(
self,
input_data: ndarray,
) -> ndarray:
output_data = self.algo.predict(input_data)
if output_data.ndim == 1:
output_data = output_data[:, None]
return output_data
[docs] def predict_std(
self,
input_data: DataType,
) -> ndarray:
"""Predict the standard deviation from input data.
The user can specify these input data either as a NumPy array,
e.g. ``array([1., 2., 3.])``
or as a dictionary of NumPy arrays,
e.g. ``{'a': array([1.]), 'b': array([2., 3.])}``.
If the NumPy arrays are of dimension 2,
their i-th rows represent the input data of the i-th sample;
while if the NumPy arrays are of dimension 1,
there is a single sample.
Args:
input_data: The input data.
Returns:
The standard deviation at the query points.
Warning:
If the output variables are transformed before the training stage,
then the standard deviation is related to this transformed output space
unlike :meth:`.predict` which returns values in the original output space.
"""
if isinstance(input_data, Mapping):
input_data = concatenate_dict_of_arrays_to_array(
input_data, self.input_names
)
input_data = atleast_2d(input_data)
transformer = self.transformer.get(self.learning_set.INPUT_GROUP)
if transformer:
input_data = transformer.transform(input_data)
output_data = self.algo.predict(input_data, return_std=True)[1]
if output_data.ndim == 1:
output_data = repeat(output_data[:, None], self._reduced_dimensions[1], 1)
return output_data