"""A Grammar based on JSON schema."""

from __future__ import division, unicode_literals

import json
import logging
from numbers import Number

from numpy import generic, ndarray, zeros

from gemseo.core.grammars.abstract_grammar import AbstractGrammar
from gemseo.core.grammars.errors import InvalidDataException
from gemseo.core.grammars.json_schema import MutableMappingSchemaBuilder
from gemseo.core.grammars.simple_grammar import SimpleGrammar
from gemseo.utils.py23_compat import PY2, JsonSchemaException, Path, compile_schema

LOGGER = logging.getLogger(__name__)

[docs]class JSONGrammar(AbstractGrammar): """A grammar subclass that stores the input or output data types and structure a MDODiscipline using a JSON format It is able to check the inputs and outputs against a JSON schema.""" PROPERTIES_FIELD = "properties" REQUIRED_FIELD = "required" TYPE_FIELD = "type" OBJECT_FIELD = "object" TYPES_MAP = { "array": ndarray, "float": float, "string": str, "integer": int, "boolean": bool, "number": Number, } def __init__( self, name, schema_file=None, schema=None, grammar_type=None, descriptions=None, ): """Constructor. :param name : grammar name :param schema : a genson Schema to initialize self :param schema_file : the json schema file """ super(JSONGrammar, self).__init__(name) self._validator = None self.schema = None self._init_schema() if schema is not None: self.schema.add_schema(schema) elif schema_file is not None: self.init_from_schema_file(schema_file, descriptions=descriptions) else: self.initialize_from_base_dict({}, description_dict=descriptions) def __repr__(self): return "{}, schema: {}".format(self, self.schema.to_json()) def _init_schema(self): """Initialize the schema.""" self.schema = MutableMappingSchemaBuilder()
[docs] def clear(self): """Clears the data to produce an empty grammar.""" self.__set_grammar_from_dict({})
def _init_validator(self): """Initialize the validator.""" schema_dict = self.schema.to_schema() schema_dict.pop("id", None) self._validator = compile_schema(schema_dict)
[docs] @classmethod def cast_array_to_list(cls, data_dict): """Casts the numpy arrays to lists for dictionary values. :param data_dict: the data dictionary :returns: The dict with casted arrays """ out_d = data_dict.copy() for key, value in data_dict.items(): if isinstance(value, (ndarray, generic)): out_d[key] = value.real.tolist() elif isinstance(value, dict): out_d[key] = cls.cast_array_to_list(value) return out_d
[docs] def load_data(self, data, raise_exception=True): """Loads the data dictionary in the grammar and checks it against json schema. :param data: the input data :param raise_exception: if False, no exception is raised when data is invalid (Default value = True) """ if not isinstance(data, dict): raise InvalidDataException( "Input data must be a dictionary: got a {} instead".format(type(data)) ) if self._validator is None: self._init_validator() error_exist = False data_to_check = self.cast_array_to_list(data) try: self._validator(data_to_check) except JsonSchemaException as error: error_exist = True msg = "Invalid data in: {}".format( if error.args[0].startswith("data must contain"): # Error messages are not clear enough when missing properties # All keys are put in the message diff = sorted(set(self.get_data_names()) - set(data.keys())) if diff: msg += "\nMissing mandatory properties: {}".format(",".join(diff)) else: msg += "\n', error: {}".format(error.args[0]) else: msg += "\n', error: {}".format(error.args[0]) LOGGER.error(msg) if error_exist and raise_exception: err = "Invalid data from grammar {}".format( raise InvalidDataException(err) # Check a copy to keep types and arrays but store initial dict for complex # Add defaults for key, value in data_to_check.items(): data.setdefault(key, value) return data
[docs] def init_from_schema_file(self, schema_path, descriptions=None): """Set the grammar from a file. :param schema_path: path to the schema file. """ schema_path = Path(schema_path) if not schema_path.exists(): msg = "Try to initialize grammar with not existing file: {}".format( schema_path ) raise FileNotFoundError(msg) schema_dict = json.loads(schema_path.read_text()) self.__set_grammar_from_dict(schema_dict, descriptions)
def __set_grammar_from_dict(self, schema_dict, descriptions=None): """Set the grammar from a dictionary. Args: schema_dict: Schema dictionary. descriptions: Properties descriptions, optional. """ self._init_schema() self.__update_grammar_from_dict(schema_dict, descriptions) def __update_grammar_from_dict(self, schema_dict, descriptions=None): """Update the grammar from a dictionary. Args: schema_dict: Schema dictionary. descriptions: Properties descriptions, optional. """ if descriptions is not None: if not isinstance(schema_dict, dict): schema_dict = schema_dict.to_schema() for ppty_name, ppty_schema in schema_dict["properties"].items(): try: ppty_schema["description"] = descriptions[ppty_name] except KeyError: LOGGER.debug( "skipping description for unknown property %s", ppty_name ) self.__merge_schema(schema_dict) def __merge_schema(self, schema): """Merge a schema in the current one. Args: schema: Schema to merge, could be a schema object or a dictionary. """ self.schema.add_schema(schema) self._validator = None
[docs] def initialize_from_data_names( self, data_names, descriptions=None, ): """Initializes a JSONGrammar from a list of data. All data of the grammar will be set as arrays. :param data_names: a data names list """ data = zeros(1) typical_data_dict = {k: data for k in data_names} self.initialize_from_base_dict(typical_data_dict, description_dict=descriptions)
[docs] def initialize_from_base_dict( self, typical_data_dict, description_dict=None, ): """Initialize a json grammar with types and names from a typical data entry. The keys of the typical_data_dict will be the names of the data in the grammar. The types of the values of the typical_data_dict will be converted to JSON Schema types and define the properties of the JSON Schema. :param typical_data_dict: a data dictionary with keys as data names and values as a typical value for this data :param description_dict: dictionary of descriptions, {name:meaning} structure """ # Convert arrays to list as for check list_data_dict = self.cast_array_to_list(typical_data_dict) self.schema.add_object(list_data_dict) self.__set_grammar_from_dict(self.schema, description_dict)
[docs] def get_data_names(self): """Returns the list of data names. :returns: the data names, as a dict keys set """ return list(self.schema.keys())
[docs] def is_data_name_existing(self, data_name): """Checks if data_name is present in grammar. :param data_name: the data name :returns: True if data is in grammar """ return data_name in self.get_data_names()
[docs] def is_all_data_names_existing(self, data_names): """Checks if data_names are present in grammar. :param data_names: the data names list :returns: True if all data are in grammar """ exists = self.is_data_name_existing for data_name in data_names: if not exists(data_name): return False return True
[docs] def update_from(self, input_grammar): """Adds properties coming from another grammar. :param input_grammar: the grammar to take inputs from """ if not isinstance(input_grammar, JSONGrammar): msg = ( "A {} is expected as input, but an object of type {} " "has been provided.".format(self.__class__, type(input_grammar)) ) raise TypeError(msg) self.__merge_schema(input_grammar.schema)
[docs] def to_simple_grammar(self): """Creates a SimpleGrammar from self, preserving the features if possible Ignores the features of JSONGrammar that are not supported by SimpleGrammar. :returns: a SimpleGrammar instance """ grammar = SimpleGrammar( schema_dict = self.schema.to_schema() properties = schema_dict.get(self.PROPERTIES_FIELD, {}) for prop, desc in properties.items(): grammar.data_names.append(prop) if "type" not in desc or desc["type"] not in self.TYPES_MAP: d_type = None else: d_type = self.TYPES_MAP[desc["type"]] grammar.data_types.append(d_type) if d_type == "array": sub_type = desc["items"].get("type") if sub_type not in ["number", "integer", None]: msg = "Unsupported type {sub_type} in JSONGrammar {name}" msg += "for property {prop} in conversion to simple grammar" err = msg.format(sub_type=sub_type,, prop=prop) LOGGER.warning(err) for prop_name in ["minItems", "maxItems", "additionalItems", "contains"]: if prop_name in desc: msg = "Unsupported feature {desc} in JSONGrammar {name}" msg += "for property {prop} in conversion to simple grammar" err = msg.format(desc=desc,, prop=prop) LOGGER.warning(err) return grammar
[docs] def update_from_if_not_in(self, input_grammar, exclude_grammar): """Adds objects coming from input_grammar if they are not in exclude_grammar. :param input_grammar: the grammar to take inputs from :param exclude_grammar: exclusion grammar """ if not ( isinstance(input_grammar, self.__class__) and isinstance(exclude_grammar, self.__class__) ): msg = self._get_update_error_msg(self, input_grammar, exclude_grammar) raise TypeError(msg) schema = MutableMappingSchemaBuilder() schema.add_schema(input_grammar.schema) for name in exclude_grammar.schema.keys(): try: del schema[name] except KeyError: pass self.__merge_schema(schema)
[docs] def restrict_to(self, data_names): """Restrict the grammar to given names. :param data_names: the names of the data to restrict the grammar to """ for name in list(self.schema.keys()): if name not in data_names: self.remove_item(name)
[docs] def remove_item(self, item_name): """Removes an item from the grammar. :param item_name: the item name to be removed """ del self.schema[item_name]
[docs] def set_item_value(self, item_name, item_value): """Sets the value of an item. :param item_name: the item name to be modified :param item_value: value of the item """ if not self.is_data_name_existing(item_name): raise ValueError("Item {} not in grammar {}".format(item_name, self_dict = self.schema.to_schema() self_dict[self.PROPERTIES_FIELD][item_name] = item_value self.__set_grammar_from_dict(self_dict)
[docs] def write_schema(self, path=None): """Write the schema to a file. Args: path: Write to this path, if None then write to a file named after the grammar and with .json extension. """ if path is None: path = Path(".json") else: path = Path(path) schema_json = self.schema.to_json() if PY2: # workaround, see x = json.dumps( schema_json, ensure_ascii=False, ) if isinstance(x, str): x = unicode(x, "UTF-8") # noqa: F821 path.write_text(x) else: json.dump( schema_json,"w", encoding="utf-8"), )
def __getstate__(self): """Used by pickle to define what to serialize. :returns: the dict to serialize """ out_d = dict(self.__dict__) out_d.pop("_validator") # genson schema cannot be pickled: use its dictionary representation out_d["schema"] = self.schema.to_schema() return out_d def __setstate__(self, data_dict): """Used by pickle to define what to deserialize. :param data_dict : update self dict from data_dict to deserialize """ self.__dict__.update(data_dict) # genson schema cannot be pickled: use its dictionary representation self.__set_grammar_from_dict(data_dict.pop("schema"))