Source code for gemseo.core.analytic_discipline

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry,
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or
#                      initial documentation
#        :author:  Francois Gallard
Analytic MDODiscipline based on symbolic expressions
from __future__ import division, unicode_literals

import logging

from numpy import array, float64, heaviside, zeros
from six import string_types
from sympy import Expr, lambdify, symbols
from sympy.parsing.sympy_parser import parse_expr

from gemseo.core.discipline import MDODiscipline
from gemseo.utils.py23_compat import PY2

LOGGER = logging.getLogger(__name__)

[docs]class AnalyticDiscipline(MDODiscipline): """Discipline based on analytic expressions list, using the symbolic calculation SymPy engine. Automatically differentiates the expressions to obtain the Jacobian matrices. See also -------- gemseo.core.discipline.MDODiscipline : abstract class defining the key concept of discipline """ def __init__(self, name=None, expressions_dict=None, fast_evaluation=True): """Constructor. :param name: name of the discipline. :type name: str :param expressions_dict: dictionary of outputs and their expressions for instance : { 'y_1':'2*x**2', 'y_2':'4*x**2+5+z**3'} will create a discipline with outputs y_1, y_2 and inputs x, and z. Expressions may be passed as strings or SymPy expressions. :type expressions_dict: dict(str or sympy.Expr) :param fast_evaluation: if True then sympy.lambdify is applied to the expressions to accelerate their numerical evaluation (requires NumPy), otherwise the expressions are evaluated with sympy.Expr.evalf. :type fast_evaluation: bool """ super(AnalyticDiscipline, self).__init__(name) if not expressions_dict: raise ValueError("expressions_dict is a mandatory argument") self.expressions_dict = expressions_dict self.expr_symbols_dict = {} self.input_names = [] self._sympy_exprs = {} self._sympy_funcs = {} self._sympy_jac_exprs = {} self._sympy_jac_funcs = {} self._fast_evaluation = fast_evaluation self._init_expressions() self._init_grammars() self._init_default_inputs() self.re_exec_policy = self.RE_EXECUTE_DONE_POLICY def _init_grammars(self): """Initialize the |g| grammars from the expressions dictionary.""" zero = zeros(2) in_dict = {k: zero for k in self.input_names} self.input_grammar.initialize_from_base_dict(in_dict) out_dict = {k: zero for k in self.expressions_dict} self.output_grammar.initialize_from_base_dict(out_dict) def _init_expressions(self): """Parse the expressions (get SymPy expressions from string expressions) and their Jacobian expressions.""" input_symbols = [] for out_var, expr in self.expressions_dict.items(): if isinstance(expr, Expr): parsed = expr elif isinstance(expr, string_types): parsed = parse_expr(expr) else: raise TypeError("Expression must be a SymPy expression or a string") args = list(parsed.free_symbols) self._sympy_exprs[out_var] = parsed input_symbols += args args_names = [ for arg in args] self.expr_symbols_dict[out_var] = args_names self._sympy_jac_exprs[out_var] = {} for arg in args_names: jac_expr = parsed.diff(arg) self._sympy_jac_exprs[out_var][arg] = jac_expr self.input_names = sorted([ for symb in set(input_symbols)]) if self._fast_evaluation: self._lambdify_expressions() def _lambdify_expressions(self): """Lambdify the SymPy expressions.""" numpy_str = "numpy" if PY2: numpy_str = numpy_str.encode("ascii") modules = [numpy_str, {"Heaviside": lambda x: heaviside(x, 1)}] for out_var, expr in self._sympy_exprs.items(): args_names = self.expr_symbols_dict[out_var] args = symbols(args_names) self._sympy_funcs[out_var] = lambdify(args, expr) self._sympy_jac_funcs[out_var] = dict() for in_var in args_names: jac_expr = self._sympy_jac_exprs[out_var][in_var] self._sympy_jac_funcs[out_var][in_var] = lambdify( args, jac_expr, modules ) def _init_default_inputs(self): """Initialize the default inputs of the discipline with zeros.""" zeros_array = zeros(1) self.default_inputs = {k: zeros_array for k in self.input_names} def _run(self): """Run the discipline.""" outputs = {} # Do not pass useless tokens to the expr, this may # fail when tokens contains dots, or slow down the process filtered_inputs = {key: float(val.real) for key, val in self.local_data.items()} if self._fast_evaluation: for out_var, func in self._sympy_funcs.items(): args_names = self.expr_symbols_dict[out_var] args = (filtered_inputs[name] for name in args_names) out_val = func(*args) outputs[out_var] = array([out_val], dtype=float64) else: for out_var, expr in self._sympy_exprs.items(): try: out_val = expr.evalf(subs=filtered_inputs) outputs[out_var] = array([out_val], dtype=float64) except TypeError: LOGGER.error("Failed to evaluate expression : %s", str(expr)) LOGGER.error("With inputs : %s", str(self.local_data)) raise self.store_local_data(**outputs) def _compute_jacobian(self, inputs=None, outputs=None): """Compute the Jacobian. :param inputs: names of the differentiation inputs (Default value = None) :type inputs: list(str) :param outputs: names of the outputs to be differentiated (Default value = None) :type outputs: list(str) """ # otherwise there may be missing terms # if some formula have no dependency self._init_jacobian(inputs, outputs, with_zeros=True) filtered_inputs = {key: float(val.real) for key, val in self.local_data.items()} if self._fast_evaluation: for out_var, grad in self._sympy_jac_funcs.items(): args_names = self.expr_symbols_dict[out_var] for in_var, func in grad.items(): args = (filtered_inputs[name] for name in args_names) jac_val = func(*args) self.jac[out_var][in_var] = array([[jac_val]], dtype=float64) else: for out_var, expr in self._sympy_exprs.items(): for arg in expr.free_symbols: j_expr = self._sympy_jac_exprs[out_var][] jac_val = j_expr.evalf(subs=filtered_inputs) self.jac[out_var][] = array([[jac_val]], dtype=float64)