# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Matthias De Lozzo
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""
Parametric estimation of statistics from a dataset
==================================================
Overview
--------
The :class:`.ParametricStatistics` class inherits from the
abstract :class:`.Statistics` class and aims to estimate statistics
from a :class:`.Dataset`, based on a collection of
candidate parametric distribution calibrated from this :class:`.Dataset`.
For each variable, parameters of these distributions are calibrated
from the :class:`.Dataset`
and the fitted parametric :class:`.Distribution` which is optimal
in the sense of a goodness-of-fit criterion and a selection criterion
is selected to estimate :class:`.Statistics` associated with this variable.
The :class:`.ParametricStatistics` relies on the OpenTURNS library through
the :class:`.OTDistribution` and :class:`.OTDistributionFitter` classes.
Construction
------------
The :class:`.ParametricStatistics` is built from two mandatory arguments:
- a dataset,
- a list of distributions names,
and can consider optional arguments:
- a subset of variables names (by default, statistics are computed
for all variables),
- a fitting criterion name (by default, BIC is used;
see :meth:`.ParametricStatistics.get_available_criteria`
and :meth:`.ParametricStatistics.get_significance_tests`
for more information),
- a level associated with the fitting criterion,
- a selection criterion:
- 'best': select the distribution minimizing (or maximizing, depending
on the criterion) the criterion,
- 'first': Select the first distribution for which the criterion is
greater (or lower, depending on the criterion) than the level,
- a name for the :class:`.ParametricStatistics` object (by default,
the name is the concatenation of 'ParametricStatistics' and
and the name of the :class:`.Dataset`).
Capabilities
------------
By inheritance, a :class:`.ParametricStatistics` object has the
same capabilities as :class:`.Statistics`. Additional ones are:
- :meth:`.get_fitting_matrix`: this method shows the values
of the fitting criterion for the different variables and
candidate probability distributions
as well as the select probability distribution,
- :meth:`.plot_criteria`: this method plots the criterion values
for a given variable.
"""
from __future__ import absolute_import, division, unicode_literals
import os
import matplotlib.pyplot as plt
import openturns as ot
from future import standard_library
from numpy import array, exp, inf, linspace, log
from past.utils import old_div
from gemseo.third_party.prettytable.prettytable import PrettyTable
from gemseo.uncertainty.distributions.ot_dist import OTNormalDistribution
from gemseo.uncertainty.distributions.ot_fdist import OTDistributionFitter
from gemseo.uncertainty.statistics.statistics import Statistics
standard_library.install_aliases()
from gemseo import LOGGER
[docs]class ParametricStatistics(Statistics):
""" Parametric estimation of statistics. """
def __init__(
self,
dataset,
distributions,
variables_names=None,
fitting_criterion="BIC",
level=0.05,
selection_criterion="best",
name=None,
):
"""Constructor
:param Dataset dataset: dataset
:param list(str) distributions: list of distributions names
:param list(str) variables_names: list of variables names
or list of variables names. If None, the method considers
all variables from loaded dataset. Default: None.
:param str fitting_criterion: goodness-of-fit criterion.
Default: 'BIC'.
:param float level: risk of committing a Type 1 error,
that is an incorrect rejection of a true null hypothesis,
for criteria based on test hypothesis.
Default: 0.05.
:param str selection_criterion: selection criterion. Default: 'best'
:param str name: name of the object.
If None, use the concatenation of class and dataset names.
Default: None.
"""
super(ParametricStatistics, self).__init__(dataset, variables_names, name)
significance_tests = OTDistributionFitter.SIGNIFICANCE_TESTS
self.fitting_criterion = fitting_criterion
self.selection_criterion = selection_criterion
LOGGER.info("| Set goodness-of-fit criterion: %s.", fitting_criterion)
if self.fitting_criterion in significance_tests:
self.level = level
LOGGER.info("| Set significance level of hypothesis test: %s.", level)
else:
self.level = None
self.build_distributions(distributions)
[docs] def build_distributions(self, distributions):
"""Build distributions from a list of distributions names,
a test level and the stored dataset.
:param list(str) distributions: list of distributions names.
"""
self._all_distributions = self._fit_distributions(distributions)
self.distributions = self._select_best_distributions(distributions)
[docs] def get_fitting_matrix(self):
"""Get the fitting matrix. This matrix contains goodness-of-fit
measures for each pair < variable, distribution >."""
rownames = sorted(self._all_distributions.keys())
colnames = list(self._all_distributions[rownames[0]].keys())
table = PrettyTable(["Variable"] + colnames + ["Selection"])
for varname in rownames:
row, _ = self.get_criteria(varname)
row = [varname] + [row[distribution] for distribution in colnames]
row = row + [self.distributions[varname]["name"]]
table.add_row(row)
return str(table)
[docs] def get_criteria(self, varname):
"""Get criteria for a given variable name.
:param str varname: variable name.
"""
varname_dist = self._all_distributions[varname]
criteria = {
distribution: result["criterion"]
for distribution, result in varname_dist.items()
}
is_pvalue = False
significance_tests = OTDistributionFitter.SIGNIFICANCE_TESTS
if self.fitting_criterion in significance_tests:
criteria = {
distribution: result[1]["p-value"]
for distribution, result in criteria.items()
}
is_pvalue = True
return criteria, is_pvalue
[docs] def plot_criteria(
self, varname, title=None, save=False, show=True, n_legend_cols=4, directory="."
):
"""Plot criteria for a given variable name
:param str varname: name of the variable
:param str title: title. Default: None.
:param bool save: save the plot into a file. Default: False.
:param bool show: show the plot. Default: True.
:param int n_legend_cols: number of text columns in the upper legend.
Default: 4.
:param str directory: directory absolute or relative path.
Default: '.'.
"""
if varname not in self.names:
raise ValueError(
varname + " is not a variable of the dataset."
"Available ones are:" + ", ".join(self.names)
)
criteria, is_pvalue = self.get_criteria(varname)
xvals = []
yvals = []
labels = []
xval = 0
for distribution, criterion in criteria.items():
xval += 1
xvals.append(xval)
yvals.append(criterion)
labels.append(distribution)
plt.subplot(121)
plt.bar(xvals, yvals, tick_label=labels, align="center")
if is_pvalue:
plt.ylabel("p-value from " + self.fitting_criterion + " test")
plt.axhline(self.level, color="r", linewidth=2.0)
plt.grid(True, "both")
plt.subplot(122)
data = array(self.dataset[varname])
data_min = min(data)
data_max = max(data)
xvals = linspace(data_min, data_max, 1000)
distributions = self._all_distributions[varname]
try:
plt.hist(data, density=True)
except AttributeError:
plt.hist(data, normed=True)
for dist_name, dist_value in distributions.items():
pdf = dist_value["fitted_distribution"].distribution.computePDF
yvals = [pdf([xval])[0] for xval in xvals]
plt.plot(xvals, yvals, label=dist_name, linewidth=2.0)
plt.legend(
bbox_to_anchor=(0.0, 1.02, 1.0, 0.102),
loc="lower left",
ncol=n_legend_cols,
mode="expand",
borderaxespad=0.0,
)
plt.grid(True, "both")
if title is not None:
plt.suptitle(title)
filename = os.path.join(directory, "criteria.pdf")
if save:
plt.savefig(filename)
if show:
plt.show()
plt.close()
def _select_best_distributions(self, distributions_names):
""" Select the best distributions for the different variables."""
LOGGER.info("Select the best distribution for each variable.")
distributions = {}
for varname in self.names:
varname_dist = self._all_distributions[varname]
criteria = [
varname_dist[distribution]["criterion"]
for distribution in distributions_names
]
select_from_results = OTDistributionFitter.select_from_results
index = select_from_results(
criteria, self.fitting_criterion, self.level, self.selection_criterion
)
name = distributions_names[index]
value = varname_dist[name]["fitted_distribution"]
distributions[varname] = {"name": name, "value": value}
LOGGER.info("| The best distribution for %s is %s.", varname, value)
return distributions
def _fit_distributions(self, distributions):
"""Fit distributions for the different dataset marginals
among a given collection of distributions names.
:param list(str) distributions: list of distributions names
"""
dist_list = ", ".join(distributions)
LOGGER.info(
"Fit different distributions (%s) per variable "
"and compute the goodness-of-fit criterion.",
dist_list,
)
results = {}
for varname in self.names:
LOGGER.info("| Fit different distributions for %s.", varname)
dataset = self.dataset[varname]
results[varname] = self._fit_marginal_distributions(
varname, dataset, distributions
)
return results
def _fit_marginal_distributions(self, variable, sample, distributions):
"""Fit distributions for a given dataset marginal
among a given collection of distributions names
:param str variable: variable name
:param array dataset: sample
:param list(str) distributions: list of distributions names
"""
result = {}
factory = OTDistributionFitter(variable, sample)
for distribution in distributions:
fitted_distribution = factory.fit(distribution)
test_result = factory.measure(
fitted_distribution, self.fitting_criterion, self.level
)
result[distribution] = {}
result[distribution]["fitted_distribution"] = fitted_distribution
result[distribution]["criterion"] = test_result
return result
[docs] def maximum(self):
"""Get the maximum.
:return: maximum
"""
result = {
name: self.distributions[name]["value"].math_upper_bound
for name in self.names
}
return result
[docs] def mean(self):
"""Get the mean.
:return: mean
"""
result = {name: self.distributions[name]["value"].mean for name in self.names}
return result
[docs] def minimum(self):
"""Get the minimum.
:return: minimum
"""
result = {
name: self.distributions[name]["value"].math_lower_bound
for name in self.names
}
return result
[docs] def probability(self, thresh, greater=False):
"""Compute a probability associated to a threshold.
:param float thresh: threshold
:param bool greater: if True, compute the probability the probability
of exceeding the threshold, if False, compute the reverse.
Default: True.
:return: probability
"""
dist = self.distributions
if greater:
result = {
name: 1 - dist[name]["value"].cdf(thresh[name])[0]
for name in self.names
}
else:
result = {
name: dist[name]["value"].cdf(thresh[name])[0] for name in self.names
}
return result
[docs] def tolerance_interval(self, coverage, confidence=0.95, side="both"):
"""Compute the tolerance interval (TI) for a given minimum percentage
of the population and a given confidence level.
:param float coverage: minimum percentage of belonging to the TI.
:param float confidence: level of confidence in [0,1]. Default: 0.95.
:param str side: kind of interval: 'lower' for lower-sided TI,
'upper' for upper-sided TI and 'both for both-sided TI.
:return: tolerance limits
"""
if side not in ["upper", "lower", "both"]:
raise ValueError(
"The argument 'side' represents the type"
"of tolerance bounds. Available ones are: 'upper'"
"'lower' and 'both'."
)
if not 0.0 <= coverage <= 1.0:
raise ValueError("The argument 'coverage'" " must be number in [0,1].")
if not 0.0 <= confidence <= 1.0:
raise ValueError("The argument 'confidence'" " must be number in [0,1].")
limits = {}
for varname in self.names:
dist_name = self.distributions[varname]["name"]
dist = self.distributions[varname]["value"]
if dist_name == "Normal":
lower, upper = self._normal_tolerance_interval(
dist, coverage, confidence, side
)
elif dist_name == "Uniform":
lower, upper = self._uniform_tolerance_interval(
dist, coverage, confidence, side
)
elif dist_name == "LogNormal":
lower, upper = self._lognormal_tolerance_interval(
dist, coverage, confidence, side
)
elif dist_name == "WeibullMin":
lower, upper = self._weibull_tolerance_interval(
dist, coverage, confidence, side
)
elif dist_name == "Exponential":
lower, upper = self._exponential_tolerance_interval(
dist, coverage, confidence, side
)
else:
raise ValueError(
"Tolerance interval is not implemented for"
'distribution "' + dist_name + '".'
)
limits[varname] = (lower, upper)
return limits
def _weibull_tolerance_interval(
self, dist, coverage, confidence=0.95, side="lower"
):
"""Compute the tolerance interval (TI) for a given minimum percentage
of the population and a given confidence level, when data are
Weibull distributed
:param WeibullMin dist: OT WeibullMin distribution
:param float coverage: minimum percentage of belonging to the TI.
:param float confidence: level of confidence in [0,1]. Default: 0.95.
:param str side: kind of interval: 'lower' for one-sided lower TI,
'upper' for one-sided upper TI or 'both' for two-sided TI.
:return: tolerance limits
"""
alpha = dist.marginals[0].getParameter()[0]
beta = dist.marginals[0].getParameter()[1]
gamma = dist.marginals[0].getParameter()[2]
x_i = log(beta)
delta = 1.0 / alpha
if side == "upper":
lbd = log(-log(1 - coverage))
offset = -self.n_samples ** 0.5 * lbd
dof = self.n_samples - 1
student = ot.Student(dof, offset)
upper = delta * student.computeQuantile(confidence)[0]
upper /= (self.n_samples - 1) ** 0.5
upper = x_i - upper
lower = -inf
elif side == "lower":
lbd = log(-log(coverage))
offset = -self.n_samples ** 0.5 * lbd
dof = self.n_samples - 1
student = ot.Student(dof, offset)
lower = delta * student.computeQuantile(1 - confidence)[0]
lower /= (self.n_samples - 1) ** 0.5
lower = x_i - lower
upper = inf
else:
coverage = (coverage + 1) / 2.0
alpha = (1 - confidence) / 2.0
lbd = log(-log(1 - coverage))
offset = -self.n_samples ** 0.5 * lbd
dof = self.n_samples - 1
student = ot.Student(dof, offset)
upper = delta * student.computeQuantile(1 - alpha)[0]
upper /= (self.n_samples - 1) ** 0.5
upper = x_i - upper
lbd = log(-log(coverage))
offset = -self.n_samples ** 0.5 * lbd
dof = self.n_samples - 1
student = ot.Student(dof, offset)
lower = delta * student.computeQuantile(alpha)[0]
lower /= (self.n_samples - 1) ** 0.5
lower = x_i - lower
limits = (array([exp(lower) + gamma]), array([exp(upper) + gamma]))
return limits
def _uniform_tolerance_interval(
self, dist, coverage, confidence=0.95, side="lower"
):
"""Compute the tolerance interval (TI) for a given minimum percentage
of the population and a given confidence level, when data are
uniformly distributed
:param Distribution dist: OT uniform distribution
:param float coverage: minimum percentage of belonging to the TI.
:param float confidence: level of confidence in [0,1]. Default: 0.95.
:param str side: kind of interval: 'lower' for one-sided lower TI,
'upper' for one-sided upper TI or 'both' for two-sided TI.
:return: tolerance limits
"""
minimum = dist.marginals[0].getParameter()[0]
maximum = dist.marginals[0].getParameter()[1]
if side == "upper":
upper = (maximum - minimum) * coverage
upper /= (1 - confidence) ** (1.0 / self.n_samples)
upper += minimum
limits = (array([-inf]), array([upper]))
elif side == "lower":
lower = (maximum - minimum) * (1 - coverage)
lower /= confidence ** (1.0 / self.n_samples)
lower += minimum
limits = (array([lower]), array([inf]))
else:
upper = (maximum - minimum) * (coverage + 1) / 2.0
upper /= ((1 - confidence) / 2.0) ** (1.0 / self.n_samples)
upper += minimum
lower = (maximum - minimum) * (1 - (coverage + 1) / 2.0)
lower /= (1 - (1 - confidence) / 2.0) ** (1.0 / self.n_samples)
lower += minimum
limits = (array([lower]), array([upper]))
return limits
def _exponential_tolerance_interval(
self, dist, coverage, confidence=0.95, side="lower"
):
"""Compute the tolerance interval (TI) for a given minimum percentage
of the population and a given confidence level, when data are
exponentially distributed
:param Exponential dist: OT exponential distribution
:param float coverage: minimum percentage of belonging to the TI.
:param float confidence: level of confidence in [0,1]. Default: 0.95.
:param str side: kind of interval: 'lower' for one-sided lower TI,
'upper' for one-sided upper TI or 'both' for two-sided TI.
:return: tolerance limits
"""
lmbda = dist.marginals[0].getParameter()[0]
gamma = dist.marginals[0].getParameter()[1]
if side == "upper":
chisq = ot.ChiSquare(2 * self.n_samples)
chisq = chisq.computeQuantile(confidence)[0]
upper = -2 * self.n_samples * log(coverage) * lmbda
upper /= chisq
lower = 0.0
elif side == "lower":
chisq = ot.ChiSquare(2 * self.n_samples)
chisq = chisq.computeQuantile(confidence)[0]
lower = -2 * self.n_samples * log(1 - coverage) * lmbda
lower /= chisq
upper = inf
else:
coverage = (coverage + 1) / 2.0
alpha = (1 - confidence) / 2.0
chisq = ot.ChiSquare(2 * self.n_samples)
chisq = chisq.computeQuantile(1 - alpha)[0]
upper = -2 * self.n_samples * log(1 - coverage) * lmbda
upper /= chisq
lower = -2 * self.n_samples * log(coverage) * lmbda
lower /= chisq
limits = (array([lower + gamma]), array([upper + gamma]))
return limits
def _normal_tolerance_interval(self, dist, coverage, confidence=0.95, side="lower"):
"""Compute the tolerance interval (TI) for a given minimum percentage
of the population and a given confidence level, when data are
normally distributed
:param Normal dist: OT normal distribution
:param float coverage: minimum percentage of belonging to the TI.
:param float confidence: level of confidence in [0,1]. Default: 0.95.
:param str side: kind of interval: 'lower' for one-sided lower TI,
'upper' for one-sided upper TI or 'both' for two-sided TI.
:return: tolerance limits
"""
mean = dist.marginals[0].getParameter()[0]
std = dist.marginals[0].getParameter()[1]
if side in ["upper", "lower"]:
z_p = ot.Normal().computeQuantile(coverage)[0]
delta = z_p * self.n_samples ** 0.5
dof = self.n_samples - 1
student = ot.Student(dof, delta)
student_quantile = student.computeQuantile(confidence)[0]
tolerance_factor = old_div(student_quantile, self.n_samples ** 0.5)
if side == "upper":
upper = mean + tolerance_factor * std
limits = (array([-inf]), array([upper]))
else:
lower = mean - tolerance_factor * std
limits = (array([lower]), array([inf]))
else:
z_p = ot.Normal().computeQuantile((1 + coverage) / 2.0)[0]
left = (1 + 1.0 / self.n_samples) ** 0.5 * z_p
chisq = ot.ChiSquare(self.n_samples - 1)
right = old_div(
(self.n_samples - 1), chisq.computeQuantile(1 - confidence)[0]
)
right = right ** 0.5
weight = self.n_samples - 3 - chisq.computeQuantile(1 - confidence)[0]
weight /= 2 * (self.n_samples + 1) ** 2
weight = (1 + weight) ** 0.5
tolerance_factor = left * right * weight
lower = mean - tolerance_factor * std
upper = mean + tolerance_factor * std
limits = (array([lower]), array([upper]))
return limits
def _lognormal_tolerance_interval(
self, dist, coverage, confidence=0.95, side="lower"
):
"""Compute the tolerance interval (TI) for a given minimum percentage
of the population and a given confidence level, when data are
normally distributed
:param LogNormal dist: OT LogNormal distribution
:param float coverage: minimum percentage of belonging to the TI.
:param float confidence: level of confidence in [0,1]. Default: 0.95.
:param str side: kind of interval: 'lower' for one-sided lower TI,
'upper' for one-sided upper TI or 'both' for two-sided TI.
:return: tolerance limits
"""
dist = OTNormalDistribution(
"x",
dist.marginals[0].getParameter()[0],
dist.marginals[0].getParameter()[1],
)
lower, upper = self._normal_tolerance_interval(dist, coverage, confidence, side)
limits = (exp(lower), exp(upper))
return limits
[docs] def quantile(self, prob):
"""Get the quantile associated to a given probability.
:param float prob: probability
:return: quantile
:rtype: float or list(float)
"""
prob = array([prob])
result = {
name: self.distributions[name]["value"].inverse_cdf(prob)
for name in self.names
}
return result
[docs] def standard_deviation(self):
"""Get the standard deviation.
:return: standard deviation
:rtype: float or list(float)
"""
result = {
name: self.distributions[name]["value"].standard_deviation
for name in self.names
}
return result
[docs] def variance(self):
"""Get the variance.
:return: variance
:rtype: float or list(float)
"""
result = {
name: self.distributions[name]["value"].standard_deviation ** 2
for name in self.names
}
return result
[docs] def moment(self, order):
"""Compute the moment for a given order, either centered or not.
:param int order: moment index
:return: moment
:rtype: float or list(float)
"""
dist = self.distributions
result = [
dist[varname]["value"].distribution.getMoment(order)[0]
for varname in self.names
]
return result
[docs] def range(self):
"""Get the range of variables.
:return: range of variables
"""
result = {}
for name in self.names:
dist = self.distributions[name]["value"]
result[name] = dist.math_upper_bound - dist.math_lower_bound
return result
[docs] @classmethod
def get_available_distributions(cls):
""" Get available distributions. """
return sorted(OTDistributionFitter.AVAILABLE_FACTORIES.keys())
[docs] @classmethod
def get_available_criteria(cls):
""" Get available goodness-of-fit criteria. """
return sorted(OTDistributionFitter.AVAILABLE_FITTING_TESTS.keys())
[docs] @classmethod
def get_significance_tests(cls):
""" Get significance tests. """
return sorted(OTDistributionFitter.SIGNIFICANCE_TESTS)