# -*- coding: utf-8 -*-
"""
JSON Schema URI resolution scopes and dereferencing
https://tools.ietf.org/id/draft-zyp-json-schema-04.html#rfc.section.7
Code adapted from https://github.com/Julian/jsonschema
"""
from __future__ import unicode_literals
import contextlib
import json
import re
import requests
from gemseo.utils.py23_compat import PY3
from .exceptions import JsonSchemaException
if PY3:
import urllib.parse as urlparse
from urllib.request import urlopen
from urllib.parse import unquote
else:
from urllib2 import urlopen
import urlparse
from urllib import unquote
[docs]def resolve_path(schema, fragment):
"""
Return definition from path.
Path is unescaped according https://tools.ietf.org/html/rfc6901
"""
fragment = fragment.lstrip("/")
parts = unquote(fragment).split("/") if fragment else []
for part in parts:
part = part.replace("~1", "/").replace("~0", "~")
if isinstance(schema, list):
schema = schema[int(part)]
elif part in schema:
schema = schema[part]
else:
raise JsonSchemaException("Unresolvable ref: {}".format(part))
return schema
[docs]def normalize(uri):
return urlparse.urlsplit(uri).geturl()
[docs]def resolve_remote(uri, handlers):
"""
Resolve a remote ``uri``.
.. note::
Requests_ library is used to fetch ``http`` or ``https``
requests from the remote ``uri``, if handlers does not
define otherwise.
For unknown schemes urlib is used with UTF-8 encoding.
.. _Requests: http://pypi.python.org/pypi/requests/
"""
scheme = urlparse.urlsplit(uri).scheme
if scheme in handlers:
result = handlers[scheme](uri)
elif scheme in ["http", "https"]:
result = requests.get(uri).json()
else:
result = json.loads(urlopen(uri).read().decode("utf-8"))
return result
[docs]class RefResolver(object):
"""
Resolve JSON References.
"""
# pylint: disable=dangerous-default-value,too-many-arguments
def __init__(self, base_uri, schema, store={}, cache=True, handlers={}):
"""
`base_uri` is URI of the referring document from the `schema`.
"""
self.base_uri = base_uri
self.resolution_scope = base_uri
self.schema = schema
self.store = store
self.cache = cache
self.handlers = handlers
self.walk(schema)
[docs] @classmethod
def from_schema(cls, schema, handlers={}, **kwargs):
"""
Construct a resolver from a JSON schema object.
"""
return cls(
schema.get("$id", schema.get("id", "")) if isinstance(schema, dict) else "",
schema,
handlers=handlers,
**kwargs
)
[docs] @contextlib.contextmanager
def in_scope(self, scope):
"""
Context manager to handle current scope.
"""
old_scope = self.resolution_scope
self.resolution_scope = urlparse.urljoin(old_scope, scope)
try:
yield
finally:
self.resolution_scope = old_scope
[docs] @contextlib.contextmanager
def resolving(self, ref):
"""
Context manager which resolves a JSON ``ref`` and enters the
resolution scope of this ref.
"""
new_uri = urlparse.urljoin(self.resolution_scope, ref)
uri, fragment = urlparse.urldefrag(new_uri)
if normalize(uri) in self.store:
schema = self.store[normalize(uri)]
elif not uri or uri == self.base_uri:
schema = self.schema
else:
schema = resolve_remote(uri, self.handlers)
if self.cache:
self.store[normalize(uri)] = schema
old_base_uri, old_schema = self.base_uri, self.schema
self.base_uri, self.schema = uri, schema
try:
with self.in_scope(uri):
yield resolve_path(schema, fragment)
finally:
self.base_uri, self.schema = old_base_uri, old_schema
[docs] def get_uri(self):
return normalize(self.resolution_scope)
[docs] def get_scope_name(self):
"""
Get current scope and return it as a valid function name.
"""
name = "validate_" + unquote(self.resolution_scope).replace("~1", "_").replace(
"~0", "_"
)
name = re.sub(r"[:/#\.\-\%]", "_", name)
name = name.lower().rstrip("_")
return name
[docs] def walk(self, node):
"""
Walk thru schema and dereferencing ``id`` and ``$ref`` instances
"""
if isinstance(node, bool):
pass
elif "$ref" in node and isinstance(node["$ref"], unicode):
ref = node["$ref"]
node["$ref"] = urlparse.urljoin(self.resolution_scope, ref)
elif "id" in node and isinstance(node["id"], unicode):
with self.in_scope(node["id"]):
self.store[normalize(self.resolution_scope)] = node
for _, item in node.items():
if isinstance(item, dict):
self.walk(item)
else:
for _, item in node.items():
if isinstance(item, dict):
self.walk(item)