Source code for gemseo.core.mdofunctions.mdo_discipline_adapter

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                        documentation
#        :author: Francois Gallard, Charlie Vanaret
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""A function computing some outputs of a discipline from some of its inputs."""

from __future__ import annotations

from numbers import Number
from typing import TYPE_CHECKING
from typing import Callable
from typing import Union

from numpy import array
from numpy import empty
from numpy import ndarray

from gemseo.core.mdofunctions.linear_candidate_function import LinearCandidateFunction
from gemseo.utils.compatibility.scipy import sparse_classes

if TYPE_CHECKING:
    from collections.abc import Mapping
    from collections.abc import MutableMapping
    from collections.abc import Sequence

    from gemseo.core.discipline import MDODiscipline
    from gemseo.typing import NumberArray

OperandType = Union[ndarray, Number]
OperatorType = Callable[[OperandType, OperandType], OperandType]


[docs] class MDODisciplineAdapter(LinearCandidateFunction): """An :class:`.MDOFunction` executing a discipline for some inputs and outputs.""" __linear_candidate: bool """Whether the final MDOFunction could be linear.""" __input_dimension: int | None """The input variable dimension, needed for linear candidates.""" def __init__( self, input_names: Sequence[str], output_names: Sequence[str], default_inputs: Mapping[str, ndarray] | None, discipline: MDODiscipline, names_to_sizes: MutableMapping[str, int] | None = None, linear_candidate: bool = False, ) -> None: """ Args: input_names: The names of the inputs. output_names: The names of the outputs. default_inputs: The default input values to overload the ones of the discipline at each evaluation of the outputs with :meth:`._fun` or their derivatives with :meth:`._jac`. If ``None``, do not overload them. discipline: The discipline to be adapted. names_to_sizes: The sizes of the input variables. If ``None``, determine them from the default inputs and local data of the discipline :class:`.MDODiscipline`. linear_candidate: Whether the final MDOFunction could be linear. """ # noqa: D205, D212, D415 self.__input_names = input_names self.__output_names = output_names self.__default_inputs = default_inputs if default_inputs is not None else {} self.__input_size = 0 self.__output_names_to_slices = {} self.__jacobian = array(()) self.__discipline = discipline self.__input_names_to_slices = {} self.__input_names_to_sizes = ( names_to_sizes if names_to_sizes is not None else {} ) self.__linear_candidate = linear_candidate self.__input_dimension = self.__compute_input_dimension( default_inputs, discipline, input_names ) super().__init__( self._func_to_wrap, jac=self._jac_to_wrap, name="_".join(self.__output_names), input_names=self.__input_names, output_names=self.__output_names, ) @property def linear_candidate(self) -> bool: # noqa: D102 return self.__linear_candidate @property def input_dimension(self) -> int | None: # noqa: D102 return self.__input_dimension def __compute_input_dimension( self, default_inputs: Mapping[str, ndarray] | None, discipline: MDODiscipline, input_names: Sequence[str], ) -> int | None: """Compute the input dimension. Args: default_inputs: : The default input values to overload the ones of the discipline at each evaluation of the outputs with :meth:`._fun` or their derivatives with :meth:`._jac`. If ``None``, do not overload them. discipline: The discipline to be adapted. input_names: The names of the inputs. Returns: The input dimension. """ if default_inputs and all(inpt in default_inputs for inpt in input_names): return sum([ len(default_inputs[inpt]) if isinstance(default_inputs[inpt], ndarray) else 1 for inpt in input_names ]) if len(self.__input_names_to_sizes) > 0: return sum(self.__input_names_to_sizes.values()) if all(inpt in discipline.default_inputs for inpt in input_names): return sum([ len(discipline.default_inputs[inpt]) if isinstance(discipline.default_inputs[inpt], ndarray) else 1 for inpt in input_names ]) return None def __create_output_names_to_slices(self) -> int: """Compute the indices of the input variables in the Jacobian array. Returns: The size of the inputs. """ self.__output_names_to_slices = output_names_to_slices = {} start = 0 output_size = 0 jac_row_id = self.__input_names[0] jac = self.__discipline.jac for name in self.__output_names: output_size += jac[name][jac_row_id].shape[0] output_names_to_slices[name] = slice(start, output_size) start = output_size return output_size def _func_to_wrap(self, x_vect: NumberArray) -> ndarray | Number: """Compute an output vector from an input one. Args: x_vect: The input vector. Returns: The output vector or a scalar if the vector has only one component. """ self.__discipline.reset_statuses_for_run() input_data = self.__create_discipline_input_data(x_vect) output_data = self.__discipline.execute(input_data) output_array = ( self.__discipline.output_grammar.data_converter.convert_data_to_array( self.__output_names, output_data, ) ) if output_array.size == 1: # The function is scalar. return output_array[0] return output_array def _jac_to_wrap(self, x_vect: NumberArray) -> NumberArray: """Compute the Jacobian value from an input vector. Args: x_vect: The input vector. Returns: The Jacobian value. """ self.__discipline.linearize(self.__create_discipline_input_data(x_vect)) if len(self.__jacobian) == 0: output_size = self.__create_output_names_to_slices() if output_size == 1: shape = self.__input_size else: shape = (output_size, self.__input_size) self.__jacobian = empty(shape) if self.__jacobian.ndim == 1 or self.__jacobian.shape[0] == 1: output_name = self.__output_names[0] jac_output = self.__discipline.jac[output_name] for input_name in self.__input_names: input_slice = self.__input_names_to_slices[input_name] jac = jac_output[input_name] # TODO: This precaution is meant to disappear when sparse 1-D array will # be available. This is also mandatory since self.__jacobian is # initialized as a dense array. if isinstance(jac, sparse_classes): first_row = jac.getrow(0).todense().flatten() else: first_row = jac[0, :] self.__jacobian[input_slice] = first_row else: for output_name in self.__output_names: output_slice = self.__output_names_to_slices[output_name] jac_output = self.__discipline.jac[output_name] for input_name in self.__input_names: input_slice = self.__input_names_to_slices[input_name] jac = jac_output[input_name] # TODO: This is mandatory since self.__jacobian is initialized as a # dense array. Performance improvement could be obtained if one is # able to infer the type of jac. if isinstance(jac, sparse_classes): jac = jac.toarray() self.__jacobian[output_slice, input_slice] = jac return self.__jacobian def __create_input_names_to_slices(self) -> None: """Create the map from discipline input names to input vector slices. Raises: ValueError: When a discipline input has no default value. """ input_data = self.__discipline.get_input_data() input_data.update(self.__discipline.default_inputs) missing_names = ( set(self.__input_names) .difference(self.__input_names_to_sizes.keys()) .difference(input_data.keys()) ) if missing_names: msg = ( f"The size of the input {','.join(missing_names)} cannot be guessed " f"from the discipline {self.__discipline.name}, " f"nor from its default inputs or from its local data." ) raise ValueError(msg) ( self.__input_names_to_slices, self.__input_size, ) = self.__discipline.input_grammar.data_converter.compute_names_to_slices( self.__input_names, input_data, self.__input_names_to_sizes ) def __create_discipline_input_data( self, x_vect: ndarray, ) -> dict[str, ndarray]: """Return the input data for the underlying discipline. The variables in the input data are cast according to the types defined in the design space. Args: x_vect: The input vector of the function. Returns: The input data of the underlying discipline. Raises: ValueError: When a discipline input has no default value. """ if self.__default_inputs: self.__discipline.default_inputs.update(self.__default_inputs) if not self.__input_names_to_slices: self.__create_input_names_to_slices() input_data = ( self.__discipline.input_grammar.data_converter.convert_array_to_data( x_vect, self.__input_names_to_slices ) ) variable_types = x_vect.dtype.metadata if variable_types is not None: # Restore the proper data types as declared in the design space. for name, type_ in variable_types.items(): input_data[name] = input_data[name].astype(type_, copy=False) return input_data