# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Matthias De Lozzo
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""Class for the parametric estimation of statistics from a dataset.
Overview
--------
The :class:`.ParametricStatistics` class inherits
from the abstract :class:`.BaseStatistics` class
and aims to estimate statistics from a :class:`.Dataset`,
based on candidate parametric distributions calibrated from this :class:`.Dataset`.
For each variable,
1. the parameters of these distributions are calibrated from the :class:`.Dataset`,
2. the fitted parametric :class:`.BaseDistribution` which is optimal
in the sense of a goodness-of-fit criterion and a selection criterion
is selected to estimate the statistics related to this variable.
The :class:`.ParametricStatistics` relies on the OpenTURNS library
through the :class:`.OTDistribution` and :class:`.OTDistributionFitter` classes.
Construction
------------
The :class:`.ParametricStatistics` is built from two mandatory arguments:
- a dataset,
- a list of distributions names,
and can consider optional arguments:
- a subset of variables names
(by default, statistics are computed for all variables),
- a fitting criterion name
(by default, BIC is used;
see :attr:`.FittingCriterion`
and :attr:`.SignificanceTest`
for more information),
- a level associated with the fitting criterion,
- a selection criterion:
- 'best':
select the distribution minimizing
(or maximizing, depending on the criterion)
the criterion,
- 'first':
select the first distribution
for which the criterion is greater
(or lower, depending on the criterion)
than the level,
- a name for the :class:`.ParametricStatistics` object
(by default, the name is the concatenation of 'ParametricStatistics'
and the name of the :class:`.Dataset`).
Capabilities
------------
By inheritance,
a :class:`.ParametricStatistics` object has
the same capabilities as :class:`.BaseStatistics`.
Additional ones are:
- :meth:`.get_fitting_matrix`:
this method displays the values of the fitting criterion
for the different variables
and candidate probability distributions
as well as the select probability distribution,
- :meth:`.plot_criteria`:
this method plots the criterion values for a given variable.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING
from typing import NamedTuple
from typing import Union
import matplotlib.pyplot as plt
from numpy import array
from numpy import linspace
from prettytable import PrettyTable
from gemseo.uncertainty.distributions.openturns.distribution import OTDistribution
from gemseo.uncertainty.distributions.openturns.fitting import MeasureType
from gemseo.uncertainty.distributions.openturns.fitting import OTDistributionFitter
from gemseo.uncertainty.statistics.base_statistics import BaseStatistics
from gemseo.uncertainty.statistics.tolerance_interval.distribution import (
BaseToleranceInterval,
)
from gemseo.uncertainty.statistics.tolerance_interval.factory import (
ToleranceIntervalFactory,
)
from gemseo.utils.matplotlib_figure import FigSizeType
from gemseo.utils.matplotlib_figure import save_show_figure
from gemseo.utils.string_tools import pretty_str
from gemseo.utils.string_tools import repr_variable
if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Mapping
from collections.abc import Sequence
from matplotlib.figure import Figure
from gemseo.datasets.dataset import Dataset
from gemseo.typing import RealArray
LOGGER = logging.getLogger(__name__)
DistributionType = dict[str, Union[str, OTDistribution]]
class _Distribution(NamedTuple):
"""A probability distribution."""
name: str
"""The name of the probability distribution."""
value: OTDistribution
"""The probability distribution."""
[docs]
class ParametricStatistics(BaseStatistics):
"""A toolbox to compute statistics based on probability distribution-fitting.
Unless otherwise stated,
the statistics are computed *variable-wise* and *component-wise*,
i.e. variable-by-variable and component-by-component.
So, for the sake of readability,
the methods named as :meth:`compute_statistic`
return ``dict[str, RealArray]`` objects
whose values are the names of the variables
and the values are the statistic estimated for the different component.
Examples:
>>> from gemseo import (
... create_discipline,
... create_parameter_space,
... create_scenario,
... )
>>> from gemseo.uncertainty.statistics.parametric_statistics import (
... ParametricStatistics,
... )
>>>
>>> expressions = {"y1": "x1+2*x2", "y2": "x1-3*x2"}
>>> discipline = create_discipline(
... "AnalyticDiscipline", expressions=expressions
... )
>>>
>>> parameter_space = create_parameter_space()
>>> parameter_space.add_random_variable(
... "x1", "OTUniformDistribution", minimum=-1, maximum=1
... )
>>> parameter_space.add_random_variable(
... "x2", "OTNormalDistribution", mu=0.5, sigma=2
... )
>>>
>>> scenario = create_scenario(
... [discipline],
... "y1",
... parameter_space,
... formulation_name="DisciplinaryOpt",
... scenario_type="DOE",
... )
>>> scenario.execute(algo_name="OT_MONTE_CARLO", n_samples=100)
>>>
>>> dataset = scenario.to_dataset(opt_naming=False)
>>>
>>> statistics = ParametricStatistics(
... dataset, ["Normal", "Uniform", "Triangular"]
... )
>>> fitting_matrix = statistics.get_fitting_matrix()
>>> mean = statistics.compute_mean()
"""
DistributionName = OTDistributionFitter.DistributionName
FittingCriterion = OTDistributionFitter.FittingCriterion
SignificanceTest = OTDistributionFitter.SignificanceTest
SelectionCriterion = OTDistributionFitter.SelectionCriterion
fitting_criterion: FittingCriterion
"""The goodness-of-fit criterion, measuring how the distribution fits the data."""
level: float
"""The test level used by the selection criteria that are significance tests.
In statistical hypothesis testing, the test level corresponds to the risk of
committing a type 1 error, that is an incorrect rejection of the null hypothesis
"""
selection_criterion: SelectionCriterion
"""The selection criterion to select a distribution from a list of candidates."""
distributions: dict[str, DistributionType | list[DistributionType]]
"""The probability distributions of the random variables.
When a random variable is a random vector, its probability distribution is expressed
as a list of marginal distributions. Otherwise, its probability distribution is
expressed as the unique marginal distribution.
"""
def __init__(
self,
dataset: Dataset,
distributions: Sequence[DistributionName],
variable_names: Iterable[str] = (),
fitting_criterion: FittingCriterion = FittingCriterion.BIC,
level: float = 0.05,
selection_criterion: SelectionCriterion = SelectionCriterion.BEST,
name: str = "",
) -> None:
"""
Args:
distributions: The names of the distributions.
fitting_criterion: The name of
the goodness-of-fit criterion,
measuring how the distribution fits the data.
Use :meth:`.ParametricStatistics.get_criteria`
to get the available criteria.
level: A test level,
i.e. the risk of committing a Type 1 error,
that is an incorrect rejection of a true null hypothesis,
for criteria based on test hypothesis.
selection_criterion: The selection criterion
to select a distribution from a list of candidates.
""" # noqa: D205,D212,D415
super().__init__(dataset, variable_names, name)
self.fitting_criterion = fitting_criterion
self.selection_criterion = selection_criterion
LOGGER.info("| Set goodness-of-fit criterion: %s.", fitting_criterion)
self.level = level
if self.fitting_criterion in self.SignificanceTest.__members__:
LOGGER.info("| Set significance level of hypothesis test: %s.", level)
self._all_distributions = self._fit_distributions(distributions)
self.__distributions = {}
self.distributions = self._select_best_distributions(distributions)
[docs]
def get_fitting_matrix(self) -> str:
"""Get the fitting matrix.
This matrix contains goodness-of-fit measures
for each pair < variable, distribution >.
Returns:
The printable fitting matrix.
"""
variables = sorted(self._all_distributions.keys())
distributions = list(self._all_distributions[variables[0]][0].keys())
table = PrettyTable(["Variable", *distributions, "Selection"])
for variable in variables:
for index in range(self.dataset.variable_names_to_n_components[variable]):
row = (
[variable]
+ [
str(self.get_criteria(variable, index)[0][distribution])
for distribution in distributions
]
+ [self.__distributions[variable][index].name]
)
table.add_row(row)
return str(table)
[docs]
def get_criteria(
self, variable: str, index: int = 0
) -> tuple[dict[str, float], bool]:
"""Get the value of the fitting criterion for the different distributions.
Args:
variable: The name of the variable.
index: The component of the variable.
Returns:
The value of the fitting criterion for the given variable name and component
and the different distributions,
as well as whether this fitting criterion is a statistical test
and so this value a p-value.
"""
distribution_names_to_criterion_values = {
name: result["criterion"]
for name, result in self._all_distributions[variable][index].items()
}
criterion_value_is_p_value = False
if self.fitting_criterion in self.SignificanceTest.__members__:
distribution_names_to_criterion_values = {
name: result[1]["p-value"]
for name, result in distribution_names_to_criterion_values.items()
}
criterion_value_is_p_value = True
return distribution_names_to_criterion_values, criterion_value_is_p_value
[docs]
def plot_criteria(
self,
variable: str,
title: str = "",
save: bool = False,
show: bool = True,
directory: str | Path = ".",
index: int = 0,
fig_size: FigSizeType = (6.4, 3.2),
) -> Figure:
"""Plot criteria for a given variable name.
Args:
variable: The name of the variable.
title: The title of the plot, if any.
save: If ``True``, save the plot on the disk.
show: If ``True``, show the plot.
directory: The directory path, either absolute or relative.
index: The index of the component of the variable.
fig_size: The width and height of the figure in inches, e.g. ``(w, h)``.
Raises:
ValueError: If the variable is missing from the dataset.
"""
if variable not in self.names:
msg = (
f"The variable '{variable}' is missing from the dataset; "
f"available ones are: {pretty_str(self.names)}."
)
raise ValueError(msg)
criteria, is_p_value = self.get_criteria(variable, index)
x_values = []
y_values = []
labels = []
x_value = 0
for distribution, criterion in criteria.items():
x_value += 1 # noqa: SIM113
x_values.append(x_value)
y_values.append(criterion)
labels.append(distribution)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=fig_size)
ax1.bar(x_values, y_values, tick_label=labels)
if is_p_value:
ax1.set_title(f"{self.fitting_criterion} (p-value)")
ax1.axhline(self.level, color="r", linewidth=2.0)
else:
ax1.set_title(self.fitting_criterion)
ax1.grid(True, "both")
ax1.set_box_aspect(1)
ax1.set_xlabel("Probability distributions")
data = self.dataset.get_view(variable_names=variable).to_numpy()
data_min = min(data)
data_max = max(data)
x_values = linspace(data_min, data_max, 1000)
distributions = self._all_distributions[variable][index]
ax2.hist(data, density=True)
for dist_name, dist_value in distributions.items():
pdf = dist_value["fitted_distribution"].distribution.computePDF
y_values = [pdf([x_value])[0] for x_value in x_values]
ax2.plot(x_values, y_values, label=dist_name, linewidth=2.0)
ax2.set_box_aspect(1)
ax2.legend()
ax2.grid(True, "both")
ax2.set_title("Probability density function")
ax2.set_xlabel(
repr_variable(
variable, index, self.dataset.variable_names_to_n_components[variable]
)
)
if title:
plt.suptitle(title)
save_show_figure(fig, show, Path(directory) / "criteria.pdf" if save else "")
return fig
def _select_best_distributions(
self, distribution_names: Sequence[DistributionName]
) -> dict[str, DistributionType | list[DistributionType]]:
"""Select the best distributions for the different variables.
Args:
distribution_names: The distribution names.
Returns:
The best distributions for the different variables.
"""
LOGGER.info("Select the best distribution for each variable.")
distributions = {}
select_from_measures = OTDistributionFitter.select_from_measures
for variable in self.names:
selected_distribution_names = []
marginal_distributions = []
for component, all_distributions in enumerate(
self._all_distributions[variable]
):
distribution_name = distribution_names[
select_from_measures(
[
all_distributions[distribution]["criterion"]
for distribution in distribution_names
],
self.fitting_criterion,
self.level,
self.selection_criterion,
)
]
best_dist = all_distributions[distribution_name]["fitted_distribution"]
selected_distribution_names.append(distribution_name)
marginal_distributions.append(best_dist)
LOGGER.info(
"| The best distribution for %s[%s] is %s.",
variable,
component,
best_dist,
)
self.__distributions[variable] = list(
map(_Distribution, selected_distribution_names, marginal_distributions)
)
if len(marginal_distributions) == 1:
distributions[variable] = self.__distributions[variable][0]
else:
distributions[variable] = self.__distributions[variable]
return distributions
def _fit_distributions(
self,
distributions: Iterable[DistributionName],
) -> dict[str, list[dict[str, dict[str, OTDistribution | MeasureType]]]]:
"""Fit different distributions for the different marginals.
Args:
distributions: The distributions names.
Returns:
The distributions for the different variables.
"""
LOGGER.info(
"Fit different distributions (%s) per variable "
"and compute the goodness-of-fit criterion.",
", ".join(distributions),
)
results = {}
for name in self.names:
LOGGER.info("| Fit different distributions for %s.", name)
dataset_values = self.dataset.get_view(variable_names=name).to_numpy()
size = self.dataset.variable_names_to_n_components[name]
results[name] = [
self._fit_marginal_distributions(
repr_variable(name, index, size), column, distributions
)
for index, column in enumerate(dataset_values.T)
]
return results
def _fit_marginal_distributions(
self,
variable: str,
sample: RealArray,
distributions: Iterable[DistributionName],
) -> dict[str, dict[str, OTDistribution | MeasureType]]:
"""Fit different distributions for a given dataset marginal.
Args:
variable: A variable name.
sample: A data array.
distributions: The names of the distributions.
Returns:
The distributions for the different variables.
"""
factory = OTDistributionFitter(variable, sample)
result = {}
for distribution in distributions:
fitted_distribution = factory.fit(distribution)
result[distribution] = {
"fitted_distribution": fitted_distribution,
"criterion": factory.compute_measure(
fitted_distribution, self.fitting_criterion, self.level
),
}
return result
[docs]
def compute_maximum(self) -> dict[str, RealArray]: # noqa: D102
return {
name: array([
distribution.value.math_upper_bound
for distribution in self.__distributions[name]
])
for name in self.names
}
[docs]
def compute_mean(self) -> dict[str, RealArray]: # noqa: D102
return {
name: array([
distribution.value.mean for distribution in self.__distributions[name]
])
for name in self.names
}
[docs]
def compute_minimum(self) -> dict[str, RealArray]: # noqa: D102
return {
name: array([
distribution.value.math_lower_bound
for distribution in self.__distributions[name]
])
for name in self.names
}
[docs]
def compute_probability( # noqa: D102
self, thresh: Mapping[str, float | RealArray], greater: bool = True
) -> dict[str, RealArray]:
func = lambda x: 1 - x if greater else x # noqa: E731
new_thresh = {}
for name, value in thresh.items():
if isinstance(value, float):
new_thresh[name] = [
value
] * self.dataset.variable_names_to_n_components[name]
elif len(value) == 1:
new_thresh[name] = [
value[0]
] * self.dataset.variable_names_to_n_components[name]
else:
new_thresh[name] = value
return {
name: array([
func(distribution.value.compute_cdf(new_thresh[name][index]))
for index, distribution in enumerate(self.__distributions[name])
])
for name in self.names
}
[docs]
def compute_joint_probability( # noqa: D102
self, thresh: Mapping[str, float | RealArray], greater: bool = True
) -> dict[str, float]:
raise NotImplementedError
[docs]
def compute_tolerance_interval( # noqa: D102
self,
coverage: float,
confidence: float = 0.95,
side: BaseToleranceInterval.ToleranceIntervalSide = BaseToleranceInterval.ToleranceIntervalSide.BOTH, # noqa:E501
) -> dict[str, list[BaseToleranceInterval.Bounds]]:
if not 0.0 <= coverage <= 1.0:
msg = "The argument 'coverage' must be a number in [0,1]."
raise ValueError(msg)
if not 0.0 <= confidence <= 1.0:
msg = "The argument 'confidence' must be a number in [0,1]."
raise ValueError(msg)
tolerance_interval_factory = ToleranceIntervalFactory()
return {
name: [
tolerance_interval_factory.get_class(distribution.name)(
self.n_samples,
*distribution.value.distribution.getParameter(),
).compute(coverage, confidence, side)
for distribution in self.__distributions[name]
]
for name in self.names
}
[docs]
def compute_quantile(self, prob: float) -> dict[str, RealArray]: # noqa: D102
prob = array([prob])
return {
name: array([
distribution.value.compute_inverse_cdf(prob)[0]
for distribution in self.__distributions[name]
])
for name in self.names
}
[docs]
def compute_standard_deviation(self) -> dict[str, RealArray]: # noqa: D102
return {
name: array([
distribution.value.standard_deviation
for distribution in self.__distributions[name]
])
for name in self.names
}
[docs]
def compute_variance(self) -> dict[str, RealArray]: # noqa: D102
return {
name: array([
distribution.value.standard_deviation**2
for distribution in self.__distributions[name]
])
for name in self.names
}
[docs]
def compute_moment(self, order: int) -> dict[str, RealArray]: # noqa: D102
return {
name: array([
distribution.value.distribution.getMoment(order)[0]
for distribution in self.__distributions[name]
])
for name in self.names
}
[docs]
def compute_range(self) -> dict[str, RealArray]: # noqa: D102
return {
name: array([
distribution.value.math_upper_bound
- distribution.value.math_lower_bound
for distribution in self.__distributions[name]
])
for name in self.names
}
[docs]
def plot(
self,
save: bool = False,
show: bool = True,
directory_path: str | Path = "",
file_format: str = "png",
) -> dict[str, Figure]:
"""Visualize the cumulative distribution and probability density functions.
Args:
save: Whether to save the figures.
show: Whether to show the figures.
directory_path: The path to save the figures.
file_format: The file extension.
Returns:
The cumulative distribution and probability density functions
for each variable.
"""
plots = {}
for name in self.names:
size = self.dataset.variable_names_to_n_components[name]
for index, distribution in enumerate(self.__distributions[name]):
plots[repr_variable(name, index, size)] = distribution.value.plot(
save=save,
show=show,
directory_path=directory_path,
file_extension=file_format,
)
return plots