# Source code for gemseo.formulations.idf

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or
#                        initial documentation
#        :author: Francois Gallard, Charlie Vanaret
#    OTHER AUTHORS   - MACROSCOPIC CHANGES

"""
The Individual Discipline Feasible formulation
**********************************************
"""
from __future__ import absolute_import, division, print_function, unicode_literals

from builtins import str, super

from future import standard_library
from numpy import abs as np_abs
from numpy import concatenate, eye, ones_like, zeros

from gemseo import LOGGER
from gemseo.core.chain import MDOParallelChain
from gemseo.core.coupling_structure import MDOCouplingStructure
from gemseo.core.execution_sequence import ExecutionSequenceFactory
from gemseo.core.formulation import MDOFormulation
from gemseo.core.function import MDOFunction
from gemseo.mda.mda_chain import MDAChain

standard_library.install_aliases()

[docs]class IDF(MDOFormulation):
"""The Individual Discipline Feasible formulation.

This formulation draws an optimization architecture where the coupling
equality constraints on the coupling variables at top level, the
optimization problem w.r.t. local, global design variables and coupling
variables is made at the top level. Disciplinary analysis is made at a each
optimization iteration. Multidisciplinary analysis is made at the optimum.
"""

def __init__(
self,
disciplines,
objective_name,
design_space,
maximize_objective=False,
normalize_constraints=True,
parallel_exec=False,
start_at_equilibrium=False,
):
"""
Constructor, initializes the objective functions and constraints

:param disciplines: the disciplines list.
:type disciplines: list(MDODiscipline)
:param objective_name: the objective function data name.
:type objective_name: str
:param design_space: the design space.
:type design_space: DesignSpace
:param maximize_objective: if True, the objective function
is maximized, by default, a minimization is performed.
:type maximize_objective: bool
:param normalize_constraints: if True, outputs of the coupling
consistency contraints are scaled.
:type normalize_constraints: bool
:param parallel_exec: if True, all constraints and objectives
are computed in parallel. At every iteration, all disciplines
are executed in parallel. Otherwise, a separate constraint is
created for each discipline with couplings.
:type parallel_exec: bool
:param use_threading: if True and parallel_exec=True,
the disciplines are run in parallel using multi-threading,
if False and parallel_exec=True, multi-processing is used.
:param start_at_equilibrium: if True, an MDA is used to initialize
coupling variables.
:type start_at_equilibrium: bool
"""
super(IDF, self).__init__(
disciplines,
objective_name,
design_space,
maximize_objective=maximize_objective,
)
if parallel_exec:
self._parallel_exec = MDOParallelChain(
)
else:
self._parallel_exec = None

self.coupling_structure = MDOCouplingStructure(disciplines)
self.all_couplings = self.coupling_structure.get_all_couplings()
self._update_design_space()
self.normalize_constraints = normalize_constraints
self._build_constraints()
self._build_objective_from_disc(objective_name)

if start_at_equilibrium:
self._compute_equilibrium()

def _compute_equilibrium(self):
"""Run an MDA to compute initial target couplings at equilibrium.

Values at equlibrium are set in the initial design space.
"""
current_x = self.design_space.get_current_x_dict()
# run MDA to initialize target coupling variables
mda = MDAChain(self.disciplines)
res = mda.execute(current_x)

for name in self.all_couplings:
value = res[name]
LOGGER.info(
"IDF: changing the initial value of %s " "from %s to %s (equilibrium)",
name,
str(current_x[name]),
str(value),
)
self.design_space.set_current_variable(name, value)

# Reset the number of evaluations
for disc in self.disciplines:
disc.n_calls = 0
disc.n_calls_linearize = 0

def _update_design_space(self):
"""Updates the design space with the required variables"""
strong_couplings = set(self.all_couplings)
variables_names = set(self.opt_problem.design_space.variables_names)
if not strong_couplings.issubset(variables_names):
missing = strong_couplings - variables_names
raise ValueError(
"IDF formulation needs coupling variables"
+ " as design variables, missing variables : "
+ str(missing)
)
self._set_defaultinputs_from_ds()

[docs]    def get_top_level_disc(self):
# All functions and constraints are built from the top level disc
# If we are in parallel mode: return the parallel execution
if self._parallel_exec is not None:
return [self._parallel_exec]
# Otherwise the disciplines are top level
return self.disciplines

def _get_normalization_factor(self, output_couplings):
"""Returns [abs(ub-lb)] for all outputs_couplings.

:param output_couplings: names of variables for normalization
"""
norm_fact = []
for output in output_couplings:
u_b = self.design_space.get_upper_bound(output)
l_b = self.design_space.get_lower_bound(output)
norm_fact.append(np_abs(u_b - l_b))
return concatenate(norm_fact)

def _generate_consistency_cstr(self, output_couplings):
"""Generates the consistency constraints for a discipline.

:param output_coupl: the output coupling variables
of the discipline
:param output_couplings: returns: mdo_c :
function pointer to consistency constraints
:returns: mdo_c : function pointer to consistency constraints

"""
coupl_func = self._get_function_from(output_couplings)
dv_names_of_disc = coupl_func.args

if self.normalize_constraints:
norm_fact = self._get_normalization_factor(output_couplings)
else:
norm_fact = 1.0

def coupl_min_x(x_vec):
"""Function to compute consistency constraints.

:param x: design variable vector
:param x_vec: returns: value of consistency constraints
(=0 if disciplines are at equilibrium)
:returns: value of consistency constraints
(=0 if disciplines are at equilibrium)
"""
coupl = coupl_func(x_vec)
if self.normalize_constraints:
return (coupl - x_sw) / norm_fact
return coupl - x_sw

def coupl_min_x_jac(x_vec):
"""Function to compute consistency constraints gradient.

:param x: design variable vector
:param x_vec: returns: gradient of consistency constraints
"""
coupl_jac = coupl_func.jac(x_vec)  # pylint: disable=E1102

if len(coupl_jac.shape) > 1:
# IN this case it is harder since a block diagonal
# matrix with -Id should be placed for each output
# coupling, at the right place
n_outs = coupl_jac.shape[0]
x_jac_2d = zeros((n_outs, len(x_vec)), dtype=x_vec.dtype)
x_names = self.get_optim_variables_names()
o_min = 0
o_max = 0
for out in output_couplings:
# self._reference_input_data[out].size
o_len = self._get_dv_length(out)
i_min = 0
i_max = 0
o_max += o_len
for x_i in x_names:
# self._reference_input_data[x_i].size
x_len = self._get_dv_length(x_i)
i_max += x_len
if x_i == out:
x_jac_2d[o_min:o_max, i_min:i_max] = eye(x_len)
i_min = i_max
o_min = o_max
x_jac = x_jac_2d
else:
# This is surprising but there is a duality between the masking
# operation in the function inputs and the unmasking of its
# outputs
if self.normalize_constraints:
return (coupl_jac - x_jac) / norm_fact[:, None]
return coupl_jac - x_jac

expr = ""
for out_c in output_couplings:
expr += out_c + "(" + ", ".join(dv_names_of_disc) + ") - "
expr += str(out_c) + "" + "\n"

name = coupl_func.name
return MDOFunction(
coupl_min_x,
name,
args=dv_names_of_disc,
expr=expr,
jac=coupl_min_x_jac,
outvars=coupl_func.outvars,
f_type=MDOFunction.TYPE_EQ,
)

def _build_constraints(self):
"""In IDF formulation, the consistency constraints are "y - y_copy = 0"
fills self._eq_constraints attributes
"""
# Building constraints per generator couplings
for discipline in self.disciplines:
couplings = self.coupling_structure.output_couplings(
discipline, strong=False
)
if couplings:
cstr = self._generate_consistency_cstr(couplings)

[docs]    def get_expected_workflow(self):
"""
Returns the expected execution sequence,
used for xdsm representation
See MDOFormulation.get_expected_workflow
"""
return ExecutionSequenceFactory.parallel(self.disciplines)

[docs]    def get_expected_dataflow(self):
"""
Returns the expected data exchange sequence,
used for xdsm representation
See MDOFormulation.get_expected_dataflow
"""
return []