# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Matthias De Lozzo
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""
Scalable disciplines from Tedford and Martins (2010)
****************************************************
"""
from __future__ import absolute_import, division, unicode_literals
from future import standard_library
from numpy import array, atleast_2d, diag, dot
from numpy import mean as npmean
from numpy import ndarray
from numpy import sum as npsum
from past.utils import old_div
from gemseo.core.discipline import MDODiscipline
from gemseo.problems.scalable.scalable_tm.variables import (
OBJECTIVE_NAME,
X_SHARED_NAME,
get_constraint_name,
get_coupling_name,
get_x_local_name,
)
standard_library.install_aliases()
from gemseo import LOGGER
[docs]class TMSystem(MDODiscipline):
r"""The system discipline from the scalable problem introduced by Tedford
and Martins (2010) takes the local design parameters
:math:`x_1,x_2,\ldots,x_N` and the global design parameters :math:`z`
as inputs, as well as the coupling variables :math:`y_1,y_2,\ldots,y_N`
and returns the objective function value :math:`f(x,y(x,y))` to minimize as
well as the inequality constraints ones
:math:`c_1(y_1),c_2(y_2),\ldots,c_N(y_N)` which are expressed as:
.. math::
f(z,y) = |z|_2^2 + \sum_{i=1}^N |y_i|_2^2
and:
.. math::
c_i(y_i) = 1- C_i^{-T}Iy_i
"""
def __init__(self, c_constraint, default_inputs):
"""Constructor
:param list(array) c_constraint: constraint coefficients
:param dict default_inputs: default inputs
"""
super(TMSystem, self).__init__("TM_System")
self.input_grammar.initialize_from_base_dict(default_inputs)
default_outputs = {}
default_outputs[OBJECTIVE_NAME] = array([0.0])
for index, value in enumerate(c_constraint):
tmp = array([0.0] * len(value))
default_outputs[get_constraint_name(index)] = tmp
self.output_grammar.initialize_from_base_dict(default_outputs)
self.default_inputs = default_inputs
self.re_exec_policy = self.RE_EXECUTE_DONE_POLICY
self.coefficients = c_constraint
self.n_disciplines = len(c_constraint)
def _run(self):
x_shared = self.get_inputs_by_name(X_SHARED_NAME)
coupling = [
self.get_inputs_by_name(get_coupling_name(index))
for index in range(self.n_disciplines)
]
obj = npmean(x_shared ** 2)
obj += npmean(
[npmean(coupling[index] ** 2) for index in range(self.n_disciplines)]
)
data = {}
data[OBJECTIVE_NAME] = array([obj])
for index in range(self.n_disciplines):
constraint = get_constraint_name(index)
tmp = old_div(coupling[index], self.coefficients[index])
data[constraint] = 1 - tmp
self.store_local_data(**data)
def _compute_jacobian(self, inputs=None, outputs=None):
"""
Computes the jacobian
:param inputs: linearization should be performed with respect
to inputs list. If None, linearization should
be performed wrt all inputs (Default value = None)
:param outputs: linearization should be performed on outputs list.
If None, linearization should be performed
on all outputs (Default value = None)
"""
self._init_jacobian(inputs, outputs, with_zeros=True)
x_shared = self.get_inputs_by_name(X_SHARED_NAME)
coupling = [
self.get_inputs_by_name(get_coupling_name(index))
for index in range(self.n_disciplines)
]
tmp = old_div(2 * x_shared, len(x_shared))
self.jac[OBJECTIVE_NAME][X_SHARED_NAME] = atleast_2d(tmp)
for index in range(self.n_disciplines):
tmp = old_div(2 * coupling[index], len(coupling[index]))
tmp /= self.n_disciplines
self.jac[OBJECTIVE_NAME][get_coupling_name(index)] = atleast_2d(tmp)
tmp = diag(old_div(-1.0, self.coefficients[index]))
constraint = get_constraint_name(index)
coupling_name = get_coupling_name(index)
self.jac[constraint][coupling_name] = atleast_2d(tmp)
[docs]class TMDiscipline(MDODiscipline):
r"""An elementary discipline from the scalable problem introduced by
Tedford and Martins (2010) takes local design parameters :math:`x_i`
and shared design parameters :math:`z` in input as well as coupling
variables :math:`\left(y_i\right)_{1\leq j \leq N\atop j\neq i}`
from :math:`N-1` elementary disciplines,
and returns the coupling variables:
.. math::
y_i =\frac{\tilde{y}_i+C_{z,i}.1+C_{x_i}.1}{\sum_{j=1 \atop j
\neq i}^NC_{y_j,i}.1+C_{z,i}.1+C_{x_i}.1} \in [0,1]^{n_{y_i}}
where:
.. math::
\tilde{y}_i = - C_{z,i}.z - C_{x_i}.x_i +
\sum_{j=1 \atop j \neq i}^N C_{y_j,i}.y_j
"""
def __init__(self, index, c_shared, c_local, c_coupling, default_inputs):
"""Constructor
:param int index: discipline index for naming.
:param array c_shared: weights for the shared design parameters.
:param array c_local: weights for the local design parameters.
:param dict(array) c_coupling: weights for the coupling parameters.
:param dict default_inputs: default inputs
"""
self.index = index
self.c_shared = c_shared
self.c_local = c_local
self.c_coupling = c_coupling
self.n_disciplines = len(c_coupling)
self._check_consistency(default_inputs)
super(TMDiscipline, self).__init__(name="TM_Discipline_" + str(index))
self.input_grammar.initialize_from_base_dict(default_inputs)
value = array([0.0] * self.c_shared.shape[0])
default_outputs = {get_coupling_name(index): value}
self.output_grammar.initialize_from_base_dict(default_outputs)
self.default_inputs = default_inputs
self.re_exec_policy = self.RE_EXECUTE_DONE_POLICY
def _check_consistency(self, default_inputs):
"""Check consistency of default inputs.
:param dict default_inputs: default inputs.
"""
assert isinstance(self.c_shared, ndarray)
assert len(self.c_shared.shape) == 2
assert self.c_shared.shape[1] == len(default_inputs[X_SHARED_NAME])
nout = self.c_shared.shape[0]
assert isinstance(self.c_local, ndarray)
assert len(self.c_local.shape) == 2
assert self.c_local.shape[0] == nout
xlocal = get_x_local_name(self.index)
assert self.c_local.shape[1] == len(default_inputs[xlocal])
assert isinstance(self.c_coupling, dict)
for key, val in self.c_coupling.items():
assert isinstance(val, ndarray)
assert len(val.shape) == 2
assert val.shape[0] == nout
assert val.shape[1] == len(default_inputs[key])
def _run(self):
index = self.index
x_shared = self.get_inputs_by_name(X_SHARED_NAME)
x_local = self.get_inputs_by_name(get_x_local_name(index))
coupling = {
cpl_name: self.get_inputs_by_name(cpl_name)
for cpl_name in list(self.c_coupling.keys())
}
data = -dot(self.c_shared, x_shared.reshape((-1, 1)))
data -= dot(self.c_local, x_local.reshape((-1, 1)))
cpl_sum = 0
for cpl_name, cpl_value in self.c_coupling.items():
data += dot(cpl_value, coupling[cpl_name].reshape((-1, 1)))
cpl_sum += npsum(self.c_coupling[cpl_name], 1)
data += npsum(self.c_shared, 1).reshape((-1, 1))
data += npsum(self.c_local, 1).reshape((-1, 1))
norm = cpl_sum.reshape((-1, 1))
norm += npsum(self.c_shared, 1).reshape((-1, 1))
norm += npsum(self.c_local, 1).reshape((-1, 1))
data /= norm
data = data.flatten()
self.store_local_data(**{get_coupling_name(index): data})
def _compute_jacobian(self, inputs=None, outputs=None):
"""
Computes the jacobian
:param inputs: linearization should be performed with respect
to inputs list. If None, linearization should
be performed wrt all inputs (Default value = None)
:param outputs: linearization should be performed on outputs list.
If None, linearization should be performed
on all outputs (Default value = None)
"""
self._init_jacobian(inputs, outputs, with_zeros=True)
other_indices = set(range(self.n_disciplines)) - set([self.index])
other_indices = list(other_indices)
cpl_sum = 0
for cpl_value in self.c_coupling.values():
cpl_sum += npsum(cpl_value, 1)
norm = cpl_sum.reshape((-1, 1))
norm += npsum(self.c_shared, 1).reshape((-1, 1))
norm += npsum(self.c_local, 1).reshape((-1, 1))
der = old_div(-self.c_local, norm)
self.jac[get_coupling_name(self.index)][get_x_local_name(self.index)] = der
der = old_div(-self.c_shared, norm)
self.jac[get_coupling_name(self.index)][X_SHARED_NAME] = der
for cpl_name, cpl_value in self.c_coupling.items():
self.jac[get_coupling_name(self.index)][cpl_name] = old_div(cpl_value, norm)