# Source code for gemseo.uncertainty.distributions.openturns.composed

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                           documentation
#        :author: Matthias De Lozzo
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""The OpenTURNS-based joint probability distribution."""

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import Callable

import openturns as ots

if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Sequence

from gemseo.uncertainty.distributions.openturns.distribution import OTDistribution

from numpy import array
from numpy import ndarray

from gemseo.uncertainty.distributions.composed import ComposedDistribution

[docs]
class OTComposedDistribution(ComposedDistribution):
"""The OpenTURNS-based joint probability distribution."""

def __init__(  # noqa: D107
self,
distributions: Sequence[OTDistribution],
copula: ots.Distribution | None = None,
variable: str = "",
) -> None:
super().__init__(distributions, copula=copula, variable=variable)
marginals = [
marginal
for distribution in distributions
for marginal in distribution.marginals
]
if copula is None:
copula = ots.IndependentCopula(len(marginals))
self.distribution = ots.ComposedDistribution(marginals, copula)
self._mapping = {}
index = 0
for distribution_index, distribution in enumerate(distributions):
for marginal_index in range(distribution.dimension):
self._mapping[index] = (distribution_index, marginal_index)
index += 1

self._set_bounds(distributions)

[docs]
def compute_samples(  # noqa: D102
self,
n_samples: int = 1,
) -> ndarray:
return array(self.distribution.getSample(n_samples))

[docs]
def compute_cdf(  # noqa: D102
self,
vector: Iterable[float],
) -> ndarray:
tmp = []
for index, value in enumerate(vector):
id1 = self._mapping[index][0]
id2 = self._mapping[index][1]
value = ots.Point([value])
tmp.append(self.marginals[id1].marginals[id2].computeCDF(value))
return array(tmp)

[docs]
def compute_inverse_cdf(  # noqa: D102
self,
vector: ndarray,
) -> Iterable[float]:
tmp = []
for index, value in enumerate(vector):
id1 = self._mapping[index][0]
id2 = self._mapping[index][1]
tmp.append(self.marginals[id1].marginals[id2].computeQuantile(value)[0])
return array(tmp)

def _pdf(  # noqa: D102
self,
index: int,
) -> Callable:
id1 = self._mapping[index][0]
id2 = self._mapping[index][1]

def pdf(
point: float,
) -> float:
"""Probability Density Function (PDF).

Args:
point: An evaluation point.

Returns:
The PDF value at the evaluation point.
"""
return self.marginals[id1].marginals[id2].computePDF(point)

return pdf

def _cdf(  # noqa: D102
self,
index: int,
) -> Callable:
id1 = self._mapping[index][0]
id2 = self._mapping[index][1]

def cdf(
level: float,
) -> float:
"""Cumulative Density Function (CDF).

Args:
level: A probability level.

Returns:
The CDF value for the probability level.
"""
return self.marginals[id1].marginals[id2].computeCDF(level)

return cdf