Source code for gemseo.core.grammars.json_grammar

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or initial
#                         documentation
#        :author: Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES

"""A Grammar based on JSON schema."""

from __future__ import division, unicode_literals

import json
import logging
from numbers import Number
from typing import (
    Dict,
    Iterable,
    List,
    Mapping,
    MutableMapping,
    Optional,
    Sequence,
    Union,
)

from numpy import generic, ndarray, zeros

from gemseo.core.grammars.abstract_grammar import AbstractGrammar
from gemseo.core.grammars.errors import InvalidDataException
from gemseo.core.grammars.json_schema import MutableMappingSchemaBuilder
from gemseo.core.grammars.simple_grammar import SimpleGrammar
from gemseo.utils.py23_compat import PY2, JsonSchemaException, Path, compile_schema
from gemseo.utils.string_tools import MultiLineString

if PY2:
    import jsonschema
    from jsonschema import ValidationError
else:
    # TODO: remove when py27 is gone
[docs] class ValidationError(BaseException): pass
LOGGER = logging.getLogger(__name__) ElementType = Union[str, float, bool, Sequence[Union[str, float, bool]]] NumPyNestedMappingType = Mapping[ str, Union[ElementType, ndarray, generic, "NumPyNestedMappingType"] ] MappingSchemaType = Dict[ str, Union[ElementType, List[ElementType], "MappingSchemaType"] ] DictSchemaType = Mapping[str, Union[ElementType, List[ElementType], "DictSchemaType"]] SerializedGrammarType = Dict[ str, Union[ElementType, List[ElementType], "SerializedGrammarType"] ]
[docs]class JSONGrammar(AbstractGrammar): """A grammar based on a JSON schema. Attributes: schema (MutableMappingSchemaBuilder): The JSON schema. """ PROPERTIES_FIELD = "properties" REQUIRED_FIELD = "required" TYPE_FIELD = "type" OBJECT_FIELD = "object" TYPES_MAP = { "array": ndarray, "float": float, "string": str, "integer": int, "boolean": bool, "number": Number, } def __init__( self, name, # type: str schema_file=None, # type: Optional[Union[str,Path]] schema=None, # type: Optional[MappingSchemaType] descriptions=None, # type: Optional[Mapping[str,str]] ): """ Args: schema_file: The JSON schema file. If None, do not initialize the grammar from a JSON schema file. schema: A genson schema to initialize the grammar. If None, do not initialize the grammar from a JSON schema. descriptions: The descriptions of the elements, in the form: ``{element_name: element_meaning}``. If None, use the descriptions available in the JSON schema if any. """ super(JSONGrammar, self).__init__(name) self._validator = None self.schema = None self._schema_dict = None self._properties_dict = None self._init_schema() if schema is not None: self.schema.add_schema(schema) elif schema_file is not None: self.init_from_schema_file(schema_file, descriptions=descriptions) else: self.initialize_from_base_dict({}, description_dict=descriptions) def __repr__(self): # type: (...) -> str return "{}, schema: {}".format(self, self.schema.to_json()) def _init_schema(self): # type: (...) -> None """Initialize the schema.""" self.schema = MutableMappingSchemaBuilder() self._schema_dict = None self._properties_dict = None @property def schema_dict(self): # type: (...) -> Dict[str,DictSchemaType] """The dictionary representation of the schema.""" if self._schema_dict is None: self._schema_dict = self.schema.to_schema() return self._schema_dict @property def properties_dict(self): # type: (...) -> Dict[str,DictSchemaType] """The dictionnary representation of the properties of the schema. Raises: ValueError: When the schema has no properties. """ if self._properties_dict is None: self._properties_dict = self.schema_dict.get("properties") if self._properties_dict is None: raise ValueError( "Schema has no properties: {}.".format(self.schema_dict) ) return self._properties_dict
[docs] def clear(self): # type: (...) -> None self.__set_grammar_from_dict({})
def _init_validator(self): # type: (...) -> None """Initialize the validator.""" self.schema_dict.pop("id", None) if PY2: # Use jsonschema instead of fastjsonschema when a property has anyOf. for value in self.schema_dict.get("properties", {}).values(): if "anyOf" in value: self._validator = jsonschema.validators.validator_for( self.schema_dict )(self.schema_dict).validate return self._validator = compile_schema(self.schema_dict)
[docs] @classmethod def cast_array_to_list( cls, data_dict, # type: NumPyNestedMappingType ): # type: (...) -> DictSchemaType """Cast the NumPy arrays to lists for dictionary values. Args: data_dict: The data mapping. Returns: The original mapping casted to a dictionary where NumPy arrays have been replaced with lists. """ dict_of_list = dict(data_dict) for key, value in data_dict.items(): if isinstance(value, (ndarray, generic)): dict_of_list[key] = value.real.tolist() elif isinstance(value, Mapping): dict_of_list[key] = cls.cast_array_to_list(value) return dict_of_list
[docs] def is_required( self, element_name # type: str ): # type: (...) -> bool required_element_names = self.schema_dict.get("required", [element_name]) return element_name in required_element_names
[docs] def load_data( self, data, # type: MutableMapping[str,ElementType] raise_exception=True, # type: bool ): # type: (...) -> MutableMapping[str,ElementType] """ Raises: InvalidDataException: * If the passed data is not a dictionary. * If the data is not consistent with the grammar. """ if not isinstance(data, MutableMapping): raise InvalidDataException( "Data must be a mutable mapping; " "got a {} instead.".format(type(data)) ) if self._validator is None: self._init_validator() data_to_check = self.cast_array_to_list(data) try: self._validator(data_to_check) except (JsonSchemaException, ValidationError) as error: log_message = MultiLineString() log_message.add("Invalid data in: {}".format(self.name)) error_message = error.args[0] if error_message.startswith("data must contain"): # Error messages are not clear enough when missing elements # All keys are put in the message missing_elements = set(self.get_data_names()) - set(data.keys()) if missing_elements: log_message.add( "Missing mandatory elements: {}".format( ",".join(sorted(missing_elements)) ) ) else: log_message.add(", error: {}".format(error_message)) else: log_message.add(", error: {}".format(error_message)) LOGGER.error(log_message) if raise_exception: raise InvalidDataException(str(log_message)) # Check a copy to keep types and arrays but store initial dict for complex # Add defaults for key, value in data_to_check.items(): data.setdefault(key, value) return data
[docs] def init_from_schema_file( self, schema_path, # type: Union[str,Path] descriptions=None, # type: Optional[Mapping[str,str]] ): # type: (...) -> None """Set the grammar from a file. Args: schema_path: The path to the schema file. descriptions: The descriptions for the elements of the grammar, in the form: ``{element_name: element_meaning}``. If None, use the descriptions from the schema file. Raises: FileNotFoundError: If the schema file does not exist. """ schema_path = Path(schema_path) if not schema_path.exists(): raise FileNotFoundError( "Try to initialize grammar " "with not existing file: {}.".format(schema_path) ) schema = json.loads(schema_path.read_text()) self.__set_grammar_from_dict(schema, descriptions)
def __set_grammar_from_dict( self, schema, # type: Union[MappingSchemaType,MutableMappingSchemaBuilder] descriptions=None, # type: Optional[Mapping[str,str]] ): # type: (...) -> None """Set the grammar from a dictionary. Args: schema: The schema to set the grammar with. descriptions: The descriptions for the elements of the grammar, in the form: ``{element_name: element_meaning}``. If None, use the ``schema`` ones. """ self._init_schema() self.__update_grammar_from_dict(schema, descriptions) def __update_grammar_from_dict( self, schema, # type: Union[MappingSchemaType,MutableMappingSchemaBuilder] descriptions=None, # type: Optional[Mapping[str,str]] ): # type: (...) -> None """Update the grammar from a dictionary. Args: schema: The schema to update the grammar with. descriptions: The descriptions for the elements of the grammar, in the form: ``{element_name: element_meaning}``. If None, use the ``schema`` ones. """ if descriptions is not None: if not isinstance(schema, dict): schema = schema.to_schema() for property_name, property_schema in schema["properties"].items(): descr = descriptions.get(property_name) if descr is not None: self.__add_description_to_types(descr, property_schema) self.__merge_schema(schema) def __add_description_to_types( self, description, # type: str property_schema, # type: Mapping[str, str] ): # type: (...) -> None """Add the description for all the types found in the schema of a parameter. Args: description: The description of the parameter. property_schema: The schema of the parameter. """ if "anyOf" in property_schema: for each_type in property_schema["anyOf"]: each_type["description"] = description else: property_schema["description"] = description def __merge_schema( self, schema, # type: MappingSchemaType ): # type: (...) -> None """Merge a schema in the current one. Args: schema: The schema to be merge, could be a schema object or a dictionary. """ self.schema.add_schema(schema) self.__reset_schema_attrs()
[docs] def initialize_from_data_names( self, data_names, # type: Iterable[str] descriptions=None, # type: Optional[Mapping[str,str]] ): # type: (...) -> None """Initialize the grammar from the names and descriptions of the elements. Use float type. Args: descriptions: The descriptions of the elements, in the form: ``{element_name: element_meaning}``. If None, do not initialize the elements with descriptions. """ element_value = zeros(1) elements_values = {element_name: element_value for element_name in data_names} self.initialize_from_base_dict(elements_values, description_dict=descriptions)
[docs] def initialize_from_base_dict( self, typical_data_dict, # type: Mapping[str,ElementType] description_dict=None, # type: Optional[Mapping[str,str]] ): # type: (...) -> None """Initialize the grammar with types and names from a typical data entry. The keys of the ``typical_data_dict`` are the names of the elements. The types of the values of the ``typical_data_dict`` will be converted to JSON Schema types and define the elements of the JSON Schema. Args: description_dict: The descriptions of the data names, in the form: ``{element_name: element_meaning}``. If None, do not initialize the elements with descriptions. """ # Convert arrays to list as for check list_data_dict = self.cast_array_to_list(typical_data_dict) self.schema.add_object(list_data_dict) self.__set_grammar_from_dict(self.schema, description_dict)
[docs] def get_data_names(self): # type: (...) -> List[str] return list(self.schema.keys())
[docs] def is_data_name_existing( self, data_name, # type: str ): # type: (...) -> bool return data_name in self.schema._properties
[docs] def is_type_array( self, data_name # type: str ): # type: (...) -> bool if not self.is_data_name_existing(data_name): raise ValueError("{} is not in the grammar.".format(data_name)) prop = self.properties_dict.get(data_name) return "array" == prop.get("type")
[docs] def is_all_data_names_existing( self, data_names, # type: Iterable[str] ): # type: (...) -> bool properties = self.schema._properties for data_name in data_names: if data_name not in properties: return False return True
[docs] def update_from( self, input_grammar, # type: JSONGrammar ): # type: (...) -> None """ Raises: TypeError: If the passed grammar is not a JSONGrammar. """ if not isinstance(input_grammar, JSONGrammar): msg = ( "A {} is expected as input, but an object of type {} " "has been provided.".format(self.__class__, type(input_grammar)) ) raise TypeError(msg) self.__merge_schema(input_grammar.schema)
[docs] def to_simple_grammar(self): # type: (...) -> SimpleGrammar """Convert to the base :class:`.SimpleGrammar` type. Ignore the features of JSONGrammar that are not supported by SimpleGrammar. Returns: A :class:`.SimpleGrammar` equivalent to the current grammar. """ grammar = SimpleGrammar(self.name) schema_dict = self.schema_dict properties = schema_dict.get(self.PROPERTIES_FIELD, {}) names_to_types = {} for property_name, property_description in properties.items(): property_json_type = property_description.get("type") if property_json_type not in self.TYPES_MAP: property_type = None else: property_type = self.TYPES_MAP[property_description["type"]] names_to_types[property_name] = property_type if property_json_type == "array" and "items" in property_description: property_json_sub_type = property_description["items"].get("type") if property_json_sub_type not in ["number", "integer", None]: message = ( "Unsupported type '{}' in JSONGrammar '{}' " "for property '{}' in conversion to simple grammar." ).format(property_json_sub_type, self.name, property_name) LOGGER.warning(message) for feature in ["minItems", "maxItems", "additionalItems", "contains"]: if feature in property_description: message = ( "Unsupported feature '{}' in JSONGrammar '{}' " "for property '{}' in conversion to simple grammar." ).format(feature, self.name, property_name) LOGGER.warning(message) grammar.update_elements(**names_to_types) required_data_names = self.schema_dict.get("required", []) grammar.update_required_elements(**dict.fromkeys(required_data_names, True)) optional_data_names = set(self.schema_dict.get("properties", [])) - set( required_data_names ) grammar.update_required_elements(**dict.fromkeys(optional_data_names, False)) return grammar
[docs] def update_from_if_not_in( self, input_grammar, # type: JSONGrammar exclude_grammar, # type: JSONGrammar ): # type: (...) -> None if not ( isinstance(input_grammar, self.__class__) and isinstance(exclude_grammar, self.__class__) ): msg = self._get_update_error_msg(self, input_grammar, exclude_grammar) raise TypeError(msg) schema = MutableMappingSchemaBuilder() schema.add_schema(input_grammar.schema) for element_name in exclude_grammar.schema.keys(): try: del schema[element_name] except KeyError: pass self.__merge_schema(schema)
[docs] def restrict_to( self, data_names, # type: Sequence[str] ): # type: (...) -> None for element_name in list(self.schema.keys()): if element_name not in data_names: del self.schema[element_name] self.__reset_schema_attrs()
[docs] def remove_item( self, item_name, # type: str ): # type: (...) -> None del self.schema[item_name] self.__reset_schema_attrs()
def __reset_schema_attrs(self): # type: (...) -> None """Resets the validator, properties dict and schema dict conversions.""" self._validator = None self._properties_dict = None self._schema_dict = None
[docs] def set_item_value( self, item_name, # type: str item_value, # type: Dict[str,str] ): # type: (...) -> None """Set the value of an element. Args: item_name: The name of the element. item_value: The value of the element. Raises: ValueError: If the item is not in the grammar. """ if not self.is_data_name_existing(item_name): raise ValueError("Item {} not in grammar {}.".format(item_name, self.name)) schema = self.schema_dict schema[self.PROPERTIES_FIELD][item_name] = item_value self.__set_grammar_from_dict(schema)
[docs] def write_schema( self, path=None, # type: Optional[Path,str] ): # type: (...) -> None """Write the schema to a file. Args: path: The file path. If None, then write to a file named after the grammar and with .json extension. """ if path is None: path = Path(self.name).with_suffix(".json") else: path = Path(path) schema_json = self.schema.to_json() if PY2: # workaround, see https://stackoverflow.com/a/36003774 x = json.dumps( schema_json, ensure_ascii=False, ) if isinstance(x, str): x = unicode(x, "UTF-8") # noqa: F821 path.write_text(x) else: json.dump( schema_json, path.open("w", encoding="utf-8"), )
def __getstate__(self): # type: (...) -> SerializedGrammarType """Used by pickle to define what to serialize. Returns: The dict to serialize. """ deserialized_grammar = dict(self.__dict__) deserialized_grammar.pop("_validator") # genson schema cannot be pickled: use its dictionary representation deserialized_grammar["schema"] = self.schema_dict return deserialized_grammar def __setstate__( self, serialized_grammar, # type: SerializedGrammarType ): # type: (...) -> None """Used by pickle to define what to deserialize. Args: data_dict: update self dict from data_dict to deserialize. """ self.__dict__.update(serialized_grammar) # genson schema cannot be pickled: use its dictionary representation self.__set_grammar_from_dict(serialized_grammar.pop("schema"))