Source code for gemseo.third_party.genson_generator

# -*- coding: utf-8 -*-
# The MIT License (MIT)
#
# Copyright (c) 2014 Jon Wolverton github.com/wolverdude
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
A JSON schema generation tool
*****************************
"""

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

from builtins import dict, str, super
from collections import defaultdict
import json
import re

from future import standard_library
from six import string_types


standard_library.install_aliases()


JS_TYPES = {dict: 'object',
            list: 'array',
            str: 'string',
            type('a'): 'string',
            type(u'a'): 'string',
            int: 'integer',
            float: 'number',
            bool: 'boolean',
            type(None): 'null'}

for typ in string_types:
    JS_TYPES[typ] = "string"


[docs]class SchemaDefaultDict(defaultdict): """A defaultdict implementation that is serializable with pickle and that returns an empty Schema by default """ def __init__(self, *args, **kwargs): """ Constructor """ super(SchemaDefaultDict, self).__init__() self.options = {} def __missing__(self, key): value = Schema(**self.options) self[key] = value return value
[docs]class Schema(object): """Basic schema generator class. Schema objects can be loaded up with existing schemas and objects before being serialized. """ def __init__(self, merge_arrays=True, additional_items=True, additional_props=True, match_props=[], exclude_at_merge=None): """Builds a schema generator object. * `merge_arrays` (default `True`): Assume all array items share the same schema (as they should). The alternate behavior is to create a different schema for each item in every array. * `additional_items` (default 'True'): If True, allow tuple-validated arrays to be followed by unvalidated items. If False, generate an "additionalItems": False element so that array items not specified in the schema cause a ValidationError. Not used with list-validated (merged) arrays. * `additional_props` (default 'True'): If True, allow objects to include unvalidated properties. If False, generate an "additionalProperties": False element so that properties not specified in the schema cause a ValidationError. * `match_props` (default '[]'): List of regular expressions to compare with property keys. Properties with matching keys share the same "patternProperties" schema. """ self._options = { 'merge_arrays': merge_arrays, 'additional_items': additional_items, 'additional_props': additional_props, 'match_props': match_props, } self._type = set() self._description = None self._required = None self._exclude_at_merge = exclude_at_merge if self._exclude_at_merge is None: self._exclude_at_merge = [] # Original implementation # self._properties = DictWithDefaultSchema(**self._options) # self._patternProperties = DictWithDefaultSchema(**self._options) self._patternProperties = SchemaDefaultDict() self._patternProperties.options = self._options self._properties = SchemaDefaultDict() self._properties.options = self._options self._items = [] self._other = {}
[docs] def add_schema(self, schema): """Merges in an existing schema. :param schema: required :param an: existing JSON Schema to merge """ # serialize instances of Schema before parsing if isinstance(schema, Schema): schema = schema.to_dict() # parse properties and add them individually for prop, val in list(schema.items()): if prop == 'type': self._add_type(val) elif prop == 'required': self._add_required(val) elif prop in ['properties', 'patternProperties']: self._add_properties(prop, val, 'add_schema') elif prop == 'items': self._add_items(val, 'add_schema') elif prop == "description": self.add_description(val) elif prop not in self._other: self._other[prop] = val elif self._other[prop] != val and prop not in\ self._exclude_at_merge: e = prop + ': ' + str(self._other[prop]) + ' ^= ' + str(val) raise SchemaError('schema incompatible -- ' + e) # make sure the 'required' key gets set regardless if 'required' not in schema: self._add_required([]) # return self for easy method chaining return self
[docs] def add_object(self, obj, descr=None): """Modify the schema to accomodate an object. :param obj: required :param a: JSON object to use in generate the schema """ if isinstance(obj, dict): self._generate_object(obj, descr) elif isinstance(obj, list): self._generate_array(obj, descr) else: self._generate_basic(obj, descr) # return self for easy method chaining return self
[docs] def to_dict(self): """Convert the current schema to a `dict`. """ # start with existing fields schema = dict(self._other) if 'additionalItems' in schema: if schema['additionalItems'] == True or not isinstance( self._items, list): del(schema['additionalItems']) if 'additionalProperties' in schema and schema[ 'additionalProperties'] == True: del(schema['additionalProperties']) # unpack the type field if self._type: schema['type'] = self._get_type() if self._description: schema["description"] = self._get_description() # call recursively on subschemas if object or array if 'object' in self._type: props = self._get_properties() # include unnecessary but valid "properties": {} if props[0] or not props[1]: schema['properties'] = props[0] if props[1]: schema['patternProperties'] = props[1] if self._required: schema['required'] = self._get_required() if 'array' in self._type: items = self._get_items() if items or isinstance(items, dict): schema['items'] = items return schema
[docs] def to_json(self, *args, **kwargs): """Convert the current schema directly to serialized JSON. :param args: :param kwargs: """ return json.dumps(self.to_dict(), *args, **kwargs)
# private methods # getters def _get_type(self): """ """ schema_type = self._type | set() # get a copy # remove any redundant integer type if 'integer' in schema_type and 'number' in schema_type: schema_type.remove('integer') # unwrap if only one item, else convert to array if len(schema_type) == 1: (schema_type,) = schema_type else: schema_type = sorted(schema_type) return schema_type def _get_description(self): return self._description def _get_required(self): """ """ return sorted(self._required) if self._required else [] def _get_properties(self): """ """ properties, patprops = {}, {} for prop, subschema in list(self._properties.items()): properties[prop] = subschema.to_dict() for prop, subschema in list(self._patternProperties.items()): patprops[prop] = subschema.to_dict() return ((properties, patprops)) def _get_items(self): """ Lists items """ if isinstance(self._items, list): return [subschema.to_dict() for subschema in self._items] else: return self._items.to_dict() # setters def _add_type(self, val_type): """ :param val_type: """ if isinstance(val_type, string_types): self._type.add(val_type) else: self._type |= set(val_type) def _add_required(self, required): """ :param required: """ if self._required is None: # if not already set, set to this self._required = set(required) else: # use intersection to limit to properties present in both self._required.update(set(required)) def _add_properties(self, ptype, properties, func): """ :param ptype: param properties: :param func: :param properties: """ # recursively modify subschemas pattern = self._options['match_props'] if pattern and not ptype: self._add_properties_merge(pattern, properties, func) else: self._add_properties_sep(ptype or 'properties', properties, func) def _add_properties_merge(self, pattern, properties, func): """ :param pattern: param properties: :param func: :param properties: """ err = None for prop, val in list(properties.items()): match = [] for pat in pattern: m = re.search(pat, prop) if m: getattr(self._patternProperties[pat], func)(val) match.append(pat) self._required -= set((prop,)) if len(match) > 1: err = (prop, match) elif len(match) < 1: getattr(self._properties[prop], func)(val) if err: raise SchemaError('patternProperties multiple match: ' + err[0] + ' ~= ' + ', '.join(err[1])) def _add_properties_sep(self, ptype, properties, func): """ :param ptype: param properties: :param func: :param properties: """ pdict = self._properties if ptype == 'properties' else self._patternProperties for prop, val in list(properties.items()): getattr(pdict[prop], func)(val) def _add_items(self, items, func): """ :param items: param func: :param func: """ if self._options['merge_arrays']: self._add_items_merge(items, func) else: self._add_items_sep(items, func)
[docs] def add_description(self, descr): """ :param descr: tue description """ self._description = descr
def _add_items_merge(self, items, func): """ :param items: param func: :param func: """ if not self._items: self._items = Schema(**self._options) method = getattr(self._items, func) if isinstance(items, list): for item in items: method(item) else: method(items) def _add_items_sep(self, items, func): """ :param items: param func: :param func: """ for item in items: subschema = Schema(**self._options) getattr(subschema, func)(item) self._items.append(subschema) def _add_additionalItems(self): """ """ self._other['additionalItems'] = self._options['additional_items'] def _add_additionalProperties(self): """ """ self._other['additionalProperties'] = self._options['additional_props'] # generate from object def _generate_object(self, obj, descr=None): """ :param obj: """ self._add_type('object') self._add_required(list(obj.keys())) self._add_properties(None, obj, 'add_object') self._add_additionalProperties() if descr is not None: self._add_properties(None, descr, 'add_description') def _generate_array(self, array, descr=None): """ :param array: """ self._add_type('array') self._add_items(array, 'add_object') self._add_additionalItems() if descr is not None: self.add_description(descr) def _generate_basic(self, val, descr=None): """ :param val: """ val_type = JS_TYPES[type(val)] if val_type != "null": self._add_type(val_type) if descr is not None: self.add_description(descr)
[docs]class SchemaError(Exception): """ """ def __init__(self, value): self.value = value def __str__(self): return repr(self.value)