summaryrefslogtreecommitdiff
path: root/wsme/rest
diff options
context:
space:
mode:
Diffstat (limited to 'wsme/rest')
-rw-r--r--wsme/rest/__init__.py78
-rw-r--r--wsme/rest/args.py310
-rw-r--r--wsme/rest/json.py328
-rw-r--r--wsme/rest/protocol.py133
-rw-r--r--wsme/rest/xml.py298
5 files changed, 0 insertions, 1147 deletions
diff --git a/wsme/rest/__init__.py b/wsme/rest/__init__.py
deleted file mode 100644
index f35d6a9..0000000
--- a/wsme/rest/__init__.py
+++ /dev/null
@@ -1,78 +0,0 @@
-import inspect
-import wsme.api
-
-APIPATH_MAXLEN = 20
-
-
-class expose(object):
- def __init__(self, *args, **kwargs):
- self.signature = wsme.api.signature(*args, **kwargs)
-
- def __call__(self, func):
- return self.signature(func)
-
- @classmethod
- def with_method(cls, method, *args, **kwargs):
- kwargs['method'] = method
- return cls(*args, **kwargs)
-
- @classmethod
- def get(cls, *args, **kwargs):
- return cls.with_method('GET', *args, **kwargs)
-
- @classmethod
- def post(cls, *args, **kwargs):
- return cls.with_method('POST', *args, **kwargs)
-
- @classmethod
- def put(cls, *args, **kwargs):
- return cls.with_method('PUT', *args, **kwargs)
-
- @classmethod
- def delete(cls, *args, **kwargs):
- return cls.with_method('DELETE', *args, **kwargs)
-
-
-class validate(object):
- """
- Decorator that define the arguments types of a function.
-
-
- Example::
-
- class MyController(object):
- @expose(str)
- @validate(datetime.date, datetime.time)
- def format(self, d, t):
- return d.isoformat() + ' ' + t.isoformat()
- """
- def __init__(self, *param_types):
- self.param_types = param_types
-
- def __call__(self, func):
- argspec = wsme.api.getargspec(func)
- fd = wsme.api.FunctionDefinition.get(func)
- fd.set_arg_types(argspec, self.param_types)
- return func
-
-
-def scan_api(controller, path=[], objects=[]):
- """
- Recursively iterate a controller api entries.
- """
- for name in dir(controller):
- if name.startswith('_'):
- continue
- a = getattr(controller, name)
- if a in objects:
- continue
- if inspect.ismethod(a):
- if wsme.api.iswsmefunction(a):
- yield path + [name], a, []
- elif inspect.isclass(a):
- continue
- else:
- if len(path) > APIPATH_MAXLEN:
- raise ValueError("Path is too long: " + str(path))
- for i in scan_api(a, path + [name], objects + [a]):
- yield i
diff --git a/wsme/rest/args.py b/wsme/rest/args.py
deleted file mode 100644
index 9dc16c7..0000000
--- a/wsme/rest/args.py
+++ /dev/null
@@ -1,310 +0,0 @@
-import cgi
-import datetime
-import re
-
-from simplegeneric import generic
-
-from wsme.exc import ClientSideError, UnknownArgument, InvalidInput
-
-from wsme.types import iscomplex, list_attributes, Unset
-from wsme.types import UserType, ArrayType, DictType, File
-from wsme.utils import parse_isodate, parse_isotime, parse_isodatetime
-import wsme.runtime
-
-from six import moves
-
-ARRAY_MAX_SIZE = 1000
-
-
-@generic
-def from_param(datatype, value):
- return datatype(value) if value is not None else None
-
-
-@from_param.when_object(datetime.date)
-def date_from_param(datatype, value):
- return parse_isodate(value) if value else None
-
-
-@from_param.when_object(datetime.time)
-def time_from_param(datatype, value):
- return parse_isotime(value) if value else None
-
-
-@from_param.when_object(datetime.datetime)
-def datetime_from_param(datatype, value):
- return parse_isodatetime(value) if value else None
-
-
-@from_param.when_object(File)
-def filetype_from_param(datatype, value):
- if isinstance(value, cgi.FieldStorage):
- return File(fieldstorage=value)
- return File(content=value)
-
-
-@from_param.when_type(UserType)
-def usertype_from_param(datatype, value):
- return datatype.frombasetype(
- from_param(datatype.basetype, value))
-
-
-@from_param.when_type(ArrayType)
-def array_from_param(datatype, value):
- if value is None:
- return value
- return [
- from_param(datatype.item_type, item)
- for item in value
- ]
-
-
-@generic
-def from_params(datatype, params, path, hit_paths):
- if iscomplex(datatype) and datatype is not File:
- objfound = False
- for key in params:
- if key.startswith(path + '.'):
- objfound = True
- break
- if objfound:
- r = datatype()
- for attrdef in list_attributes(datatype):
- value = from_params(
- attrdef.datatype,
- params, '%s.%s' % (path, attrdef.key), hit_paths
- )
- if value is not Unset:
- setattr(r, attrdef.key, value)
- return r
- else:
- if path in params:
- hit_paths.add(path)
- return from_param(datatype, params[path])
- return Unset
-
-
-@from_params.when_type(ArrayType)
-def array_from_params(datatype, params, path, hit_paths):
- if hasattr(params, 'getall'):
- # webob multidict
- def getall(params, path):
- return params.getall(path)
- elif hasattr(params, 'getlist'):
- # werkzeug multidict
- def getall(params, path): # noqa
- return params.getlist(path)
- if path in params:
- hit_paths.add(path)
- return [
- from_param(datatype.item_type, value)
- for value in getall(params, path)]
-
- if iscomplex(datatype.item_type):
- attributes = set()
- r = re.compile(r'^%s\.(?P<attrname>[^\.])' % re.escape(path))
- for p in params.keys():
- m = r.match(p)
- if m:
- attributes.add(m.group('attrname'))
- if attributes:
- value = []
- for attrdef in list_attributes(datatype.item_type):
- attrpath = '%s.%s' % (path, attrdef.key)
- hit_paths.add(attrpath)
- attrvalues = getall(params, attrpath)
- if len(value) < len(attrvalues):
- value[-1:] = [
- datatype.item_type()
- for i in moves.range(len(attrvalues) - len(value))
- ]
- for i, attrvalue in enumerate(attrvalues):
- setattr(
- value[i],
- attrdef.key,
- from_param(attrdef.datatype, attrvalue)
- )
- return value
-
- indexes = set()
- r = re.compile(r'^%s\[(?P<index>\d+)\]' % re.escape(path))
-
- for p in params.keys():
- m = r.match(p)
- if m:
- indexes.add(int(m.group('index')))
-
- if not indexes:
- return Unset
-
- indexes = list(indexes)
- indexes.sort()
-
- return [from_params(datatype.item_type, params,
- '%s[%s]' % (path, index), hit_paths)
- for index in indexes]
-
-
-@from_params.when_type(DictType)
-def dict_from_params(datatype, params, path, hit_paths):
-
- keys = set()
- r = re.compile(r'^%s\[(?P<key>[a-zA-Z0-9_\.]+)\]' % re.escape(path))
-
- for p in params.keys():
- m = r.match(p)
- if m:
- keys.add(from_param(datatype.key_type, m.group('key')))
-
- if not keys:
- return Unset
-
- return dict((
- (key, from_params(datatype.value_type,
- params, '%s[%s]' % (path, key), hit_paths))
- for key in keys))
-
-
-@from_params.when_type(UserType)
-def usertype_from_params(datatype, params, path, hit_paths):
- value = from_params(datatype.basetype, params, path, hit_paths)
- if value is not Unset:
- return datatype.frombasetype(value)
- return Unset
-
-
-def args_from_args(funcdef, args, kwargs):
- newargs = []
- for argdef, arg in zip(funcdef.arguments[:len(args)], args):
- try:
- newargs.append(from_param(argdef.datatype, arg))
- except Exception as e:
- if isinstance(argdef.datatype, UserType):
- datatype_name = argdef.datatype.name
- elif isinstance(argdef.datatype, type):
- datatype_name = argdef.datatype.__name__
- else:
- datatype_name = argdef.datatype.__class__.__name__
- raise InvalidInput(
- argdef.name,
- arg,
- "unable to convert to %(datatype)s. Error: %(error)s" % {
- 'datatype': datatype_name, 'error': e})
- newkwargs = {}
- for argname, value in kwargs.items():
- newkwargs[argname] = from_param(
- funcdef.get_arg(argname).datatype, value
- )
- return newargs, newkwargs
-
-
-def args_from_params(funcdef, params):
- kw = {}
- hit_paths = set()
- for argdef in funcdef.arguments:
- value = from_params(
- argdef.datatype, params, argdef.name, hit_paths)
- if value is not Unset:
- kw[argdef.name] = value
- paths = set(params.keys())
- unknown_paths = paths - hit_paths
- if '__body__' in unknown_paths:
- unknown_paths.remove('__body__')
- if not funcdef.ignore_extra_args and unknown_paths:
- raise UnknownArgument(', '.join(unknown_paths))
- return [], kw
-
-
-def args_from_body(funcdef, body, mimetype):
- from wsme.rest import json as restjson
- from wsme.rest import xml as restxml
-
- if funcdef.body_type is not None:
- datatypes = {funcdef.arguments[-1].name: funcdef.body_type}
- else:
- datatypes = dict(((a.name, a.datatype) for a in funcdef.arguments))
-
- if not body:
- return (), {}
- if mimetype == "application/x-www-form-urlencoded":
- # the parameters should have been parsed in params
- return (), {}
- elif mimetype in restjson.accept_content_types:
- dataformat = restjson
- elif mimetype in restxml.accept_content_types:
- dataformat = restxml
- else:
- raise ClientSideError("Unknown mimetype: %s" % mimetype,
- status_code=415)
-
- try:
- kw = dataformat.parse(
- body, datatypes, bodyarg=funcdef.body_type is not None
- )
- except UnknownArgument:
- if not funcdef.ignore_extra_args:
- raise
- kw = {}
-
- return (), kw
-
-
-def combine_args(funcdef, akw, allow_override=False):
- newargs, newkwargs = [], {}
- for args, kwargs in akw:
- for i, arg in enumerate(args):
- n = funcdef.arguments[i].name
- if not allow_override and n in newkwargs:
- raise ClientSideError(
- "Parameter %s was given several times" % n)
- newkwargs[n] = arg
- for name, value in kwargs.items():
- n = str(name)
- if not allow_override and n in newkwargs:
- raise ClientSideError(
- "Parameter %s was given several times" % n)
- newkwargs[n] = value
- return newargs, newkwargs
-
-
-def get_args(funcdef, args, kwargs, params, form, body, mimetype):
- """Combine arguments from :
- * the host framework args and kwargs
- * the request params
- * the request body
-
- Note that the host framework args and kwargs can be overridden
- by arguments from params of body
- """
- # get the body from params if not given directly
- if not body and '__body__' in params:
- body = params['__body__']
-
- # extract args from the host args and kwargs
- from_args = args_from_args(funcdef, args, kwargs)
-
- # extract args from the request parameters
- from_params = args_from_params(funcdef, params)
-
- # extract args from the form parameters
- if form:
- from_form_params = args_from_params(funcdef, form)
- else:
- from_form_params = (), {}
-
- # extract args from the request body
- from_body = args_from_body(funcdef, body, mimetype)
-
- # combine params and body arguments
- from_params_and_body = combine_args(
- funcdef,
- (from_params, from_form_params, from_body)
- )
-
- args, kwargs = combine_args(
- funcdef,
- (from_args, from_params_and_body),
- allow_override=True
- )
- wsme.runtime.check_arguments(funcdef, args, kwargs)
- return args, kwargs
diff --git a/wsme/rest/json.py b/wsme/rest/json.py
deleted file mode 100644
index 48bd082..0000000
--- a/wsme/rest/json.py
+++ /dev/null
@@ -1,328 +0,0 @@
-"""REST+Json protocol implementation."""
-from __future__ import absolute_import
-import datetime
-import decimal
-
-import six
-
-from simplegeneric import generic
-
-import wsme.exc
-import wsme.types
-from wsme.types import Unset
-import wsme.utils
-
-
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
-
-
-content_type = 'application/json'
-accept_content_types = [
- content_type,
- 'text/javascript',
- 'application/javascript'
-]
-ENUM_TRUE = ('true', 't', 'yes', 'y', 'on', '1')
-ENUM_FALSE = ('false', 'f', 'no', 'n', 'off', '0')
-
-
-@generic
-def tojson(datatype, value):
- """
- A generic converter from python to jsonify-able datatypes.
-
- If a non-complex user specific type is to be used in the api,
- a specific tojson should be added::
-
- from wsme.protocol.restjson import tojson
-
- myspecialtype = object()
-
- @tojson.when_object(myspecialtype)
- def myspecialtype_tojson(datatype, value):
- return str(value)
- """
- if value is None:
- return None
- if wsme.types.iscomplex(datatype):
- d = dict()
- for attr in wsme.types.list_attributes(datatype):
- attr_value = getattr(value, attr.key)
- if attr_value is not Unset:
- d[attr.name] = tojson(attr.datatype, attr_value)
- return d
- elif wsme.types.isusertype(datatype):
- return tojson(datatype.basetype, datatype.tobasetype(value))
- return value
-
-
-@tojson.when_object(wsme.types.bytes)
-def bytes_tojson(datatype, value):
- if value is None:
- return None
- return value.decode('ascii')
-
-
-@tojson.when_type(wsme.types.ArrayType)
-def array_tojson(datatype, value):
- if value is None:
- return None
- return [tojson(datatype.item_type, item) for item in value]
-
-
-@tojson.when_type(wsme.types.DictType)
-def dict_tojson(datatype, value):
- if value is None:
- return None
- return dict((
- (tojson(datatype.key_type, item[0]),
- tojson(datatype.value_type, item[1]))
- for item in value.items()
- ))
-
-
-@tojson.when_object(decimal.Decimal)
-def decimal_tojson(datatype, value):
- if value is None:
- return None
- return str(value)
-
-
-@tojson.when_object(datetime.date)
-def date_tojson(datatype, value):
- if value is None:
- return None
- return value.isoformat()
-
-
-@tojson.when_object(datetime.time)
-def time_tojson(datatype, value):
- if value is None:
- return None
- return value.isoformat()
-
-
-@tojson.when_object(datetime.datetime)
-def datetime_tojson(datatype, value):
- if value is None:
- return None
- return value.isoformat()
-
-
-@generic
-def fromjson(datatype, value):
- """A generic converter from json base types to python datatype.
-
- If a non-complex user specific type is to be used in the api,
- a specific fromjson should be added::
-
- from wsme.protocol.restjson import fromjson
-
- class MySpecialType(object):
- pass
-
- @fromjson.when_object(MySpecialType)
- def myspecialtype_fromjson(datatype, value):
- return MySpecialType(value)
- """
- if value is None:
- return None
- if wsme.types.iscomplex(datatype):
- obj = datatype()
- attributes = wsme.types.list_attributes(datatype)
-
- # Here we check that all the attributes in the value are also defined
- # in our type definition, otherwise we raise an Error.
- v_keys = set(value.keys())
- a_keys = set(adef.name for adef in attributes)
- if not v_keys <= a_keys:
- raise wsme.exc.UnknownAttribute(None, v_keys - a_keys)
-
- for attrdef in attributes:
- if attrdef.name in value:
- try:
- val_fromjson = fromjson(attrdef.datatype,
- value[attrdef.name])
- except wsme.exc.UnknownAttribute as e:
- e.add_fieldname(attrdef.name)
- raise
- if getattr(attrdef, 'readonly', False):
- raise wsme.exc.InvalidInput(attrdef.name, val_fromjson,
- "Cannot set read only field.")
- setattr(obj, attrdef.key, val_fromjson)
- elif attrdef.mandatory:
- raise wsme.exc.InvalidInput(attrdef.name, None,
- "Mandatory field missing.")
-
- return wsme.types.validate_value(datatype, obj)
- elif wsme.types.isusertype(datatype):
- value = datatype.frombasetype(
- fromjson(datatype.basetype, value))
- return value
-
-
-@fromjson.when_type(wsme.types.ArrayType)
-def array_fromjson(datatype, value):
- if value is None:
- return None
- if not isinstance(value, list):
- raise ValueError("Value not a valid list: %s" % value)
- return [fromjson(datatype.item_type, item) for item in value]
-
-
-@fromjson.when_type(wsme.types.DictType)
-def dict_fromjson(datatype, value):
- if value is None:
- return None
- if not isinstance(value, dict):
- raise ValueError("Value not a valid dict: %s" % value)
- return dict((
- (fromjson(datatype.key_type, item[0]),
- fromjson(datatype.value_type, item[1]))
- for item in value.items()))
-
-
-@fromjson.when_object(six.binary_type)
-def str_fromjson(datatype, value):
- if (isinstance(value, six.string_types) or
- isinstance(value, six.integer_types) or
- isinstance(value, float)):
- return six.text_type(value).encode('utf8')
-
-
-@fromjson.when_object(wsme.types.text)
-def text_fromjson(datatype, value):
- if value is not None and isinstance(value, wsme.types.bytes):
- return wsme.types.text(value)
- return value
-
-
-@fromjson.when_object(*six.integer_types + (float,))
-def numeric_fromjson(datatype, value):
- """Convert string object to built-in types int, long or float."""
- if value is None:
- return None
- return datatype(value)
-
-
-@fromjson.when_object(bool)
-def bool_fromjson(datatype, value):
- """Convert to bool, restricting strings to just unambiguous values."""
- if value is None:
- return None
- if isinstance(value, six.integer_types + (bool,)):
- return bool(value)
- if value in ENUM_TRUE:
- return True
- if value in ENUM_FALSE:
- return False
- raise ValueError("Value not an unambiguous boolean: %s" % value)
-
-
-@fromjson.when_object(decimal.Decimal)
-def decimal_fromjson(datatype, value):
- if value is None:
- return None
- return decimal.Decimal(value)
-
-
-@fromjson.when_object(datetime.date)
-def date_fromjson(datatype, value):
- if value is None:
- return None
- return wsme.utils.parse_isodate(value)
-
-
-@fromjson.when_object(datetime.time)
-def time_fromjson(datatype, value):
- if value is None:
- return None
- return wsme.utils.parse_isotime(value)
-
-
-@fromjson.when_object(datetime.datetime)
-def datetime_fromjson(datatype, value):
- if value is None:
- return None
- return wsme.utils.parse_isodatetime(value)
-
-
-def parse(s, datatypes, bodyarg, encoding='utf8'):
- jload = json.load
- if not hasattr(s, 'read'):
- if six.PY3 and isinstance(s, six.binary_type):
- s = s.decode(encoding)
- jload = json.loads
- try:
- jdata = jload(s)
- except ValueError:
- raise wsme.exc.ClientSideError("Request is not in valid JSON format")
- if bodyarg:
- argname = list(datatypes.keys())[0]
- try:
- kw = {argname: fromjson(datatypes[argname], jdata)}
- except ValueError as e:
- raise wsme.exc.InvalidInput(argname, jdata, e.args[0])
- except wsme.exc.UnknownAttribute as e:
- # We only know the fieldname at this level, not in the
- # called function. We fill in this information here.
- e.add_fieldname(argname)
- raise
- else:
- kw = {}
- extra_args = []
- if not isinstance(jdata, dict):
- raise wsme.exc.ClientSideError("Request must be a JSON dict")
- for key in jdata:
- if key not in datatypes:
- extra_args.append(key)
- else:
- try:
- kw[key] = fromjson(datatypes[key], jdata[key])
- except ValueError as e:
- raise wsme.exc.InvalidInput(key, jdata[key], e.args[0])
- except wsme.exc.UnknownAttribute as e:
- # We only know the fieldname at this level, not in the
- # called function. We fill in this information here.
- e.add_fieldname(key)
- raise
- if extra_args:
- raise wsme.exc.UnknownArgument(', '.join(extra_args))
- return kw
-
-
-def encode_result(value, datatype, **options):
- jsondata = tojson(datatype, value)
- if options.get('nest_result', False):
- jsondata = {options.get('nested_result_attrname', 'result'): jsondata}
- return json.dumps(jsondata)
-
-
-def encode_error(context, errordetail):
- return json.dumps(errordetail)
-
-
-def encode_sample_value(datatype, value, format=False):
- r = tojson(datatype, value)
- content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0,
- sort_keys=format)
- return ('javascript', content)
-
-
-def encode_sample_params(params, format=False):
- kw = {}
- for name, datatype, value in params:
- kw[name] = tojson(datatype, value)
- content = json.dumps(kw, ensure_ascii=False, indent=4 if format else 0,
- sort_keys=format)
- return ('javascript', content)
-
-
-def encode_sample_result(datatype, value, format=False):
- r = tojson(datatype, value)
- content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0,
- sort_keys=format)
- return ('javascript', content)
diff --git a/wsme/rest/protocol.py b/wsme/rest/protocol.py
deleted file mode 100644
index 5201ccf..0000000
--- a/wsme/rest/protocol.py
+++ /dev/null
@@ -1,133 +0,0 @@
-import os.path
-import logging
-
-from wsme.utils import OrderedDict
-from wsme.protocol import CallContext, Protocol, media_type_accept
-
-import wsme.rest
-import wsme.rest.args
-import wsme.runtime
-
-log = logging.getLogger(__name__)
-
-
-class RestProtocol(Protocol):
- name = 'rest'
- displayname = 'REST'
- dataformats = ['json', 'xml']
- content_types = ['application/json', 'text/xml']
-
- def __init__(self, dataformats=None):
- if dataformats is None:
- dataformats = RestProtocol.dataformats
-
- self.dataformats = OrderedDict()
- self.content_types = []
-
- for dataformat in dataformats:
- __import__('wsme.rest.' + dataformat)
- dfmod = getattr(wsme.rest, dataformat)
- self.dataformats[dataformat] = dfmod
- self.content_types.extend(dfmod.accept_content_types)
-
- def accept(self, request):
- for dataformat in self.dataformats:
- if request.path.endswith('.' + dataformat):
- return True
- return media_type_accept(request, self.content_types)
-
- def iter_calls(self, request):
- context = CallContext(request)
- context.outformat = None
- ext = os.path.splitext(request.path.split('/')[-1])[1]
- inmime = request.content_type
- outmime = request.accept.best_match(self.content_types)
-
- outformat = None
- informat = None
- for dfname, df in self.dataformats.items():
- if ext == '.' + dfname:
- outformat = df
- if not inmime:
- informat = df
-
- if outformat is None and request.accept:
- for dfname, df in self.dataformats.items():
- if outmime in df.accept_content_types:
- outformat = df
- if not inmime:
- informat = df
-
- if outformat is None:
- for dfname, df in self.dataformats.items():
- if inmime == df.content_type:
- outformat = df
-
- context.outformat = outformat
- context.outformat_options = {
- 'nest_result': getattr(self, 'nest_result', False)
- }
- if not inmime and informat:
- inmime = informat.content_type
- log.debug("Inferred input type: %s" % inmime)
- context.inmime = inmime
- yield context
-
- def extract_path(self, context):
- path = context.request.path
- assert path.startswith(self.root._webpath)
- path = path[len(self.root._webpath):]
- path = path.strip('/').split('/')
-
- for dataformat in self.dataformats:
- if path[-1].endswith('.' + dataformat):
- path[-1] = path[-1][:-len(dataformat) - 1]
-
- # Check if the path is actually a function, and if not
- # see if the http method make a difference
- # TODO Re-think the function lookup phases. Here we are
- # doing the job that will be done in a later phase, which
- # is sub-optimal
- for p, fdef in self.root.getapi():
- if p == path:
- return path
-
- # No function at this path. Now check for function that have
- # this path as a prefix, and declared an http method
- for p, fdef in self.root.getapi():
- if len(p) == len(path) + 1 and p[:len(path)] == path and \
- fdef.extra_options.get('method') == context.request.method:
- return p
-
- return path
-
- def read_arguments(self, context):
- request = context.request
- funcdef = context.funcdef
-
- body = None
- if request.content_length not in (None, 0, '0'):
- body = request.body
- if not body and '__body__' in request.params:
- body = request.params['__body__']
-
- args, kwargs = wsme.rest.args.combine_args(
- funcdef,
- (wsme.rest.args.args_from_params(funcdef, request.params),
- wsme.rest.args.args_from_body(funcdef, body, context.inmime))
- )
- wsme.runtime.check_arguments(funcdef, args, kwargs)
- return kwargs
-
- def encode_result(self, context, result):
- out = context.outformat.encode_result(
- result, context.funcdef.return_type,
- **context.outformat_options
- )
- return out
-
- def encode_error(self, context, errordetail):
- out = context.outformat.encode_error(
- context, errordetail
- )
- return out
diff --git a/wsme/rest/xml.py b/wsme/rest/xml.py
deleted file mode 100644
index 286afa7..0000000
--- a/wsme/rest/xml.py
+++ /dev/null
@@ -1,298 +0,0 @@
-from __future__ import absolute_import
-
-import datetime
-
-import six
-
-import xml.etree.ElementTree as et
-
-from simplegeneric import generic
-
-import wsme.types
-from wsme.exc import UnknownArgument, InvalidInput
-
-import re
-
-content_type = 'text/xml'
-accept_content_types = [
- content_type,
-]
-
-time_re = re.compile(r'(?P<h>[0-2][0-9]):(?P<m>[0-5][0-9]):(?P<s>[0-6][0-9])')
-
-
-def xml_indent(elem, level=0):
- i = "\n" + level * " "
- if len(elem):
- if not elem.text or not elem.text.strip():
- elem.text = i + " "
- for e in elem:
- xml_indent(e, level + 1)
- if not e.tail or not e.tail.strip():
- e.tail = i
- if level and (not elem.tail or not elem.tail.strip()):
- elem.tail = i
-
-
-@generic
-def toxml(datatype, key, value):
- """
- A generic converter from python to xml elements.
-
- If a non-complex user specific type is to be used in the api,
- a specific toxml should be added::
-
- from wsme.protocol.restxml import toxml
-
- myspecialtype = object()
-
- @toxml.when_object(myspecialtype)
- def myspecialtype_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- el.text = str(value)
- return el
- """
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- if wsme.types.isusertype(datatype):
- return toxml(datatype.basetype,
- key, datatype.tobasetype(value))
- elif wsme.types.iscomplex(datatype):
- for attrdef in datatype._wsme_attributes:
- attrvalue = getattr(value, attrdef.key)
- if attrvalue is not wsme.types.Unset:
- el.append(toxml(attrdef.datatype, attrdef.name,
- attrvalue))
- else:
- el.text = six.text_type(value)
- return el
-
-
-@generic
-def fromxml(datatype, element):
- """
- A generic converter from xml elements to python datatype.
-
- If a non-complex user specific type is to be used in the api,
- a specific fromxml should be added::
-
- from wsme.protocol.restxml import fromxml
-
- class MySpecialType(object):
- pass
-
- @fromxml.when_object(MySpecialType)
- def myspecialtype_fromxml(datatype, element):
- if element.get('nil', False):
- return None
- return MySpecialType(element.text)
- """
- if element.get('nil', False):
- return None
- if wsme.types.isusertype(datatype):
- return datatype.frombasetype(fromxml(datatype.basetype, element))
- if wsme.types.iscomplex(datatype):
- obj = datatype()
- for attrdef in wsme.types.list_attributes(datatype):
- sub = element.find(attrdef.name)
- if sub is not None:
- val_fromxml = fromxml(attrdef.datatype, sub)
- if getattr(attrdef, 'readonly', False):
- raise InvalidInput(attrdef.name, val_fromxml,
- "Cannot set read only field.")
- setattr(obj, attrdef.key, val_fromxml)
- elif attrdef.mandatory:
- raise InvalidInput(attrdef.name, None,
- "Mandatory field missing.")
- return wsme.types.validate_value(datatype, obj)
- if datatype is wsme.types.bytes:
- return element.text.encode('ascii')
- return datatype(element.text)
-
-
-@toxml.when_type(wsme.types.ArrayType)
-def array_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- for item in value:
- el.append(toxml(datatype.item_type, 'item', item))
- return el
-
-
-@toxml.when_type(wsme.types.DictType)
-def dict_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- for item in value.items():
- key = toxml(datatype.key_type, 'key', item[0])
- value = toxml(datatype.value_type, 'value', item[1])
- node = et.Element('item')
- node.append(key)
- node.append(value)
- el.append(node)
- return el
-
-
-@toxml.when_object(wsme.types.bytes)
-def bytes_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- el.text = value.decode('ascii')
- return el
-
-
-@toxml.when_object(bool)
-def bool_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- el.text = value and 'true' or 'false'
- return el
-
-
-@toxml.when_object(datetime.date)
-def date_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- el.text = value.isoformat()
- return el
-
-
-@toxml.when_object(datetime.datetime)
-def datetime_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- el.text = value.isoformat()
- return el
-
-
-@fromxml.when_type(wsme.types.ArrayType)
-def array_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return [
- fromxml(datatype.item_type, item)
- for item in element.findall('item')
- ]
-
-
-@fromxml.when_object(bool)
-def bool_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return element.text.lower() != 'false'
-
-
-@fromxml.when_type(wsme.types.DictType)
-def dict_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return dict((
- (fromxml(datatype.key_type, item.find('key')),
- fromxml(datatype.value_type, item.find('value')))
- for item in element.findall('item')))
-
-
-@fromxml.when_object(wsme.types.text)
-def unicode_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return wsme.types.text(element.text) if element.text else six.u('')
-
-
-@fromxml.when_object(datetime.date)
-def date_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return wsme.utils.parse_isodate(element.text)
-
-
-@fromxml.when_object(datetime.time)
-def time_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return wsme.utils.parse_isotime(element.text)
-
-
-@fromxml.when_object(datetime.datetime)
-def datetime_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return wsme.utils.parse_isodatetime(element.text)
-
-
-def parse(s, datatypes, bodyarg):
- if hasattr(s, 'read'):
- tree = et.parse(s)
- else:
- tree = et.fromstring(s)
- if bodyarg:
- name = list(datatypes.keys())[0]
- return {name: fromxml(datatypes[name], tree)}
- else:
- kw = {}
- extra_args = []
- for sub in tree:
- if sub.tag not in datatypes:
- extra_args.append(sub.tag)
- kw[sub.tag] = fromxml(datatypes[sub.tag], sub)
- if extra_args:
- raise UnknownArgument(', '.join(extra_args))
- return kw
-
-
-def encode_result(value, datatype, **options):
- return et.tostring(toxml(
- datatype, options.get('nested_result_attrname', 'result'), value
- ))
-
-
-def encode_error(context, errordetail):
- el = et.Element('error')
- et.SubElement(el, 'faultcode').text = errordetail['faultcode']
- et.SubElement(el, 'faultstring').text = errordetail['faultstring']
- if 'debuginfo' in errordetail:
- et.SubElement(el, 'debuginfo').text = errordetail['debuginfo']
- return et.tostring(el)
-
-
-def encode_sample_value(datatype, value, format=False):
- r = toxml(datatype, 'value', value)
- if format:
- xml_indent(r)
- content = et.tostring(r)
- return ('xml', content)
-
-
-def encode_sample_params(params, format=False):
- node = et.Element('parameters')
- for name, datatype, value in params:
- node.append(toxml(datatype, name, value))
- if format:
- xml_indent(node)
- content = et.tostring(node)
- return ('xml', content)
-
-
-def encode_sample_result(datatype, value, format=False):
- r = toxml(datatype, 'result', value)
- if format:
- xml_indent(r)
- content = et.tostring(r)
- return ('xml', content)