|
- # Asterisk -- An open source telephony toolkit.
- #
- # Copyright (C) 2013, Digium, Inc.
- #
- # David M. Lee, II <dlee@digium.com>
- #
- # See http://www.asterisk.org for more information about
- # the Asterisk project. Please do not directly contact
- # any of the maintainers of this project for assistance;
- # the project provides a web site, mailing lists and IRC
- # channels for your use.
- #
- # This program is free software, distributed under the terms of
- # the GNU General Public License Version 2. See the LICENSE file
- # at the top of the source tree.
- #
- """Swagger data model objects.
- These objects should map directly to the Swagger api-docs, without a lot of
- additional fields. In the process of translation, it should also validate the
- model for consistency against the Swagger spec (i.e., fail if fields are
- missing, or have incorrect values).
- See https://github.com/wordnik/swagger-core/wiki/API-Declaration for the spec.
- """
- from __future__ import print_function
- import json
- import os.path
- import pprint
- import re
- import sys
- import traceback
- # We don't fully support Swagger 1.2, but we need it for subtyping
- SWAGGER_VERSIONS = ["1.1", "1.2"]
- SWAGGER_PRIMITIVES = [
- 'void',
- 'string',
- 'boolean',
- 'number',
- 'int',
- 'long',
- 'double',
- 'float',
- 'Date',
- ]
- class Stringify(object):
- """Simple mix-in to make the repr of the model classes more meaningful.
- """
- def __repr__(self):
- return "%s(%s)" % (self.__class__, pprint.saferepr(self.__dict__))
- def compare_versions(lhs, rhs):
- '''Performs a lexicographical comparison between two version numbers.
- This properly handles simple major.minor.whatever.sure.why.not version
- numbers, but fails miserably if there's any letters in there.
- For reference:
- 1.0 == 1.0
- 1.0 < 1.0.1
- 1.2 < 1.10
- @param lhs Left hand side of the comparison
- @param rhs Right hand side of the comparison
- @return < 0 if lhs < rhs
- @return == 0 if lhs == rhs
- @return > 0 if lhs > rhs
- '''
- lhs = [int(v) for v in lhs.split('.')]
- rhs = [int(v) for v in rhs.split('.')]
- return (lhs > rhs) - (lhs < rhs)
- class ParsingContext(object):
- """Context information for parsing.
- This object is immutable. To change contexts (like adding an item to the
- stack), use the next() and next_stack() functions to build a new one.
- """
- def __init__(self, swagger_version, stack):
- self.__swagger_version = swagger_version
- self.__stack = stack
- def __repr__(self):
- return "ParsingContext(swagger_version=%s, stack=%s)" % (
- self.swagger_version, self.stack)
- def get_swagger_version(self):
- return self.__swagger_version
- def get_stack(self):
- return self.__stack
- swagger_version = property(get_swagger_version)
- stack = property(get_stack)
- def version_less_than(self, ver):
- return compare_versions(self.swagger_version, ver) < 0
- def next_stack(self, json, id_field):
- """Returns a new item pushed to the stack.
- @param json: Current JSON object.
- @param id_field: Field identifying this object.
- @return New context with additional item in the stack.
- """
- if not id_field in json:
- raise SwaggerError("Missing id_field: %s" % id_field, self)
- new_stack = self.stack + ['%s=%s' % (id_field, str(json[id_field]))]
- return ParsingContext(self.swagger_version, new_stack)
- def next(self, version=None, stack=None):
- if version is None:
- version = self.version
- if stack is None:
- stack = self.stack
- return ParsingContext(version, stack)
- class SwaggerError(Exception):
- """Raised when an error is encountered mapping the JSON objects into the
- model.
- """
- def __init__(self, msg, context, cause=None):
- """Ctor.
- @param msg: String message for the error.
- @param context: ParsingContext object
- @param cause: Optional exception that caused this one.
- """
- super(Exception, self).__init__(msg, context, cause)
- class SwaggerPostProcessor(object):
- """Post processing interface for model objects. This processor can add
- fields to model objects for additional information to use in the
- templates.
- """
- def process_resource_api(self, resource_api, context):
- """Post process a ResourceApi object.
- @param resource_api: ResourceApi object.
- @param context: Current context in the API.
- """
- pass
- def process_api(self, api, context):
- """Post process an Api object.
- @param api: Api object.
- @param context: Current context in the API.
- """
- pass
- def process_operation(self, operation, context):
- """Post process a Operation object.
- @param operation: Operation object.
- @param context: Current context in the API.
- """
- pass
- def process_parameter(self, parameter, context):
- """Post process a Parameter object.
- @param parameter: Parameter object.
- @param context: Current context in the API.
- """
- pass
- def process_model(self, model, context):
- """Post process a Model object.
- @param model: Model object.
- @param context: Current context in the API.
- """
- pass
- def process_property(self, property, context):
- """Post process a Property object.
- @param property: Property object.
- @param context: Current context in the API.
- """
- pass
- def process_type(self, swagger_type, context):
- """Post process a SwaggerType object.
- @param swagger_type: ResourceListing object.
- @param context: Current context in the API.
- """
- pass
- def process_resource_listing(self, resource_listing, context):
- """Post process the overall ResourceListing object.
- @param resource_listing: ResourceListing object.
- @param context: Current context in the API.
- """
- pass
- class AllowableRange(Stringify):
- """Model of a allowableValues of type RANGE
- See https://github.com/wordnik/swagger-core/wiki/datatypes#complex-types
- """
- def __init__(self, min_value, max_value):
- self.min_value = min_value
- self.max_value = max_value
- def to_wiki(self):
- return "Allowed range: Min: {0}; Max: {1}".format(self.min_value, self.max_value)
- class AllowableList(Stringify):
- """Model of a allowableValues of type LIST
- See https://github.com/wordnik/swagger-core/wiki/datatypes#complex-types
- """
- def __init__(self, values):
- self.values = values
- def to_wiki(self):
- return "Allowed values: {0}".format(", ".join(self.values))
- def load_allowable_values(json, context):
- """Parse a JSON allowableValues object.
- This returns None, AllowableList or AllowableRange, depending on the
- valueType in the JSON. If the valueType is not recognized, a SwaggerError
- is raised.
- """
- if not json:
- return None
- if not 'valueType' in json:
- raise SwaggerError("Missing valueType field", context)
- value_type = json['valueType']
- if value_type == 'RANGE':
- if not 'min' in json and not 'max' in json:
- raise SwaggerError("Missing fields min/max", context)
- return AllowableRange(json.get('min'), json.get('max'))
- if value_type == 'LIST':
- if not 'values' in json:
- raise SwaggerError("Missing field values", context)
- return AllowableList(json['values'])
- raise SwaggerError("Unkown valueType %s" % value_type, context)
- class Parameter(Stringify):
- """Model of an operation's parameter.
- See https://github.com/wordnik/swagger-core/wiki/parameters
- """
- required_fields = ['name', 'paramType', 'dataType']
- def __init__(self):
- self.param_type = None
- self.name = None
- self.description = None
- self.data_type = None
- self.required = None
- self.allowable_values = None
- self.allow_multiple = None
- def load(self, parameter_json, processor, context):
- context = context.next_stack(parameter_json, 'name')
- validate_required_fields(parameter_json, self.required_fields, context)
- self.name = parameter_json.get('name')
- self.param_type = parameter_json.get('paramType')
- self.description = parameter_json.get('description') or ''
- self.data_type = parameter_json.get('dataType')
- self.required = parameter_json.get('required') or False
- self.default_value = parameter_json.get('defaultValue')
- self.allowable_values = load_allowable_values(
- parameter_json.get('allowableValues'), context)
- self.allow_multiple = parameter_json.get('allowMultiple') or False
- processor.process_parameter(self, context)
- if parameter_json.get('allowedValues'):
- raise SwaggerError(
- "Field 'allowedValues' invalid; use 'allowableValues'",
- context)
- return self
- def is_type(self, other_type):
- return self.param_type == other_type
- class ErrorResponse(Stringify):
- """Model of an error response.
- See https://github.com/wordnik/swagger-core/wiki/errors
- """
- required_fields = ['code', 'reason']
- def __init__(self):
- self.code = None
- self.reason = None
- def load(self, err_json, processor, context):
- context = context.next_stack(err_json, 'code')
- validate_required_fields(err_json, self.required_fields, context)
- self.code = err_json.get('code')
- self.reason = err_json.get('reason')
- return self
- class SwaggerType(Stringify):
- """Model of a data type.
- """
- def __init__(self):
- self.name = None
- self.is_discriminator = None
- self.is_list = None
- self.singular_name = None
- self.is_primitive = None
- def load(self, type_name, processor, context):
- # Some common errors
- if type_name == 'integer':
- raise SwaggerError("The type for integer should be 'int'", context)
- self.name = type_name
- type_param = get_list_parameter_type(self.name)
- self.is_list = type_param is not None
- if self.is_list:
- self.singular_name = type_param
- else:
- self.singular_name = self.name
- self.is_primitive = self.singular_name in SWAGGER_PRIMITIVES
- processor.process_type(self, context)
- return self
- class Operation(Stringify):
- """Model of an operation on an API
- See https://github.com/wordnik/swagger-core/wiki/API-Declaration#apis
- """
- required_fields = ['httpMethod', 'nickname', 'responseClass', 'summary']
- def __init__(self):
- self.http_method = None
- self.nickname = None
- self.response_class = None
- self.parameters = []
- self.summary = None
- self.notes = None
- self.error_responses = []
- def load(self, op_json, processor, context):
- context = context.next_stack(op_json, 'nickname')
- validate_required_fields(op_json, self.required_fields, context)
- self.http_method = op_json.get('httpMethod')
- self.nickname = op_json.get('nickname')
- response_class = op_json.get('responseClass')
- self.response_class = response_class and SwaggerType().load(
- response_class, processor, context)
- # Specifying WebSocket URL's is our own extension
- self.is_websocket = op_json.get('upgrade') == 'websocket'
- self.is_req = not self.is_websocket
- if self.is_websocket:
- self.websocket_protocol = op_json.get('websocketProtocol')
- if self.http_method != 'GET':
- raise SwaggerError(
- "upgrade: websocket is only valid on GET operations",
- context)
- params_json = op_json.get('parameters') or []
- self.parameters = [
- Parameter().load(j, processor, context) for j in params_json]
- self.query_parameters = [
- p for p in self.parameters if p.is_type('query')]
- self.has_query_parameters = self.query_parameters and True
- self.path_parameters = [
- p for p in self.parameters if p.is_type('path')]
- self.has_path_parameters = self.path_parameters and True
- self.header_parameters = [
- p for p in self.parameters if p.is_type('header')]
- self.has_header_parameters = self.header_parameters and True
- self.has_parameters = self.has_query_parameters or \
- self.has_path_parameters or self.has_header_parameters
- # Body param is different, since there's at most one
- self.body_parameter = [
- p for p in self.parameters if p.is_type('body')]
- if len(self.body_parameter) > 1:
- raise SwaggerError("Cannot have more than one body param", context)
- self.body_parameter = self.body_parameter and self.body_parameter[0]
- self.has_body_parameter = self.body_parameter and True
- self.summary = op_json.get('summary')
- self.notes = op_json.get('notes')
- err_json = op_json.get('errorResponses') or []
- self.error_responses = [
- ErrorResponse().load(j, processor, context) for j in err_json]
- self.has_error_responses = self.error_responses != []
- processor.process_operation(self, context)
- return self
- class Api(Stringify):
- """Model of a single API in an API declaration.
- See https://github.com/wordnik/swagger-core/wiki/API-Declaration
- """
- required_fields = ['path', 'operations']
- def __init__(self,):
- self.path = None
- self.description = None
- self.operations = []
- def load(self, api_json, processor, context):
- context = context.next_stack(api_json, 'path')
- validate_required_fields(api_json, self.required_fields, context)
- self.path = api_json.get('path')
- self.description = api_json.get('description')
- op_json = api_json.get('operations')
- self.operations = [
- Operation().load(j, processor, context) for j in op_json]
- self.has_websocket = any(op.is_websocket for op in self.operations)
- processor.process_api(self, context)
- return self
- def get_list_parameter_type(type_string):
- """Returns the type parameter if the given type_string is List[].
- @param type_string: Type string to parse
- @returns Type parameter of the list, or None if not a List.
- """
- list_match = re.match('^List\[(.*)\]$', type_string)
- return list_match and list_match.group(1)
- class Property(Stringify):
- """Model of a Swagger property.
- See https://github.com/wordnik/swagger-core/wiki/datatypes
- """
- required_fields = ['type']
- def __init__(self, name):
- self.name = name
- self.type = None
- self.description = None
- self.required = None
- def load(self, property_json, processor, context):
- validate_required_fields(property_json, self.required_fields, context)
- # Bit of a hack, but properties do not self-identify
- context = context.next_stack({'name': self.name}, 'name')
- self.description = property_json.get('description') or ''
- self.required = property_json.get('required') or False
- type = property_json.get('type')
- self.type = type and SwaggerType().load(type, processor, context)
- processor.process_property(self, context)
- return self
- class Model(Stringify):
- """Model of a Swagger model.
- See https://github.com/wordnik/swagger-core/wiki/datatypes
- """
- required_fields = ['description', 'properties']
- def __init__(self):
- self.id = None
- self.subtypes = []
- self.__subtype_types = []
- self.notes = None
- self.description = None
- self.__properties = None
- self.__discriminator = None
- self.__extends_type = None
- def load(self, id, model_json, processor, context):
- context = context.next_stack(model_json, 'id')
- validate_required_fields(model_json, self.required_fields, context)
- # The duplication of the model's id is required by the Swagger spec.
- self.id = model_json.get('id')
- if id != self.id:
- raise SwaggerError("Model id doesn't match name", context)
- self.subtypes = model_json.get('subTypes') or []
- if self.subtypes and context.version_less_than("1.2"):
- raise SwaggerError("Type extension support added in Swagger 1.2",
- context)
- self.description = model_json.get('description')
- props = model_json.get('properties').items() or []
- self.__properties = [
- Property(k).load(j, processor, context) for (k, j) in props]
- self.__properties = sorted(self.__properties, key=lambda p: p.name)
- discriminator = model_json.get('discriminator')
- if discriminator:
- if context.version_less_than("1.2"):
- raise SwaggerError("Discriminator support added in Swagger 1.2",
- context)
- discr_props = [p for p in self.__properties if p.name == discriminator]
- if not discr_props:
- raise SwaggerError(
- "Discriminator '%s' does not name a property of '%s'" % (
- discriminator, self.id),
- context)
- self.__discriminator = discr_props[0]
- self.model_json = json.dumps(model_json,
- indent=2, separators=(',', ': '))
- processor.process_model(self, context)
- return self
- def extends(self):
- return self.__extends_type and self.__extends_type.id
- def set_extends_type(self, extends_type):
- self.__extends_type = extends_type
- def set_subtype_types(self, subtype_types):
- self.__subtype_types = subtype_types
- def discriminator(self):
- """Returns the discriminator, digging through base types if needed.
- """
- return self.__discriminator or \
- self.__extends_type and self.__extends_type.discriminator()
- def properties(self):
- base_props = []
- if self.__extends_type:
- base_props = self.__extends_type.properties()
- return base_props + self.__properties
- def has_properties(self):
- return len(self.properties()) > 0
- def all_subtypes(self):
- """Returns the full list of all subtypes, including sub-subtypes.
- """
- res = self.__subtype_types + \
- [subsubtypes for subtype in self.__subtype_types
- for subsubtypes in subtype.all_subtypes()]
- return sorted(res, key=lambda m: m.id)
- def has_subtypes(self):
- """Returns True if type has any subtypes.
- """
- return len(self.subtypes) > 0
- class ApiDeclaration(Stringify):
- """Model class for an API Declaration.
- See https://github.com/wordnik/swagger-core/wiki/API-Declaration
- """
- required_fields = [
- 'swaggerVersion', '_author', '_copyright', 'apiVersion', 'basePath',
- 'resourcePath', 'apis', 'models'
- ]
- def __init__(self):
- self.swagger_version = None
- self.author = None
- self.copyright = None
- self.api_version = None
- self.base_path = None
- self.resource_path = None
- self.apis = []
- self.models = []
- def load_file(self, api_declaration_file, processor):
- context = ParsingContext(None, [api_declaration_file])
- try:
- return self.__load_file(api_declaration_file, processor, context)
- except SwaggerError:
- raise
- except Exception as e:
- print("Error: ", traceback.format_exc(), file=sys.stderr)
- raise SwaggerError(
- "Error loading %s" % api_declaration_file, context, e)
- def __load_file(self, api_declaration_file, processor, context):
- with open(api_declaration_file) as fp:
- self.load(json.load(fp), processor, context)
- expected_resource_path = '/api-docs/' + \
- os.path.basename(api_declaration_file) \
- .replace(".json", ".{format}")
- if self.resource_path != expected_resource_path:
- print("%s != %s" % (self.resource_path, expected_resource_path),
- file=sys.stderr)
- raise SwaggerError("resourcePath has incorrect value", context)
- return self
- def load(self, api_decl_json, processor, context):
- """Loads a resource from a single Swagger resource.json file.
- """
- # If the version doesn't match, all bets are off.
- self.swagger_version = api_decl_json.get('swaggerVersion')
- context = context.next(version=self.swagger_version)
- if not self.swagger_version in SWAGGER_VERSIONS:
- raise SwaggerError(
- "Unsupported Swagger version %s" % self.swagger_version, context)
- validate_required_fields(api_decl_json, self.required_fields, context)
- self.author = api_decl_json.get('_author')
- self.copyright = api_decl_json.get('_copyright')
- self.api_version = api_decl_json.get('apiVersion')
- self.base_path = api_decl_json.get('basePath')
- self.resource_path = api_decl_json.get('resourcePath')
- api_json = api_decl_json.get('apis') or []
- self.apis = [
- Api().load(j, processor, context) for j in api_json]
- paths = set()
- for api in self.apis:
- if api.path in paths:
- raise SwaggerError("API with duplicated path: %s" % api.path, context)
- paths.add(api.path)
- self.has_websocket = any(api.has_websocket for api in self.apis)
- models = api_decl_json.get('models').items() or []
- self.models = [Model().load(id, json, processor, context)
- for (id, json) in models]
- self.models = sorted(self.models, key=lambda m: m.id)
- # Now link all base/extended types
- model_dict = dict((m.id, m) for m in self.models)
- for m in self.models:
- def link_subtype(name):
- res = model_dict.get(name)
- if not res:
- raise SwaggerError("%s has non-existing subtype %s",
- m.id, name)
- res.set_extends_type(m)
- return res;
- if m.subtypes:
- m.set_subtype_types([
- link_subtype(subtype) for subtype in m.subtypes])
- return self
- class ResourceApi(Stringify):
- """Model of an API listing in the resources.json file.
- """
- required_fields = ['path', 'description']
- def __init__(self):
- self.path = None
- self.description = None
- self.api_declaration = None
- def load(self, api_json, processor, context):
- context = context.next_stack(api_json, 'path')
- validate_required_fields(api_json, self.required_fields, context)
- self.path = api_json['path'].replace('{format}', 'json')
- self.description = api_json['description']
- if not self.path or self.path[0] != '/':
- raise SwaggerError("Path must start with /", context)
- processor.process_resource_api(self, context)
- return self
- def load_api_declaration(self, base_dir, processor):
- self.file = (base_dir + self.path)
- self.api_declaration = ApiDeclaration().load_file(self.file, processor)
- processor.process_resource_api(self, [self.file])
- class ResourceListing(Stringify):
- """Model of Swagger's resources.json file.
- """
- required_fields = ['apiVersion', 'basePath', 'apis']
- def __init__(self):
- self.swagger_version = None
- self.api_version = None
- self.base_path = None
- self.apis = None
- def load_file(self, resource_file, processor):
- context = ParsingContext(None, [resource_file])
- try:
- return self.__load_file(resource_file, processor, context)
- except SwaggerError:
- raise
- except Exception as e:
- print("Error: ", traceback.format_exc(), file=sys.stderr)
- raise SwaggerError(
- "Error loading %s" % resource_file, context, e)
- def __load_file(self, resource_file, processor, context):
- with open(resource_file) as fp:
- return self.load(json.load(fp), processor, context)
- def load(self, resources_json, processor, context):
- # If the version doesn't match, all bets are off.
- self.swagger_version = resources_json.get('swaggerVersion')
- if not self.swagger_version in SWAGGER_VERSIONS:
- raise SwaggerError(
- "Unsupported Swagger version %s" % self.swagger_version, context)
- validate_required_fields(resources_json, self.required_fields, context)
- self.api_version = resources_json['apiVersion']
- self.base_path = resources_json['basePath']
- apis_json = resources_json['apis']
- self.apis = [
- ResourceApi().load(j, processor, context) for j in apis_json]
- processor.process_resource_listing(self, context)
- return self
- def validate_required_fields(json, required_fields, context):
- """Checks a JSON object for a set of required fields.
- If any required field is missing, a SwaggerError is raised.
- @param json: JSON object to check.
- @param required_fields: List of required fields.
- @param context: Current context in the API.
- """
- missing_fields = [f for f in required_fields if not f in json]
- if missing_fields:
- raise SwaggerError(
- "Missing fields: %s" % ', '.join(missing_fields), context)
|