# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Matthias De Lozzo
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""The scalable problem."""
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import ClassVar
from typing import Final
from numpy import concatenate
from numpy import eye
from numpy import full
from numpy import newaxis
from numpy import ones
from numpy import quantile
from numpy import vstack
from numpy import zeros
from numpy.linalg import inv
from numpy.random import default_rng
from gemseo.problems.mdo.scalable.parametric.core.default_settings import DEFAULT_D_0
from gemseo.problems.mdo.scalable.parametric.core.disciplines.main_discipline import (
MainDiscipline,
)
from gemseo.problems.mdo.scalable.parametric.core.disciplines.scalable_discipline import ( # noqa: E501
ScalableDiscipline,
)
from gemseo.problems.mdo.scalable.parametric.core.quadratic_programming_problem import (
QuadraticProgrammingProblem,
)
from gemseo.problems.mdo.scalable.parametric.core.scalable_design_space import (
ScalableDesignSpace,
)
from gemseo.problems.mdo.scalable.parametric.core.scalable_discipline_settings import (
DEFAULT_SCALABLE_DISCIPLINE_SETTINGS,
)
from gemseo.problems.mdo.scalable.parametric.core.scalable_discipline_settings import (
ScalableDisciplineSettings,
)
from gemseo.problems.mdo.scalable.parametric.core.variable_names import (
SHARED_DESIGN_VARIABLE_NAME,
)
from gemseo.problems.mdo.scalable.parametric.core.variable_names import (
get_constraint_name,
)
from gemseo.problems.mdo.scalable.parametric.core.variable_names import (
get_coupling_name,
)
from gemseo.problems.mdo.scalable.parametric.core.variable_names import get_u_local_name
from gemseo.problems.mdo.scalable.parametric.core.variable_names import get_x_local_name
from gemseo.utils.seeder import SEED
from gemseo.utils.string_tools import MultiLineString
if TYPE_CHECKING:
from collections.abc import Sequence
from gemseo.typing import RealArray
[docs]
class ScalableProblem:
r"""The scalable problem.
It builds a set of strongly coupled scalable disciplines completed by a system
discipline computing the objective function and the constraints.
These disciplines are defined on a unit design space, i.e. design variables belongs
to :math:`[0, 1]`.
"""
_MAIN_DISCIPLINE_CLASS: ClassVar[type] = MainDiscipline
"""The class of the main discipline."""
_SCALABLE_DISCIPLINE_CLASS: ClassVar[type] = ScalableDiscipline
"""The class of the scalable discipline."""
_DESIGN_SPACE_CLASS: ClassVar[type] = ScalableDesignSpace
"""The class of the design space."""
disciplines: list[_MAIN_DISCIPLINE_CLASS | _SCALABLE_DISCIPLINE_CLASS]
"""The disciplines."""
design_space: _DESIGN_SPACE_CLASS
"""The design space."""
qp_problem: QuadraticProgrammingProblem
"""The quadratic programming problem."""
__N_SAMPLES: Final[int] = 100000
"""The number of samples to estimate the quantile-based constraint threshold."""
__alpha: RealArray
r"""The matrix :math:`\alpha` to compute :math:`y=\alpha+\beta x."""
__beta: RealArray
r"""The matrix :math:`\beta` to compute :math:`y=\alpha+\beta x."""
def __init__(
self,
discipline_settings: Sequence[
ScalableDisciplineSettings
] = DEFAULT_SCALABLE_DISCIPLINE_SETTINGS,
d_0: int = DEFAULT_D_0,
add_random_variables: bool = False,
alpha: float = 0.50,
seed: int = SEED,
) -> None:
r"""
Args:
discipline_settings: The configurations
of the different scalable disciplines.
d_0: The size of the shared design variable :math:`x_0`.
add_random_variables: Whether to add a centered random variable :math:`u_i`
on the output of the :math:`i`-th scalable discipline.
alpha: The proportion of feasible design points.
seed: The seed for reproducibility.
""" # noqa: D205 D212
rng = default_rng(seed)
# The output sizes of the scalable disciplines.
p_i = [d.p_i for d in discipline_settings]
self._p = sum(p_i)
# The input sizes of the scalable disciplines.
d_i = [d.d_i for d in discipline_settings]
d = sum(d_i) + d_0
# Generate realizations of the random matrices.
# - a_i ~ U{[0,1], (p_i,)}
# - D_i0 ~ U{[0,1], (p_i, d_0)}
# - D_ii ~ U{[0,1], (p_i, d_i)}
# - C_ij ~ U{[0,1], (p_i, p_j)}
a_i = []
D_i0 = [] # noqa:N806
D_ii = [] # noqa:N806
C_ij = [] # noqa:N806
N = len(discipline_settings) # noqa:N806
all_discipline_indices = set(range(N))
for i, i_th_disc_settings in enumerate(discipline_settings):
other_discipline_indices = all_discipline_indices - {i}
p_i_ = i_th_disc_settings.p_i
D_i0.append(rng.random((p_i_, d_0)))
D_ii.append(rng.random((p_i_, i_th_disc_settings.d_i)))
C_ij.append({
get_coupling_name(j + 1): rng.random((p_i_, discipline_settings[j].p_i))
for j in other_discipline_indices
})
a_i.append(rng.random(p_i_))
# Define the matrix C and compute its inverse.
C = eye(self._p) # noqa: N806
row_start = 0
for i, p_i_ in enumerate(p_i):
C_ij_i = C_ij[i] # noqa: N806
col_start = 0
row_end = row_start + p_i_
for j, _p_j in enumerate(p_i):
col_end = col_start + _p_j
if j != i:
name = get_coupling_name(j + 1)
C[row_start:row_end, col_start:col_end] = -C_ij_i[name]
col_start = col_end
row_start = row_end
self._inv_C = inv(C) # noqa: N806
# Define the matrices \alpha and \beta.
D = zeros((self._p, d)) # noqa: N806
row_start = 0
col_start = d_0
for i, p_i_ in enumerate(p_i):
row_end = row_start + p_i_
col_end = col_start + d_i[i]
D[row_start:row_end, 0:d_0] = D_i0[i]
D[row_start:row_end, col_start:col_end] = D_ii[i]
row_start = row_end
col_start = col_end
self.__alpha = self._inv_C @ concatenate(a_i)
self.__beta = -self._inv_C @ D
q = quantile(
[
self.compute_y(x).min()
for x in rng.random((self.__N_SAMPLES, sum(d_i) + d_0))
],
1 - alpha,
)
t_i = [[q] * n for n in p_i]
# Define the QP problem with the matrices
# Q = 2(Q_{x_0}+\beta^T\beta)
# c = 2*\beta^T\alpha
# d = \alpha^T\alpha
# b = [-\beta,\diag(d),-\diag(d)]
Q_x0 = zeros((d, d)) # noqa: N806
Q_x0[0:d_0, 0:d_0] = eye(d_0)
self.qp_problem = QuadraticProgrammingProblem(
2 * (Q_x0 + self.__beta.T @ self.__beta),
(2 * self.__beta.T @ self.__alpha)[:, newaxis],
self.__alpha.T @ self.__alpha,
vstack([-self.__beta, eye(d), -eye(d)]),
concatenate([self.__alpha - concatenate(t_i), ones(d), zeros(d)]),
)
# Define the default values of the input variables.
default_input_values = {SHARED_DESIGN_VARIABLE_NAME: zeros(d_0) + 0.5}
for index, _discipline_settings in enumerate(discipline_settings):
discipline_index = index + 1
d_i = _discipline_settings.d_i
p_i = _discipline_settings.p_i
default_input_values[get_x_local_name(discipline_index)] = full(d_i, 0.5)
default_input_values[get_coupling_name(discipline_index)] = full(p_i, 0.5)
default_input_values[get_constraint_name(discipline_index)] = full(p_i, 0.5)
default_input_values[get_u_local_name(discipline_index)] = zeros(p_i)
# Instantiate the main discipline
names = [SHARED_DESIGN_VARIABLE_NAME]
names.extend([get_coupling_name(index) for index in range(1, N + 1)])
self.disciplines = [
self._MAIN_DISCIPLINE_CLASS(
*t_i,
**{k: v.copy() for k, v in default_input_values.items() if k in names},
)
]
# Instantiate the scalable disciplines
for discipline_index in range(1, N + 1):
names = [SHARED_DESIGN_VARIABLE_NAME, get_x_local_name(discipline_index)]
names.extend([
get_coupling_name(other_discipline_index)
for other_discipline_index in range(1, N + 1)
if other_discipline_index != discipline_index
])
if add_random_variables:
names.append(get_u_local_name(discipline_index))
self.disciplines.append(
self._SCALABLE_DISCIPLINE_CLASS(
discipline_index,
a_i[discipline_index - 1],
D_i0[discipline_index - 1],
D_ii[discipline_index - 1],
C_ij[discipline_index - 1],
**{
k: v.copy()
for k, v in default_input_values.items()
if k in names
},
)
)
self.design_space = self._DESIGN_SPACE_CLASS(discipline_settings, d_0)
[docs]
def compute_y(self, x: RealArray, u: RealArray | None = None) -> RealArray:
r"""Compute the coupling vector :math:`y`.
Args:
x: A design point.
u: An uncertain point, if any.
Returns:
The coupling vector associated with the design point :math:`x`
and the uncertain vector :math:`U` if any.
"""
y = self.__alpha + self.__beta @ x
if u is not None:
y += self._inv_C @ u
return y
@property
def main_discipline(self) -> _MAIN_DISCIPLINE_CLASS:
"""The main discipline."""
return self.disciplines[0]
@property
def scalable_disciplines(self) -> list[_SCALABLE_DISCIPLINE_CLASS]:
"""The scalable disciplines."""
return self.disciplines[1:]
@property
def __string_representation(self) -> MultiLineString:
"""The string representation of the scalable problem."""
mls = MultiLineString()
mls.add("Scalable problem")
mls.indent()
for discipline in self.disciplines:
mls.add(discipline.name)
mls.indent()
mls.add("Inputs")
mls.indent()
for name in discipline.input_names:
mls.add(f"{name} ({discipline.names_to_sizes[name]})")
mls.dedent()
mls.add("Outputs")
mls.indent()
for name in discipline.output_names:
mls.add(f"{name} ({discipline.names_to_sizes[name]})")
mls.dedent()
mls.dedent()
return mls
def __repr__(self) -> str:
return str(self.__string_representation)
def _repr_html_(self) -> str:
return self.__string_representation._repr_html_()