Source code for gemseo.core.grammars.simple_grammar

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""Most basic grammar implementation."""
import logging
from collections import defaultdict
from typing import Any, Iterable, List, Mapping, Optional, Sequence, Union

from numpy import ndarray
from six import text_type

from gemseo.core.grammars.abstract_grammar import AbstractGrammar
from gemseo.core.grammars.errors import InvalidDataException
from gemseo.utils.py23_compat import Path
from gemseo.utils.string_tools import MultiLineString

LOGGER = logging.getLogger(__name__)


[docs]class SimpleGrammar(AbstractGrammar): """A grammar based on the names and types of the elements specified by a dictionary.""" def __init__( self, name, # type: str names_to_types=None, # type: Optional[Mapping[str,type]] required_names=None, # type: Optional[Mapping[str,bool]] **kwargs # type: Union[str,Path] ): # type: (...) -> None """ Args: name: The grammar name. names_to_types: The mapping defining the data names as keys, and data types as values. If None, the grammar is empty. required_names: The mapping defining the required data names as keys, bound to whether the data name is required. If None, all data names are required. """ super(SimpleGrammar, self).__init__(name) if names_to_types is None: self._names_to_types = {} else: self._names_to_types = names_to_types self._check_types() self._default_callable = ( lambda: True ) # Callable to be assigned to defaultdict.default_factory at init self._required_names = defaultdict(self._default_callable) self._required_names.update(self._names_to_types) if required_names is not None: self._required_names.update(required_names)
[docs] def is_required( self, element_name # type: str ): # type: (...) -> bool self._required_names.default_factory = None try: return self._required_names[element_name] except KeyError: raise ValueError("Element {} is not in the grammar.".format(element_name)) finally: self._required_names.default_factory = self._default_callable
[docs] def update_required_elements( self, **elements # type: Mapping[str, bool] ): # type: (...) -> None for element_name, element_value in elements.items(): if element_name not in self._names_to_types: raise KeyError( "Data named {} is not in the grammar.".format(element_name) ) if not isinstance(element_value, bool): raise TypeError( "Boolean is required for element {}.".format(element_name) ) self._required_names.update(elements)
@property def data_names(self): # type: (...) -> List[str] """The names of the elements.""" return list(self._names_to_types.keys()) @property def data_types(self): # type: (...) -> List[type] """The types of the elements.""" return list(self._names_to_types.values()) def _check_types(self): # type: (...) -> None """Check that the elements names to types mapping contains only acceptable type specifications, ie, are a type or None. Raises: TypeError: When at least one type specification is not a type. """ for obj_name, obj in self._names_to_types.items(): if obj is not None and not isinstance(obj, type): raise TypeError( ( "{} is not a type and cannot be used as a" " type specification for the element named {} in the grammar {}." ).format(obj, obj_name, self.name) )
[docs] def get_type_from_python_type( self, python_type # type: type ): # type: (...) -> type if python_type == str: return text_type else: return python_type
[docs] def update_elements( self, python_typing=False, # type: bool **elements # type: Mapping[str,type] ): # type: (...) -> None if python_typing: for element_name, element_value in elements.items(): elements[element_name] = self.get_type_from_python_type(element_value) self._names_to_types.update(**elements) self._check_types()
[docs] def load_data( self, data, # type: Mapping[str,Any] raise_exception=True, # type: bool ): # type: (...) -> Mapping[str,Any] self.check(data, raise_exception) return data
[docs] def check( self, data, # type: Mapping[str,Any] raise_exception=True, # type: bool ): # type: (...) -> None """Check the consistency (name and type) of elements with the grammar. Args: data: The elements to be checked. raise_exception: Whether to raise an exception when the elements are invalid. Raises: TypeError: If a data type in the grammar is not a type. InvalidDataException: * If the passed data is not a dictionary. * If a name in the passed data is not in the grammar. * If the type of a value in the passed data does not have the specified type in the grammar for the corresponding name. """ failed = False if not isinstance(data, Mapping): failed = True LOGGER.error("Grammar data is not a mapping, in %s.", self.name) if raise_exception: raise InvalidDataException("Invalid data in: {}.".format(self.name)) error_message = MultiLineString() error_message.add("Invalid data in {}", self.name) for element_name, element_type in self._names_to_types.items(): if element_name not in data and self._required_names[element_name]: failed = True error_message.add( "Missing mandatory elements: {} in grammar {}".format( element_name, self.name ) ) elif ( element_name in data and element_type is not None and not isinstance(data.get(element_name), element_type) ): failed = True error_message.add( "Wrong input type for: {} in {} got {} instead of {}.".format( element_name, self.name, type(data[element_name]), element_type ) ) if failed: LOGGER.error(error_message) if raise_exception: raise InvalidDataException(str(error_message))
[docs] def initialize_from_base_dict( self, typical_data_dict, # type: Mapping[str,Any] ): # type: (...) -> None self.update_elements( **{name: type(value) for name, value in typical_data_dict.items()} )
[docs] def get_data_names(self): # type: (...) -> List[str] return self.data_names
[docs] def is_all_data_names_existing( self, data_names, # type: Iterable[str] ): # type: (...) -> bool get = self._names_to_types.get for name in data_names: if get(name) is None: return False return True
def _update_field( self, data_name, # type: str data_type, # type: type ): """Update the grammar elements from an element name and an element type. If there is no element with this name, create it and store its type. Otherwise, update its type. Args: data_name: The name of the element. data_type: The type of the element. """ self._names_to_types[data_name] = data_type
[docs] def get_type_of_data_named( self, data_name, # type: str ): # type: (...) -> str """Return the element type associated to an element name. Args: data_name: The name of the element. Returns: The type of the element associated to the passed element name. Raises: ValueError: If the name does not correspond to an element name. """ if data_name not in self._names_to_types: raise ValueError("Unknown data named: {}.".format(data_name)) return self._names_to_types[data_name]
[docs] def is_type_array( self, data_name # type: str ): # type: (...) -> bool element_type = self.get_type_of_data_named(data_name) return issubclass(element_type, ndarray)
[docs] def restrict_to( self, data_names, # type: Sequence[str] ): # type: (...) -> None for element_name in self.data_names: if element_name not in data_names: del self._names_to_types[element_name]
[docs] def remove_item( self, item_name, # type: str ): # type: (...) -> None del self._names_to_types[item_name]
[docs] def update_from( self, input_grammar, # type: AbstractGrammar ): # type: (...) -> None """ Raises: TypeError: If the passed grammar is not an :class:`.AbstractGrammar`. """ if not isinstance(input_grammar, AbstractGrammar): msg = self._get_update_error_msg(self, input_grammar) raise TypeError(msg) input_grammar = input_grammar.to_simple_grammar() self._names_to_types.update(input_grammar._names_to_types)
[docs] def update_from_if_not_in( self, input_grammar, # type: AbstractGrammar exclude_grammar, # type: AbstractGrammar ): # type: (...) -> None """ Raises: TypeError: If a passed grammar is not an :class:`.AbstractGrammar`. ValueError: If types are inconsistent between both passed grammars. """ if not isinstance(input_grammar, AbstractGrammar) or not isinstance( exclude_grammar, AbstractGrammar ): msg = self._get_update_error_msg(self, input_grammar, exclude_grammar) raise TypeError(msg) input_grammar = input_grammar.to_simple_grammar() exclude_grammar = exclude_grammar.to_simple_grammar() for element_name, element_type in zip( input_grammar.data_names, input_grammar.data_types ): if exclude_grammar.is_data_name_existing(element_name): ex_element_type = exclude_grammar.get_type_of_data_named(element_name) if element_type != ex_element_type: raise ValueError( "Inconsistent grammar update {} != {}.".format( element_type, ex_element_type ) ) else: self._names_to_types[element_name] = element_type
[docs] def is_data_name_existing( self, data_name, # type: str ): # type: (...) -> bool return data_name in self._names_to_types
[docs] def clear(self): # type: (...) -> None self._names_to_types = {}
[docs] def to_simple_grammar(self): # type: (...) -> SimpleGrammar return self