# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Francois Gallard
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""
A Grammar based on JSON schema
******************************
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import json
import os
from future import standard_library
from numpy import generic, ndarray, zeros
from gemseo import LOGGER
from gemseo.core.grammar import AbstractGrammar, InvalidDataException
from gemseo.third_party.genson_generator import Schema
from gemseo.utils.py23_compat import JsonSchemaException, compile_schema
standard_library.install_aliases()
[docs]class JSONGrammar(AbstractGrammar):
"""A grammar subclass that stores the input or output data types
and structure a MDODiscipline using a JSON format
It is able to check the inputs and outputs against a JSON schema.
"""
PROPERTIES_FIELD = "properties"
DEFAULTS_FIELD = "defaults"
REQUIRED_FIELD = "required"
TYPE_FIELD = "type"
OBJECT_FIELD = "object"
def __init__(
self,
name,
schema_file=None,
schema=None,
grammar_type=AbstractGrammar.INPUT_GRAMMAR,
):
"""
Constructor
:param name : grammar name
:param schema : a genson Schema to initialize self
:param schema_file : the json schema file
:param grammar_type : the type of grammar : input or output
"""
super(JSONGrammar, self).__init__()
self.schema = None
self._properties = None
self._validator = None
self.data = None
self.name = name
self.grammar_type = grammar_type
self._init_schema()
if schema is not None:
self.schema.add_schema(schema)
self._update_properties()
if schema_file is not None:
self.init_from_schema_file(schema_file)
if schema is None and schema_file is None:
self.initialize_from_base_dict(typical_data_dict={})
def _init_schema(self):
"""Initializes the genson schema"""
self.schema = Schema(merge_arrays=True, exclude_at_merge=["name", "id"])
def _update_properties(self):
"""Updates the properties after a change in the schema"""
schema_dict = self.schema.to_dict()
self._properties = schema_dict.get(self.PROPERTIES_FIELD)
if self._properties is None:
self._properties = {}
self._validator = None
@property
def properties(self):
"""Accessor for the properties of the schema"""
return self._properties
[docs] def clear(self):
"""Clears the data to produce an empty grammar"""
self._init_schema()
self._update_properties()
self._validator = None
def _init_validator(self):
"""Initializes the validator according to self"""
schema_dict = self.schema.to_dict()
schema_dict.pop("id", None)
self._validator = compile_schema(schema_dict)
[docs] @staticmethod
def cast_array_to_list(data_dict):
"""
Casts the numpy arrays in data_dict to lists
:param data_dict : the data dictionary
:returns: The dict with casted arrays
"""
out_d = data_dict.copy()
for key, value in data_dict.items():
if isinstance(value, (ndarray, generic)):
out_d[key] = value.real.tolist()
elif isinstance(value, dict):
out_d[key] = JSONGrammar.cast_array_to_list(value)
return out_d
[docs] def load_data(self, data_dict, raise_exception=True):
"""Loads the data dictionary in the grammar
and checks it against json schema
:param data_dict: the input data
:param raise_exception: if False, no exception is raised
when data is invalid (Default value = True)
"""
if not isinstance(data_dict, dict):
raise InvalidDataException(
"Input data must be a python dictionary, got "
+ str(type(data_dict))
+ " instead."
)
error_exist = False
data_to_check = self.cast_array_to_list(data_dict)
if self._validator is None:
self._init_validator()
try:
self._validator(data_to_check)
except JsonSchemaException as error:
error_exist = True
msg = "Invalid data in : " + str(self.name)
if error.args[0].startswith("data must contain"):
# Error messages are not clear enough when missing properties
# All keys are put in the message
diff = set(set(self.get_data_names()) - set(data_dict.keys()))
if len(diff) > 0:
msg += "\nMissing mandatory properties: " + str(list(diff))
else:
msg += "\n', error : " + str(error.args[0])
else:
msg += "\n', error : " + str(error.args[0])
LOGGER.error(msg)
if error_exist and raise_exception:
raise InvalidDataException("Invalid data from grammar " + str(self.name))
# Check a copy to keep types and arrays but store initial dict for
# complex
self.data = data_dict
# Add defaults
for key, value in data_to_check.items():
k_utf = key
self.data.setdefault(k_utf, value)
return self.data
[docs] def init_from_schema_file(self, schema_file="input.json"):
"""Initializes grammar from
:param schema_file: path to the schema input file
(Default value = "input.json")
"""
if not os.path.exists(schema_file):
raise ValueError(
"Try to initialize grammar with not existing file : " + str(schema_file)
)
# Refs are not supported any more for simplification
# curr_folder = os.path.dirname(schema_file)
# registry = providers.FilesystemProvider(curr_folder)
with open(schema_file, "r") as in_file:
json_content = json.loads(in_file.read())
# Refs not supported any more for simplification
# Resolve #ref tags in json
# try:
# resolved_json = resolve(json_content, '#/properties', registry)
# except ValueError as err_json:
# LOGGER.error(err_json)
# raise Exception(
# "Cannot resolve referenced json properties of file : " +
# str(schema_file))
self.schema.add_schema(json_content)
self._update_properties()
def __str__(self):
msg = "Grammar named :" + str(self.name)
msg += ", schema = " + self.schema.to_json()
return msg
[docs] def initialize_from_data_names(
self, data_names, schema_file=None, write_schema=False
):
"""Initializes a JSONGrammar from a list of data.
All data of the grammar will be set as arrays
:param data_names: a data names list
:param schema_file: the output json file path. If None : input.json or
output.json depending on grammar type.
(Default value = None)
:param write_schema: if True, writes the schema files
(Default value = False)
"""
data = zeros(1)
typical_data_dict = {k: data for k in data_names}
self.initialize_from_base_dict(typical_data_dict, schema_file, write_schema)
[docs] def initialize_from_base_dict(
self,
typical_data_dict,
schema_file=None,
write_schema=False,
description_dict=None,
):
"""Initialize a json grammar with types and names from a
typical data entry.
The keys of the typical_data_dict will be the names of the
data in the grammar.
The types of the values of the typical_data_dict will be converted
to JSON Schema types and define the properties of the JSON Schema.
:param typical_data_dict: a data dictionary with keys as data names
and values as a typical value for this data
:param schema_file: the output json file path. If None : input.json or
output.json depending on grammar type.
(Default value = None)
:param write_schema: if True, writes the schema files
(Default value = False)
:param description_dict: dictionary of descriptions,
{name:meaning} structure
"""
# Convert arrays to list as for check
list_data_dict = self.cast_array_to_list(typical_data_dict)
# if PY2:
# list_data_dict = self.cast_str_val_ascii(list_data_dict)
self.schema.add_object(list_data_dict, description_dict)
if write_schema:
if schema_file is None:
schema_file = self.name + ".json"
with open(schema_file, "w") as outf:
outf.write(self.schema.to_json())
self._update_properties()
# @staticmethod
# def cast_str_val_ascii(data_dict):
# """
# Casts the values of dict to ascii
#
# :param: input data dict
# :returns: ourput data dict
# """
# out_d = data_dict.copy()
# for key, value in data_dict.items():
# if isinstance(value, string_types):
# out_d[key] = value.encode('ascii', 'ignore')
# elif isinstance(value, dict):
# out_d[key] = JSONGrammar.cast_str_val_ascii(value)
# return out_d
[docs] def add_description(self, description_dict):
"""
Add a description to the properties
:param description_dict: dictionary of descriptions,
{name:meaning} structure
"""
descr_filtered = {
k: v for k, v in description_dict.items() if k in self._properties
}
self.schema.add_object({}, descr_filtered)
[docs] def get_data_names(self):
"""Returns the list of data names
:returns: the data names, as a dict keys set
"""
return self._properties.keys()
[docs] def is_data_name_existing(self, data_name):
"""Checks if data_name is present in grammar
:param data_name: the data name
:returns: True if data is in grammar
"""
return data_name in self._properties
[docs] def is_all_data_names_existing(self, data_names):
"""Checks if data_names are present in grammar
:param data_names: the data names list
:returns: True if all data are in grammar
"""
exists = self.is_data_name_existing
for data_name in data_names:
if not exists(data_name):
return False
return True
[docs] def update_from(self, input_grammar):
"""Adds properties coming from another grammar
:param input_grammar: the grammar to take inputs from
"""
if not isinstance(input_grammar, JSONGrammar):
msg = "Cannot update grammar " "{} of type {} with {} of type {} ".format(
self.name,
type(self).__name__,
input_grammar.name,
type(input_grammar).__name__,
)
raise ValueError(msg)
schema_dct = input_grammar.schema.to_dict()
self.schema.add_schema(schema_dct)
self._update_properties()
[docs] def update_from_if_not_in(self, input_grammar, exclude_grammar):
"""Adds objects coming from input_grammar if they are not in
exclude_grammar
:param input_grammar: the grammar to take inputs from
:param exclude_grammar: exclusion grammar
"""
if isinstance(input_grammar, JSONGrammar) and isinstance(
exclude_grammar, JSONGrammar
):
in_schema_dct = input_grammar.schema.to_dict()
in_schema_prop = in_schema_dct.get(self.PROPERTIES_FIELD, {})
in_required = in_schema_dct.get(self.REQUIRED_FIELD, [])
ex_schema_dct = exclude_grammar.schema.to_dict()
ex_schema_prop = ex_schema_dct.get(self.PROPERTIES_FIELD, {})
merged_required = []
merged_prop = {}
for prop_name, prop_schema in in_schema_prop.items():
if prop_name not in ex_schema_prop:
merged_prop[prop_name] = prop_schema
if prop_name in in_required:
merged_required.append(prop_name)
merged_schema = {
self.TYPE_FIELD: self.OBJECT_FIELD,
self.PROPERTIES_FIELD: merged_prop,
self.REQUIRED_FIELD: merged_required,
}
self.schema.add_schema(merged_schema)
self._update_properties()
else:
msg = "Cannot update grammar " + str(self.name)
msg += " of type JSONGrammar with " + str(input_grammar.name)
msg += " of type " + str(type(input_grammar).__name__)
msg += " and " + str(exclude_grammar.name)
msg += ", of type " + str(type(exclude_grammar).__name__)
raise TypeError(msg)
[docs] def restrict_to(self, data_names):
"""Restricts the grammar to a sublist of data names
:param data_names: the names of the data to restrict the grammar to
"""
self_schema_dct = self.schema.to_dict()
self_schema_prop = self_schema_dct.get(self.PROPERTIES_FIELD, {})
self_required = self_schema_dct.get(self.REQUIRED_FIELD, [])
for prop_name in list(self_schema_prop.keys()):
if prop_name not in data_names:
del self_schema_prop[prop_name]
if prop_name in self_required:
self_required.remove(prop_name)
self_schema = {
self.TYPE_FIELD: self.OBJECT_FIELD,
self.PROPERTIES_FIELD: self_schema_prop,
self.REQUIRED_FIELD: self_required,
}
self._init_schema()
self.schema.add_schema(self_schema)
self._update_properties()
[docs] def remove_item(self, item_name):
"""Removes an item from the grammar
:param item_name: the item name to be removed
"""
if not self.is_data_name_existing(item_name):
raise ValueError("Item " + str(item_name) + " not in grammar " + self.name)
self_dict = self.schema.to_dict()
del self_dict[self.PROPERTIES_FIELD][item_name]
self_required = self_dict.get(self.REQUIRED_FIELD, [])
if item_name in self_required:
self_required.remove(item_name)
self._init_schema()
self.schema.add_schema(self_dict)
self._update_properties()
[docs] def set_item_value(self, item_name, item_value):
"""
Sets the value of an item
:param item_name: the item name to be modified
:param item_value: value of the item
"""
if not self.is_data_name_existing(item_name):
raise ValueError("Item " + str(item_name) + " not in grammar " + self.name)
self_dict = self.schema.to_dict()
self_dict[self.PROPERTIES_FIELD][item_name] = item_value
self._init_schema()
self.schema.add_schema(self_dict)
self._update_properties()
def __getstate__(self):
"""
Used by pickle to define what to serialize
:returns: the dict to serialize
"""
out_d = {}
out_d.update(self.__dict__)
out_d.pop("_validator")
return out_d
def __setstate__(self, data_dict):
"""
Used by pickle to define what to deserialize
:param data_dict : update self dict from data_dict to deserialize
"""
self.__dict__.update(data_dict)
self._update_properties()