diff options
Diffstat (limited to 'wsme/rest')
-rw-r--r-- | wsme/rest/__init__.py | 78 | ||||
-rw-r--r-- | wsme/rest/args.py | 310 | ||||
-rw-r--r-- | wsme/rest/json.py | 328 | ||||
-rw-r--r-- | wsme/rest/protocol.py | 133 | ||||
-rw-r--r-- | wsme/rest/xml.py | 298 |
5 files changed, 0 insertions, 1147 deletions
diff --git a/wsme/rest/__init__.py b/wsme/rest/__init__.py deleted file mode 100644 index f35d6a9..0000000 --- a/wsme/rest/__init__.py +++ /dev/null @@ -1,78 +0,0 @@ -import inspect -import wsme.api - -APIPATH_MAXLEN = 20 - - -class expose(object): - def __init__(self, *args, **kwargs): - self.signature = wsme.api.signature(*args, **kwargs) - - def __call__(self, func): - return self.signature(func) - - @classmethod - def with_method(cls, method, *args, **kwargs): - kwargs['method'] = method - return cls(*args, **kwargs) - - @classmethod - def get(cls, *args, **kwargs): - return cls.with_method('GET', *args, **kwargs) - - @classmethod - def post(cls, *args, **kwargs): - return cls.with_method('POST', *args, **kwargs) - - @classmethod - def put(cls, *args, **kwargs): - return cls.with_method('PUT', *args, **kwargs) - - @classmethod - def delete(cls, *args, **kwargs): - return cls.with_method('DELETE', *args, **kwargs) - - -class validate(object): - """ - Decorator that define the arguments types of a function. - - - Example:: - - class MyController(object): - @expose(str) - @validate(datetime.date, datetime.time) - def format(self, d, t): - return d.isoformat() + ' ' + t.isoformat() - """ - def __init__(self, *param_types): - self.param_types = param_types - - def __call__(self, func): - argspec = wsme.api.getargspec(func) - fd = wsme.api.FunctionDefinition.get(func) - fd.set_arg_types(argspec, self.param_types) - return func - - -def scan_api(controller, path=[], objects=[]): - """ - Recursively iterate a controller api entries. - """ - for name in dir(controller): - if name.startswith('_'): - continue - a = getattr(controller, name) - if a in objects: - continue - if inspect.ismethod(a): - if wsme.api.iswsmefunction(a): - yield path + [name], a, [] - elif inspect.isclass(a): - continue - else: - if len(path) > APIPATH_MAXLEN: - raise ValueError("Path is too long: " + str(path)) - for i in scan_api(a, path + [name], objects + [a]): - yield i diff --git a/wsme/rest/args.py b/wsme/rest/args.py deleted file mode 100644 index 9dc16c7..0000000 --- a/wsme/rest/args.py +++ /dev/null @@ -1,310 +0,0 @@ -import cgi -import datetime -import re - -from simplegeneric import generic - -from wsme.exc import ClientSideError, UnknownArgument, InvalidInput - -from wsme.types import iscomplex, list_attributes, Unset -from wsme.types import UserType, ArrayType, DictType, File -from wsme.utils import parse_isodate, parse_isotime, parse_isodatetime -import wsme.runtime - -from six import moves - -ARRAY_MAX_SIZE = 1000 - - -@generic -def from_param(datatype, value): - return datatype(value) if value is not None else None - - -@from_param.when_object(datetime.date) -def date_from_param(datatype, value): - return parse_isodate(value) if value else None - - -@from_param.when_object(datetime.time) -def time_from_param(datatype, value): - return parse_isotime(value) if value else None - - -@from_param.when_object(datetime.datetime) -def datetime_from_param(datatype, value): - return parse_isodatetime(value) if value else None - - -@from_param.when_object(File) -def filetype_from_param(datatype, value): - if isinstance(value, cgi.FieldStorage): - return File(fieldstorage=value) - return File(content=value) - - -@from_param.when_type(UserType) -def usertype_from_param(datatype, value): - return datatype.frombasetype( - from_param(datatype.basetype, value)) - - -@from_param.when_type(ArrayType) -def array_from_param(datatype, value): - if value is None: - return value - return [ - from_param(datatype.item_type, item) - for item in value - ] - - -@generic -def from_params(datatype, params, path, hit_paths): - if iscomplex(datatype) and datatype is not File: - objfound = False - for key in params: - if key.startswith(path + '.'): - objfound = True - break - if objfound: - r = datatype() - for attrdef in list_attributes(datatype): - value = from_params( - attrdef.datatype, - params, '%s.%s' % (path, attrdef.key), hit_paths - ) - if value is not Unset: - setattr(r, attrdef.key, value) - return r - else: - if path in params: - hit_paths.add(path) - return from_param(datatype, params[path]) - return Unset - - -@from_params.when_type(ArrayType) -def array_from_params(datatype, params, path, hit_paths): - if hasattr(params, 'getall'): - # webob multidict - def getall(params, path): - return params.getall(path) - elif hasattr(params, 'getlist'): - # werkzeug multidict - def getall(params, path): # noqa - return params.getlist(path) - if path in params: - hit_paths.add(path) - return [ - from_param(datatype.item_type, value) - for value in getall(params, path)] - - if iscomplex(datatype.item_type): - attributes = set() - r = re.compile(r'^%s\.(?P<attrname>[^\.])' % re.escape(path)) - for p in params.keys(): - m = r.match(p) - if m: - attributes.add(m.group('attrname')) - if attributes: - value = [] - for attrdef in list_attributes(datatype.item_type): - attrpath = '%s.%s' % (path, attrdef.key) - hit_paths.add(attrpath) - attrvalues = getall(params, attrpath) - if len(value) < len(attrvalues): - value[-1:] = [ - datatype.item_type() - for i in moves.range(len(attrvalues) - len(value)) - ] - for i, attrvalue in enumerate(attrvalues): - setattr( - value[i], - attrdef.key, - from_param(attrdef.datatype, attrvalue) - ) - return value - - indexes = set() - r = re.compile(r'^%s\[(?P<index>\d+)\]' % re.escape(path)) - - for p in params.keys(): - m = r.match(p) - if m: - indexes.add(int(m.group('index'))) - - if not indexes: - return Unset - - indexes = list(indexes) - indexes.sort() - - return [from_params(datatype.item_type, params, - '%s[%s]' % (path, index), hit_paths) - for index in indexes] - - -@from_params.when_type(DictType) -def dict_from_params(datatype, params, path, hit_paths): - - keys = set() - r = re.compile(r'^%s\[(?P<key>[a-zA-Z0-9_\.]+)\]' % re.escape(path)) - - for p in params.keys(): - m = r.match(p) - if m: - keys.add(from_param(datatype.key_type, m.group('key'))) - - if not keys: - return Unset - - return dict(( - (key, from_params(datatype.value_type, - params, '%s[%s]' % (path, key), hit_paths)) - for key in keys)) - - -@from_params.when_type(UserType) -def usertype_from_params(datatype, params, path, hit_paths): - value = from_params(datatype.basetype, params, path, hit_paths) - if value is not Unset: - return datatype.frombasetype(value) - return Unset - - -def args_from_args(funcdef, args, kwargs): - newargs = [] - for argdef, arg in zip(funcdef.arguments[:len(args)], args): - try: - newargs.append(from_param(argdef.datatype, arg)) - except Exception as e: - if isinstance(argdef.datatype, UserType): - datatype_name = argdef.datatype.name - elif isinstance(argdef.datatype, type): - datatype_name = argdef.datatype.__name__ - else: - datatype_name = argdef.datatype.__class__.__name__ - raise InvalidInput( - argdef.name, - arg, - "unable to convert to %(datatype)s. Error: %(error)s" % { - 'datatype': datatype_name, 'error': e}) - newkwargs = {} - for argname, value in kwargs.items(): - newkwargs[argname] = from_param( - funcdef.get_arg(argname).datatype, value - ) - return newargs, newkwargs - - -def args_from_params(funcdef, params): - kw = {} - hit_paths = set() - for argdef in funcdef.arguments: - value = from_params( - argdef.datatype, params, argdef.name, hit_paths) - if value is not Unset: - kw[argdef.name] = value - paths = set(params.keys()) - unknown_paths = paths - hit_paths - if '__body__' in unknown_paths: - unknown_paths.remove('__body__') - if not funcdef.ignore_extra_args and unknown_paths: - raise UnknownArgument(', '.join(unknown_paths)) - return [], kw - - -def args_from_body(funcdef, body, mimetype): - from wsme.rest import json as restjson - from wsme.rest import xml as restxml - - if funcdef.body_type is not None: - datatypes = {funcdef.arguments[-1].name: funcdef.body_type} - else: - datatypes = dict(((a.name, a.datatype) for a in funcdef.arguments)) - - if not body: - return (), {} - if mimetype == "application/x-www-form-urlencoded": - # the parameters should have been parsed in params - return (), {} - elif mimetype in restjson.accept_content_types: - dataformat = restjson - elif mimetype in restxml.accept_content_types: - dataformat = restxml - else: - raise ClientSideError("Unknown mimetype: %s" % mimetype, - status_code=415) - - try: - kw = dataformat.parse( - body, datatypes, bodyarg=funcdef.body_type is not None - ) - except UnknownArgument: - if not funcdef.ignore_extra_args: - raise - kw = {} - - return (), kw - - -def combine_args(funcdef, akw, allow_override=False): - newargs, newkwargs = [], {} - for args, kwargs in akw: - for i, arg in enumerate(args): - n = funcdef.arguments[i].name - if not allow_override and n in newkwargs: - raise ClientSideError( - "Parameter %s was given several times" % n) - newkwargs[n] = arg - for name, value in kwargs.items(): - n = str(name) - if not allow_override and n in newkwargs: - raise ClientSideError( - "Parameter %s was given several times" % n) - newkwargs[n] = value - return newargs, newkwargs - - -def get_args(funcdef, args, kwargs, params, form, body, mimetype): - """Combine arguments from : - * the host framework args and kwargs - * the request params - * the request body - - Note that the host framework args and kwargs can be overridden - by arguments from params of body - """ - # get the body from params if not given directly - if not body and '__body__' in params: - body = params['__body__'] - - # extract args from the host args and kwargs - from_args = args_from_args(funcdef, args, kwargs) - - # extract args from the request parameters - from_params = args_from_params(funcdef, params) - - # extract args from the form parameters - if form: - from_form_params = args_from_params(funcdef, form) - else: - from_form_params = (), {} - - # extract args from the request body - from_body = args_from_body(funcdef, body, mimetype) - - # combine params and body arguments - from_params_and_body = combine_args( - funcdef, - (from_params, from_form_params, from_body) - ) - - args, kwargs = combine_args( - funcdef, - (from_args, from_params_and_body), - allow_override=True - ) - wsme.runtime.check_arguments(funcdef, args, kwargs) - return args, kwargs diff --git a/wsme/rest/json.py b/wsme/rest/json.py deleted file mode 100644 index 48bd082..0000000 --- a/wsme/rest/json.py +++ /dev/null @@ -1,328 +0,0 @@ -"""REST+Json protocol implementation.""" -from __future__ import absolute_import -import datetime -import decimal - -import six - -from simplegeneric import generic - -import wsme.exc -import wsme.types -from wsme.types import Unset -import wsme.utils - - -try: - import simplejson as json -except ImportError: - import json # noqa - - -content_type = 'application/json' -accept_content_types = [ - content_type, - 'text/javascript', - 'application/javascript' -] -ENUM_TRUE = ('true', 't', 'yes', 'y', 'on', '1') -ENUM_FALSE = ('false', 'f', 'no', 'n', 'off', '0') - - -@generic -def tojson(datatype, value): - """ - A generic converter from python to jsonify-able datatypes. - - If a non-complex user specific type is to be used in the api, - a specific tojson should be added:: - - from wsme.protocol.restjson import tojson - - myspecialtype = object() - - @tojson.when_object(myspecialtype) - def myspecialtype_tojson(datatype, value): - return str(value) - """ - if value is None: - return None - if wsme.types.iscomplex(datatype): - d = dict() - for attr in wsme.types.list_attributes(datatype): - attr_value = getattr(value, attr.key) - if attr_value is not Unset: - d[attr.name] = tojson(attr.datatype, attr_value) - return d - elif wsme.types.isusertype(datatype): - return tojson(datatype.basetype, datatype.tobasetype(value)) - return value - - -@tojson.when_object(wsme.types.bytes) -def bytes_tojson(datatype, value): - if value is None: - return None - return value.decode('ascii') - - -@tojson.when_type(wsme.types.ArrayType) -def array_tojson(datatype, value): - if value is None: - return None - return [tojson(datatype.item_type, item) for item in value] - - -@tojson.when_type(wsme.types.DictType) -def dict_tojson(datatype, value): - if value is None: - return None - return dict(( - (tojson(datatype.key_type, item[0]), - tojson(datatype.value_type, item[1])) - for item in value.items() - )) - - -@tojson.when_object(decimal.Decimal) -def decimal_tojson(datatype, value): - if value is None: - return None - return str(value) - - -@tojson.when_object(datetime.date) -def date_tojson(datatype, value): - if value is None: - return None - return value.isoformat() - - -@tojson.when_object(datetime.time) -def time_tojson(datatype, value): - if value is None: - return None - return value.isoformat() - - -@tojson.when_object(datetime.datetime) -def datetime_tojson(datatype, value): - if value is None: - return None - return value.isoformat() - - -@generic -def fromjson(datatype, value): - """A generic converter from json base types to python datatype. - - If a non-complex user specific type is to be used in the api, - a specific fromjson should be added:: - - from wsme.protocol.restjson import fromjson - - class MySpecialType(object): - pass - - @fromjson.when_object(MySpecialType) - def myspecialtype_fromjson(datatype, value): - return MySpecialType(value) - """ - if value is None: - return None - if wsme.types.iscomplex(datatype): - obj = datatype() - attributes = wsme.types.list_attributes(datatype) - - # Here we check that all the attributes in the value are also defined - # in our type definition, otherwise we raise an Error. - v_keys = set(value.keys()) - a_keys = set(adef.name for adef in attributes) - if not v_keys <= a_keys: - raise wsme.exc.UnknownAttribute(None, v_keys - a_keys) - - for attrdef in attributes: - if attrdef.name in value: - try: - val_fromjson = fromjson(attrdef.datatype, - value[attrdef.name]) - except wsme.exc.UnknownAttribute as e: - e.add_fieldname(attrdef.name) - raise - if getattr(attrdef, 'readonly', False): - raise wsme.exc.InvalidInput(attrdef.name, val_fromjson, - "Cannot set read only field.") - setattr(obj, attrdef.key, val_fromjson) - elif attrdef.mandatory: - raise wsme.exc.InvalidInput(attrdef.name, None, - "Mandatory field missing.") - - return wsme.types.validate_value(datatype, obj) - elif wsme.types.isusertype(datatype): - value = datatype.frombasetype( - fromjson(datatype.basetype, value)) - return value - - -@fromjson.when_type(wsme.types.ArrayType) -def array_fromjson(datatype, value): - if value is None: - return None - if not isinstance(value, list): - raise ValueError("Value not a valid list: %s" % value) - return [fromjson(datatype.item_type, item) for item in value] - - -@fromjson.when_type(wsme.types.DictType) -def dict_fromjson(datatype, value): - if value is None: - return None - if not isinstance(value, dict): - raise ValueError("Value not a valid dict: %s" % value) - return dict(( - (fromjson(datatype.key_type, item[0]), - fromjson(datatype.value_type, item[1])) - for item in value.items())) - - -@fromjson.when_object(six.binary_type) -def str_fromjson(datatype, value): - if (isinstance(value, six.string_types) or - isinstance(value, six.integer_types) or - isinstance(value, float)): - return six.text_type(value).encode('utf8') - - -@fromjson.when_object(wsme.types.text) -def text_fromjson(datatype, value): - if value is not None and isinstance(value, wsme.types.bytes): - return wsme.types.text(value) - return value - - -@fromjson.when_object(*six.integer_types + (float,)) -def numeric_fromjson(datatype, value): - """Convert string object to built-in types int, long or float.""" - if value is None: - return None - return datatype(value) - - -@fromjson.when_object(bool) -def bool_fromjson(datatype, value): - """Convert to bool, restricting strings to just unambiguous values.""" - if value is None: - return None - if isinstance(value, six.integer_types + (bool,)): - return bool(value) - if value in ENUM_TRUE: - return True - if value in ENUM_FALSE: - return False - raise ValueError("Value not an unambiguous boolean: %s" % value) - - -@fromjson.when_object(decimal.Decimal) -def decimal_fromjson(datatype, value): - if value is None: - return None - return decimal.Decimal(value) - - -@fromjson.when_object(datetime.date) -def date_fromjson(datatype, value): - if value is None: - return None - return wsme.utils.parse_isodate(value) - - -@fromjson.when_object(datetime.time) -def time_fromjson(datatype, value): - if value is None: - return None - return wsme.utils.parse_isotime(value) - - -@fromjson.when_object(datetime.datetime) -def datetime_fromjson(datatype, value): - if value is None: - return None - return wsme.utils.parse_isodatetime(value) - - -def parse(s, datatypes, bodyarg, encoding='utf8'): - jload = json.load - if not hasattr(s, 'read'): - if six.PY3 and isinstance(s, six.binary_type): - s = s.decode(encoding) - jload = json.loads - try: - jdata = jload(s) - except ValueError: - raise wsme.exc.ClientSideError("Request is not in valid JSON format") - if bodyarg: - argname = list(datatypes.keys())[0] - try: - kw = {argname: fromjson(datatypes[argname], jdata)} - except ValueError as e: - raise wsme.exc.InvalidInput(argname, jdata, e.args[0]) - except wsme.exc.UnknownAttribute as e: - # We only know the fieldname at this level, not in the - # called function. We fill in this information here. - e.add_fieldname(argname) - raise - else: - kw = {} - extra_args = [] - if not isinstance(jdata, dict): - raise wsme.exc.ClientSideError("Request must be a JSON dict") - for key in jdata: - if key not in datatypes: - extra_args.append(key) - else: - try: - kw[key] = fromjson(datatypes[key], jdata[key]) - except ValueError as e: - raise wsme.exc.InvalidInput(key, jdata[key], e.args[0]) - except wsme.exc.UnknownAttribute as e: - # We only know the fieldname at this level, not in the - # called function. We fill in this information here. - e.add_fieldname(key) - raise - if extra_args: - raise wsme.exc.UnknownArgument(', '.join(extra_args)) - return kw - - -def encode_result(value, datatype, **options): - jsondata = tojson(datatype, value) - if options.get('nest_result', False): - jsondata = {options.get('nested_result_attrname', 'result'): jsondata} - return json.dumps(jsondata) - - -def encode_error(context, errordetail): - return json.dumps(errordetail) - - -def encode_sample_value(datatype, value, format=False): - r = tojson(datatype, value) - content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0, - sort_keys=format) - return ('javascript', content) - - -def encode_sample_params(params, format=False): - kw = {} - for name, datatype, value in params: - kw[name] = tojson(datatype, value) - content = json.dumps(kw, ensure_ascii=False, indent=4 if format else 0, - sort_keys=format) - return ('javascript', content) - - -def encode_sample_result(datatype, value, format=False): - r = tojson(datatype, value) - content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0, - sort_keys=format) - return ('javascript', content) diff --git a/wsme/rest/protocol.py b/wsme/rest/protocol.py deleted file mode 100644 index 5201ccf..0000000 --- a/wsme/rest/protocol.py +++ /dev/null @@ -1,133 +0,0 @@ -import os.path -import logging - -from wsme.utils import OrderedDict -from wsme.protocol import CallContext, Protocol, media_type_accept - -import wsme.rest -import wsme.rest.args -import wsme.runtime - -log = logging.getLogger(__name__) - - -class RestProtocol(Protocol): - name = 'rest' - displayname = 'REST' - dataformats = ['json', 'xml'] - content_types = ['application/json', 'text/xml'] - - def __init__(self, dataformats=None): - if dataformats is None: - dataformats = RestProtocol.dataformats - - self.dataformats = OrderedDict() - self.content_types = [] - - for dataformat in dataformats: - __import__('wsme.rest.' + dataformat) - dfmod = getattr(wsme.rest, dataformat) - self.dataformats[dataformat] = dfmod - self.content_types.extend(dfmod.accept_content_types) - - def accept(self, request): - for dataformat in self.dataformats: - if request.path.endswith('.' + dataformat): - return True - return media_type_accept(request, self.content_types) - - def iter_calls(self, request): - context = CallContext(request) - context.outformat = None - ext = os.path.splitext(request.path.split('/')[-1])[1] - inmime = request.content_type - outmime = request.accept.best_match(self.content_types) - - outformat = None - informat = None - for dfname, df in self.dataformats.items(): - if ext == '.' + dfname: - outformat = df - if not inmime: - informat = df - - if outformat is None and request.accept: - for dfname, df in self.dataformats.items(): - if outmime in df.accept_content_types: - outformat = df - if not inmime: - informat = df - - if outformat is None: - for dfname, df in self.dataformats.items(): - if inmime == df.content_type: - outformat = df - - context.outformat = outformat - context.outformat_options = { - 'nest_result': getattr(self, 'nest_result', False) - } - if not inmime and informat: - inmime = informat.content_type - log.debug("Inferred input type: %s" % inmime) - context.inmime = inmime - yield context - - def extract_path(self, context): - path = context.request.path - assert path.startswith(self.root._webpath) - path = path[len(self.root._webpath):] - path = path.strip('/').split('/') - - for dataformat in self.dataformats: - if path[-1].endswith('.' + dataformat): - path[-1] = path[-1][:-len(dataformat) - 1] - - # Check if the path is actually a function, and if not - # see if the http method make a difference - # TODO Re-think the function lookup phases. Here we are - # doing the job that will be done in a later phase, which - # is sub-optimal - for p, fdef in self.root.getapi(): - if p == path: - return path - - # No function at this path. Now check for function that have - # this path as a prefix, and declared an http method - for p, fdef in self.root.getapi(): - if len(p) == len(path) + 1 and p[:len(path)] == path and \ - fdef.extra_options.get('method') == context.request.method: - return p - - return path - - def read_arguments(self, context): - request = context.request - funcdef = context.funcdef - - body = None - if request.content_length not in (None, 0, '0'): - body = request.body - if not body and '__body__' in request.params: - body = request.params['__body__'] - - args, kwargs = wsme.rest.args.combine_args( - funcdef, - (wsme.rest.args.args_from_params(funcdef, request.params), - wsme.rest.args.args_from_body(funcdef, body, context.inmime)) - ) - wsme.runtime.check_arguments(funcdef, args, kwargs) - return kwargs - - def encode_result(self, context, result): - out = context.outformat.encode_result( - result, context.funcdef.return_type, - **context.outformat_options - ) - return out - - def encode_error(self, context, errordetail): - out = context.outformat.encode_error( - context, errordetail - ) - return out diff --git a/wsme/rest/xml.py b/wsme/rest/xml.py deleted file mode 100644 index 286afa7..0000000 --- a/wsme/rest/xml.py +++ /dev/null @@ -1,298 +0,0 @@ -from __future__ import absolute_import - -import datetime - -import six - -import xml.etree.ElementTree as et - -from simplegeneric import generic - -import wsme.types -from wsme.exc import UnknownArgument, InvalidInput - -import re - -content_type = 'text/xml' -accept_content_types = [ - content_type, -] - -time_re = re.compile(r'(?P<h>[0-2][0-9]):(?P<m>[0-5][0-9]):(?P<s>[0-6][0-9])') - - -def xml_indent(elem, level=0): - i = "\n" + level * " " - if len(elem): - if not elem.text or not elem.text.strip(): - elem.text = i + " " - for e in elem: - xml_indent(e, level + 1) - if not e.tail or not e.tail.strip(): - e.tail = i - if level and (not elem.tail or not elem.tail.strip()): - elem.tail = i - - -@generic -def toxml(datatype, key, value): - """ - A generic converter from python to xml elements. - - If a non-complex user specific type is to be used in the api, - a specific toxml should be added:: - - from wsme.protocol.restxml import toxml - - myspecialtype = object() - - @toxml.when_object(myspecialtype) - def myspecialtype_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - el.text = str(value) - return el - """ - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - if wsme.types.isusertype(datatype): - return toxml(datatype.basetype, - key, datatype.tobasetype(value)) - elif wsme.types.iscomplex(datatype): - for attrdef in datatype._wsme_attributes: - attrvalue = getattr(value, attrdef.key) - if attrvalue is not wsme.types.Unset: - el.append(toxml(attrdef.datatype, attrdef.name, - attrvalue)) - else: - el.text = six.text_type(value) - return el - - -@generic -def fromxml(datatype, element): - """ - A generic converter from xml elements to python datatype. - - If a non-complex user specific type is to be used in the api, - a specific fromxml should be added:: - - from wsme.protocol.restxml import fromxml - - class MySpecialType(object): - pass - - @fromxml.when_object(MySpecialType) - def myspecialtype_fromxml(datatype, element): - if element.get('nil', False): - return None - return MySpecialType(element.text) - """ - if element.get('nil', False): - return None - if wsme.types.isusertype(datatype): - return datatype.frombasetype(fromxml(datatype.basetype, element)) - if wsme.types.iscomplex(datatype): - obj = datatype() - for attrdef in wsme.types.list_attributes(datatype): - sub = element.find(attrdef.name) - if sub is not None: - val_fromxml = fromxml(attrdef.datatype, sub) - if getattr(attrdef, 'readonly', False): - raise InvalidInput(attrdef.name, val_fromxml, - "Cannot set read only field.") - setattr(obj, attrdef.key, val_fromxml) - elif attrdef.mandatory: - raise InvalidInput(attrdef.name, None, - "Mandatory field missing.") - return wsme.types.validate_value(datatype, obj) - if datatype is wsme.types.bytes: - return element.text.encode('ascii') - return datatype(element.text) - - -@toxml.when_type(wsme.types.ArrayType) -def array_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - for item in value: - el.append(toxml(datatype.item_type, 'item', item)) - return el - - -@toxml.when_type(wsme.types.DictType) -def dict_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - for item in value.items(): - key = toxml(datatype.key_type, 'key', item[0]) - value = toxml(datatype.value_type, 'value', item[1]) - node = et.Element('item') - node.append(key) - node.append(value) - el.append(node) - return el - - -@toxml.when_object(wsme.types.bytes) -def bytes_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - el.text = value.decode('ascii') - return el - - -@toxml.when_object(bool) -def bool_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - el.text = value and 'true' or 'false' - return el - - -@toxml.when_object(datetime.date) -def date_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - el.text = value.isoformat() - return el - - -@toxml.when_object(datetime.datetime) -def datetime_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - el.text = value.isoformat() - return el - - -@fromxml.when_type(wsme.types.ArrayType) -def array_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return [ - fromxml(datatype.item_type, item) - for item in element.findall('item') - ] - - -@fromxml.when_object(bool) -def bool_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return element.text.lower() != 'false' - - -@fromxml.when_type(wsme.types.DictType) -def dict_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return dict(( - (fromxml(datatype.key_type, item.find('key')), - fromxml(datatype.value_type, item.find('value'))) - for item in element.findall('item'))) - - -@fromxml.when_object(wsme.types.text) -def unicode_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return wsme.types.text(element.text) if element.text else six.u('') - - -@fromxml.when_object(datetime.date) -def date_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return wsme.utils.parse_isodate(element.text) - - -@fromxml.when_object(datetime.time) -def time_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return wsme.utils.parse_isotime(element.text) - - -@fromxml.when_object(datetime.datetime) -def datetime_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return wsme.utils.parse_isodatetime(element.text) - - -def parse(s, datatypes, bodyarg): - if hasattr(s, 'read'): - tree = et.parse(s) - else: - tree = et.fromstring(s) - if bodyarg: - name = list(datatypes.keys())[0] - return {name: fromxml(datatypes[name], tree)} - else: - kw = {} - extra_args = [] - for sub in tree: - if sub.tag not in datatypes: - extra_args.append(sub.tag) - kw[sub.tag] = fromxml(datatypes[sub.tag], sub) - if extra_args: - raise UnknownArgument(', '.join(extra_args)) - return kw - - -def encode_result(value, datatype, **options): - return et.tostring(toxml( - datatype, options.get('nested_result_attrname', 'result'), value - )) - - -def encode_error(context, errordetail): - el = et.Element('error') - et.SubElement(el, 'faultcode').text = errordetail['faultcode'] - et.SubElement(el, 'faultstring').text = errordetail['faultstring'] - if 'debuginfo' in errordetail: - et.SubElement(el, 'debuginfo').text = errordetail['debuginfo'] - return et.tostring(el) - - -def encode_sample_value(datatype, value, format=False): - r = toxml(datatype, 'value', value) - if format: - xml_indent(r) - content = et.tostring(r) - return ('xml', content) - - -def encode_sample_params(params, format=False): - node = et.Element('parameters') - for name, datatype, value in params: - node.append(toxml(datatype, name, value)) - if format: - xml_indent(node) - content = et.tostring(node) - return ('xml', content) - - -def encode_sample_result(datatype, value, format=False): - r = toxml(datatype, 'result', value) - if format: - xml_indent(r) - content = et.tostring(r) - return ('xml', content) |