summaryrefslogtreecommitdiff
path: root/wsme
diff options
context:
space:
mode:
authorJim Rollenhagen <jim@jimrollenhagen.com>2019-09-26 09:43:27 -0400
committerJim Rollenhagen <jim@jimrollenhagen.com>2019-09-26 09:43:27 -0400
commite9c6edfe510f4ed407f8d2d84b4b931a382b48b3 (patch)
tree94bbd6a34bcf09e99f7ae1be88b19960192d6adb /wsme
parent1d73d6e50411ebc45fb96a6ed3c63ca91a500323 (diff)
downloadwsme-master.tar.gz
Retire github mirror, repo moved to opendevHEADmaster
Diffstat (limited to 'wsme')
-rw-r--r--wsme/__init__.py10
-rw-r--r--wsme/api.py237
-rw-r--r--wsme/exc.py92
-rw-r--r--wsme/protocol.py147
-rw-r--r--wsme/rest/__init__.py78
-rw-r--r--wsme/rest/args.py310
-rw-r--r--wsme/rest/json.py328
-rw-r--r--wsme/rest/protocol.py133
-rw-r--r--wsme/rest/xml.py298
-rw-r--r--wsme/root.py372
-rw-r--r--wsme/runtime.py9
-rw-r--r--wsme/spore.py64
-rw-r--r--wsme/tests/__init__.py0
-rw-r--r--wsme/tests/protocol.py720
-rw-r--r--wsme/tests/test_api.py419
-rw-r--r--wsme/tests/test_exc.py40
-rw-r--r--wsme/tests/test_protocols.py70
-rw-r--r--wsme/tests/test_protocols_commons.py159
-rw-r--r--wsme/tests/test_restjson.py779
-rw-r--r--wsme/tests/test_restxml.py211
-rw-r--r--wsme/tests/test_root.py122
-rw-r--r--wsme/tests/test_spore.py51
-rw-r--r--wsme/tests/test_types.py667
-rw-r--r--wsme/tests/test_utils.py97
-rw-r--r--wsme/types.py840
-rw-r--r--wsme/utils.py118
26 files changed, 0 insertions, 6371 deletions
diff --git a/wsme/__init__.py b/wsme/__init__.py
deleted file mode 100644
index 35109ec..0000000
--- a/wsme/__init__.py
+++ /dev/null
@@ -1,10 +0,0 @@
-from wsme.api import signature
-from wsme.rest import expose, validate
-from wsme.root import WSRoot
-from wsme.types import wsattr, wsproperty, Unset
-
-__all__ = [
- 'expose', 'validate', 'signature',
- 'WSRoot',
- 'wsattr', 'wsproperty', 'Unset'
-]
diff --git a/wsme/api.py b/wsme/api.py
deleted file mode 100644
index 67263a0..0000000
--- a/wsme/api.py
+++ /dev/null
@@ -1,237 +0,0 @@
-import traceback
-import functools
-import inspect
-import logging
-import six
-
-import wsme.exc
-import wsme.types
-
-from wsme import utils
-
-log = logging.getLogger(__name__)
-
-
-def iswsmefunction(f):
- return hasattr(f, '_wsme_definition')
-
-
-def wrapfunc(f):
- @functools.wraps(f)
- def wrapper(*args, **kwargs):
- return f(*args, **kwargs)
- wrapper._wsme_original_func = f
- return wrapper
-
-
-def getargspec(f):
- f = getattr(f, '_wsme_original_func', f)
- return inspect.getargspec(f)
-
-
-class FunctionArgument(object):
- """
- An argument definition of an api entry
- """
- def __init__(self, name, datatype, mandatory, default):
- #: argument name
- self.name = name
-
- #: Data type
- self.datatype = datatype
-
- #: True if the argument is mandatory
- self.mandatory = mandatory
-
- #: Default value if argument is omitted
- self.default = default
-
- def resolve_type(self, registry):
- self.datatype = registry.resolve_type(self.datatype)
-
-
-class FunctionDefinition(object):
- """
- An api entry definition
- """
- def __init__(self, func):
- #: Function name
- self.name = func.__name__
-
- #: Function documentation
- self.doc = func.__doc__
-
- #: Return type
- self.return_type = None
-
- #: The function arguments (list of :class:`FunctionArgument`)
- self.arguments = []
-
- #: If the body carry the datas of a single argument, its type
- self.body_type = None
-
- #: Status code
- self.status_code = 200
-
- #: True if extra arguments should be ignored, NOT inserted in
- #: the kwargs of the function and not raise UnknownArgument
- #: exceptions
- self.ignore_extra_args = False
-
- #: name of the function argument to pass the host request object.
- #: Should be set by using the :class:`wsme.types.HostRequest` type
- #: in the function @\ :function:`signature`
- self.pass_request = False
-
- #: Dictionnary of protocol-specific options.
- self.extra_options = None
-
- @staticmethod
- def get(func):
- """
- Returns the :class:`FunctionDefinition` of a method.
- """
- if not hasattr(func, '_wsme_definition'):
- fd = FunctionDefinition(func)
- func._wsme_definition = fd
-
- return func._wsme_definition
-
- def get_arg(self, name):
- """
- Returns a :class:`FunctionArgument` from its name
- """
- for arg in self.arguments:
- if arg.name == name:
- return arg
- return None
-
- def resolve_types(self, registry):
- self.return_type = registry.resolve_type(self.return_type)
- self.body_type = registry.resolve_type(self.body_type)
- for arg in self.arguments:
- arg.resolve_type(registry)
-
- def set_options(self, body=None, ignore_extra_args=False, status_code=200,
- rest_content_types=('json', 'xml'), **extra_options):
- self.body_type = body
- self.status_code = status_code
- self.ignore_extra_args = ignore_extra_args
- self.rest_content_types = rest_content_types
- self.extra_options = extra_options
-
- def set_arg_types(self, argspec, arg_types):
- args, varargs, keywords, defaults = argspec
- if args[0] == 'self':
- args = args[1:]
- arg_types = list(arg_types)
- if self.body_type is not None:
- arg_types.append(self.body_type)
- for i, argname in enumerate(args):
- datatype = arg_types[i]
- mandatory = defaults is None or i < (len(args) - len(defaults))
- default = None
- if not mandatory:
- default = defaults[i - (len(args) - len(defaults))]
- if datatype is wsme.types.HostRequest:
- self.pass_request = argname
- else:
- self.arguments.append(FunctionArgument(argname, datatype,
- mandatory, default))
-
-
-class signature(object):
-
- """Decorator that specify the argument types of an exposed function.
-
- :param return_type: Type of the value returned by the function
- :param argN: Type of the Nth argument
- :param body: If the function takes a final argument that is supposed to be
- the request body by itself, its type.
- :param status_code: HTTP return status code of the function.
- :param ignore_extra_args: Allow extra/unknow arguments (default to False)
-
- Most of the time this decorator is not supposed to be used directly,
- unless you are not using WSME on top of another framework.
-
- If an adapter is used, it will provide either a specialised version of this
- decororator, either a new decorator named @wsexpose that takes the same
- parameters (it will in addition expose the function, hence its name).
- """
-
- def __init__(self, *types, **options):
- self.return_type = types[0] if types else None
- self.arg_types = []
- if len(types) > 1:
- self.arg_types.extend(types[1:])
- if 'body' in options:
- self.arg_types.append(options['body'])
- self.wrap = options.pop('wrap', False)
- self.options = options
-
- def __call__(self, func):
- argspec = getargspec(func)
- if self.wrap:
- func = wrapfunc(func)
- fd = FunctionDefinition.get(func)
- if fd.extra_options is not None:
- raise ValueError("This function is already exposed")
- fd.return_type = self.return_type
- fd.set_options(**self.options)
- if self.arg_types:
- fd.set_arg_types(argspec, self.arg_types)
- return func
-
-
-sig = signature
-
-
-class Response(object):
- """
- Object to hold the "response" from a view function
- """
- def __init__(self, obj, status_code=None, error=None,
- return_type=wsme.types.Unset):
- #: Store the result object from the view
- self.obj = obj
-
- #: Store an optional status_code
- self.status_code = status_code
-
- #: Return error details
- #: Must be a dictionnary with the following keys: faultcode,
- #: faultstring and an optional debuginfo
- self.error = error
-
- #: Return type
- #: Type of the value returned by the function
- #: If the return type is wsme.types.Unset it will be ignored
- #: and the default return type will prevail.
- self.return_type = return_type
-
-
-def format_exception(excinfo, debug=False):
- """Extract informations that can be sent to the client."""
- error = excinfo[1]
- code = getattr(error, 'code', None)
- if code and utils.is_valid_code(code) and utils.is_client_error(code):
- faultstring = (error.faultstring if hasattr(error, 'faultstring')
- else six.text_type(error))
- r = dict(faultcode="Client",
- faultstring=faultstring)
- log.debug("Client-side error: %s" % r['faultstring'])
- r['debuginfo'] = None
- return r
- else:
- faultstring = six.text_type(error)
- debuginfo = "\n".join(traceback.format_exception(*excinfo))
-
- log.error('Server-side error: "%s". Detail: \n%s' % (
- faultstring, debuginfo))
-
- r = dict(faultcode="Server", faultstring=faultstring)
- if debug:
- r['debuginfo'] = debuginfo
- else:
- r['debuginfo'] = None
- return r
diff --git a/wsme/exc.py b/wsme/exc.py
deleted file mode 100644
index 81dcbf8..0000000
--- a/wsme/exc.py
+++ /dev/null
@@ -1,92 +0,0 @@
-import six
-
-from wsme.utils import _
-
-
-class ClientSideError(RuntimeError):
- def __init__(self, msg=None, status_code=400):
- self.msg = msg
- self.code = status_code
- super(ClientSideError, self).__init__(self.faultstring)
-
- @property
- def faultstring(self):
- if self.msg is None:
- return str(self)
- elif isinstance(self.msg, six.text_type):
- return self.msg
- else:
- return six.u(self.msg)
-
-
-class InvalidInput(ClientSideError):
- def __init__(self, fieldname, value, msg=''):
- self.fieldname = fieldname
- self.value = value
- super(InvalidInput, self).__init__(msg)
-
- @property
- def faultstring(self):
- return _(six.u(
- "Invalid input for field/attribute %s. Value: '%s'. %s")
- ) % (self.fieldname, self.value, self.msg)
-
-
-class MissingArgument(ClientSideError):
- def __init__(self, argname, msg=''):
- self.argname = argname
- super(MissingArgument, self).__init__(msg)
-
- @property
- def faultstring(self):
- return _(six.u('Missing argument: "%s"%s')) % (
- self.argname, self.msg and ": " + self.msg or "")
-
-
-class UnknownArgument(ClientSideError):
- def __init__(self, argname, msg=''):
- self.argname = argname
- super(UnknownArgument, self).__init__(msg)
-
- @property
- def faultstring(self):
- return _(six.u('Unknown argument: "%s"%s')) % (
- self.argname, self.msg and ": " + self.msg or "")
-
-
-class UnknownFunction(ClientSideError):
- def __init__(self, name):
- self.name = name
- super(UnknownFunction, self).__init__()
-
- @property
- def faultstring(self):
- return _(six.u("Unknown function name: %s")) % (self.name)
-
-
-class UnknownAttribute(ClientSideError):
- def __init__(self, fieldname, attributes, msg=''):
- self.fieldname = fieldname
- self.attributes = attributes
- self.msg = msg
- super(UnknownAttribute, self).__init__(self.msg)
-
- @property
- def faultstring(self):
- error = _("Unknown attribute for argument %(argn)s: %(attrs)s")
- if len(self.attributes) > 1:
- error = _("Unknown attributes for argument %(argn)s: %(attrs)s")
- str_attrs = ", ".join(self.attributes)
- return error % {'argn': self.fieldname, 'attrs': str_attrs}
-
- def add_fieldname(self, name):
- """Add a fieldname to concatenate the full name.
-
- Add a fieldname so that the whole hierarchy is displayed. Successive
- calls to this method will prepend ``name`` to the hierarchy of names.
- """
- if self.fieldname is not None:
- self.fieldname = "{}.{}".format(name, self.fieldname)
- else:
- self.fieldname = name
- super(UnknownAttribute, self).__init__(self.msg)
diff --git a/wsme/protocol.py b/wsme/protocol.py
deleted file mode 100644
index b0107ab..0000000
--- a/wsme/protocol.py
+++ /dev/null
@@ -1,147 +0,0 @@
-import weakref
-
-import pkg_resources
-
-from wsme.exc import ClientSideError
-
-
-__all__ = [
- 'CallContext',
-
- 'register_protocol', 'getprotocol',
-]
-
-registered_protocols = {}
-
-
-def _cfg(f):
- cfg = getattr(f, '_cfg', None)
- if cfg is None:
- f._cfg = cfg = {}
- return cfg
-
-
-class expose(object):
- def __init__(self, path, content_type):
- self.path = path
- self.content_type = content_type
-
- def __call__(self, func):
- func.exposed = True
- cfg = _cfg(func)
- cfg['content-type'] = self.content_type
- cfg.setdefault('paths', []).append(self.path)
- return func
-
-
-class CallContext(object):
- def __init__(self, request):
- self._request = weakref.ref(request)
- self.path = None
-
- self.func = None
- self.funcdef = None
-
- @property
- def request(self):
- return self._request()
-
-
-class ObjectDict(object):
- def __init__(self, obj):
- self.obj = obj
-
- def __getitem__(self, name):
- return getattr(self.obj, name)
-
-
-class Protocol(object):
- name = None
- displayname = None
- content_types = []
-
- def resolve_path(self, path):
- if '$' in path:
- from string import Template
- s = Template(path)
- path = s.substitute(ObjectDict(self))
- return path
-
- def iter_routes(self):
- for attrname in dir(self):
- attr = getattr(self, attrname)
- if getattr(attr, 'exposed', False):
- for path in _cfg(attr)['paths']:
- yield self.resolve_path(path), attr
-
- def accept(self, request):
- return request.headers.get('Content-Type') in self.content_types
-
- def iter_calls(self, request):
- pass
-
- def extract_path(self, context):
- pass
-
- def read_arguments(self, context):
- pass
-
- def encode_result(self, context, result):
- pass
-
- def encode_sample_value(self, datatype, value, format=False):
- return ('none', 'N/A')
-
- def encode_sample_params(self, params, format=False):
- return ('none', 'N/A')
-
- def encode_sample_result(self, datatype, value, format=False):
- return ('none', 'N/A')
-
-
-def register_protocol(protocol):
- registered_protocols[protocol.name] = protocol
-
-
-def getprotocol(name, **options):
- protocol_class = registered_protocols.get(name)
- if protocol_class is None:
- for entry_point in pkg_resources.iter_entry_points(
- 'wsme.protocols', name):
- if entry_point.name == name:
- protocol_class = entry_point.load()
- if protocol_class is None:
- raise ValueError("Cannot find protocol '%s'" % name)
- registered_protocols[name] = protocol_class
- return protocol_class(**options)
-
-
-def media_type_accept(request, content_types):
- """Validate media types against request.method.
-
- When request.method is GET or HEAD compare with the Accept header.
- When request.method is POST, PUT or PATCH compare with the Content-Type
- header.
- When request.method is DELETE media type is irrelevant, so return True.
- """
- if request.method in ['GET', 'HEAD']:
- if request.accept:
- if request.accept.best_match(content_types):
- return True
- error_message = ('Unacceptable Accept type: %s not in %s'
- % (request.accept, content_types))
- raise ClientSideError(error_message, status_code=406)
- elif request.method in ['PUT', 'POST', 'PATCH']:
- content_type = request.headers.get('Content-Type')
- if content_type:
- for ct in content_types:
- if request.headers.get('Content-Type', '').startswith(ct):
- return True
- error_message = ('Unacceptable Content-Type: %s not in %s'
- % (content_type, content_types))
- raise ClientSideError(error_message, status_code=415)
- else:
- raise ClientSideError('missing Content-Type header')
- elif request.method in ['DELETE']:
- return True
- return False
diff --git a/wsme/rest/__init__.py b/wsme/rest/__init__.py
deleted file mode 100644
index f35d6a9..0000000
--- a/wsme/rest/__init__.py
+++ /dev/null
@@ -1,78 +0,0 @@
-import inspect
-import wsme.api
-
-APIPATH_MAXLEN = 20
-
-
-class expose(object):
- def __init__(self, *args, **kwargs):
- self.signature = wsme.api.signature(*args, **kwargs)
-
- def __call__(self, func):
- return self.signature(func)
-
- @classmethod
- def with_method(cls, method, *args, **kwargs):
- kwargs['method'] = method
- return cls(*args, **kwargs)
-
- @classmethod
- def get(cls, *args, **kwargs):
- return cls.with_method('GET', *args, **kwargs)
-
- @classmethod
- def post(cls, *args, **kwargs):
- return cls.with_method('POST', *args, **kwargs)
-
- @classmethod
- def put(cls, *args, **kwargs):
- return cls.with_method('PUT', *args, **kwargs)
-
- @classmethod
- def delete(cls, *args, **kwargs):
- return cls.with_method('DELETE', *args, **kwargs)
-
-
-class validate(object):
- """
- Decorator that define the arguments types of a function.
-
-
- Example::
-
- class MyController(object):
- @expose(str)
- @validate(datetime.date, datetime.time)
- def format(self, d, t):
- return d.isoformat() + ' ' + t.isoformat()
- """
- def __init__(self, *param_types):
- self.param_types = param_types
-
- def __call__(self, func):
- argspec = wsme.api.getargspec(func)
- fd = wsme.api.FunctionDefinition.get(func)
- fd.set_arg_types(argspec, self.param_types)
- return func
-
-
-def scan_api(controller, path=[], objects=[]):
- """
- Recursively iterate a controller api entries.
- """
- for name in dir(controller):
- if name.startswith('_'):
- continue
- a = getattr(controller, name)
- if a in objects:
- continue
- if inspect.ismethod(a):
- if wsme.api.iswsmefunction(a):
- yield path + [name], a, []
- elif inspect.isclass(a):
- continue
- else:
- if len(path) > APIPATH_MAXLEN:
- raise ValueError("Path is too long: " + str(path))
- for i in scan_api(a, path + [name], objects + [a]):
- yield i
diff --git a/wsme/rest/args.py b/wsme/rest/args.py
deleted file mode 100644
index 9dc16c7..0000000
--- a/wsme/rest/args.py
+++ /dev/null
@@ -1,310 +0,0 @@
-import cgi
-import datetime
-import re
-
-from simplegeneric import generic
-
-from wsme.exc import ClientSideError, UnknownArgument, InvalidInput
-
-from wsme.types import iscomplex, list_attributes, Unset
-from wsme.types import UserType, ArrayType, DictType, File
-from wsme.utils import parse_isodate, parse_isotime, parse_isodatetime
-import wsme.runtime
-
-from six import moves
-
-ARRAY_MAX_SIZE = 1000
-
-
-@generic
-def from_param(datatype, value):
- return datatype(value) if value is not None else None
-
-
-@from_param.when_object(datetime.date)
-def date_from_param(datatype, value):
- return parse_isodate(value) if value else None
-
-
-@from_param.when_object(datetime.time)
-def time_from_param(datatype, value):
- return parse_isotime(value) if value else None
-
-
-@from_param.when_object(datetime.datetime)
-def datetime_from_param(datatype, value):
- return parse_isodatetime(value) if value else None
-
-
-@from_param.when_object(File)
-def filetype_from_param(datatype, value):
- if isinstance(value, cgi.FieldStorage):
- return File(fieldstorage=value)
- return File(content=value)
-
-
-@from_param.when_type(UserType)
-def usertype_from_param(datatype, value):
- return datatype.frombasetype(
- from_param(datatype.basetype, value))
-
-
-@from_param.when_type(ArrayType)
-def array_from_param(datatype, value):
- if value is None:
- return value
- return [
- from_param(datatype.item_type, item)
- for item in value
- ]
-
-
-@generic
-def from_params(datatype, params, path, hit_paths):
- if iscomplex(datatype) and datatype is not File:
- objfound = False
- for key in params:
- if key.startswith(path + '.'):
- objfound = True
- break
- if objfound:
- r = datatype()
- for attrdef in list_attributes(datatype):
- value = from_params(
- attrdef.datatype,
- params, '%s.%s' % (path, attrdef.key), hit_paths
- )
- if value is not Unset:
- setattr(r, attrdef.key, value)
- return r
- else:
- if path in params:
- hit_paths.add(path)
- return from_param(datatype, params[path])
- return Unset
-
-
-@from_params.when_type(ArrayType)
-def array_from_params(datatype, params, path, hit_paths):
- if hasattr(params, 'getall'):
- # webob multidict
- def getall(params, path):
- return params.getall(path)
- elif hasattr(params, 'getlist'):
- # werkzeug multidict
- def getall(params, path): # noqa
- return params.getlist(path)
- if path in params:
- hit_paths.add(path)
- return [
- from_param(datatype.item_type, value)
- for value in getall(params, path)]
-
- if iscomplex(datatype.item_type):
- attributes = set()
- r = re.compile(r'^%s\.(?P<attrname>[^\.])' % re.escape(path))
- for p in params.keys():
- m = r.match(p)
- if m:
- attributes.add(m.group('attrname'))
- if attributes:
- value = []
- for attrdef in list_attributes(datatype.item_type):
- attrpath = '%s.%s' % (path, attrdef.key)
- hit_paths.add(attrpath)
- attrvalues = getall(params, attrpath)
- if len(value) < len(attrvalues):
- value[-1:] = [
- datatype.item_type()
- for i in moves.range(len(attrvalues) - len(value))
- ]
- for i, attrvalue in enumerate(attrvalues):
- setattr(
- value[i],
- attrdef.key,
- from_param(attrdef.datatype, attrvalue)
- )
- return value
-
- indexes = set()
- r = re.compile(r'^%s\[(?P<index>\d+)\]' % re.escape(path))
-
- for p in params.keys():
- m = r.match(p)
- if m:
- indexes.add(int(m.group('index')))
-
- if not indexes:
- return Unset
-
- indexes = list(indexes)
- indexes.sort()
-
- return [from_params(datatype.item_type, params,
- '%s[%s]' % (path, index), hit_paths)
- for index in indexes]
-
-
-@from_params.when_type(DictType)
-def dict_from_params(datatype, params, path, hit_paths):
-
- keys = set()
- r = re.compile(r'^%s\[(?P<key>[a-zA-Z0-9_\.]+)\]' % re.escape(path))
-
- for p in params.keys():
- m = r.match(p)
- if m:
- keys.add(from_param(datatype.key_type, m.group('key')))
-
- if not keys:
- return Unset
-
- return dict((
- (key, from_params(datatype.value_type,
- params, '%s[%s]' % (path, key), hit_paths))
- for key in keys))
-
-
-@from_params.when_type(UserType)
-def usertype_from_params(datatype, params, path, hit_paths):
- value = from_params(datatype.basetype, params, path, hit_paths)
- if value is not Unset:
- return datatype.frombasetype(value)
- return Unset
-
-
-def args_from_args(funcdef, args, kwargs):
- newargs = []
- for argdef, arg in zip(funcdef.arguments[:len(args)], args):
- try:
- newargs.append(from_param(argdef.datatype, arg))
- except Exception as e:
- if isinstance(argdef.datatype, UserType):
- datatype_name = argdef.datatype.name
- elif isinstance(argdef.datatype, type):
- datatype_name = argdef.datatype.__name__
- else:
- datatype_name = argdef.datatype.__class__.__name__
- raise InvalidInput(
- argdef.name,
- arg,
- "unable to convert to %(datatype)s. Error: %(error)s" % {
- 'datatype': datatype_name, 'error': e})
- newkwargs = {}
- for argname, value in kwargs.items():
- newkwargs[argname] = from_param(
- funcdef.get_arg(argname).datatype, value
- )
- return newargs, newkwargs
-
-
-def args_from_params(funcdef, params):
- kw = {}
- hit_paths = set()
- for argdef in funcdef.arguments:
- value = from_params(
- argdef.datatype, params, argdef.name, hit_paths)
- if value is not Unset:
- kw[argdef.name] = value
- paths = set(params.keys())
- unknown_paths = paths - hit_paths
- if '__body__' in unknown_paths:
- unknown_paths.remove('__body__')
- if not funcdef.ignore_extra_args and unknown_paths:
- raise UnknownArgument(', '.join(unknown_paths))
- return [], kw
-
-
-def args_from_body(funcdef, body, mimetype):
- from wsme.rest import json as restjson
- from wsme.rest import xml as restxml
-
- if funcdef.body_type is not None:
- datatypes = {funcdef.arguments[-1].name: funcdef.body_type}
- else:
- datatypes = dict(((a.name, a.datatype) for a in funcdef.arguments))
-
- if not body:
- return (), {}
- if mimetype == "application/x-www-form-urlencoded":
- # the parameters should have been parsed in params
- return (), {}
- elif mimetype in restjson.accept_content_types:
- dataformat = restjson
- elif mimetype in restxml.accept_content_types:
- dataformat = restxml
- else:
- raise ClientSideError("Unknown mimetype: %s" % mimetype,
- status_code=415)
-
- try:
- kw = dataformat.parse(
- body, datatypes, bodyarg=funcdef.body_type is not None
- )
- except UnknownArgument:
- if not funcdef.ignore_extra_args:
- raise
- kw = {}
-
- return (), kw
-
-
-def combine_args(funcdef, akw, allow_override=False):
- newargs, newkwargs = [], {}
- for args, kwargs in akw:
- for i, arg in enumerate(args):
- n = funcdef.arguments[i].name
- if not allow_override and n in newkwargs:
- raise ClientSideError(
- "Parameter %s was given several times" % n)
- newkwargs[n] = arg
- for name, value in kwargs.items():
- n = str(name)
- if not allow_override and n in newkwargs:
- raise ClientSideError(
- "Parameter %s was given several times" % n)
- newkwargs[n] = value
- return newargs, newkwargs
-
-
-def get_args(funcdef, args, kwargs, params, form, body, mimetype):
- """Combine arguments from :
- * the host framework args and kwargs
- * the request params
- * the request body
-
- Note that the host framework args and kwargs can be overridden
- by arguments from params of body
- """
- # get the body from params if not given directly
- if not body and '__body__' in params:
- body = params['__body__']
-
- # extract args from the host args and kwargs
- from_args = args_from_args(funcdef, args, kwargs)
-
- # extract args from the request parameters
- from_params = args_from_params(funcdef, params)
-
- # extract args from the form parameters
- if form:
- from_form_params = args_from_params(funcdef, form)
- else:
- from_form_params = (), {}
-
- # extract args from the request body
- from_body = args_from_body(funcdef, body, mimetype)
-
- # combine params and body arguments
- from_params_and_body = combine_args(
- funcdef,
- (from_params, from_form_params, from_body)
- )
-
- args, kwargs = combine_args(
- funcdef,
- (from_args, from_params_and_body),
- allow_override=True
- )
- wsme.runtime.check_arguments(funcdef, args, kwargs)
- return args, kwargs
diff --git a/wsme/rest/json.py b/wsme/rest/json.py
deleted file mode 100644
index 48bd082..0000000
--- a/wsme/rest/json.py
+++ /dev/null
@@ -1,328 +0,0 @@
-"""REST+Json protocol implementation."""
-from __future__ import absolute_import
-import datetime
-import decimal
-
-import six
-
-from simplegeneric import generic
-
-import wsme.exc
-import wsme.types
-from wsme.types import Unset
-import wsme.utils
-
-
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
-
-
-content_type = 'application/json'
-accept_content_types = [
- content_type,
- 'text/javascript',
- 'application/javascript'
-]
-ENUM_TRUE = ('true', 't', 'yes', 'y', 'on', '1')
-ENUM_FALSE = ('false', 'f', 'no', 'n', 'off', '0')
-
-
-@generic
-def tojson(datatype, value):
- """
- A generic converter from python to jsonify-able datatypes.
-
- If a non-complex user specific type is to be used in the api,
- a specific tojson should be added::
-
- from wsme.protocol.restjson import tojson
-
- myspecialtype = object()
-
- @tojson.when_object(myspecialtype)
- def myspecialtype_tojson(datatype, value):
- return str(value)
- """
- if value is None:
- return None
- if wsme.types.iscomplex(datatype):
- d = dict()
- for attr in wsme.types.list_attributes(datatype):
- attr_value = getattr(value, attr.key)
- if attr_value is not Unset:
- d[attr.name] = tojson(attr.datatype, attr_value)
- return d
- elif wsme.types.isusertype(datatype):
- return tojson(datatype.basetype, datatype.tobasetype(value))
- return value
-
-
-@tojson.when_object(wsme.types.bytes)
-def bytes_tojson(datatype, value):
- if value is None:
- return None
- return value.decode('ascii')
-
-
-@tojson.when_type(wsme.types.ArrayType)
-def array_tojson(datatype, value):
- if value is None:
- return None
- return [tojson(datatype.item_type, item) for item in value]
-
-
-@tojson.when_type(wsme.types.DictType)
-def dict_tojson(datatype, value):
- if value is None:
- return None
- return dict((
- (tojson(datatype.key_type, item[0]),
- tojson(datatype.value_type, item[1]))
- for item in value.items()
- ))
-
-
-@tojson.when_object(decimal.Decimal)
-def decimal_tojson(datatype, value):
- if value is None:
- return None
- return str(value)
-
-
-@tojson.when_object(datetime.date)
-def date_tojson(datatype, value):
- if value is None:
- return None
- return value.isoformat()
-
-
-@tojson.when_object(datetime.time)
-def time_tojson(datatype, value):
- if value is None:
- return None
- return value.isoformat()
-
-
-@tojson.when_object(datetime.datetime)
-def datetime_tojson(datatype, value):
- if value is None:
- return None
- return value.isoformat()
-
-
-@generic
-def fromjson(datatype, value):
- """A generic converter from json base types to python datatype.
-
- If a non-complex user specific type is to be used in the api,
- a specific fromjson should be added::
-
- from wsme.protocol.restjson import fromjson
-
- class MySpecialType(object):
- pass
-
- @fromjson.when_object(MySpecialType)
- def myspecialtype_fromjson(datatype, value):
- return MySpecialType(value)
- """
- if value is None:
- return None
- if wsme.types.iscomplex(datatype):
- obj = datatype()
- attributes = wsme.types.list_attributes(datatype)
-
- # Here we check that all the attributes in the value are also defined
- # in our type definition, otherwise we raise an Error.
- v_keys = set(value.keys())
- a_keys = set(adef.name for adef in attributes)
- if not v_keys <= a_keys:
- raise wsme.exc.UnknownAttribute(None, v_keys - a_keys)
-
- for attrdef in attributes:
- if attrdef.name in value:
- try:
- val_fromjson = fromjson(attrdef.datatype,
- value[attrdef.name])
- except wsme.exc.UnknownAttribute as e:
- e.add_fieldname(attrdef.name)
- raise
- if getattr(attrdef, 'readonly', False):
- raise wsme.exc.InvalidInput(attrdef.name, val_fromjson,
- "Cannot set read only field.")
- setattr(obj, attrdef.key, val_fromjson)
- elif attrdef.mandatory:
- raise wsme.exc.InvalidInput(attrdef.name, None,
- "Mandatory field missing.")
-
- return wsme.types.validate_value(datatype, obj)
- elif wsme.types.isusertype(datatype):
- value = datatype.frombasetype(
- fromjson(datatype.basetype, value))
- return value
-
-
-@fromjson.when_type(wsme.types.ArrayType)
-def array_fromjson(datatype, value):
- if value is None:
- return None
- if not isinstance(value, list):
- raise ValueError("Value not a valid list: %s" % value)
- return [fromjson(datatype.item_type, item) for item in value]
-
-
-@fromjson.when_type(wsme.types.DictType)
-def dict_fromjson(datatype, value):
- if value is None:
- return None
- if not isinstance(value, dict):
- raise ValueError("Value not a valid dict: %s" % value)
- return dict((
- (fromjson(datatype.key_type, item[0]),
- fromjson(datatype.value_type, item[1]))
- for item in value.items()))
-
-
-@fromjson.when_object(six.binary_type)
-def str_fromjson(datatype, value):
- if (isinstance(value, six.string_types) or
- isinstance(value, six.integer_types) or
- isinstance(value, float)):
- return six.text_type(value).encode('utf8')
-
-
-@fromjson.when_object(wsme.types.text)
-def text_fromjson(datatype, value):
- if value is not None and isinstance(value, wsme.types.bytes):
- return wsme.types.text(value)
- return value
-
-
-@fromjson.when_object(*six.integer_types + (float,))
-def numeric_fromjson(datatype, value):
- """Convert string object to built-in types int, long or float."""
- if value is None:
- return None
- return datatype(value)
-
-
-@fromjson.when_object(bool)
-def bool_fromjson(datatype, value):
- """Convert to bool, restricting strings to just unambiguous values."""
- if value is None:
- return None
- if isinstance(value, six.integer_types + (bool,)):
- return bool(value)
- if value in ENUM_TRUE:
- return True
- if value in ENUM_FALSE:
- return False
- raise ValueError("Value not an unambiguous boolean: %s" % value)
-
-
-@fromjson.when_object(decimal.Decimal)
-def decimal_fromjson(datatype, value):
- if value is None:
- return None
- return decimal.Decimal(value)
-
-
-@fromjson.when_object(datetime.date)
-def date_fromjson(datatype, value):
- if value is None:
- return None
- return wsme.utils.parse_isodate(value)
-
-
-@fromjson.when_object(datetime.time)
-def time_fromjson(datatype, value):
- if value is None:
- return None
- return wsme.utils.parse_isotime(value)
-
-
-@fromjson.when_object(datetime.datetime)
-def datetime_fromjson(datatype, value):
- if value is None:
- return None
- return wsme.utils.parse_isodatetime(value)
-
-
-def parse(s, datatypes, bodyarg, encoding='utf8'):
- jload = json.load
- if not hasattr(s, 'read'):
- if six.PY3 and isinstance(s, six.binary_type):
- s = s.decode(encoding)
- jload = json.loads
- try:
- jdata = jload(s)
- except ValueError:
- raise wsme.exc.ClientSideError("Request is not in valid JSON format")
- if bodyarg:
- argname = list(datatypes.keys())[0]
- try:
- kw = {argname: fromjson(datatypes[argname], jdata)}
- except ValueError as e:
- raise wsme.exc.InvalidInput(argname, jdata, e.args[0])
- except wsme.exc.UnknownAttribute as e:
- # We only know the fieldname at this level, not in the
- # called function. We fill in this information here.
- e.add_fieldname(argname)
- raise
- else:
- kw = {}
- extra_args = []
- if not isinstance(jdata, dict):
- raise wsme.exc.ClientSideError("Request must be a JSON dict")
- for key in jdata:
- if key not in datatypes:
- extra_args.append(key)
- else:
- try:
- kw[key] = fromjson(datatypes[key], jdata[key])
- except ValueError as e:
- raise wsme.exc.InvalidInput(key, jdata[key], e.args[0])
- except wsme.exc.UnknownAttribute as e:
- # We only know the fieldname at this level, not in the
- # called function. We fill in this information here.
- e.add_fieldname(key)
- raise
- if extra_args:
- raise wsme.exc.UnknownArgument(', '.join(extra_args))
- return kw
-
-
-def encode_result(value, datatype, **options):
- jsondata = tojson(datatype, value)
- if options.get('nest_result', False):
- jsondata = {options.get('nested_result_attrname', 'result'): jsondata}
- return json.dumps(jsondata)
-
-
-def encode_error(context, errordetail):
- return json.dumps(errordetail)
-
-
-def encode_sample_value(datatype, value, format=False):
- r = tojson(datatype, value)
- content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0,
- sort_keys=format)
- return ('javascript', content)
-
-
-def encode_sample_params(params, format=False):
- kw = {}
- for name, datatype, value in params:
- kw[name] = tojson(datatype, value)
- content = json.dumps(kw, ensure_ascii=False, indent=4 if format else 0,
- sort_keys=format)
- return ('javascript', content)
-
-
-def encode_sample_result(datatype, value, format=False):
- r = tojson(datatype, value)
- content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0,
- sort_keys=format)
- return ('javascript', content)
diff --git a/wsme/rest/protocol.py b/wsme/rest/protocol.py
deleted file mode 100644
index 5201ccf..0000000
--- a/wsme/rest/protocol.py
+++ /dev/null
@@ -1,133 +0,0 @@
-import os.path
-import logging
-
-from wsme.utils import OrderedDict
-from wsme.protocol import CallContext, Protocol, media_type_accept
-
-import wsme.rest
-import wsme.rest.args
-import wsme.runtime
-
-log = logging.getLogger(__name__)
-
-
-class RestProtocol(Protocol):
- name = 'rest'
- displayname = 'REST'
- dataformats = ['json', 'xml']
- content_types = ['application/json', 'text/xml']
-
- def __init__(self, dataformats=None):
- if dataformats is None:
- dataformats = RestProtocol.dataformats
-
- self.dataformats = OrderedDict()
- self.content_types = []
-
- for dataformat in dataformats:
- __import__('wsme.rest.' + dataformat)
- dfmod = getattr(wsme.rest, dataformat)
- self.dataformats[dataformat] = dfmod
- self.content_types.extend(dfmod.accept_content_types)
-
- def accept(self, request):
- for dataformat in self.dataformats:
- if request.path.endswith('.' + dataformat):
- return True
- return media_type_accept(request, self.content_types)
-
- def iter_calls(self, request):
- context = CallContext(request)
- context.outformat = None
- ext = os.path.splitext(request.path.split('/')[-1])[1]
- inmime = request.content_type
- outmime = request.accept.best_match(self.content_types)
-
- outformat = None
- informat = None
- for dfname, df in self.dataformats.items():
- if ext == '.' + dfname:
- outformat = df
- if not inmime:
- informat = df
-
- if outformat is None and request.accept:
- for dfname, df in self.dataformats.items():
- if outmime in df.accept_content_types:
- outformat = df
- if not inmime:
- informat = df
-
- if outformat is None:
- for dfname, df in self.dataformats.items():
- if inmime == df.content_type:
- outformat = df
-
- context.outformat = outformat
- context.outformat_options = {
- 'nest_result': getattr(self, 'nest_result', False)
- }
- if not inmime and informat:
- inmime = informat.content_type
- log.debug("Inferred input type: %s" % inmime)
- context.inmime = inmime
- yield context
-
- def extract_path(self, context):
- path = context.request.path
- assert path.startswith(self.root._webpath)
- path = path[len(self.root._webpath):]
- path = path.strip('/').split('/')
-
- for dataformat in self.dataformats:
- if path[-1].endswith('.' + dataformat):
- path[-1] = path[-1][:-len(dataformat) - 1]
-
- # Check if the path is actually a function, and if not
- # see if the http method make a difference
- # TODO Re-think the function lookup phases. Here we are
- # doing the job that will be done in a later phase, which
- # is sub-optimal
- for p, fdef in self.root.getapi():
- if p == path:
- return path
-
- # No function at this path. Now check for function that have
- # this path as a prefix, and declared an http method
- for p, fdef in self.root.getapi():
- if len(p) == len(path) + 1 and p[:len(path)] == path and \
- fdef.extra_options.get('method') == context.request.method:
- return p
-
- return path
-
- def read_arguments(self, context):
- request = context.request
- funcdef = context.funcdef
-
- body = None
- if request.content_length not in (None, 0, '0'):
- body = request.body
- if not body and '__body__' in request.params:
- body = request.params['__body__']
-
- args, kwargs = wsme.rest.args.combine_args(
- funcdef,
- (wsme.rest.args.args_from_params(funcdef, request.params),
- wsme.rest.args.args_from_body(funcdef, body, context.inmime))
- )
- wsme.runtime.check_arguments(funcdef, args, kwargs)
- return kwargs
-
- def encode_result(self, context, result):
- out = context.outformat.encode_result(
- result, context.funcdef.return_type,
- **context.outformat_options
- )
- return out
-
- def encode_error(self, context, errordetail):
- out = context.outformat.encode_error(
- context, errordetail
- )
- return out
diff --git a/wsme/rest/xml.py b/wsme/rest/xml.py
deleted file mode 100644
index 286afa7..0000000
--- a/wsme/rest/xml.py
+++ /dev/null
@@ -1,298 +0,0 @@
-from __future__ import absolute_import
-
-import datetime
-
-import six
-
-import xml.etree.ElementTree as et
-
-from simplegeneric import generic
-
-import wsme.types
-from wsme.exc import UnknownArgument, InvalidInput
-
-import re
-
-content_type = 'text/xml'
-accept_content_types = [
- content_type,
-]
-
-time_re = re.compile(r'(?P<h>[0-2][0-9]):(?P<m>[0-5][0-9]):(?P<s>[0-6][0-9])')
-
-
-def xml_indent(elem, level=0):
- i = "\n" + level * " "
- if len(elem):
- if not elem.text or not elem.text.strip():
- elem.text = i + " "
- for e in elem:
- xml_indent(e, level + 1)
- if not e.tail or not e.tail.strip():
- e.tail = i
- if level and (not elem.tail or not elem.tail.strip()):
- elem.tail = i
-
-
-@generic
-def toxml(datatype, key, value):
- """
- A generic converter from python to xml elements.
-
- If a non-complex user specific type is to be used in the api,
- a specific toxml should be added::
-
- from wsme.protocol.restxml import toxml
-
- myspecialtype = object()
-
- @toxml.when_object(myspecialtype)
- def myspecialtype_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- el.text = str(value)
- return el
- """
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- if wsme.types.isusertype(datatype):
- return toxml(datatype.basetype,
- key, datatype.tobasetype(value))
- elif wsme.types.iscomplex(datatype):
- for attrdef in datatype._wsme_attributes:
- attrvalue = getattr(value, attrdef.key)
- if attrvalue is not wsme.types.Unset:
- el.append(toxml(attrdef.datatype, attrdef.name,
- attrvalue))
- else:
- el.text = six.text_type(value)
- return el
-
-
-@generic
-def fromxml(datatype, element):
- """
- A generic converter from xml elements to python datatype.
-
- If a non-complex user specific type is to be used in the api,
- a specific fromxml should be added::
-
- from wsme.protocol.restxml import fromxml
-
- class MySpecialType(object):
- pass
-
- @fromxml.when_object(MySpecialType)
- def myspecialtype_fromxml(datatype, element):
- if element.get('nil', False):
- return None
- return MySpecialType(element.text)
- """
- if element.get('nil', False):
- return None
- if wsme.types.isusertype(datatype):
- return datatype.frombasetype(fromxml(datatype.basetype, element))
- if wsme.types.iscomplex(datatype):
- obj = datatype()
- for attrdef in wsme.types.list_attributes(datatype):
- sub = element.find(attrdef.name)
- if sub is not None:
- val_fromxml = fromxml(attrdef.datatype, sub)
- if getattr(attrdef, 'readonly', False):
- raise InvalidInput(attrdef.name, val_fromxml,
- "Cannot set read only field.")
- setattr(obj, attrdef.key, val_fromxml)
- elif attrdef.mandatory:
- raise InvalidInput(attrdef.name, None,
- "Mandatory field missing.")
- return wsme.types.validate_value(datatype, obj)
- if datatype is wsme.types.bytes:
- return element.text.encode('ascii')
- return datatype(element.text)
-
-
-@toxml.when_type(wsme.types.ArrayType)
-def array_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- for item in value:
- el.append(toxml(datatype.item_type, 'item', item))
- return el
-
-
-@toxml.when_type(wsme.types.DictType)
-def dict_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- for item in value.items():
- key = toxml(datatype.key_type, 'key', item[0])
- value = toxml(datatype.value_type, 'value', item[1])
- node = et.Element('item')
- node.append(key)
- node.append(value)
- el.append(node)
- return el
-
-
-@toxml.when_object(wsme.types.bytes)
-def bytes_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- el.text = value.decode('ascii')
- return el
-
-
-@toxml.when_object(bool)
-def bool_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- el.text = value and 'true' or 'false'
- return el
-
-
-@toxml.when_object(datetime.date)
-def date_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- el.text = value.isoformat()
- return el
-
-
-@toxml.when_object(datetime.datetime)
-def datetime_toxml(datatype, key, value):
- el = et.Element(key)
- if value is None:
- el.set('nil', 'true')
- else:
- el.text = value.isoformat()
- return el
-
-
-@fromxml.when_type(wsme.types.ArrayType)
-def array_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return [
- fromxml(datatype.item_type, item)
- for item in element.findall('item')
- ]
-
-
-@fromxml.when_object(bool)
-def bool_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return element.text.lower() != 'false'
-
-
-@fromxml.when_type(wsme.types.DictType)
-def dict_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return dict((
- (fromxml(datatype.key_type, item.find('key')),
- fromxml(datatype.value_type, item.find('value')))
- for item in element.findall('item')))
-
-
-@fromxml.when_object(wsme.types.text)
-def unicode_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return wsme.types.text(element.text) if element.text else six.u('')
-
-
-@fromxml.when_object(datetime.date)
-def date_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return wsme.utils.parse_isodate(element.text)
-
-
-@fromxml.when_object(datetime.time)
-def time_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return wsme.utils.parse_isotime(element.text)
-
-
-@fromxml.when_object(datetime.datetime)
-def datetime_fromxml(datatype, element):
- if element.get('nil') == 'true':
- return None
- return wsme.utils.parse_isodatetime(element.text)
-
-
-def parse(s, datatypes, bodyarg):
- if hasattr(s, 'read'):
- tree = et.parse(s)
- else:
- tree = et.fromstring(s)
- if bodyarg:
- name = list(datatypes.keys())[0]
- return {name: fromxml(datatypes[name], tree)}
- else:
- kw = {}
- extra_args = []
- for sub in tree:
- if sub.tag not in datatypes:
- extra_args.append(sub.tag)
- kw[sub.tag] = fromxml(datatypes[sub.tag], sub)
- if extra_args:
- raise UnknownArgument(', '.join(extra_args))
- return kw
-
-
-def encode_result(value, datatype, **options):
- return et.tostring(toxml(
- datatype, options.get('nested_result_attrname', 'result'), value
- ))
-
-
-def encode_error(context, errordetail):
- el = et.Element('error')
- et.SubElement(el, 'faultcode').text = errordetail['faultcode']
- et.SubElement(el, 'faultstring').text = errordetail['faultstring']
- if 'debuginfo' in errordetail:
- et.SubElement(el, 'debuginfo').text = errordetail['debuginfo']
- return et.tostring(el)
-
-
-def encode_sample_value(datatype, value, format=False):
- r = toxml(datatype, 'value', value)
- if format:
- xml_indent(r)
- content = et.tostring(r)
- return ('xml', content)
-
-
-def encode_sample_params(params, format=False):
- node = et.Element('parameters')
- for name, datatype, value in params:
- node.append(toxml(datatype, name, value))
- if format:
- xml_indent(node)
- content = et.tostring(node)
- return ('xml', content)
-
-
-def encode_sample_result(datatype, value, format=False):
- r = toxml(datatype, 'result', value)
- if format:
- xml_indent(r)
- content = et.tostring(r)
- return ('xml', content)
diff --git a/wsme/root.py b/wsme/root.py
deleted file mode 100644
index e71f1f7..0000000
--- a/wsme/root.py
+++ /dev/null
@@ -1,372 +0,0 @@
-import logging
-import sys
-import weakref
-
-from six import u, b
-import six
-
-import webob
-
-from wsme.exc import ClientSideError, UnknownFunction
-from wsme.protocol import getprotocol
-from wsme.rest import scan_api
-from wsme import spore
-import wsme.api
-import wsme.types
-
-log = logging.getLogger(__name__)
-
-html_body = u("""
-<html>
-<head>
- <style type='text/css'>
- %(css)s
- </style>
-</head>
-<body>
-%(content)s
-</body>
-</html>
-""")
-
-
-def default_prepare_response_body(request, results):
- r = None
- sep = None
- for value in results:
- if sep is None:
- if isinstance(value, six.text_type):
- sep = u('\n')
- r = u('')
- else:
- sep = b('\n')
- r = b('')
- else:
- r += sep
- r += value
- return r
-
-
-class DummyTransaction:
- def commit(self):
- pass
-
- def abort(self):
- pass
-
-
-class WSRoot(object):
- """
- Root controller for webservices.
-
- :param protocols: A list of protocols to enable (see :meth:`addprotocol`)
- :param webpath: The web path where the webservice is published.
-
- :type transaction: A `transaction
- <http://pypi.python.org/pypi/transaction>`_-like
- object or ``True``.
- :param transaction: If specified, a transaction will be created and
- handled on a per-call base.
-
- This option *can* be enabled along with `repoze.tm2
- <http://pypi.python.org/pypi/repoze.tm2>`_
- (it will only make it void).
-
- If ``True``, the default :mod:`transaction`
- module will be imported and used.
-
- """
- __registry__ = wsme.types.registry
-
- def __init__(self, protocols=[], webpath='', transaction=None,
- scan_api=scan_api):
- self._debug = True
- self._webpath = webpath
- self.protocols = []
- self._scan_api = scan_api
-
- self._transaction = transaction
- if self._transaction is True:
- import transaction
- self._transaction = transaction
-
- for protocol in protocols:
- self.addprotocol(protocol)
-
- self._api = None
-
- def wsgiapp(self):
- """Returns a wsgi application"""
- from webob.dec import wsgify
- return wsgify(self._handle_request)
-
- def begin(self):
- if self._transaction:
- return self._transaction.begin()
- else:
- return DummyTransaction()
-
- def addprotocol(self, protocol, **options):
- """
- Enable a new protocol on the controller.
-
- :param protocol: A registered protocol name or an instance
- of a protocol.
- """
- if isinstance(protocol, str):
- protocol = getprotocol(protocol, **options)
- self.protocols.append(protocol)
- protocol.root = weakref.proxy(self)
-
- def getapi(self):
- """
- Returns the api description.
-
- :rtype: list of (path, :class:`FunctionDefinition`)
- """
- if self._api is None:
- self._api = [
- (path, f, f._wsme_definition, args)
- for path, f, args in self._scan_api(self)
- ]
- for path, f, fdef, args in self._api:
- fdef.resolve_types(self.__registry__)
- return [
- (path, fdef)
- for path, f, fdef, args in self._api
- ]
-
- def _get_protocol(self, name):
- for protocol in self.protocols:
- if protocol.name == name:
- return protocol
-
- def _select_protocol(self, request):
- log.debug("Selecting a protocol for the following request :\n"
- "headers: %s\nbody: %s", request.headers.items(),
- request.content_length and (
- request.content_length > 512 and
- request.body[:512] or
- request.body) or '')
- protocol = None
- error = ClientSideError(status_code=406)
- path = str(request.path)
- assert path.startswith(self._webpath)
- path = path[len(self._webpath) + 1:]
- if 'wsmeproto' in request.params:
- return self._get_protocol(request.params['wsmeproto'])
- else:
-
- for p in self.protocols:
- try:
- if p.accept(request):
- protocol = p
- break
- except ClientSideError as e:
- error = e
- # If we could not select a protocol, we raise the last exception
- # that we got, or the default one.
- if not protocol:
- raise error
- return protocol
-
- def _do_call(self, protocol, context):
- request = context.request
- request.calls.append(context)
- try:
- if context.path is None:
- context.path = protocol.extract_path(context)
-
- if context.path is None:
- raise ClientSideError(u(
- 'The %s protocol was unable to extract a function '
- 'path from the request') % protocol.name)
-
- context.func, context.funcdef, args = \
- self._lookup_function(context.path)
- kw = protocol.read_arguments(context)
- args = list(args)
-
- txn = self.begin()
- try:
- result = context.func(*args, **kw)
- txn.commit()
- except Exception:
- txn.abort()
- raise
-
- else:
- # TODO make sure result type == a._wsme_definition.return_type
- return protocol.encode_result(context, result)
-
- except Exception as e:
- infos = wsme.api.format_exception(sys.exc_info(), self._debug)
- if isinstance(e, ClientSideError):
- request.client_errorcount += 1
- request.client_last_status_code = e.code
- else:
- request.server_errorcount += 1
- return protocol.encode_error(context, infos)
-
- def find_route(self, path):
- for p in self.protocols:
- for routepath, func in p.iter_routes():
- if path.startswith(routepath):
- return routepath, func
- return None, None
-
- def _handle_request(self, request):
- res = webob.Response()
- res_content_type = None
-
- path = request.path
- if path.startswith(self._webpath):
- path = path[len(self._webpath):]
- routepath, func = self.find_route(path)
- if routepath:
- content = func()
- if isinstance(content, six.text_type):
- res.text = content
- elif isinstance(content, six.binary_type):
- res.body = content
- res.content_type = func._cfg['content-type']
- return res
-
- if request.path == self._webpath + '/api.spore':
- res.body = spore.getdesc(self, request.host_url)
- res.content_type = 'application/json'
- return res
-
- try:
- msg = None
- error_status = 500
- protocol = self._select_protocol(request)
- except ClientSideError as e:
- error_status = e.code
- msg = e.faultstring
- protocol = None
- except Exception as e:
- msg = ("Unexpected error while selecting protocol: %s" % str(e))
- log.exception(msg)
- protocol = None
- error_status = 500
-
- if protocol is None:
- if not msg:
- msg = ("None of the following protocols can handle this "
- "request : %s" % ','.join((
- p.name for p in self.protocols)))
- res.status = error_status
- res.content_type = 'text/plain'
- try:
- res.text = u(msg)
- except TypeError:
- res.text = msg
- log.error(msg)
- return res
-
- request.calls = []
- request.client_errorcount = 0
- request.client_last_status_code = None
- request.server_errorcount = 0
-
- try:
-
- context = None
-
- if hasattr(protocol, 'prepare_response_body'):
- prepare_response_body = protocol.prepare_response_body
- else:
- prepare_response_body = default_prepare_response_body
-
- body = prepare_response_body(request, (
- self._do_call(protocol, context)
- for context in protocol.iter_calls(request)))
-
- if isinstance(body, six.text_type):
- res.text = body
- else:
- res.body = body
-
- if len(request.calls) == 1:
- if hasattr(protocol, 'get_response_status'):
- res.status = protocol.get_response_status(request)
- else:
- if request.client_errorcount == 1:
- res.status = request.client_last_status_code
- elif request.client_errorcount:
- res.status = 400
- elif request.server_errorcount:
- res.status = 500
- else:
- res.status = 200
- else:
- res.status = protocol.get_response_status(request)
- res_content_type = protocol.get_response_contenttype(request)
- except ClientSideError as e:
- request.server_errorcount += 1
- res.status = e.code
- res.text = e.faultstring
- except Exception:
- infos = wsme.api.format_exception(sys.exc_info(), self._debug)
- request.server_errorcount += 1
- res.text = protocol.encode_error(context, infos)
- res.status = 500
-
- if res_content_type is None:
- # Attempt to correctly guess what content-type we should return.
- ctypes = [ct for ct in protocol.content_types if ct]
- if ctypes:
- res_content_type = request.accept.best_match(ctypes)
-
- # If not we will attempt to convert the body to an accepted
- # output format.
- if res_content_type is None:
- if "text/html" in request.accept:
- res.text = self._html_format(res.body, protocol.content_types)
- res_content_type = "text/html"
-
- # TODO should we consider the encoding asked by
- # the web browser ?
- res.headers['Content-Type'] = "%s; charset=UTF-8" % res_content_type
-
- return res
-
- def _lookup_function(self, path):
- if not self._api:
- self.getapi()
-
- for fpath, f, fdef, args in self._api:
- if path == fpath:
- return f, fdef, args
- raise UnknownFunction('/'.join(path))
-
- def _html_format(self, content, content_types):
- try:
- from pygments import highlight
- from pygments.lexers import get_lexer_for_mimetype
- from pygments.formatters import HtmlFormatter
-
- lexer = None
- for ct in content_types:
- try:
- lexer = get_lexer_for_mimetype(ct)
- break
- except Exception:
- pass
-
- if lexer is None:
- raise ValueError("No lexer found")
- formatter = HtmlFormatter()
- return html_body % dict(
- css=formatter.get_style_defs(),
- content=highlight(content, lexer, formatter).encode('utf8'))
- except Exception as e:
- log.warning(
- "Could not pygment the content because of the following "
- "error :\n%s" % e)
- return html_body % dict(
- css='',
- content=u('<pre>%s</pre>') %
- content.replace(b('>'), b('&gt;'))
- .replace(b('<'), b('&lt;')))
diff --git a/wsme/runtime.py b/wsme/runtime.py
deleted file mode 100644
index e114d13..0000000
--- a/wsme/runtime.py
+++ /dev/null
@@ -1,9 +0,0 @@
-from wsme.exc import MissingArgument
-
-
-def check_arguments(funcdef, args, kw):
- """Check if some arguments are missing"""
- assert len(args) == 0
- for arg in funcdef.arguments:
- if arg.mandatory and arg.name not in kw:
- raise MissingArgument(arg.name)
diff --git a/wsme/spore.py b/wsme/spore.py
deleted file mode 100644
index 8de0ed2..0000000
--- a/wsme/spore.py
+++ /dev/null
@@ -1,64 +0,0 @@
-from wsme import types
-
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
-
-
-def getdesc(root, host_url=''):
- methods = {}
-
- for path, funcdef in root.getapi():
- method = funcdef.extra_options.get('method', None)
- name = '_'.join(path)
- if method is not None:
- path = path[:-1]
- else:
- method = 'GET'
- for argdef in funcdef.arguments:
- if types.iscomplex(argdef.datatype) \
- or types.isarray(argdef.datatype) \
- or types.isdict(argdef.datatype):
- method = 'POST'
- break
-
- required_params = []
- optional_params = []
- for argdef in funcdef.arguments:
- if method == 'GET' and argdef.mandatory:
- required_params.append(argdef.name)
- else:
- optional_params.append(argdef.name)
-
- methods[name] = {
- 'method': method,
- 'path': '/'.join(path)
- }
- if required_params:
- methods[name]['required_params'] = required_params
- if optional_params:
- methods[name]['optional_params'] = optional_params
- if funcdef.doc:
- methods[name]['documentation'] = funcdef.doc
-
- formats = []
- for p in root.protocols:
- if p.name == 'restxml':
- formats.append('xml')
- if p.name == 'restjson':
- formats.append('json')
-
- api = {
- 'base_url': host_url + root._webpath,
- 'version': '0.1',
- 'name': getattr(root, 'name', 'name'),
- 'authority': '',
- 'formats': [
- 'json',
- 'xml'
- ],
- 'methods': methods
- }
-
- return json.dumps(api, indent=4)
diff --git a/wsme/tests/__init__.py b/wsme/tests/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wsme/tests/__init__.py
+++ /dev/null
diff --git a/wsme/tests/protocol.py b/wsme/tests/protocol.py
deleted file mode 100644
index 1c41f8c..0000000
--- a/wsme/tests/protocol.py
+++ /dev/null
@@ -1,720 +0,0 @@
-# coding=utf-8
-
-import unittest
-import warnings
-import datetime
-import decimal
-import six
-
-from six import u, b
-
-from webtest import TestApp
-
-from wsme import WSRoot, Unset
-from wsme import expose, validate
-import wsme.types
-import wsme.utils
-
-warnings.filterwarnings('ignore', module='webob.dec')
-
-binarysample = b('\x00\xff\x43')
-
-try:
- 1 / 0
-except ZeroDivisionError as e:
- zerodivisionerrormsg = str(e)
-
-
-class CallException(RuntimeError):
- def __init__(self, faultcode, faultstring, debuginfo):
- self.faultcode = faultcode
- self.faultstring = faultstring
- self.debuginfo = debuginfo
-
- def __str__(self):
- return 'faultcode=%s, faultstring=%s, debuginfo=%s' % (
- self.faultcode, self.faultstring, self.debuginfo
- )
-
-
-myenumtype = wsme.types.Enum(wsme.types.bytes, 'v1', 'v2')
-
-
-class NestedInner(object):
- aint = int
-
- def __init__(self, aint=None):
- self.aint = aint
-
-
-class NestedOuter(object):
- inner = NestedInner
- inner_array = wsme.types.wsattr([NestedInner])
- inner_dict = {wsme.types.text: NestedInner}
-
- def __init__(self):
- self.inner = NestedInner(0)
-
-
-class NamedAttrsObject(object):
- def __init__(self, v1=Unset, v2=Unset):
- self.attr_1 = v1
- self.attr_2 = v2
-
- attr_1 = wsme.types.wsattr(int, name='attr.1')
- attr_2 = wsme.types.wsattr(int, name='attr.2')
-
-
-class CustomObject(object):
- aint = int
- name = wsme.types.text
-
-
-class ExtendedInt(wsme.types.UserType):
- basetype = int
- name = "Extended integer"
-
-
-class NestedInnerApi(object):
- @expose(bool)
- def deepfunction(self):
- return True
-
-
-class NestedOuterApi(object):
- inner = NestedInnerApi()
-
-
-class ReturnTypes(object):
- @expose(wsme.types.bytes)
- def getbytes(self):
- return b("astring")
-
- @expose(wsme.types.text)
- def gettext(self):
- return u('\xe3\x81\xae')
-
- @expose(int)
- def getint(self):
- return 2
-
- @expose(float)
- def getfloat(self):
- return 3.14159265
-
- @expose(decimal.Decimal)
- def getdecimal(self):
- return decimal.Decimal('3.14159265')
-
- @expose(datetime.date)
- def getdate(self):
- return datetime.date(1994, 1, 26)
-
- @expose(bool)
- def getbooltrue(self):
- return True
-
- @expose(bool)
- def getboolfalse(self):
- return False
-
- @expose(datetime.time)
- def gettime(self):
- return datetime.time(12, 0, 0)
-
- @expose(datetime.datetime)
- def getdatetime(self):
- return datetime.datetime(1994, 1, 26, 12, 0, 0)
-
- @expose(wsme.types.binary)
- def getbinary(self):
- return binarysample
-
- @expose(NestedOuter)
- def getnested(self):
- n = NestedOuter()
- return n
-
- @expose([wsme.types.bytes])
- def getbytesarray(self):
- return [b("A"), b("B"), b("C")]
-
- @expose([NestedOuter])
- def getnestedarray(self):
- return [NestedOuter(), NestedOuter()]
-
- @expose({wsme.types.bytes: NestedOuter})
- def getnesteddict(self):
- return {b('a'): NestedOuter(), b('b'): NestedOuter()}
-
- @expose(NestedOuter)
- def getobjectarrayattribute(self):
- obj = NestedOuter()
- obj.inner_array = [NestedInner(12), NestedInner(13)]
- return obj
-
- @expose(NestedOuter)
- def getobjectdictattribute(self):
- obj = NestedOuter()
- obj.inner_dict = {
- '12': NestedInner(12),
- '13': NestedInner(13)
- }
- return obj
-
- @expose(myenumtype)
- def getenum(self):
- return b('v2')
-
- @expose(NamedAttrsObject)
- def getnamedattrsobj(self):
- return NamedAttrsObject(5, 6)
-
-
-class ArgTypes(object):
- def assertEqual(self, a, b):
- if not (a == b):
- raise AssertionError('%s != %s' % (a, b))
-
- def assertIsInstance(self, value, v_type):
- assert isinstance(value, v_type), ("%s is not instance of type %s" %
- (value, v_type))
-
- @expose(wsme.types.bytes)
- @validate(wsme.types.bytes)
- def setbytes(self, value):
- print(repr(value))
- self.assertEqual(type(value), wsme.types.bytes)
- return value
-
- @expose(wsme.types.text)
- @validate(wsme.types.text)
- def settext(self, value):
- print(repr(value))
- self.assertEqual(type(value), wsme.types.text)
- return value
-
- @expose(wsme.types.text)
- @validate(wsme.types.text)
- def settextnone(self, value):
- print(repr(value))
- self.assertEqual(type(value), type(None))
- return value
-
- @expose(bool)
- @validate(bool)
- def setbool(self, value):
- print(repr(value))
- self.assertEqual(type(value), bool)
- return value
-
- @expose(int)
- @validate(int)
- def setint(self, value):
- print(repr(value))
- self.assertEqual(type(value), int)
- return value
-
- @expose(float)
- @validate(float)
- def setfloat(self, value):
- print(repr(value))
- self.assertEqual(type(value), float)
- return value
-
- @expose(decimal.Decimal)
- @validate(decimal.Decimal)
- def setdecimal(self, value):
- print(repr(value))
- self.assertEqual(type(value), decimal.Decimal)
- return value
-
- @expose(datetime.date)
- @validate(datetime.date)
- def setdate(self, value):
- print(repr(value))
- self.assertEqual(type(value), datetime.date)
- return value
-
- @expose(datetime.time)
- @validate(datetime.time)
- def settime(self, value):
- print(repr(value))
- self.assertEqual(type(value), datetime.time)
- return value
-
- @expose(datetime.datetime)
- @validate(datetime.datetime)
- def setdatetime(self, value):
- print(repr(value))
- self.assertEqual(type(value), datetime.datetime)
- return value
-
- @expose(wsme.types.binary)
- @validate(wsme.types.binary)
- def setbinary(self, value):
- print(repr(value))
- self.assertEqual(type(value), six.binary_type)
- return value
-
- @expose([wsme.types.bytes])
- @validate([wsme.types.bytes])
- def setbytesarray(self, value):
- print(repr(value))
- self.assertEqual(type(value), list)
- self.assertEqual(type(value[0]), wsme.types.bytes)
- return value
-
- @expose([wsme.types.text])
- @validate([wsme.types.text])
- def settextarray(self, value):
- print(repr(value))
- self.assertEqual(type(value), list)
- self.assertEqual(type(value[0]), wsme.types.text)
- return value
-
- @expose([datetime.datetime])
- @validate([datetime.datetime])
- def setdatetimearray(self, value):
- print(repr(value))
- self.assertEqual(type(value), list)
- self.assertEqual(type(value[0]), datetime.datetime)
- return value
-
- @expose(NestedOuter)
- @validate(NestedOuter)
- def setnested(self, value):
- print(repr(value))
- self.assertEqual(type(value), NestedOuter)
- return value
-
- @expose([NestedOuter])
- @validate([NestedOuter])
- def setnestedarray(self, value):
- print(repr(value))
- self.assertEqual(type(value), list)
- self.assertEqual(type(value[0]), NestedOuter)
- return value
-
- @expose({wsme.types.bytes: NestedOuter})
- @validate({wsme.types.bytes: NestedOuter})
- def setnesteddict(self, value):
- print(repr(value))
- self.assertEqual(type(value), dict)
- self.assertEqual(type(list(value.keys())[0]), wsme.types.bytes)
- self.assertEqual(type(list(value.values())[0]), NestedOuter)
- return value
-
- @expose(myenumtype)
- @validate(myenumtype)
- def setenum(self, value):
- print(value)
- self.assertEqual(type(value), wsme.types.bytes)
- return value
-
- @expose(NamedAttrsObject)
- @validate(NamedAttrsObject)
- def setnamedattrsobj(self, value):
- print(value)
- self.assertEqual(type(value), NamedAttrsObject)
- self.assertEqual(value.attr_1, 10)
- self.assertEqual(value.attr_2, 20)
- return value
-
- @expose(CustomObject)
- @validate(CustomObject)
- def setcustomobject(self, value):
- self.assertIsInstance(value, CustomObject)
- self.assertIsInstance(value.name, wsme.types.text)
- self.assertIsInstance(value.aint, int)
- return value
-
- @expose(ExtendedInt())
- @validate(ExtendedInt())
- def setextendedint(self, value):
- self.assertEqual(isinstance(value, ExtendedInt.basetype), True)
- return value
-
-
-class BodyTypes(object):
- def assertEqual(self, a, b):
- if not (a == b):
- raise AssertionError('%s != %s' % (a, b))
-
- @expose(int, body={wsme.types.text: int})
- @validate(int)
- def setdict(self, body):
- print(body)
- self.assertEqual(type(body), dict)
- self.assertEqual(type(body['test']), int)
- self.assertEqual(body['test'], 10)
- return body['test']
-
- @expose(int, body=[int])
- @validate(int)
- def setlist(self, body):
- print(body)
- self.assertEqual(type(body), list)
- self.assertEqual(type(body[0]), int)
- self.assertEqual(body[0], 10)
- return body[0]
-
-
-class WithErrors(object):
- @expose()
- def divide_by_zero(self):
- 1 / 0
-
-
-class MiscFunctions(object):
- @expose(int)
- @validate(int, int)
- def multiply(self, a, b):
- return a * b
-
-
-class WSTestRoot(WSRoot):
- argtypes = ArgTypes()
- returntypes = ReturnTypes()
- bodytypes = BodyTypes()
- witherrors = WithErrors()
- nested = NestedOuterApi()
- misc = MiscFunctions()
-
- def reset(self):
- self._touched = False
-
- @expose()
- def touch(self):
- self._touched = True
-
-
-class ProtocolTestCase(unittest.TestCase):
- protocol_options = {}
-
- def assertTypedEquals(self, a, b, convert):
- if isinstance(a, six.string_types):
- a = convert(a)
- if isinstance(b, six.string_types):
- b = convert(b)
- self.assertEqual(a, b)
-
- def assertDateEquals(self, a, b):
- self.assertTypedEquals(a, b, wsme.utils.parse_isodate)
-
- def assertTimeEquals(self, a, b):
- self.assertTypedEquals(a, b, wsme.utils.parse_isotime)
-
- def assertDateTimeEquals(self, a, b):
- self.assertTypedEquals(a, b, wsme.utils.parse_isodatetime)
-
- def assertIntEquals(self, a, b):
- self.assertTypedEquals(a, b, int)
-
- def assertFloatEquals(self, a, b):
- self.assertTypedEquals(a, b, float)
-
- def assertDecimalEquals(self, a, b):
- self.assertTypedEquals(a, b, decimal.Decimal)
-
- def setUp(self):
- if self.__class__.__name__ != 'ProtocolTestCase':
- self.root = WSTestRoot()
- self.root.getapi()
- self.root.addprotocol(self.protocol, **self.protocol_options)
-
- self.app = TestApp(self.root.wsgiapp())
-
- def test_invalid_path(self):
- try:
- res = self.call('invalid_function')
- print(res)
- assert "No error raised"
- except CallException as e:
- self.assertEqual(e.faultcode, 'Client')
- self.assertEqual(e.faultstring.lower(),
- u('unknown function name: invalid_function'))
-
- def test_serverside_error(self):
- try:
- res = self.call('witherrors/divide_by_zero')
- print(res)
- assert "No error raised"
- except CallException as e:
- self.assertEqual(e.faultcode, 'Server')
- self.assertEqual(e.faultstring, zerodivisionerrormsg)
- assert e.debuginfo is not None
-
- def test_serverside_error_nodebug(self):
- self.root._debug = False
- try:
- res = self.call('witherrors/divide_by_zero')
- print(res)
- assert "No error raised"
- except CallException as e:
- self.assertEqual(e.faultcode, 'Server')
- self.assertEqual(e.faultstring, zerodivisionerrormsg)
- assert e.debuginfo is None
-
- def test_touch(self):
- r = self.call('touch')
- assert r is None, r
-
- def test_return_bytes(self):
- r = self.call('returntypes/getbytes', _rt=wsme.types.bytes)
- self.assertEqual(r, b('astring'))
-
- def test_return_text(self):
- r = self.call('returntypes/gettext', _rt=wsme.types.text)
- self.assertEqual(r, u('\xe3\x81\xae'))
-
- def test_return_int(self):
- r = self.call('returntypes/getint')
- self.assertIntEquals(r, 2)
-
- def test_return_float(self):
- r = self.call('returntypes/getfloat')
- self.assertFloatEquals(r, 3.14159265)
-
- def test_return_decimal(self):
- r = self.call('returntypes/getdecimal')
- self.assertDecimalEquals(r, '3.14159265')
-
- def test_return_bool_true(self):
- r = self.call('returntypes/getbooltrue', _rt=bool)
- assert r
-
- def test_return_bool_false(self):
- r = self.call('returntypes/getboolfalse', _rt=bool)
- assert not r
-
- def test_return_date(self):
- r = self.call('returntypes/getdate')
- self.assertDateEquals(r, datetime.date(1994, 1, 26))
-
- def test_return_time(self):
- r = self.call('returntypes/gettime')
- self.assertTimeEquals(r, datetime.time(12))
-
- def test_return_datetime(self):
- r = self.call('returntypes/getdatetime')
- self.assertDateTimeEquals(r, datetime.datetime(1994, 1, 26, 12))
-
- def test_return_binary(self):
- r = self.call('returntypes/getbinary', _rt=wsme.types.binary)
- self.assertEqual(r, binarysample)
-
- def test_return_nested(self):
- r = self.call('returntypes/getnested', _rt=NestedOuter)
- self.assertEqual(r, {'inner': {'aint': 0}})
-
- def test_return_bytesarray(self):
- r = self.call('returntypes/getbytesarray', _rt=[six.binary_type])
- self.assertEqual(r, [b('A'), b('B'), b('C')])
-
- def test_return_nestedarray(self):
- r = self.call('returntypes/getnestedarray', _rt=[NestedOuter])
- self.assertEqual(r, [{'inner': {'aint': 0}}, {'inner': {'aint': 0}}])
-
- def test_return_nesteddict(self):
- r = self.call('returntypes/getnesteddict',
- _rt={wsme.types.bytes: NestedOuter})
- self.assertEqual(r, {
- b('a'): {'inner': {'aint': 0}},
- b('b'): {'inner': {'aint': 0}}
- })
-
- def test_return_objectarrayattribute(self):
- r = self.call('returntypes/getobjectarrayattribute', _rt=NestedOuter)
- self.assertEqual(r, {
- 'inner': {'aint': 0},
- 'inner_array': [{'aint': 12}, {'aint': 13}]
- })
-
- def test_return_objectdictattribute(self):
- r = self.call('returntypes/getobjectdictattribute', _rt=NestedOuter)
- self.assertEqual(r, {
- 'inner': {'aint': 0},
- 'inner_dict': {
- '12': {'aint': 12},
- '13': {'aint': 13}
- }
- })
-
- def test_return_enum(self):
- r = self.call('returntypes/getenum', _rt=myenumtype)
- self.assertEqual(r, b('v2'), r)
-
- def test_return_namedattrsobj(self):
- r = self.call('returntypes/getnamedattrsobj', _rt=NamedAttrsObject)
- self.assertEqual(r, {'attr.1': 5, 'attr.2': 6})
-
- def test_setbytes(self):
- assert self.call('argtypes/setbytes', value=b('astring'),
- _rt=wsme.types.bytes) == b('astring')
-
- def test_settext(self):
- assert self.call('argtypes/settext', value=u('\xe3\x81\xae'),
- _rt=wsme.types.text) == u('\xe3\x81\xae')
-
- def test_settext_empty(self):
- assert self.call('argtypes/settext', value=u(''),
- _rt=wsme.types.text) == u('')
-
- def test_settext_none(self):
- self.assertEqual(
- None,
- self.call('argtypes/settextnone', value=None, _rt=wsme.types.text)
- )
-
- def test_setint(self):
- r = self.call('argtypes/setint', value=3, _rt=int)
- self.assertEqual(r, 3)
-
- def test_setfloat(self):
- assert self.call('argtypes/setfloat', value=3.54,
- _rt=float) == 3.54
-
- def test_setbool_true(self):
- r = self.call('argtypes/setbool', value=True, _rt=bool)
- assert r
-
- def test_setbool_false(self):
- r = self.call('argtypes/setbool', value=False, _rt=bool)
- assert not r
-
- def test_setdecimal(self):
- value = decimal.Decimal('3.14')
- assert self.call('argtypes/setdecimal', value=value,
- _rt=decimal.Decimal) == value
-
- def test_setdate(self):
- value = datetime.date(2008, 4, 6)
- r = self.call('argtypes/setdate', value=value,
- _rt=datetime.date)
- self.assertEqual(r, value)
-
- def test_settime(self):
- value = datetime.time(12, 12, 15)
- r = self.call('argtypes/settime', value=value,
- _rt=datetime.time)
- self.assertEqual(r, datetime.time(12, 12, 15))
-
- def test_setdatetime(self):
- value = datetime.datetime(2008, 4, 6, 12, 12, 15)
- r = self.call('argtypes/setdatetime', value=value,
- _rt=datetime.datetime)
- self.assertEqual(r, datetime.datetime(2008, 4, 6, 12, 12, 15))
-
- def test_setbinary(self):
- value = binarysample
- r = self.call('argtypes/setbinary', value=(value, wsme.types.binary),
- _rt=wsme.types.binary) == value
- print(r)
-
- def test_setnested(self):
- value = {'inner': {'aint': 54}}
- r = self.call('argtypes/setnested',
- value=(value, NestedOuter),
- _rt=NestedOuter)
- self.assertEqual(r, value)
-
- def test_setnested_nullobj(self):
- value = {'inner': None}
- r = self.call(
- 'argtypes/setnested',
- value=(value, NestedOuter),
- _rt=NestedOuter
- )
- self.assertEqual(r, value)
-
- def test_setbytesarray(self):
- value = [b("1"), b("2"), b("three")]
- r = self.call('argtypes/setbytesarray',
- value=(value, [wsme.types.bytes]),
- _rt=[wsme.types.bytes])
- self.assertEqual(r, value)
-
- def test_settextarray(self):
- value = [u("1")]
- r = self.call('argtypes/settextarray',
- value=(value, [wsme.types.text]),
- _rt=[wsme.types.text])
- self.assertEqual(r, value)
-
- def test_setdatetimearray(self):
- value = [
- datetime.datetime(2008, 3, 6, 12, 12, 15),
- datetime.datetime(2008, 4, 6, 2, 12, 15),
- ]
- r = self.call('argtypes/setdatetimearray',
- value=(value, [datetime.datetime]),
- _rt=[datetime.datetime])
- self.assertEqual(r, value)
-
- def test_setnestedarray(self):
- value = [
- {'inner': {'aint': 54}},
- {'inner': {'aint': 55}},
- ]
- r = self.call('argtypes/setnestedarray',
- value=(value, [NestedOuter]),
- _rt=[NestedOuter])
- self.assertEqual(r, value)
-
- def test_setnesteddict(self):
- value = {
- b('o1'): {'inner': {'aint': 54}},
- b('o2'): {'inner': {'aint': 55}},
- }
- r = self.call('argtypes/setnesteddict',
- value=(value, {six.binary_type: NestedOuter}),
- _rt={six.binary_type: NestedOuter})
- print(r)
- self.assertEqual(r, value)
-
- def test_setenum(self):
- value = b('v1')
- r = self.call('argtypes/setenum', value=value,
- _rt=myenumtype)
- self.assertEqual(r, value)
-
- def test_setnamedattrsobj(self):
- value = {'attr.1': 10, 'attr.2': 20}
- r = self.call('argtypes/setnamedattrsobj',
- value=(value, NamedAttrsObject),
- _rt=NamedAttrsObject)
- self.assertEqual(r, value)
-
- def test_nested_api(self):
- r = self.call('nested/inner/deepfunction', _rt=bool)
- assert r is True
-
- def test_missing_argument(self):
- try:
- r = self.call('argtypes/setdatetime')
- print(r)
- assert "No error raised"
- except CallException as e:
- self.assertEqual(e.faultcode, 'Client')
- self.assertEqual(e.faultstring, u('Missing argument: "value"'))
-
- def test_misc_multiply(self):
- self.assertEqual(self.call('misc/multiply', a=5, b=2, _rt=int), 10)
-
- def test_html_format(self):
- res = self.call('argtypes/setdatetime', _accept="text/html",
- _no_result_decode=True)
- self.assertEqual(res.content_type, 'text/html')
-
-
-class RestOnlyProtocolTestCase(ProtocolTestCase):
- def test_body_list(self):
- r = self.call('bodytypes/setlist', body=([10], [int]), _rt=int)
- self.assertEqual(r, 10)
-
- def test_body_dict(self):
- r = self.call('bodytypes/setdict',
- body=({'test': 10}, {wsme.types.text: int}),
- _rt=int)
- self.assertEqual(r, 10)
diff --git a/wsme/tests/test_api.py b/wsme/tests/test_api.py
deleted file mode 100644
index fc1e157..0000000
--- a/wsme/tests/test_api.py
+++ /dev/null
@@ -1,419 +0,0 @@
-# encoding=utf8
-
-from six import b
-
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
-import webtest
-
-from wsme import WSRoot, expose, validate
-from wsme.rest import scan_api
-from wsme import types
-from wsme import exc
-import wsme.api as wsme_api
-import wsme.types
-
-from wsme.tests.test_protocols import DummyProtocol
-
-
-class TestController(unittest.TestCase):
- def test_expose(self):
- class MyWS(WSRoot):
- @expose(int)
- def getint(self):
- return 1
-
- assert MyWS.getint._wsme_definition.return_type == int
-
- def test_validate(self):
- class ComplexType(object):
- attr = int
-
- class MyWS(object):
- @expose(int)
- @validate(int, int, int)
- def add(self, a, b, c=0):
- return a + b + c
-
- @expose(bool)
- @validate(ComplexType)
- def setcplx(self, obj):
- pass
-
- MyWS.add._wsme_definition.resolve_types(wsme.types.registry)
- MyWS.setcplx._wsme_definition.resolve_types(wsme.types.registry)
- args = MyWS.add._wsme_definition.arguments
-
- assert args[0].name == 'a'
- assert args[0].datatype == int
- assert args[0].mandatory
- assert args[0].default is None
-
- assert args[1].name == 'b'
- assert args[1].datatype == int
- assert args[1].mandatory
- assert args[1].default is None
-
- assert args[2].name == 'c'
- assert args[2].datatype == int
- assert not args[2].mandatory
- assert args[2].default == 0
-
- assert types.iscomplex(ComplexType)
-
- def test_validate_enum_with_none(self):
- class Version(object):
- number = types.Enum(str, 'v1', 'v2', None)
-
- class MyWS(WSRoot):
- @expose(str)
- @validate(Version)
- def setcplx(self, version):
- pass
-
- r = MyWS(['restjson'])
- app = webtest.TestApp(r.wsgiapp())
- res = app.post_json('/setcplx', params={'version': {'number': 'arf'}},
- expect_errors=True,
- headers={'Accept': 'application/json'})
- self.assertTrue(
- res.json_body['faultstring'].startswith(
- "Invalid input for field/attribute number. Value: 'arf'. \
-Value should be one of:"))
- self.assertIn('v1', res.json_body['faultstring'])
- self.assertIn('v2', res.json_body['faultstring'])
- self.assertIn('None', res.json_body['faultstring'])
- self.assertEqual(res.status_int, 400)
-
- def test_validate_enum_with_wrong_type(self):
- class Version(object):
- number = types.Enum(str, 'v1', 'v2', None)
-
- class MyWS(WSRoot):
- @expose(str)
- @validate(Version)
- def setcplx(self, version):
- pass
-
- r = MyWS(['restjson'])
- app = webtest.TestApp(r.wsgiapp())
- res = app.post_json('/setcplx', params={'version': {'number': 1}},
- expect_errors=True,
- headers={'Accept': 'application/json'})
- self.assertTrue(
- res.json_body['faultstring'].startswith(
- "Invalid input for field/attribute number. Value: '1'. \
-Value should be one of:"))
- self.assertIn('v1', res.json_body['faultstring'])
- self.assertIn('v2', res.json_body['faultstring'])
- self.assertIn('None', res.json_body['faultstring'])
- self.assertEqual(res.status_int, 400)
-
- def test_scan_api(self):
- class NS(object):
- @expose(int)
- @validate(int, int)
- def multiply(self, a, b):
- return a * b
-
- class MyRoot(WSRoot):
- ns = NS()
-
- r = MyRoot()
-
- api = list(scan_api(r))
- assert len(api) == 1
- path, fd, args = api[0]
- assert path == ['ns', 'multiply']
- assert fd._wsme_definition.name == 'multiply'
- assert args == []
-
- def test_scan_subclass(self):
- class MyRoot(WSRoot):
- class SubClass(object):
- pass
-
- r = MyRoot()
- api = list(scan_api(r))
-
- assert len(api) == 0
-
- def test_scan_api_too_deep(self):
- class Loop(object):
- pass
-
- ell = Loop()
- for i in range(0, 21):
- nl = Loop()
- nl.ell = ell
- ell = nl
-
- class MyRoot(WSRoot):
- loop = ell
-
- r = MyRoot()
-
- try:
- list(scan_api(r))
- assert False, "ValueError not raised"
- except ValueError as e:
- assert str(e).startswith("Path is too long")
-
- def test_handle_request(self):
- class MyRoot(WSRoot):
- @expose()
- def touch(self):
- pass
-
- p = DummyProtocol()
- r = MyRoot(protocols=[p])
-
- app = webtest.TestApp(r.wsgiapp())
-
- res = app.get('/')
-
- assert p.lastreq.path == '/'
- assert p.hits == 1
-
- res = app.get('/touch?wsmeproto=dummy')
-
- assert p.lastreq.path == '/touch'
- assert p.hits == 2
-
- class NoPathProto(DummyProtocol):
- def extract_path(self, request):
- return None
-
- p = NoPathProto()
- r = MyRoot(protocols=[p])
-
- app = webtest.TestApp(r.wsgiapp())
-
- res = app.get('/', expect_errors=True)
- print(res.status, res.body)
- assert res.status_int == 400
-
- def test_no_available_protocol(self):
- r = WSRoot()
-
- app = webtest.TestApp(r.wsgiapp())
-
- res = app.get('/', expect_errors=True)
- print(res.status_int)
- assert res.status_int == 406
- print(res.body)
- assert res.body.find(
- b("None of the following protocols can handle this request")) != -1
-
- def test_return_content_type_guess(self):
- class DummierProto(DummyProtocol):
- content_types = ['text/xml', 'text/plain']
-
- r = WSRoot([DummierProto()])
-
- app = webtest.TestApp(r.wsgiapp())
-
- res = app.get('/', expect_errors=True, headers={
- 'Accept': 'text/xml,q=0.8'})
- assert res.status_int == 400
- assert res.content_type == 'text/xml', res.content_type
-
- res = app.get('/', expect_errors=True, headers={
- 'Accept': 'text/plain'})
- assert res.status_int == 400
- assert res.content_type == 'text/plain', res.content_type
-
- def test_double_expose(self):
- try:
- class MyRoot(WSRoot):
- @expose()
- @expose()
- def atest(self):
- pass
- assert False, "A ValueError should have been raised"
- except ValueError:
- pass
-
- def test_multiple_expose(self):
- class MyRoot(WSRoot):
- def multiply(self, a, b):
- return a * b
-
- mul_int = expose(int, int, int, wrap=True)(multiply)
-
- mul_float = expose(
- float, float, float,
- wrap=True)(multiply)
-
- mul_string = expose(
- wsme.types.text, wsme.types.text, int,
- wrap=True)(multiply)
-
- r = MyRoot(['restjson'])
-
- app = webtest.TestApp(r.wsgiapp())
-
- res = app.get('/mul_int?a=2&b=5', headers={
- 'Accept': 'application/json'
- })
-
- self.assertEqual(res.body, b('10'))
-
- res = app.get('/mul_float?a=1.2&b=2.9', headers={
- 'Accept': 'application/json'
- })
-
- self.assertEqual(res.body, b('3.48'))
-
- res = app.get('/mul_string?a=hello&b=2', headers={
- 'Accept': 'application/json'
- })
-
- self.assertEqual(res.body, b('"hellohello"'))
-
- def test_wsattr_mandatory(self):
- class ComplexType(object):
- attr = wsme.types.wsattr(int, mandatory=True)
-
- class MyRoot(WSRoot):
- @expose(int, body=ComplexType)
- @validate(ComplexType)
- def clx(self, a):
- return a.attr
-
- r = MyRoot(['restjson'])
- app = webtest.TestApp(r.wsgiapp())
- res = app.post_json('/clx', params={}, expect_errors=True,
- headers={'Accept': 'application/json'})
- self.assertEqual(res.status_int, 400)
-
- def test_wsattr_readonly(self):
- class ComplexType(object):
- attr = wsme.types.wsattr(int, readonly=True)
-
- class MyRoot(WSRoot):
- @expose(int, body=ComplexType)
- @validate(ComplexType)
- def clx(self, a):
- return a.attr
-
- r = MyRoot(['restjson'])
- app = webtest.TestApp(r.wsgiapp())
- res = app.post_json('/clx', params={'attr': 1005}, expect_errors=True,
- headers={'Accept': 'application/json'})
- self.assertIn('Cannot set read only field.',
- res.json_body['faultstring'])
- self.assertIn('1005', res.json_body['faultstring'])
- self.assertEqual(res.status_int, 400)
-
- def test_wsattr_default(self):
- class ComplexType(object):
- attr = wsme.types.wsattr(wsme.types.Enum(str, 'or', 'and'),
- default='and')
-
- class MyRoot(WSRoot):
- @expose(int)
- @validate(ComplexType)
- def clx(self, a):
- return a.attr
-
- r = MyRoot(['restjson'])
- app = webtest.TestApp(r.wsgiapp())
- res = app.post_json('/clx', params={}, expect_errors=True,
- headers={'Accept': 'application/json'})
- self.assertEqual(res.status_int, 400)
-
- def test_wsproperty_mandatory(self):
- class ComplexType(object):
- def foo(self):
- pass
-
- attr = wsme.types.wsproperty(int, foo, foo, mandatory=True)
-
- class MyRoot(WSRoot):
- @expose(int, body=ComplexType)
- @validate(ComplexType)
- def clx(self, a):
- return a.attr
-
- r = MyRoot(['restjson'])
- app = webtest.TestApp(r.wsgiapp())
- res = app.post_json('/clx', params={}, expect_errors=True,
- headers={'Accept': 'application/json'})
- self.assertEqual(res.status_int, 400)
-
- def test_validate_enum_mandatory(self):
- class Version(object):
- number = wsme.types.wsattr(wsme.types.Enum(str, 'v1', 'v2'),
- mandatory=True)
-
- class MyWS(WSRoot):
- @expose(str)
- @validate(Version)
- def setcplx(self, version):
- pass
-
- r = MyWS(['restjson'])
- app = webtest.TestApp(r.wsgiapp())
- res = app.post_json('/setcplx', params={'version': {}},
- expect_errors=True,
- headers={'Accept': 'application/json'})
- self.assertEqual(res.status_int, 400)
-
-
-class TestFunctionDefinition(unittest.TestCase):
-
- def test_get_arg(self):
- def myfunc(self):
- pass
-
- fd = wsme_api.FunctionDefinition(wsme_api.FunctionDefinition)
- fd.arguments.append(wsme_api.FunctionArgument('a', int, True, None))
-
- assert fd.get_arg('a').datatype is int
- assert fd.get_arg('b') is None
-
-
-class TestFormatException(unittest.TestCase):
-
- def _test_format_exception(self, exception, debug=False):
- fake_exc_info = (None, exception, None)
- return wsme_api.format_exception(fake_exc_info, debug=debug)
-
- def test_format_client_exception(self):
- faultstring = 'boom'
- ret = self._test_format_exception(exc.ClientSideError(faultstring))
- self.assertIsNone(ret['debuginfo'])
- self.assertEqual('Client', ret['faultcode'])
- self.assertEqual(faultstring, ret['faultstring'])
-
- def test_format_client_exception_unicode(self):
- faultstring = u'\xc3\xa3o'
- ret = self._test_format_exception(exc.ClientSideError(faultstring))
- self.assertIsNone(ret['debuginfo'])
- self.assertEqual('Client', ret['faultcode'])
- self.assertEqual(faultstring, ret['faultstring'])
-
- def test_format_server_exception(self):
- faultstring = 'boom'
- ret = self._test_format_exception(Exception(faultstring))
- self.assertIsNone(ret['debuginfo'])
- self.assertEqual('Server', ret['faultcode'])
- self.assertEqual(faultstring, ret['faultstring'])
-
- def test_format_server_exception_unicode(self):
- faultstring = u'\xc3\xa3o'
- ret = self._test_format_exception(Exception(faultstring))
- self.assertIsNone(ret['debuginfo'])
- self.assertEqual('Server', ret['faultcode'])
- self.assertEqual(faultstring, ret['faultstring'])
-
- def test_format_server_exception_debug(self):
- faultstring = 'boom'
- ret = self._test_format_exception(Exception(faultstring), debug=True)
- # assert debuginfo is populated
- self.assertIsNotNone(ret['debuginfo'])
- self.assertEqual('Server', ret['faultcode'])
- self.assertEqual(faultstring, ret['faultstring'])
diff --git a/wsme/tests/test_exc.py b/wsme/tests/test_exc.py
deleted file mode 100644
index ba903fb..0000000
--- a/wsme/tests/test_exc.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# encoding=utf8
-
-from wsme.exc import (ClientSideError, InvalidInput, MissingArgument,
- UnknownArgument)
-from six import u
-
-
-def test_clientside_error():
- e = ClientSideError("Test")
-
- assert e.faultstring == u("Test")
-
-
-def test_unicode_clientside_error():
- e = ClientSideError(u("\u30d5\u30a1\u30b7\u30ea"))
-
- assert e.faultstring == u("\u30d5\u30a1\u30b7\u30ea")
-
-
-def test_invalidinput():
- e = InvalidInput('field', 'badvalue', "error message")
-
- assert e.faultstring == u(
- "Invalid input for field/attribute field. Value: 'badvalue'. "
- "error message"
- ), e.faultstring
-
-
-def test_missingargument():
- e = MissingArgument('argname', "error message")
-
- assert e.faultstring == \
- u('Missing argument: "argname": error message'), e.faultstring
-
-
-def test_unknownargument():
- e = UnknownArgument('argname', "error message")
-
- assert e.faultstring == \
- u('Unknown argument: "argname": error message'), e.faultstring
diff --git a/wsme/tests/test_protocols.py b/wsme/tests/test_protocols.py
deleted file mode 100644
index f4b7270..0000000
--- a/wsme/tests/test_protocols.py
+++ /dev/null
@@ -1,70 +0,0 @@
-# encoding=utf8
-
-import unittest
-
-from wsme import WSRoot
-from wsme.protocol import getprotocol, CallContext, Protocol
-import wsme.protocol
-
-
-class DummyProtocol(Protocol):
- name = 'dummy'
- content_types = ['', None]
-
- def __init__(self):
- self.hits = 0
-
- def accept(self, req):
- return True
-
- def iter_calls(self, req):
- yield CallContext(req)
-
- def extract_path(self, context):
- return ['touch']
-
- def read_arguments(self, context):
- self.lastreq = context.request
- self.hits += 1
- return {}
-
- def encode_result(self, context, result):
- return str(result)
-
- def encode_error(self, context, infos):
- return str(infos)
-
-
-def test_getprotocol():
- try:
- getprotocol('invalid')
- assert False, "ValueError was not raised"
- except ValueError:
- pass
-
-
-class TestProtocols(unittest.TestCase):
- def test_register_protocol(self):
- wsme.protocol.register_protocol(DummyProtocol)
- assert wsme.protocol.registered_protocols['dummy'] == DummyProtocol
-
- r = WSRoot()
- assert len(r.protocols) == 0
-
- r.addprotocol('dummy')
- assert len(r.protocols) == 1
- assert r.protocols[0].__class__ == DummyProtocol
-
- r = WSRoot(['dummy'])
- assert len(r.protocols) == 1
- assert r.protocols[0].__class__ == DummyProtocol
-
- def test_Protocol(self):
- p = wsme.protocol.Protocol()
- assert p.iter_calls(None) is None
- assert p.extract_path(None) is None
- assert p.read_arguments(None) is None
- assert p.encode_result(None, None) is None
- assert p.encode_sample_value(None, None) == ('none', 'N/A')
- assert p.encode_sample_params(None) == ('none', 'N/A')
- assert p.encode_sample_result(None, None) == ('none', 'N/A')
diff --git a/wsme/tests/test_protocols_commons.py b/wsme/tests/test_protocols_commons.py
deleted file mode 100644
index af5aa05..0000000
--- a/wsme/tests/test_protocols_commons.py
+++ /dev/null
@@ -1,159 +0,0 @@
-# encoding=utf8
-
-import datetime
-import unittest
-
-from wsme.api import FunctionArgument, FunctionDefinition
-from wsme.rest.args import from_param, from_params, args_from_args
-from wsme.exc import InvalidInput
-
-from wsme.types import UserType, Unset, ArrayType, DictType, Base
-
-
-class MyBaseType(Base):
- test = str
-
-
-class MyUserType(UserType):
- basetype = str
-
-
-class DictBasedUserType(UserType):
- basetype = DictType(int, int)
-
-
-class TestProtocolsCommons(unittest.TestCase):
- def test_from_param_date(self):
- assert from_param(datetime.date, '2008-02-28') == \
- datetime.date(2008, 2, 28)
-
- def test_from_param_time(self):
- assert from_param(datetime.time, '12:14:56') == \
- datetime.time(12, 14, 56)
-
- def test_from_param_datetime(self):
- assert from_param(datetime.datetime, '2009-12-23T12:14:56') == \
- datetime.datetime(2009, 12, 23, 12, 14, 56)
-
- def test_from_param_usertype(self):
- assert from_param(MyUserType(), 'test') == 'test'
-
- def test_from_params_empty(self):
- assert from_params(str, {}, '', set()) is Unset
-
- def test_from_params_native_array(self):
- class params(dict):
- def getall(self, path):
- return ['1', '2']
- p = params({'a': []})
- assert from_params(ArrayType(int), p, 'a', set()) == [1, 2]
-
- def test_from_params_empty_array(self):
- assert from_params(ArrayType(int), {}, 'a', set()) is Unset
-
- def test_from_params_dict(self):
- value = from_params(
- DictType(int, str),
- {'a[2]': 'a2', 'a[3]': 'a3'},
- 'a',
- set()
- )
- assert value == {2: 'a2', 3: 'a3'}, value
-
- def test_from_params_dict_unset(self):
- assert from_params(DictType(int, str), {}, 'a', set()) is Unset
-
- def test_from_params_usertype(self):
- value = from_params(
- DictBasedUserType(),
- {'a[2]': '2'},
- 'a',
- set()
- )
- self.assertEqual(value, {2: 2})
-
- def test_args_from_args_usertype(self):
-
- class FakeType(UserType):
- name = 'fake-type'
- basetype = int
-
- fake_type = FakeType()
- fd = FunctionDefinition(FunctionDefinition)
- fd.arguments.append(FunctionArgument('fake-arg', fake_type, True, 0))
-
- new_args = args_from_args(fd, [1], {})
- self.assertEqual([1], new_args[0])
-
- # can't convert str to int
- try:
- args_from_args(fd, ['invalid-argument'], {})
- except InvalidInput as e:
- assert fake_type.name in str(e)
- else:
- self.fail('Should have thrown an InvalidInput')
-
- def test_args_from_args_custom_exc(self):
-
- class FakeType(UserType):
- name = 'fake-type'
- basetype = int
-
- def validate(self, value):
- if value < 10:
- raise ValueError('should be greater than 10')
-
- def frombasetype(self, value):
- self.validate(value)
-
- fake_type = FakeType()
- fd = FunctionDefinition(FunctionDefinition)
- fd.arguments.append(FunctionArgument('fake-arg', fake_type, True, 0))
-
- try:
- args_from_args(fd, [9], {})
- except InvalidInput as e:
- assert fake_type.name in str(e)
- assert 'Error: should be greater than 10' in str(e)
- else:
- self.fail('Should have thrown an InvalidInput')
-
- def test_args_from_args_array_type(self):
- fake_type = ArrayType(MyBaseType)
- fd = FunctionDefinition(FunctionDefinition)
- fd.arguments.append(FunctionArgument('fake-arg', fake_type, True, []))
- try:
- args_from_args(fd, [['invalid-argument']], {})
- except InvalidInput as e:
- assert ArrayType.__name__ in str(e)
- else:
- self.fail('Should have thrown an InvalidInput')
-
-
-class ArgTypeConversion(unittest.TestCase):
-
- def test_int_zero(self):
- self.assertEqual(0, from_param(int, 0))
- self.assertEqual(0, from_param(int, '0'))
-
- def test_int_nonzero(self):
- self.assertEqual(1, from_param(int, 1))
- self.assertEqual(1, from_param(int, '1'))
-
- def test_int_none(self):
- self.assertEqual(None, from_param(int, None))
-
- def test_float_zero(self):
- self.assertEqual(0.0, from_param(float, 0))
- self.assertEqual(0.0, from_param(float, 0.0))
- self.assertEqual(0.0, from_param(float, '0'))
- self.assertEqual(0.0, from_param(float, '0.0'))
-
- def test_float_nonzero(self):
- self.assertEqual(1.0, from_param(float, 1))
- self.assertEqual(1.0, from_param(float, 1.0))
- self.assertEqual(1.0, from_param(float, '1'))
- self.assertEqual(1.0, from_param(float, '1.0'))
-
- def test_float_none(self):
- self.assertEqual(None, from_param(float, None))
diff --git a/wsme/tests/test_restjson.py b/wsme/tests/test_restjson.py
deleted file mode 100644
index 7afbb86..0000000
--- a/wsme/tests/test_restjson.py
+++ /dev/null
@@ -1,779 +0,0 @@
-import base64
-import datetime
-import decimal
-
-import wsme.tests.protocol
-
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
-
-from wsme.rest.json import fromjson, tojson, parse
-from wsme.utils import parse_isodatetime, parse_isotime, parse_isodate
-from wsme.types import isarray, isdict, isusertype, register_type
-from wsme.types import UserType, ArrayType, DictType
-from wsme.rest import expose, validate
-from wsme.exc import ClientSideError, InvalidInput
-
-
-import six
-from six import b, u
-
-if six.PY3:
- from urllib.parse import urlencode
-else:
- from urllib import urlencode # noqa
-
-
-def prepare_value(value, datatype):
- if isinstance(datatype, list):
- return [prepare_value(item, datatype[0]) for item in value]
- if isinstance(datatype, dict):
- key_type, value_type = list(datatype.items())[0]
- return dict((
- (prepare_value(item[0], key_type),
- prepare_value(item[1], value_type))
- for item in value.items()
- ))
- if datatype in (datetime.date, datetime.time, datetime.datetime):
- return value.isoformat()
- if datatype == decimal.Decimal:
- return str(value)
- if datatype == wsme.types.binary:
- return base64.encodestring(value).decode('ascii')
- if datatype == wsme.types.bytes:
- return value.decode('ascii')
- return value
-
-
-def prepare_result(value, datatype):
- print(value, datatype)
- if value is None:
- return None
- if datatype == wsme.types.binary:
- return base64.decodestring(value.encode('ascii'))
- if isusertype(datatype):
- datatype = datatype.basetype
- if isinstance(datatype, list):
- return [prepare_result(item, datatype[0]) for item in value]
- if isarray(datatype):
- return [prepare_result(item, datatype.item_type) for item in value]
- if isinstance(datatype, dict):
- return dict((
- (prepare_result(item[0], list(datatype.keys())[0]),
- prepare_result(item[1], list(datatype.values())[0]))
- for item in value.items()
- ))
- if isdict(datatype):
- return dict((
- (prepare_result(item[0], datatype.key_type),
- prepare_result(item[1], datatype.value_type))
- for item in value.items()
- ))
- if datatype == datetime.date:
- return parse_isodate(value)
- if datatype == datetime.time:
- return parse_isotime(value)
- if datatype == datetime.datetime:
- return parse_isodatetime(value)
- if hasattr(datatype, '_wsme_attributes'):
- for attr in datatype._wsme_attributes:
- if attr.key not in value:
- continue
- value[attr.key] = prepare_result(value[attr.key], attr.datatype)
- return value
- if datatype == wsme.types.bytes:
- return value.encode('ascii')
- if type(value) != datatype:
- print(type(value), datatype)
- return datatype(value)
- return value
-
-
-class CustomInt(UserType):
- basetype = int
- name = "custom integer"
-
-
-class Obj(wsme.types.Base):
- id = int
- name = wsme.types.text
-
-
-class NestedObj(wsme.types.Base):
- o = Obj
-
-
-class CRUDResult(object):
- data = Obj
- message = wsme.types.text
-
- def __init__(self, data=wsme.types.Unset, message=wsme.types.Unset):
- self.data = data
- self.message = message
-
-
-class MiniCrud(object):
- @expose(CRUDResult, method='PUT')
- @validate(Obj)
- def create(self, data):
- print(repr(data))
- return CRUDResult(data, u('create'))
-
- @expose(CRUDResult, method='GET', ignore_extra_args=True)
- @validate(Obj)
- def read(self, ref):
- print(repr(ref))
- if ref.id == 1:
- ref.name = u('test')
- return CRUDResult(ref, u('read'))
-
- @expose(CRUDResult, method='POST')
- @validate(Obj)
- def update(self, data):
- print(repr(data))
- return CRUDResult(data, u('update'))
-
- @expose(CRUDResult, wsme.types.text, body=Obj)
- def update_with_body(self, msg, data):
- print(repr(data))
- return CRUDResult(data, msg)
-
- @expose(CRUDResult, method='DELETE')
- @validate(Obj)
- def delete(self, ref):
- print(repr(ref))
- if ref.id == 1:
- ref.name = u('test')
- return CRUDResult(ref, u('delete'))
-
-
-wsme.tests.protocol.WSTestRoot.crud = MiniCrud()
-
-
-class TestRestJson(wsme.tests.protocol.RestOnlyProtocolTestCase):
- protocol = 'restjson'
-
- def call(self, fpath, _rt=None, _accept=None, _no_result_decode=False,
- body=None, **kw):
- if body:
- if isinstance(body, tuple):
- body, datatype = body
- else:
- datatype = type(body)
- body = prepare_value(body, datatype)
- content = json.dumps(body)
- else:
- for key in kw:
- if isinstance(kw[key], tuple):
- value, datatype = kw[key]
- else:
- value = kw[key]
- datatype = type(value)
- kw[key] = prepare_value(value, datatype)
- content = json.dumps(kw)
- headers = {
- 'Content-Type': 'application/json',
- }
- if _accept is not None:
- headers["Accept"] = _accept
- res = self.app.post(
- '/' + fpath,
- content,
- headers=headers,
- expect_errors=True)
- print("Received:", res.body)
-
- if _no_result_decode:
- return res
-
- r = json.loads(res.text)
- if res.status_int == 200:
- if _rt and r:
- r = prepare_result(r, _rt)
- return r
- else:
- raise wsme.tests.protocol.CallException(
- r['faultcode'],
- r['faultstring'],
- r.get('debuginfo')
- )
-
- return json.loads(res.text)
-
- def test_fromjson(self):
- assert fromjson(str, None) is None
-
- def test_keyargs(self):
- r = self.app.get('/argtypes/setint.json?value=2')
- print(r)
- assert json.loads(r.text) == 2
-
- nestedarray = 'value[0].inner.aint=54&value[1].inner.aint=55'
- r = self.app.get('/argtypes/setnestedarray.json?' + nestedarray)
- print(r)
- assert json.loads(r.text) == [
- {'inner': {'aint': 54}},
- {'inner': {'aint': 55}}]
-
- def test_form_urlencoded_args(self):
- params = {
- 'value[0].inner.aint': 54,
- 'value[1].inner.aint': 55
- }
- body = urlencode(params)
- r = self.app.post(
- '/argtypes/setnestedarray.json',
- body,
- headers={'Content-Type': 'application/x-www-form-urlencoded'}
- )
- print(r)
-
- assert json.loads(r.text) == [
- {'inner': {'aint': 54}},
- {'inner': {'aint': 55}}]
-
- def test_body_and_params(self):
- r = self.app.post('/argtypes/setint.json?value=2', '{"value": 2}',
- headers={"Content-Type": "application/json"},
- expect_errors=True)
- print(r)
- assert r.status_int == 400
- assert json.loads(r.text)['faultstring'] == \
- "Parameter value was given several times"
-
- def test_inline_body(self):
- params = urlencode({'__body__': '{"value": 4}'})
- r = self.app.get('/argtypes/setint.json?' + params)
- print(r)
- assert json.loads(r.text) == 4
-
- def test_empty_body(self):
- params = urlencode({'__body__': ''})
- r = self.app.get('/returntypes/getint.json?' + params)
- print(r)
- assert json.loads(r.text) == 2
-
- def test_invalid_content_type_body(self):
- r = self.app.post('/argtypes/setint.json', '{"value": 2}',
- headers={"Content-Type": "application/invalid"},
- expect_errors=True)
- print(r)
- assert r.status_int == 415
- assert json.loads(r.text)['faultstring'] == \
- "Unknown mimetype: application/invalid"
-
- def test_invalid_json_body(self):
- r = self.app.post('/argtypes/setint.json', '{"value": 2',
- headers={"Content-Type": "application/json"},
- expect_errors=True)
- print(r)
- assert r.status_int == 400
- assert json.loads(r.text)['faultstring'] == \
- "Request is not in valid JSON format"
-
- def test_unknown_arg(self):
- r = self.app.post('/returntypes/getint.json', '{"a": 2}',
- headers={"Content-Type": "application/json"},
- expect_errors=True)
- print(r)
- assert r.status_int == 400
- assert json.loads(r.text)['faultstring'].startswith(
- "Unknown argument:"
- )
-
- r = self.app.get('/returntypes/getint.json?a=2', expect_errors=True)
- print(r)
- assert r.status_int == 400
- assert json.loads(r.text)['faultstring'].startswith(
- "Unknown argument:"
- )
-
- def test_set_custom_object(self):
- r = self.app.post(
- '/argtypes/setcustomobject',
- '{"value": {"aint": 2, "name": "test"}}',
- headers={"Content-Type": "application/json"}
- )
- self.assertEqual(r.status_int, 200)
- self.assertEqual(r.json, {'aint': 2, 'name': 'test'})
-
- def test_set_extended_int(self):
- r = self.app.post(
- '/argtypes/setextendedint',
- '{"value": 3}',
- headers={"Content-Type": "application/json"}
- )
- self.assertEqual(r.status_int, 200)
- self.assertEqual(r.json, 3)
-
- def test_unset_attrs(self):
- class AType(object):
- attr = int
-
- wsme.types.register_type(AType)
-
- j = tojson(AType, AType())
- assert j == {}
-
- def test_array_tojson(self):
- assert tojson([int], None) is None
- assert tojson([int], []) == []
- assert tojson([str], ['1', '4']) == ['1', '4']
-
- def test_dict_tojson(self):
- assert tojson({int: str}, None) is None
- assert tojson({int: str}, {5: '5'}) == {5: '5'}
-
- def test_None_tojson(self):
- for dt in (datetime.date, datetime.time, datetime.datetime,
- decimal.Decimal):
- assert tojson(dt, None) is None
-
- def test_None_fromjson(self):
- for dt in (str, int, datetime.date, datetime.time, datetime.datetime,
- decimal.Decimal, [int], {int: int}):
- assert fromjson(dt, None) is None
-
- def test_parse_valid_date(self):
- j = parse('{"a": "2011-01-01"}', {'a': datetime.date}, False)
- assert isinstance(j['a'], datetime.date)
-
- def test_invalid_root_dict_fromjson(self):
- try:
- parse('["invalid"]', {'a': ArrayType(str)}, False)
- assert False
- except Exception as e:
- assert isinstance(e, ClientSideError)
- assert e.msg == "Request must be a JSON dict"
-
- def test_invalid_list_fromjson(self):
- jlist = "invalid"
- try:
- parse('{"a": "%s"}' % jlist, {'a': ArrayType(str)}, False)
- assert False
- except Exception as e:
- assert isinstance(e, InvalidInput)
- assert e.fieldname == 'a'
- assert e.value == jlist
- assert e.msg == "Value not a valid list: %s" % jlist
-
- def test_invalid_dict_fromjson(self):
- jdict = "invalid"
- try:
- parse('{"a": "%s"}' % jdict, {'a': DictType(str, str)}, False)
- assert False
- except Exception as e:
- assert isinstance(e, InvalidInput)
- assert e.fieldname == 'a'
- assert e.value == jdict
- assert e.msg == "Value not a valid dict: %s" % jdict
-
- def test_invalid_date_fromjson(self):
- jdate = "2015-01-invalid"
- try:
- parse('{"a": "%s"}' % jdate, {'a': datetime.date}, False)
- assert False
- except Exception as e:
- assert isinstance(e, InvalidInput)
- assert e.fieldname == 'a'
- assert e.value == jdate
- assert e.msg == "'%s' is not a legal date value" % jdate
-
- def test_parse_valid_date_bodyarg(self):
- j = parse('"2011-01-01"', {'a': datetime.date}, True)
- assert isinstance(j['a'], datetime.date)
-
- def test_invalid_date_fromjson_bodyarg(self):
- jdate = "2015-01-invalid"
- try:
- parse('"%s"' % jdate, {'a': datetime.date}, True)
- assert False
- except Exception as e:
- assert isinstance(e, InvalidInput)
- assert e.fieldname == 'a'
- assert e.value == jdate
- assert e.msg == "'%s' is not a legal date value" % jdate
-
- def test_valid_str_to_builtin_fromjson(self):
- types = six.integer_types + (bool, float)
- value = '2'
- for t in types:
- for ba in True, False:
- jd = '%s' if ba else '{"a": %s}'
- i = parse(jd % value, {'a': t}, ba)
- self.assertEqual(
- i, {'a': t(value)},
- "Parsed value does not correspond for %s: "
- "%s != {'a': %s}" % (
- t, repr(i), repr(t(value))
- )
- )
- self.assertIsInstance(i['a'], t)
-
- def test_valid_int_fromjson(self):
- value = 2
- for ba in True, False:
- jd = '%d' if ba else '{"a": %d}'
- i = parse(jd % value, {'a': int}, ba)
- self.assertEqual(i, {'a': 2})
- self.assertIsInstance(i['a'], int)
-
- def test_valid_num_to_float_fromjson(self):
- values = 2, 2.3
- for v in values:
- for ba in True, False:
- jd = '%f' if ba else '{"a": %f}'
- i = parse(jd % v, {'a': float}, ba)
- self.assertEqual(i, {'a': float(v)})
- self.assertIsInstance(i['a'], float)
-
- def test_invalid_str_to_buitin_fromjson(self):
- types = six.integer_types + (float, bool)
- value = '2a'
- for t in types:
- for ba in True, False:
- jd = '"%s"' if ba else '{"a": "%s"}'
- try:
- parse(jd % value, {'a': t}, ba)
- assert False, (
- "Value '%s' should not parse correctly for %s." %
- (value, t)
- )
- except ClientSideError as e:
- self.assertIsInstance(e, InvalidInput)
- self.assertEqual(e.fieldname, 'a')
- self.assertEqual(e.value, value)
-
- def test_ambiguous_to_bool(self):
- amb_values = ('', 'randomstring', '2', '-32', 'not true')
- for value in amb_values:
- for ba in True, False:
- jd = '"%s"' if ba else '{"a": "%s"}'
- try:
- parse(jd % value, {'a': bool}, ba)
- assert False, (
- "Value '%s' should not parse correctly for %s." %
- (value, bool)
- )
- except ClientSideError as e:
- self.assertIsInstance(e, InvalidInput)
- self.assertEqual(e.fieldname, 'a')
- self.assertEqual(e.value, value)
-
- def test_true_strings_to_bool(self):
- true_values = ('true', 't', 'yes', 'y', 'on', '1')
- for value in true_values:
- for ba in True, False:
- jd = '"%s"' if ba else '{"a": "%s"}'
- i = parse(jd % value, {'a': bool}, ba)
- self.assertIsInstance(i['a'], bool)
- self.assertTrue(i['a'])
-
- def test_false_strings_to_bool(self):
- false_values = ('false', 'f', 'no', 'n', 'off', '0')
- for value in false_values:
- for ba in True, False:
- jd = '"%s"' if ba else '{"a": "%s"}'
- i = parse(jd % value, {'a': bool}, ba)
- self.assertIsInstance(i['a'], bool)
- self.assertFalse(i['a'])
-
- def test_true_ints_to_bool(self):
- true_values = (1, 5, -3)
- for value in true_values:
- for ba in True, False:
- jd = '%d' if ba else '{"a": %d}'
- i = parse(jd % value, {'a': bool}, ba)
- self.assertIsInstance(i['a'], bool)
- self.assertTrue(i['a'])
-
- def test_false_ints_to_bool(self):
- value = 0
- for ba in True, False:
- jd = '%d' if ba else '{"a": %d}'
- i = parse(jd % value, {'a': bool}, ba)
- self.assertIsInstance(i['a'], bool)
- self.assertFalse(i['a'])
-
- def test_valid_simple_custom_type_fromjson(self):
- value = 2
- for ba in True, False:
- jd = '"%d"' if ba else '{"a": "%d"}'
- i = parse(jd % value, {'a': CustomInt()}, ba)
- self.assertEqual(i, {'a': 2})
- self.assertIsInstance(i['a'], int)
-
- def test_invalid_simple_custom_type_fromjson(self):
- value = '2b'
- for ba in True, False:
- jd = '"%s"' if ba else '{"a": "%s"}'
- try:
- i = parse(jd % value, {'a': CustomInt()}, ba)
- self.assertEqual(i, {'a': 2})
- except ClientSideError as e:
- self.assertIsInstance(e, InvalidInput)
- self.assertEqual(e.fieldname, 'a')
- self.assertEqual(e.value, value)
- self.assertEqual(
- e.msg,
- "invalid literal for int() with base 10: '%s'" % value
- )
-
- def test_parse_unexpected_attribute(self):
- o = {
- "id": "1",
- "name": "test",
- "other": "unknown",
- "other2": "still unknown",
- }
- for ba in True, False:
- jd = o if ba else {"o": o}
- try:
- parse(json.dumps(jd), {'o': Obj}, ba)
- raise AssertionError("Object should not parse correcty.")
- except wsme.exc.UnknownAttribute as e:
- self.assertEqual(e.attributes, set(['other', 'other2']))
-
- def test_parse_unexpected_nested_attribute(self):
- no = {
- "o": {
- "id": "1",
- "name": "test",
- "other": "unknown",
- },
- }
- for ba in False, True:
- jd = no if ba else {"no": no}
- try:
- parse(json.dumps(jd), {'no': NestedObj}, ba)
- except wsme.exc.UnknownAttribute as e:
- self.assertEqual(e.attributes, set(['other']))
- self.assertEqual(e.fieldname, "no.o")
-
- def test_nest_result(self):
- self.root.protocols[0].nest_result = True
- r = self.app.get('/returntypes/getint.json')
- print(r)
- assert json.loads(r.text) == {"result": 2}
-
- def test_encode_sample_value(self):
- class MyType(object):
- aint = int
- astr = str
-
- register_type(MyType)
-
- v = MyType()
- v.aint = 4
- v.astr = 's'
-
- r = wsme.rest.json.encode_sample_value(MyType, v, True)
- print(r)
- assert r[0] == ('javascript')
- assert r[1] == json.dumps({'aint': 4, 'astr': 's'}, ensure_ascii=False,
- indent=4, sort_keys=True)
-
- def test_bytes_tojson(self):
- assert tojson(wsme.types.bytes, None) is None
- assert tojson(wsme.types.bytes, b('ascii')) == u('ascii')
-
- def test_encode_sample_params(self):
- r = wsme.rest.json.encode_sample_params(
- [('a', int, 2)], True
- )
- assert r[0] == 'javascript', r[0]
- assert r[1] == '''{
- "a": 2
-}''', r[1]
-
- def test_encode_sample_result(self):
- r = wsme.rest.json.encode_sample_result(
- int, 2, True
- )
- assert r[0] == 'javascript', r[0]
- assert r[1] == '''2'''
-
- def test_PUT(self):
- data = {"id": 1, "name": u("test")}
- content = json.dumps(dict(data=data))
- headers = {
- 'Content-Type': 'application/json',
- }
- res = self.app.put(
- '/crud',
- content,
- headers=headers,
- expect_errors=False)
- print("Received:", res.body)
- result = json.loads(res.text)
- print(result)
- assert result['data']['id'] == 1
- assert result['data']['name'] == u("test")
- assert result['message'] == "create"
-
- def test_GET(self):
- headers = {
- 'Accept': 'application/json',
- }
- res = self.app.get(
- '/crud?ref.id=1',
- headers=headers,
- expect_errors=False)
- print("Received:", res.body)
- result = json.loads(res.text)
- print(result)
- assert result['data']['id'] == 1
- assert result['data']['name'] == u("test")
-
- def test_GET_complex_accept(self):
- headers = {
- 'Accept': 'text/html,application/xml;q=0.9,*/*;q=0.8'
- }
- res = self.app.get(
- '/crud?ref.id=1',
- headers=headers,
- expect_errors=False)
- print("Received:", res.body)
- result = json.loads(res.text)
- print(result)
- assert result['data']['id'] == 1
- assert result['data']['name'] == u("test")
-
- def test_GET_complex_choose_xml(self):
- headers = {
- 'Accept': 'text/html,text/xml;q=0.9,*/*;q=0.8'
- }
- res = self.app.get(
- '/crud?ref.id=1',
- headers=headers,
- expect_errors=False)
- print("Received:", res.body)
- assert res.content_type == 'text/xml'
-
- def test_GET_complex_accept_no_match(self):
- headers = {
- 'Accept': 'text/html,application/xml;q=0.9'
- }
- res = self.app.get(
- '/crud?ref.id=1',
- headers=headers,
- status=406)
- print("Received:", res.body)
- assert res.body == b("Unacceptable Accept type: "
- "text/html, application/xml;q=0.9 not in "
- "['application/json', 'text/javascript', "
- "'application/javascript', 'text/xml']")
-
- def test_GET_bad_simple_accept(self):
- headers = {
- 'Accept': 'text/plain',
- }
- res = self.app.get(
- '/crud?ref.id=1',
- headers=headers,
- status=406)
- print("Received:", res.body)
- assert res.body == b("Unacceptable Accept type: text/plain not in "
- "['application/json', 'text/javascript', "
- "'application/javascript', 'text/xml']")
-
- def test_POST(self):
- headers = {
- 'Content-Type': 'application/json',
- }
- res = self.app.post(
- '/crud',
- json.dumps(dict(data=dict(id=1, name=u('test')))),
- headers=headers,
- expect_errors=False)
- print("Received:", res.body)
- result = json.loads(res.text)
- print(result)
- assert result['data']['id'] == 1
- assert result['data']['name'] == u("test")
- assert result['message'] == "update"
-
- def test_POST_bad_content_type(self):
- headers = {
- 'Content-Type': 'text/plain',
- }
- res = self.app.post(
- '/crud',
- json.dumps(dict(data=dict(id=1, name=u('test')))),
- headers=headers,
- status=415)
- print("Received:", res.body)
- assert res.body == b("Unacceptable Content-Type: text/plain not in "
- "['application/json', 'text/javascript', "
- "'application/javascript', 'text/xml']")
-
- def test_DELETE(self):
- res = self.app.delete(
- '/crud.json?ref.id=1',
- expect_errors=False)
- print("Received:", res.body)
- result = json.loads(res.text)
- print(result)
- assert result['data']['id'] == 1
- assert result['data']['name'] == u("test")
- assert result['message'] == "delete"
-
- def test_extra_arguments(self):
- headers = {
- 'Accept': 'application/json',
- }
- res = self.app.get(
- '/crud?ref.id=1&extraarg=foo',
- headers=headers,
- expect_errors=False)
- print("Received:", res.body)
- result = json.loads(res.text)
- print(result)
- assert result['data']['id'] == 1
- assert result['data']['name'] == u("test")
- assert result['message'] == "read"
-
- def test_unexpected_extra_arg(self):
- headers = {
- 'Content-Type': 'application/json',
- }
- data = {"id": 1, "name": "test"}
- content = json.dumps({"data": data, "other": "unexpected"})
- res = self.app.put(
- '/crud',
- content,
- headers=headers,
- expect_errors=True)
- self.assertEqual(res.status_int, 400)
-
- def test_unexpected_extra_attribute(self):
- """Expect a failure if we send an unexpected object attribute."""
- headers = {
- 'Content-Type': 'application/json',
- }
- data = {"id": 1, "name": "test", "other": "unexpected"}
- content = json.dumps({"data": data})
- res = self.app.put(
- '/crud',
- content,
- headers=headers,
- expect_errors=True)
- self.assertEqual(res.status_int, 400)
-
- def test_body_arg(self):
- headers = {
- 'Content-Type': 'application/json',
- }
- res = self.app.post(
- '/crud/update_with_body?msg=hello',
- json.dumps(dict(id=1, name=u('test'))),
- headers=headers,
- expect_errors=False)
- print("Received:", res.body)
- result = json.loads(res.text)
- print(result)
- assert result['data']['id'] == 1
- assert result['data']['name'] == u("test")
- assert result['message'] == "hello"
diff --git a/wsme/tests/test_restxml.py b/wsme/tests/test_restxml.py
deleted file mode 100644
index b677a65..0000000
--- a/wsme/tests/test_restxml.py
+++ /dev/null
@@ -1,211 +0,0 @@
-import decimal
-import datetime
-import base64
-
-from six import u, b
-import six
-
-import wsme.tests.protocol
-from wsme.utils import parse_isodatetime, parse_isodate, parse_isotime
-from wsme.types import isarray, isdict, isusertype, register_type
-
-from wsme.rest.xml import fromxml, toxml
-
-try:
- import xml.etree.ElementTree as et
-except ImportError:
- import cElementTree as et # noqa
-
-
-def dumpxml(key, obj, datatype=None):
- el = et.Element(key)
- if isinstance(obj, tuple):
- obj, datatype = obj
- if isinstance(datatype, list):
- for item in obj:
- el.append(dumpxml('item', item, datatype[0]))
- elif isinstance(datatype, dict):
- key_type, value_type = list(datatype.items())[0]
- for item in obj.items():
- node = et.SubElement(el, 'item')
- node.append(dumpxml('key', item[0], key_type))
- node.append(dumpxml('value', item[1], value_type))
- elif datatype == wsme.types.binary:
- el.text = base64.encodestring(obj).decode('ascii')
- elif isinstance(obj, wsme.types.bytes):
- el.text = obj.decode('ascii')
- elif isinstance(obj, wsme.types.text):
- el.text = obj
- elif type(obj) in (int, float, bool, decimal.Decimal):
- el.text = six.text_type(obj)
- elif type(obj) in (datetime.date, datetime.time, datetime.datetime):
- el.text = obj.isoformat()
- elif isinstance(obj, type(None)):
- el.set('nil', 'true')
- elif hasattr(datatype, '_wsme_attributes'):
- for attr in datatype._wsme_attributes:
- name = attr.name
- if name not in obj:
- continue
- o = obj[name]
- el.append(dumpxml(name, o, attr.datatype))
- elif type(obj) == dict:
- for name, value in obj.items():
- el.append(dumpxml(name, value))
- print(obj, datatype, et.tostring(el))
- return el
-
-
-def loadxml(el, datatype):
- print(el, datatype, len(el))
- if el.get('nil') == 'true':
- return None
- if isinstance(datatype, list):
- return [loadxml(item, datatype[0]) for item in el.findall('item')]
- elif isarray(datatype):
- return [
- loadxml(item, datatype.item_type) for item in el.findall('item')
- ]
- elif isinstance(datatype, dict):
- key_type, value_type = list(datatype.items())[0]
- return dict((
- (loadxml(item.find('key'), key_type),
- loadxml(item.find('value'), value_type))
- for item in el.findall('item')
- ))
- elif isdict(datatype):
- return dict((
- (loadxml(item.find('key'), datatype.key_type),
- loadxml(item.find('value'), datatype.value_type))
- for item in el.findall('item')
- ))
- elif isdict(datatype):
- return dict((
- (loadxml(item.find('key'), datatype.key_type),
- loadxml(item.find('value'), datatype.value_type))
- for item in el.findall('item')
- ))
- elif len(el):
- d = {}
- for attr in datatype._wsme_attributes:
- name = attr.name
- child = el.find(name)
- print(name, attr, child)
- if child is not None:
- d[name] = loadxml(child, attr.datatype)
- print(d)
- return d
- else:
- if datatype == wsme.types.binary:
- return base64.decodestring(el.text.encode('ascii'))
- if isusertype(datatype):
- datatype = datatype.basetype
- if datatype == datetime.date:
- return parse_isodate(el.text)
- if datatype == datetime.time:
- return parse_isotime(el.text)
- if datatype == datetime.datetime:
- return parse_isodatetime(el.text)
- if datatype == wsme.types.text:
- return datatype(el.text if el.text else u(''))
- if datatype == bool:
- return el.text.lower() != 'false'
- if datatype is None:
- return el.text
- if datatype is wsme.types.bytes:
- return el.text.encode('ascii')
- return datatype(el.text)
-
-
-class TestRestXML(wsme.tests.protocol.RestOnlyProtocolTestCase):
- protocol = 'restxml'
-
- def call(self, fpath, _rt=None, _accept=None, _no_result_decode=False,
- body=None, **kw):
- if body:
- el = dumpxml('body', body)
- else:
- el = dumpxml('parameters', kw)
- content = et.tostring(el)
- headers = {
- 'Content-Type': 'text/xml',
- }
- if _accept is not None:
- headers['Accept'] = _accept
- res = self.app.post(
- '/' + fpath,
- content,
- headers=headers,
- expect_errors=True)
- print("Received:", res.body)
-
- if _no_result_decode:
- return res
-
- el = et.fromstring(res.body)
- if el.tag == 'error':
- raise wsme.tests.protocol.CallException(
- el.find('faultcode').text,
- el.find('faultstring').text,
- el.find('debuginfo') is not None and
- el.find('debuginfo').text or None
- )
-
- else:
- return loadxml(et.fromstring(res.body), _rt)
-
- def test_encode_sample_value(self):
- class MyType(object):
- aint = int
- atext = wsme.types.text
-
- register_type(MyType)
-
- value = MyType()
- value.aint = 5
- value.atext = u('test')
-
- language, sample = wsme.rest.xml.encode_sample_value(
- MyType, value, True)
- print(language, sample)
-
- assert language == 'xml'
- assert sample == b("""<value>
- <aint>5</aint>
- <atext>test</atext>
-</value>""")
-
- def test_encode_sample_params(self):
- lang, content = wsme.rest.xml.encode_sample_params(
- [('a', int, 2)], True)
- assert lang == 'xml', lang
- assert content == b('<parameters>\n <a>2</a>\n</parameters>'), content
-
- def test_encode_sample_result(self):
- lang, content = wsme.rest.xml.encode_sample_result(int, 2, True)
- assert lang == 'xml', lang
- assert content == b('<result>2</result>'), content
-
- def test_nil_fromxml(self):
- for dt in (
- str, [int], {int: str}, bool,
- datetime.date, datetime.time, datetime.datetime):
- e = et.Element('value', nil='true')
- assert fromxml(dt, e) is None
-
- def test_nil_toxml(self):
- for dt in (
- wsme.types.bytes,
- [int], {int: str}, bool,
- datetime.date, datetime.time, datetime.datetime):
- x = et.tostring(toxml(dt, 'value', None))
- assert x == b('<value nil="true" />'), x
-
- def test_unset_attrs(self):
- class AType(object):
- someattr = wsme.types.bytes
-
- wsme.types.register_type(AType)
-
- x = et.tostring(toxml(AType, 'value', AType()))
- assert x == b('<value />'), x
diff --git a/wsme/tests/test_root.py b/wsme/tests/test_root.py
deleted file mode 100644
index 8ccb0c1..0000000
--- a/wsme/tests/test_root.py
+++ /dev/null
@@ -1,122 +0,0 @@
-# encoding=utf8
-
-import unittest
-
-from wsme import WSRoot
-import wsme.protocol
-import wsme.rest.protocol
-from wsme.root import default_prepare_response_body
-
-from six import b, u
-from webob import Request
-
-
-class TestRoot(unittest.TestCase):
- def test_default_transaction(self):
- import transaction
- root = WSRoot(transaction=True)
- assert root._transaction is transaction
-
- txn = root.begin()
- txn.abort()
-
- def test_default_prepare_response_body(self):
- default_prepare_response_body(None, [b('a')]) == b('a')
- default_prepare_response_body(None, [b('a'), b('b')]) == b('a\nb')
- default_prepare_response_body(None, [u('a')]) == u('a')
- default_prepare_response_body(None, [u('a'), u('b')]) == u('a\nb')
-
- def test_protocol_selection_error(self):
- class P(wsme.protocol.Protocol):
- name = "test"
-
- def accept(self, r):
- raise Exception('test')
-
- root = WSRoot()
- root.addprotocol(P())
-
- from webob import Request
- req = Request.blank('/test?check=a&check=b&name=Bob')
- res = root._handle_request(req)
- assert res.status_int == 500
- assert res.content_type == 'text/plain'
- assert (res.text ==
- 'Unexpected error while selecting protocol: test'), req.text
-
- def test_protocol_selection_accept_mismatch(self):
- """Verify that we get a 406 error on wrong Accept header."""
- class P(wsme.protocol.Protocol):
- name = "test"
-
- def accept(self, r):
- return False
-
- root = WSRoot()
- root.addprotocol(wsme.rest.protocol.RestProtocol())
- root.addprotocol(P())
-
- req = Request.blank('/test?check=a&check=b&name=Bob')
- req.method = 'GET'
- res = root._handle_request(req)
- assert res.status_int == 406
- assert res.content_type == 'text/plain'
- assert res.text.startswith(
- 'None of the following protocols can handle this request'
- ), req.text
-
- def test_protocol_selection_content_type_mismatch(self):
- """Verify that we get a 415 error on wrong Content-Type header."""
- class P(wsme.protocol.Protocol):
- name = "test"
-
- def accept(self, r):
- return False
-
- root = WSRoot()
- root.addprotocol(wsme.rest.protocol.RestProtocol())
- root.addprotocol(P())
-
- req = Request.blank('/test?check=a&check=b&name=Bob')
- req.method = 'POST'
- req.headers['Content-Type'] = "test/unsupported"
- res = root._handle_request(req)
- assert res.status_int == 415
- assert res.content_type == 'text/plain'
- assert res.text.startswith(
- 'Unacceptable Content-Type: test/unsupported not in'
- ), req.text
-
- def test_protocol_selection_get_method(self):
- class P(wsme.protocol.Protocol):
- name = "test"
-
- def accept(self, r):
- return True
-
- root = WSRoot()
- root.addprotocol(wsme.rest.protocol.RestProtocol())
- root.addprotocol(P())
-
- req = Request.blank('/test?check=a&check=b&name=Bob')
- req.method = 'GET'
- req.headers['Accept'] = 'test/fake'
- p = root._select_protocol(req)
- assert p.name == "test"
-
- def test_protocol_selection_post_method(self):
- class P(wsme.protocol.Protocol):
- name = "test"
-
- def accept(self, r):
- return True
-
- root = WSRoot()
- root.addprotocol(wsme.rest.protocol.RestProtocol())
- root.addprotocol(P())
-
- req = Request.blank('/test?check=a&check=b&name=Bob')
- req.headers['Content-Type'] = 'test/fake'
- req.method = 'POST'
- p = root._select_protocol(req)
- assert p.name == "test"
diff --git a/wsme/tests/test_spore.py b/wsme/tests/test_spore.py
deleted file mode 100644
index d6509f3..0000000
--- a/wsme/tests/test_spore.py
+++ /dev/null
@@ -1,51 +0,0 @@
-import unittest
-
-try:
- import simplejson as json
-except ImportError:
- import json
-
-from wsme.tests.protocol import WSTestRoot
-import wsme.tests.test_restjson
-import wsme.spore
-
-
-class TestSpore(unittest.TestCase):
- def test_spore(self):
- spore = wsme.spore.getdesc(WSTestRoot())
-
- print(spore)
-
- spore = json.loads(spore)
-
- assert len(spore['methods']) == 51, str(len(spore['methods']))
-
- m = spore['methods']['argtypes_setbytesarray']
- assert m['path'] == 'argtypes/setbytesarray', m['path']
- assert m['optional_params'] == ['value']
- assert m['method'] == 'POST'
-
- m = spore['methods']['argtypes_setdecimal']
- assert m['path'] == 'argtypes/setdecimal'
- assert m['required_params'] == ['value']
- assert m['method'] == 'GET'
-
- m = spore['methods']['crud_create']
- assert m['path'] == 'crud'
- assert m['method'] == 'PUT'
- assert m['optional_params'] == ['data']
-
- m = spore['methods']['crud_read']
- assert m['path'] == 'crud'
- assert m['method'] == 'GET'
- assert m['required_params'] == ['ref']
-
- m = spore['methods']['crud_update']
- assert m['path'] == 'crud'
- assert m['method'] == 'POST'
- assert m['optional_params'] == ['data']
-
- m = spore['methods']['crud_delete']
- assert m['path'] == 'crud'
- assert m['method'] == 'DELETE'
- assert m['optional_params'] == ['ref']
diff --git a/wsme/tests/test_types.py b/wsme/tests/test_types.py
deleted file mode 100644
index 4a07130..0000000
--- a/wsme/tests/test_types.py
+++ /dev/null
@@ -1,667 +0,0 @@
-import re
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
-import six
-
-from wsme import exc
-from wsme import types
-
-
-def gen_class():
- d = {}
- exec('''class tmp(object): pass''', d)
- return d['tmp']
-
-
-class TestTypes(unittest.TestCase):
- def setUp(self):
- types.registry = types.Registry()
-
- def test_default_usertype(self):
- class MyType(types.UserType):
- basetype = str
-
- My = MyType()
-
- assert My.validate('a') == 'a'
- assert My.tobasetype('a') == 'a'
- assert My.frombasetype('a') == 'a'
-
- def test_unset(self):
- u = types.Unset
-
- assert not u
-
- def test_flat_type(self):
- class Flat(object):
- aint = int
- abytes = six.binary_type
- atext = six.text_type
- afloat = float
-
- types.register_type(Flat)
-
- assert len(Flat._wsme_attributes) == 4
- attrs = Flat._wsme_attributes
- print(attrs)
-
- assert attrs[0].key == 'aint'
- assert attrs[0].name == 'aint'
- assert isinstance(attrs[0], types.wsattr)
- assert attrs[0].datatype == int
- assert attrs[0].mandatory is False
- assert attrs[1].key == 'abytes'
- assert attrs[1].name == 'abytes'
- assert attrs[2].key == 'atext'
- assert attrs[2].name == 'atext'
- assert attrs[3].key == 'afloat'
- assert attrs[3].name == 'afloat'
-
- def test_private_attr(self):
- class WithPrivateAttrs(object):
- _private = 12
-
- types.register_type(WithPrivateAttrs)
-
- assert len(WithPrivateAttrs._wsme_attributes) == 0
-
- def test_attribute_order(self):
- class ForcedOrder(object):
- _wsme_attr_order = ('a2', 'a1', 'a3')
- a1 = int
- a2 = int
- a3 = int
-
- types.register_type(ForcedOrder)
-
- print(ForcedOrder._wsme_attributes)
- assert ForcedOrder._wsme_attributes[0].key == 'a2'
- assert ForcedOrder._wsme_attributes[1].key == 'a1'
- assert ForcedOrder._wsme_attributes[2].key == 'a3'
-
- c = gen_class()
- print(c)
- types.register_type(c)
- del c._wsme_attributes
-
- c.a2 = int
- c.a1 = int
- c.a3 = int
-
- types.register_type(c)
-
- assert c._wsme_attributes[0].key == 'a1', c._wsme_attributes[0].key
- assert c._wsme_attributes[1].key == 'a2'
- assert c._wsme_attributes[2].key == 'a3'
-
- def test_wsproperty(self):
- class WithWSProp(object):
- def __init__(self):
- self._aint = 0
-
- def get_aint(self):
- return self._aint
-
- def set_aint(self, value):
- self._aint = value
-
- aint = types.wsproperty(int, get_aint, set_aint, mandatory=True)
-
- types.register_type(WithWSProp)
-
- print(WithWSProp._wsme_attributes)
- assert len(WithWSProp._wsme_attributes) == 1
- a = WithWSProp._wsme_attributes[0]
- assert a.key == 'aint'
- assert a.datatype == int
- assert a.mandatory
-
- o = WithWSProp()
- o.aint = 12
-
- assert o.aint == 12
-
- def test_nested(self):
- class Inner(object):
- aint = int
-
- class Outer(object):
- inner = Inner
-
- types.register_type(Outer)
-
- assert hasattr(Inner, '_wsme_attributes')
- assert len(Inner._wsme_attributes) == 1
-
- def test_inspect_with_inheritance(self):
- class Parent(object):
- parent_attribute = int
-
- class Child(Parent):
- child_attribute = int
-
- types.register_type(Parent)
- types.register_type(Child)
-
- assert len(Child._wsme_attributes) == 2
-
- def test_selfreftype(self):
- class SelfRefType(object):
- pass
-
- SelfRefType.parent = SelfRefType
-
- types.register_type(SelfRefType)
-
- def test_inspect_with_property(self):
- class AType(object):
- @property
- def test(self):
- return 'test'
-
- types.register_type(AType)
-
- assert len(AType._wsme_attributes) == 0
- assert AType().test == 'test'
-
- def test_enum(self):
- aenum = types.Enum(str, 'v1', 'v2')
- assert aenum.basetype is str
-
- class AType(object):
- a = aenum
-
- types.register_type(AType)
-
- assert AType.a.datatype is aenum
-
- obj = AType()
- obj.a = 'v1'
- assert obj.a == 'v1', repr(obj.a)
-
- self.assertRaisesRegexp(exc.InvalidInput,
- "Invalid input for field/attribute a. \
-Value: 'v3'. Value should be one of: v., v.",
- setattr,
- obj,
- 'a',
- 'v3')
-
- def test_attribute_validation(self):
- class AType(object):
- alist = [int]
- aint = int
-
- types.register_type(AType)
-
- obj = AType()
-
- obj.alist = [1, 2, 3]
- assert obj.alist == [1, 2, 3]
- obj.aint = 5
- assert obj.aint == 5
-
- self.assertRaises(exc.InvalidInput, setattr, obj, 'alist', 12)
- self.assertRaises(exc.InvalidInput, setattr, obj, 'alist', [2, 'a'])
-
- def test_attribute_validation_minimum(self):
- class ATypeInt(object):
- attr = types.IntegerType(minimum=1, maximum=5)
-
- types.register_type(ATypeInt)
-
- obj = ATypeInt()
- obj.attr = 2
-
- # comparison between 'zero' value and intger minimum (1) raises a
- # TypeError which must be wrapped into an InvalidInput exception
- self.assertRaises(exc.InvalidInput, setattr, obj, 'attr', 'zero')
-
- def test_text_attribute_conversion(self):
- class SType(object):
- atext = types.text
- abytes = types.bytes
-
- types.register_type(SType)
-
- obj = SType()
-
- obj.atext = six.b('somebytes')
- assert obj.atext == six.u('somebytes')
- assert isinstance(obj.atext, types.text)
-
- obj.abytes = six.u('sometext')
- assert obj.abytes == six.b('sometext')
- assert isinstance(obj.abytes, types.bytes)
-
- def test_named_attribute(self):
- class ABCDType(object):
- a_list = types.wsattr([int], name='a.list')
- astr = str
-
- types.register_type(ABCDType)
-
- assert len(ABCDType._wsme_attributes) == 2
- attrs = ABCDType._wsme_attributes
-
- assert attrs[0].key == 'a_list', attrs[0].key
- assert attrs[0].name == 'a.list', attrs[0].name
- assert attrs[1].key == 'astr', attrs[1].key
- assert attrs[1].name == 'astr', attrs[1].name
-
- def test_wsattr_del(self):
- class MyType(object):
- a = types.wsattr(int)
-
- types.register_type(MyType)
-
- value = MyType()
-
- value.a = 5
- assert value.a == 5
- del value.a
- assert value.a is types.Unset
-
- def test_validate_dict(self):
- assert types.validate_value({int: str}, {1: '1', 5: '5'})
-
- self.assertRaises(ValueError, types.validate_value,
- {int: str}, [])
-
- assert types.validate_value({int: str}, {'1': '1', 5: '5'})
-
- self.assertRaises(ValueError, types.validate_value,
- {int: str}, {1: 1, 5: '5'})
-
- def test_validate_list_valid(self):
- assert types.validate_value([int], [1, 2])
- assert types.validate_value([int], ['5'])
-
- def test_validate_list_empty(self):
- assert types.validate_value([int], []) == []
-
- def test_validate_list_none(self):
- v = types.ArrayType(int)
- assert v.validate(None) is None
-
- def test_validate_list_invalid_member(self):
- self.assertRaises(ValueError, types.validate_value, [int],
- ['not-a-number'])
-
- def test_validate_list_invalid_type(self):
- self.assertRaises(ValueError, types.validate_value, [int], 1)
-
- def test_validate_float(self):
- self.assertEqual(types.validate_value(float, 1), 1.0)
- self.assertEqual(types.validate_value(float, '1'), 1.0)
- self.assertEqual(types.validate_value(float, 1.1), 1.1)
- self.assertRaises(ValueError, types.validate_value, float, [])
- self.assertRaises(ValueError, types.validate_value, float,
- 'not-a-float')
-
- def test_validate_int(self):
- self.assertEqual(types.validate_value(int, 1), 1)
- self.assertEqual(types.validate_value(int, '1'), 1)
- self.assertEqual(types.validate_value(int, six.u('1')), 1)
- self.assertRaises(ValueError, types.validate_value, int, 1.1)
-
- def test_validate_integer_type(self):
- v = types.IntegerType(minimum=1, maximum=10)
- v.validate(1)
- v.validate(5)
- v.validate(10)
- self.assertRaises(ValueError, v.validate, 0)
- self.assertRaises(ValueError, v.validate, 11)
-
- def test_validate_string_type(self):
- v = types.StringType(min_length=1, max_length=10,
- pattern='^[a-zA-Z0-9]*$')
- v.validate('1')
- v.validate('12345')
- v.validate('1234567890')
- self.assertRaises(ValueError, v.validate, '')
- self.assertRaises(ValueError, v.validate, '12345678901')
-
- # Test a pattern validation
- v.validate('a')
- v.validate('A')
- self.assertRaises(ValueError, v.validate, '_')
-
- def test_validate_string_type_precompile(self):
- precompile = re.compile('^[a-zA-Z0-9]*$')
- v = types.StringType(min_length=1, max_length=10,
- pattern=precompile)
-
- # Test a pattern validation
- v.validate('a')
- v.validate('A')
- self.assertRaises(ValueError, v.validate, '_')
-
- def test_validate_string_type_pattern_exception_message(self):
- regex = '^[a-zA-Z0-9]*$'
- v = types.StringType(pattern=regex)
- try:
- v.validate('_')
- self.assertFail()
- except ValueError as e:
- self.assertIn(regex, str(e))
-
- def test_validate_ipv4_address_type(self):
- v = types.IPv4AddressType()
- self.assertEqual(v.validate('127.0.0.1'), '127.0.0.1')
- self.assertEqual(v.validate('192.168.0.1'), '192.168.0.1')
- self.assertEqual(v.validate(u'8.8.1.1'), u'8.8.1.1')
- self.assertRaises(ValueError, v.validate, '')
- self.assertRaises(ValueError, v.validate, 'foo')
- self.assertRaises(ValueError, v.validate,
- '2001:0db8:bd05:01d2:288a:1fc0:0001:10ee')
- self.assertRaises(ValueError, v.validate, '1.2.3')
-
- def test_validate_ipv6_address_type(self):
- v = types.IPv6AddressType()
- self.assertEqual(v.validate('0:0:0:0:0:0:0:1'),
- '0:0:0:0:0:0:0:1')
- self.assertEqual(v.validate(u'0:0:0:0:0:0:0:1'), u'0:0:0:0:0:0:0:1')
- self.assertEqual(v.validate('2001:0db8:bd05:01d2:288a:1fc0:0001:10ee'),
- '2001:0db8:bd05:01d2:288a:1fc0:0001:10ee')
- self.assertRaises(ValueError, v.validate, '')
- self.assertRaises(ValueError, v.validate, 'foo')
- self.assertRaises(ValueError, v.validate, '192.168.0.1')
- self.assertRaises(ValueError, v.validate, '0:0:0:0:0:0:1')
-
- def test_validate_uuid_type(self):
- v = types.UuidType()
- self.assertEqual(v.validate('6a0a707c-45ef-4758-b533-e55adddba8ce'),
- '6a0a707c-45ef-4758-b533-e55adddba8ce')
- self.assertEqual(v.validate('6a0a707c45ef4758b533e55adddba8ce'),
- '6a0a707c-45ef-4758-b533-e55adddba8ce')
- self.assertRaises(ValueError, v.validate, '')
- self.assertRaises(ValueError, v.validate, 'foo')
- self.assertRaises(ValueError, v.validate,
- '6a0a707c-45ef-4758-b533-e55adddba8ce-a')
-
- def test_register_invalid_array(self):
- self.assertRaises(ValueError, types.register_type, [])
- self.assertRaises(ValueError, types.register_type, [int, str])
- self.assertRaises(AttributeError, types.register_type, [1])
-
- def test_register_invalid_dict(self):
- self.assertRaises(ValueError, types.register_type, {})
- self.assertRaises(ValueError, types.register_type,
- {int: str, str: int})
- self.assertRaises(ValueError, types.register_type,
- {types.Unset: str})
-
- def test_list_attribute_no_auto_register(self):
- class MyType(object):
- aint = int
-
- assert not hasattr(MyType, '_wsme_attributes')
-
- self.assertRaises(TypeError, types.list_attributes, MyType)
-
- assert not hasattr(MyType, '_wsme_attributes')
-
- def test_list_of_complextypes(self):
- class A(object):
- bs = types.wsattr(['B'])
-
- class B(object):
- i = int
-
- types.register_type(A)
- types.register_type(B)
-
- assert A.bs.datatype.item_type is B
-
- def test_cross_referenced_types(self):
- class A(object):
- b = types.wsattr('B')
-
- class B(object):
- a = A
-
- types.register_type(A)
- types.register_type(B)
-
- assert A.b.datatype is B
-
- def test_base(self):
- class B1(types.Base):
- b2 = types.wsattr('B2')
-
- class B2(types.Base):
- b2 = types.wsattr('B2')
-
- assert B1.b2.datatype is B2, repr(B1.b2.datatype)
- assert B2.b2.datatype is B2
-
- def test_base_init(self):
- class C1(types.Base):
- s = six.text_type
-
- c = C1(s=six.u('test'))
- assert c.s == six.u('test')
-
- def test_array_eq(self):
- ell = [types.ArrayType(str)]
- assert types.ArrayType(str) in ell
-
- def test_array_sample(self):
- s = types.ArrayType(str).sample()
- assert isinstance(s, list)
- assert s
- assert s[0] == ''
-
- def test_dict_sample(self):
- s = types.DictType(str, str).sample()
- assert isinstance(s, dict)
- assert s
- assert s == {'': ''}
-
- def test_binary_to_base(self):
- import base64
- assert types.binary.tobasetype(None) is None
- expected = base64.encodestring(six.b('abcdef'))
- assert types.binary.tobasetype(six.b('abcdef')) == expected
-
- def test_binary_from_base(self):
- import base64
- assert types.binary.frombasetype(None) is None
- encoded = base64.encodestring(six.b('abcdef'))
- assert types.binary.frombasetype(encoded) == six.b('abcdef')
-
- def test_wsattr_weakref_datatype(self):
- # If the datatype inside the wsattr ends up a weakref, it
- # should be converted to the real type when accessed again by
- # the property getter.
- import weakref
- a = types.wsattr(int)
- a.datatype = weakref.ref(int)
- assert a.datatype is int
-
- def test_wsattr_list_datatype(self):
- # If the datatype inside the wsattr ends up a list of weakrefs
- # to types, it should be converted to the real types when
- # accessed again by the property getter.
- import weakref
- a = types.wsattr(int)
- a.datatype = [weakref.ref(int)]
- assert isinstance(a.datatype, list)
- assert a.datatype[0] is int
-
- def test_file_get_content_by_reading(self):
- class buffer:
- def read(self):
- return 'abcdef'
- f = types.File(file=buffer())
- assert f.content == 'abcdef'
-
- def test_file_content_overrides_file(self):
- class buffer:
- def read(self):
- return 'from-file'
- f = types.File(content='from-content', file=buffer())
- assert f.content == 'from-content'
-
- def test_file_setting_content_discards_file(self):
- class buffer:
- def read(self):
- return 'from-file'
- f = types.File(file=buffer())
- f.content = 'from-content'
- assert f.content == 'from-content'
-
- def test_file_field_storage(self):
- class buffer:
- def read(self):
- return 'from-file'
-
- class fieldstorage:
- filename = 'static.json'
- file = buffer()
- type = 'application/json'
- f = types.File(fieldstorage=fieldstorage)
- assert f.content == 'from-file'
-
- def test_file_field_storage_value(self):
- class buffer:
- def read(self):
- return 'from-file'
-
- class fieldstorage:
- filename = 'static.json'
- file = None
- type = 'application/json'
- value = 'from-value'
- f = types.File(fieldstorage=fieldstorage)
- assert f.content == 'from-value'
-
- def test_file_property_file(self):
- class buffer:
- def read(self):
- return 'from-file'
- buf = buffer()
- f = types.File(file=buf)
- assert f.file is buf
-
- def test_file_property_content(self):
- class buffer:
- def read(self):
- return 'from-file'
- f = types.File(content=six.b('from-content'))
- assert f.file.read() == six.b('from-content')
-
- def test_unregister(self):
- class TempType(object):
- pass
- types.registry.register(TempType)
- v = types.registry.lookup('TempType')
- self.assertIs(v, TempType)
- types.registry._unregister(TempType)
- after = types.registry.lookup('TempType')
- self.assertIs(after, None)
-
- def test_unregister_twice(self):
- class TempType(object):
- pass
- types.registry.register(TempType)
- v = types.registry.lookup('TempType')
- self.assertIs(v, TempType)
- types.registry._unregister(TempType)
- # Second call should not raise an exception
- types.registry._unregister(TempType)
- after = types.registry.lookup('TempType')
- self.assertIs(after, None)
-
- def test_unregister_array_type(self):
- class TempType(object):
- pass
- t = [TempType]
- types.registry.register(t)
- self.assertNotEqual(types.registry.array_types, set())
- types.registry._unregister(t)
- self.assertEqual(types.registry.array_types, set())
-
- def test_unregister_array_type_twice(self):
- class TempType(object):
- pass
- t = [TempType]
- types.registry.register(t)
- self.assertNotEqual(types.registry.array_types, set())
- types.registry._unregister(t)
- # Second call should not raise an exception
- types.registry._unregister(t)
- self.assertEqual(types.registry.array_types, set())
-
- def test_unregister_dict_type(self):
- class TempType(object):
- pass
- t = {str: TempType}
- types.registry.register(t)
- self.assertNotEqual(types.registry.dict_types, set())
- types.registry._unregister(t)
- self.assertEqual(types.registry.dict_types, set())
-
- def test_unregister_dict_type_twice(self):
- class TempType(object):
- pass
- t = {str: TempType}
- types.registry.register(t)
- self.assertNotEqual(types.registry.dict_types, set())
- types.registry._unregister(t)
- # Second call should not raise an exception
- types.registry._unregister(t)
- self.assertEqual(types.registry.dict_types, set())
-
- def test_reregister(self):
- class TempType(object):
- pass
- types.registry.register(TempType)
- v = types.registry.lookup('TempType')
- self.assertIs(v, TempType)
- types.registry.reregister(TempType)
- after = types.registry.lookup('TempType')
- self.assertIs(after, TempType)
-
- def test_reregister_and_add_attr(self):
- class TempType(object):
- pass
- types.registry.register(TempType)
- attrs = types.list_attributes(TempType)
- self.assertEqual(attrs, [])
- TempType.one = str
- types.registry.reregister(TempType)
- after = types.list_attributes(TempType)
- self.assertNotEqual(after, [])
-
- def test_dynamicbase_add_attributes(self):
- class TempType(types.DynamicBase):
- pass
- types.registry.register(TempType)
- attrs = types.list_attributes(TempType)
- self.assertEqual(attrs, [])
- TempType.add_attributes(one=str)
- after = types.list_attributes(TempType)
- self.assertEqual(len(after), 1)
-
- def test_dynamicbase_add_attributes_second(self):
- class TempType(types.DynamicBase):
- pass
- types.registry.register(TempType)
- attrs = types.list_attributes(TempType)
- self.assertEqual(attrs, [])
- TempType.add_attributes(one=str)
- TempType.add_attributes(two=int)
- after = types.list_attributes(TempType)
- self.assertEqual(len(after), 2)
-
- def test_non_registered_complex_type(self):
- class TempType(types.Base):
- __registry__ = None
-
- self.assertFalse(types.iscomplex(TempType))
- types.registry.register(TempType)
- self.assertTrue(types.iscomplex(TempType))
diff --git a/wsme/tests/test_utils.py b/wsme/tests/test_utils.py
deleted file mode 100644
index f9464c8..0000000
--- a/wsme/tests/test_utils.py
+++ /dev/null
@@ -1,97 +0,0 @@
-import datetime
-import unittest
-import pytz
-
-from wsme import utils
-
-
-class TestUtils(unittest.TestCase):
- def test_parse_isodate(self):
- good_dates = [
- ('2008-02-01', datetime.date(2008, 2, 1)),
- ('2009-01-04', datetime.date(2009, 1, 4)),
- ]
- ill_formatted_dates = [
- '24-12-2004'
- ]
- out_of_range_dates = [
- '0000-00-00',
- '2012-02-30',
- ]
- for s, d in good_dates:
- assert utils.parse_isodate(s) == d
- for s in ill_formatted_dates + out_of_range_dates:
- self.assertRaises(ValueError, utils.parse_isodate, s)
-
- def test_parse_isotime(self):
- good_times = [
- ('12:03:54', datetime.time(12, 3, 54)),
- ('23:59:59.000004', datetime.time(23, 59, 59, 4)),
- ('01:02:03+00:00', datetime.time(1, 2, 3, 0, pytz.UTC)),
- ('01:02:03+23:59', datetime.time(1, 2, 3, 0,
- pytz.FixedOffset(1439))),
- ('01:02:03-23:59', datetime.time(1, 2, 3, 0,
- pytz.FixedOffset(-1439))),
- ]
- ill_formatted_times = [
- '24-12-2004'
- ]
- out_of_range_times = [
- '32:12:00',
- '00:54:60',
- '01:02:03-24:00',
- '01:02:03+24:00',
- ]
- for s, t in good_times:
- assert utils.parse_isotime(s) == t
- for s in ill_formatted_times + out_of_range_times:
- self.assertRaises(ValueError, utils.parse_isotime, s)
-
- def test_parse_isodatetime(self):
- good_datetimes = [
- ('2008-02-12T12:03:54',
- datetime.datetime(2008, 2, 12, 12, 3, 54)),
- ('2012-05-14T23:59:59.000004',
- datetime.datetime(2012, 5, 14, 23, 59, 59, 4)),
- ('1856-07-10T01:02:03+00:00',
- datetime.datetime(1856, 7, 10, 1, 2, 3, 0, pytz.UTC)),
- ('1856-07-10T01:02:03+23:59',
- datetime.datetime(1856, 7, 10, 1, 2, 3, 0,
- pytz.FixedOffset(1439))),
- ('1856-07-10T01:02:03-23:59',
- datetime.datetime(1856, 7, 10, 1, 2, 3, 0,
- pytz.FixedOffset(-1439))),
- ]
- ill_formatted_datetimes = [
- '24-32-2004',
- '1856-07-10+33:00'
- ]
- out_of_range_datetimes = [
- '2008-02-12T32:12:00',
- '2012-13-12T00:54:60',
- ]
- for s, t in good_datetimes:
- assert utils.parse_isodatetime(s) == t
- for s in ill_formatted_datetimes + out_of_range_datetimes:
- self.assertRaises(ValueError, utils.parse_isodatetime, s)
-
- def test_validator_with_valid_code(self):
- valid_code = 404
- self.assertTrue(
- utils.is_valid_code(valid_code),
- "Valid status code not detected"
- )
-
- def test_validator_with_invalid_int_code(self):
- invalid_int_code = 648
- self.assertFalse(
- utils.is_valid_code(invalid_int_code),
- "Invalid status code not detected"
- )
-
- def test_validator_with_invalid_str_code(self):
- invalid_str_code = '404'
- self.assertFalse(
- utils.is_valid_code(invalid_str_code),
- "Invalid status code not detected"
- )
diff --git a/wsme/types.py b/wsme/types.py
deleted file mode 100644
index 0a5fc02..0000000
--- a/wsme/types.py
+++ /dev/null
@@ -1,840 +0,0 @@
-import base64
-import datetime
-import decimal
-import inspect
-import logging
-import netaddr
-import re
-import six
-import sys
-import uuid
-import weakref
-
-from wsme import exc
-
-log = logging.getLogger(__name__)
-
-#: The 'str' (python 2) or 'bytes' (python 3) type.
-#: Its use should be restricted to
-#: pure ascii strings as the protocols will generally not be
-#: be able to send non-unicode strings.
-#: To transmit binary strings, use the :class:`binary` type
-bytes = six.binary_type
-
-#: Unicode string.
-text = six.text_type
-
-
-class ArrayType(object):
- def __init__(self, item_type):
- if iscomplex(item_type):
- self._item_type = weakref.ref(item_type)
- else:
- self._item_type = item_type
-
- def __hash__(self):
- return hash(self.item_type)
-
- def __eq__(self, other):
- return isinstance(other, ArrayType) \
- and self.item_type == other.item_type
-
- def sample(self):
- return [getattr(self.item_type, 'sample', self.item_type)()]
-
- @property
- def item_type(self):
- if isinstance(self._item_type, weakref.ref):
- return self._item_type()
- else:
- return self._item_type
-
- def validate(self, value):
- if value is None:
- return
- if not isinstance(value, list):
- raise ValueError("Wrong type. Expected '[%s]', got '%s'" % (
- self.item_type, type(value)
- ))
- return [
- validate_value(self.item_type, item)
- for item in value
- ]
-
-
-class DictType(object):
- def __init__(self, key_type, value_type):
- if key_type not in pod_types:
- raise ValueError("Dictionaries key can only be a pod type")
- self.key_type = key_type
- if iscomplex(value_type):
- self._value_type = weakref.ref(value_type)
- else:
- self._value_type = value_type
-
- def __hash__(self):
- return hash((self.key_type, self.value_type))
-
- def sample(self):
- key = getattr(self.key_type, 'sample', self.key_type)()
- value = getattr(self.value_type, 'sample', self.value_type)()
- return {key: value}
-
- @property
- def value_type(self):
- if isinstance(self._value_type, weakref.ref):
- return self._value_type()
- else:
- return self._value_type
-
- def validate(self, value):
- if not isinstance(value, dict):
- raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % (
- self.key_type, self.value_type, type(value)
- ))
- return dict((
- (
- validate_value(self.key_type, key),
- validate_value(self.value_type, v)
- ) for key, v in value.items()
- ))
-
-
-class UserType(object):
- basetype = None
- name = None
-
- def validate(self, value):
- return value
-
- def tobasetype(self, value):
- return value
-
- def frombasetype(self, value):
- return value
-
-
-def isusertype(class_):
- return isinstance(class_, UserType)
-
-
-class BinaryType(UserType):
- """
- A user type that use base64 strings to carry binary data.
- """
- basetype = bytes
- name = 'binary'
-
- def tobasetype(self, value):
- if value is None:
- return None
- return base64.encodestring(value)
-
- def frombasetype(self, value):
- if value is None:
- return None
- return base64.decodestring(value)
-
-
-#: The binary almost-native type
-binary = BinaryType()
-
-
-class IntegerType(UserType):
- """
- A simple integer type. Can validate a value range.
-
- :param minimum: Possible minimum value
- :param maximum: Possible maximum value
-
- Example::
-
- Price = IntegerType(minimum=1)
-
- """
- basetype = int
- name = "integer"
-
- def __init__(self, minimum=None, maximum=None):
- self.minimum = minimum
- self.maximum = maximum
-
- @staticmethod
- def frombasetype(value):
- return int(value) if value is not None else None
-
- def validate(self, value):
- if self.minimum is not None and value < self.minimum:
- error = 'Value should be greater or equal to %s' % self.minimum
- raise ValueError(error)
-
- if self.maximum is not None and value > self.maximum:
- error = 'Value should be lower or equal to %s' % self.maximum
- raise ValueError(error)
-
- return value
-
-
-class StringType(UserType):
- """
- A simple string type. Can validate a length and a pattern.
-
- :param min_length: Possible minimum length
- :param max_length: Possible maximum length
- :param pattern: Possible string pattern
-
- Example::
-
- Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$')
-
- """
- basetype = six.string_types
- name = "string"
-
- def __init__(self, min_length=None, max_length=None, pattern=None):
- self.min_length = min_length
- self.max_length = max_length
- if isinstance(pattern, six.string_types):
- self.pattern = re.compile(pattern)
- else:
- self.pattern = pattern
-
- def validate(self, value):
- if not isinstance(value, self.basetype):
- error = 'Value should be string'
- raise ValueError(error)
-
- if self.min_length is not None and len(value) < self.min_length:
- error = 'Value should have a minimum character requirement of %s' \
- % self.min_length
- raise ValueError(error)
-
- if self.max_length is not None and len(value) > self.max_length:
- error = 'Value should have a maximum character requirement of %s' \
- % self.max_length
- raise ValueError(error)
-
- if self.pattern is not None and not self.pattern.search(value):
- error = 'Value should match the pattern %s' % self.pattern.pattern
- raise ValueError(error)
-
- return value
-
-
-class IPv4AddressType(UserType):
- """
- A simple IPv4 type.
- """
- basetype = six.string_types
- name = "ipv4address"
-
- @staticmethod
- def validate(value):
- try:
- netaddr.IPAddress(value, version=4, flags=netaddr.INET_PTON)
- except netaddr.AddrFormatError:
- error = 'Value should be IPv4 format'
- raise ValueError(error)
- else:
- return value
-
-
-class IPv6AddressType(UserType):
- """
- A simple IPv6 type.
-
- This type represents IPv6 addresses in the short format.
- """
- basetype = six.string_types
- name = "ipv6address"
-
- @staticmethod
- def validate(value):
- try:
- netaddr.IPAddress(value, version=6, flags=netaddr.INET_PTON)
- except netaddr.AddrFormatError:
- error = 'Value should be IPv6 format'
- raise ValueError(error)
- else:
- return value
-
-
-class UuidType(UserType):
- """
- A simple UUID type.
-
- This type allows not only UUID having dashes but also UUID not
- having dashes. For example, '6a0a707c-45ef-4758-b533-e55adddba8ce'
- and '6a0a707c45ef4758b533e55adddba8ce' are distinguished as valid.
- """
- basetype = six.string_types
- name = "uuid"
-
- @staticmethod
- def validate(value):
- try:
- return six.text_type((uuid.UUID(value)))
- except (TypeError, ValueError, AttributeError):
- error = 'Value should be UUID format'
- raise ValueError(error)
-
-
-class Enum(UserType):
- """
- A simple enumeration type. Can be based on any non-complex type.
-
- :param basetype: The actual data type
- :param values: A set of possible values
-
- If nullable, 'None' should be added the values set.
-
- Example::
-
- Gender = Enum(str, 'male', 'female')
- Specie = Enum(str, 'cat', 'dog')
-
- """
- def __init__(self, basetype, *values, **kw):
- self.basetype = basetype
- self.values = set(values)
- name = kw.pop('name', None)
- if name is None:
- name = "Enum(%s)" % ', '.join((six.text_type(v) for v in values))
- self.name = name
-
- def validate(self, value):
- if value not in self.values:
- raise ValueError("Value should be one of: %s" %
- ', '.join(map(six.text_type, self.values)))
- return value
-
- def tobasetype(self, value):
- return value
-
- def frombasetype(self, value):
- return value
-
-
-class UnsetType(object):
- if sys.version < '3':
- def __nonzero__(self):
- return False
- else:
- def __bool__(self):
- return False
-
- def __repr__(self):
- return 'Unset'
-
-
-Unset = UnsetType()
-
-#: A special type that corresponds to the host framework request object.
-#: It can only be used in the function parameters, and if so the request object
-#: of the host framework will be passed to the function.
-HostRequest = object()
-
-
-pod_types = six.integer_types + (
- bytes, text, float, bool)
-dt_types = (datetime.date, datetime.time, datetime.datetime)
-extra_types = (binary, decimal.Decimal)
-native_types = pod_types + dt_types + extra_types
-# The types for which we allow promotion to certain numbers.
-_promotable_types = six.integer_types + (text, bytes)
-
-
-def iscomplex(datatype):
- return inspect.isclass(datatype) \
- and '_wsme_attributes' in datatype.__dict__
-
-
-def isarray(datatype):
- return isinstance(datatype, ArrayType)
-
-
-def isdict(datatype):
- return isinstance(datatype, DictType)
-
-
-def validate_value(datatype, value):
- if value in (Unset, None):
- return value
-
- # Try to promote the data type to one of our complex types.
- if isinstance(datatype, list):
- datatype = ArrayType(datatype[0])
- elif isinstance(datatype, dict):
- datatype = DictType(*list(datatype.items())[0])
-
- # If the datatype has its own validator, use that.
- if hasattr(datatype, 'validate'):
- return datatype.validate(value)
-
- # Do type promotion/conversion and data validation for builtin
- # types.
- v_type = type(value)
- if datatype in six.integer_types:
- if v_type in _promotable_types:
- try:
- # Try to turn the value into an int
- value = datatype(value)
- except ValueError:
- # An error is raised at the end of the function
- # when the types don't match.
- pass
- elif datatype is float and v_type in _promotable_types:
- try:
- value = float(value)
- except ValueError:
- # An error is raised at the end of the function
- # when the types don't match.
- pass
- elif datatype is text and isinstance(value, bytes):
- value = value.decode()
- elif datatype is bytes and isinstance(value, text):
- value = value.encode()
-
- if not isinstance(value, datatype):
- raise ValueError(
- "Wrong type. Expected '%s', got '%s'" % (
- datatype, v_type
- ))
- return value
-
-
-class wsproperty(property):
- """
- A specialised :class:`property` to define typed-property on complex types.
- Example::
-
- class MyComplexType(wsme.types.Base):
- def get_aint(self):
- return self._aint
-
- def set_aint(self, value):
- assert avalue < 10 # Dummy input validation
- self._aint = value
-
- aint = wsproperty(int, get_aint, set_aint, mandatory=True)
- """
- def __init__(self, datatype, fget, fset=None,
- mandatory=False, doc=None, name=None):
- property.__init__(self, fget, fset)
- #: The property name in the parent python class
- self.key = None
- #: The attribute name on the public of the api.
- #: Defaults to :attr:`key`
- self.name = name
- #: property data type
- self.datatype = datatype
- #: True if the property is mandatory
- self.mandatory = mandatory
-
-
-class wsattr(object):
- """
- Complex type attribute definition.
-
- Example::
-
- class MyComplexType(wsme.types.Base):
- optionalvalue = int
- mandatoryvalue = wsattr(int, mandatory=True)
- named_value = wsattr(int, name='named.value')
-
- After inspection, the non-wsattr attributes will be replaced, and
- the above class will be equivalent to::
-
- class MyComplexType(wsme.types.Base):
- optionalvalue = wsattr(int)
- mandatoryvalue = wsattr(int, mandatory=True)
-
- """
- def __init__(self, datatype, mandatory=False, name=None, default=Unset,
- readonly=False):
- #: The attribute name in the parent python class.
- #: Set by :func:`inspect_class`
- self.key = None # will be set by class inspection
- #: The attribute name on the public of the api.
- #: Defaults to :attr:`key`
- self.name = name
- self._datatype = (datatype,)
- #: True if the attribute is mandatory
- self.mandatory = mandatory
- #: Default value. The attribute will return this instead
- #: of :data:`Unset` if no value has been set.
- self.default = default
- #: If True value cannot be set from json/xml input data
- self.readonly = readonly
-
- self.complextype = None
-
- def _get_dataholder(self, instance):
- dataholder = getattr(instance, '_wsme_dataholder', None)
- if dataholder is None:
- dataholder = instance._wsme_DataHolderClass()
- instance._wsme_dataholder = dataholder
- return dataholder
-
- def __get__(self, instance, owner):
- if instance is None:
- return self
- return getattr(
- self._get_dataholder(instance),
- self.key,
- self.default
- )
-
- def __set__(self, instance, value):
- try:
- value = validate_value(self.datatype, value)
- except (ValueError, TypeError) as e:
- raise exc.InvalidInput(self.name, value, six.text_type(e))
- dataholder = self._get_dataholder(instance)
- if value is Unset:
- if hasattr(dataholder, self.key):
- delattr(dataholder, self.key)
- else:
- setattr(dataholder, self.key, value)
-
- def __delete__(self, instance):
- self.__set__(instance, Unset)
-
- def _get_datatype(self):
- if isinstance(self._datatype, tuple):
- self._datatype = \
- self.complextype().__registry__.resolve_type(self._datatype[0])
- if isinstance(self._datatype, weakref.ref):
- return self._datatype()
- if isinstance(self._datatype, list):
- return [
- item() if isinstance(item, weakref.ref) else item
- for item in self._datatype
- ]
- return self._datatype
-
- def _set_datatype(self, datatype):
- self._datatype = datatype
-
- #: attribute data type. Can be either an actual type,
- #: or a type name, in which case the actual type will be
- #: determined when needed (generally just before scanning the api).
- datatype = property(_get_datatype, _set_datatype)
-
-
-def iswsattr(attr):
- if inspect.isfunction(attr) or inspect.ismethod(attr):
- return False
- if isinstance(attr, property) and not isinstance(attr, wsproperty):
- return False
- return True
-
-
-def sort_attributes(class_, attributes):
- """Sort a class attributes list.
-
- 3 mechanisms are attempted :
-
- #. Look for a _wsme_attr_order attribute on the class_. This allow
- to define an arbitrary order of the attributes (useful for
- generated types).
-
- #. Access the object source code to find the declaration order.
-
- #. Sort by alphabetically"""
-
- if not len(attributes):
- return
-
- attrs = dict((a.key, a) for a in attributes)
-
- if hasattr(class_, '_wsme_attr_order'):
- names_order = class_._wsme_attr_order
- else:
- names = attrs.keys()
- names_order = []
- try:
- lines = []
- for cls in inspect.getmro(class_):
- if cls is object:
- continue
- lines[len(lines):] = inspect.getsourcelines(cls)[0]
- for line in lines:
- line = line.strip().replace(" ", "")
- if '=' in line:
- aname = line[:line.index('=')]
- if aname in names and aname not in names_order:
- names_order.append(aname)
- if len(names_order) < len(names):
- names_order.extend((
- name for name in names if name not in names_order))
- assert len(names_order) == len(names)
- except (TypeError, IOError):
- names_order = list(names)
- names_order.sort()
-
- attributes[:] = [attrs[name] for name in names_order]
-
-
-def inspect_class(class_):
- """Extract a list of (name, wsattr|wsproperty) for the given class_"""
- attributes = []
- for name, attr in inspect.getmembers(class_, iswsattr):
- if name.startswith('_'):
- continue
- if inspect.isroutine(attr):
- continue
-
- if isinstance(attr, (wsattr, wsproperty)):
- attrdef = attr
- else:
- if attr not in native_types and (
- inspect.isclass(attr) or
- isinstance(attr, (list, dict))):
- register_type(attr)
- attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr)
-
- attrdef.key = name
- if attrdef.name is None:
- attrdef.name = name
- attrdef.complextype = weakref.ref(class_)
- attributes.append(attrdef)
- setattr(class_, name, attrdef)
-
- sort_attributes(class_, attributes)
- return attributes
-
-
-def list_attributes(class_):
- """
- Returns a list of a complex type attributes.
- """
- if not iscomplex(class_):
- raise TypeError("%s is not a registered type")
- return class_._wsme_attributes
-
-
-def make_dataholder(class_):
- # the slots are computed outside the class scope to avoid
- # 'attr' to pullute the class namespace, which leads to weird
- # things if one of the slots is named 'attr'.
- slots = [attr.key for attr in class_._wsme_attributes]
-
- class DataHolder(object):
- __slots__ = slots
-
- DataHolder.__name__ = class_.__name__ + 'DataHolder'
- return DataHolder
-
-
-class Registry(object):
- def __init__(self):
- self._complex_types = []
- self.array_types = set()
- self.dict_types = set()
-
- @property
- def complex_types(self):
- return [t() for t in self._complex_types if t()]
-
- def register(self, class_):
- """
- Make sure a type is registered.
-
- It is automatically called by :class:`expose() <wsme.expose>`
- and :class:`validate() <wsme.validate>`.
- Unless you want to control when the class inspection is done there
- is no need to call it.
- """
- if class_ is None or \
- class_ in native_types or \
- isusertype(class_) or iscomplex(class_) or \
- isarray(class_) or isdict(class_):
- return class_
-
- if isinstance(class_, list):
- if len(class_) != 1:
- raise ValueError("Cannot register type %s" % repr(class_))
- dt = ArrayType(class_[0])
- self.register(dt.item_type)
- self.array_types.add(dt)
- return dt
-
- if isinstance(class_, dict):
- if len(class_) != 1:
- raise ValueError("Cannot register type %s" % repr(class_))
- dt = DictType(*list(class_.items())[0])
- self.register(dt.value_type)
- self.dict_types.add(dt)
- return dt
-
- class_._wsme_attributes = None
- class_._wsme_attributes = inspect_class(class_)
- class_._wsme_DataHolderClass = make_dataholder(class_)
-
- class_.__registry__ = self
- self._complex_types.append(weakref.ref(class_))
- return class_
-
- def reregister(self, class_):
- """Register a type which may already have been registered.
- """
- self._unregister(class_)
- return self.register(class_)
-
- def _unregister(self, class_):
- """Remove a previously registered type.
- """
- # Clear the existing attribute reference so it is rebuilt if
- # the class is registered again later.
- if hasattr(class_, '_wsme_attributes'):
- del class_._wsme_attributes
- # FIXME(dhellmann): This method does not recurse through the
- # types like register() does. Should it?
- if isinstance(class_, list):
- at = ArrayType(class_[0])
- try:
- self.array_types.remove(at)
- except KeyError:
- pass
- elif isinstance(class_, dict):
- key_type, value_type = list(class_.items())[0]
- self.dict_types = set(
- dt for dt in self.dict_types
- if (dt.key_type, dt.value_type) != (key_type, value_type)
- )
- # We can't use remove() here because the items in
- # _complex_types are weakref objects pointing to the classes,
- # so we can't compare with them directly.
- self._complex_types = [
- ct for ct in self._complex_types
- if ct() is not class_
- ]
-
- def lookup(self, typename):
- log.debug('Lookup %s' % typename)
- modname = None
- if '.' in typename:
- modname, typename = typename.rsplit('.', 1)
- for ct in self._complex_types:
- ct = ct()
- if ct is not None and typename == ct.__name__ and (
- modname is None or modname == ct.__module__):
- return ct
-
- def resolve_type(self, type_):
- if isinstance(type_, six.string_types):
- return self.lookup(type_)
- if isinstance(type_, list):
- type_ = ArrayType(type_[0])
- if isinstance(type_, dict):
- type_ = DictType(list(type_.keys())[0], list(type_.values())[0])
- if isinstance(type_, ArrayType):
- type_ = ArrayType(self.resolve_type(type_.item_type))
- self.array_types.add(type_)
- elif isinstance(type_, DictType):
- type_ = DictType(
- type_.key_type,
- self.resolve_type(type_.value_type)
- )
- self.dict_types.add(type_)
- else:
- type_ = self.register(type_)
- return type_
-
-
-# Default type registry
-registry = Registry()
-
-
-def register_type(class_):
- return registry.register(class_)
-
-
-class BaseMeta(type):
- def __new__(cls, name, bases, dct):
- if bases and bases[0] is not object and '__registry__' not in dct:
- dct['__registry__'] = registry
- return type.__new__(cls, name, bases, dct)
-
- def __init__(cls, name, bases, dct):
- if bases and bases[0] is not object and cls.__registry__:
- cls.__registry__.register(cls)
-
-
-class Base(six.with_metaclass(BaseMeta)):
- """Base type for complex types"""
- def __init__(self, **kw):
- for key, value in kw.items():
- if hasattr(self, key):
- setattr(self, key, value)
-
-
-class File(Base):
- """A complex type that represents a file.
-
- In the particular case of protocol accepting form encoded data as
- input, File can be loaded from a form file field.
- """
- #: The file name
- filename = wsattr(text)
-
- #: Mime type of the content
- contenttype = wsattr(text)
-
- def _get_content(self):
- if self._content is None and self._file:
- self._content = self._file.read()
- return self._content
-
- def _set_content(self, value):
- self._content = value
- self._file = None
-
- #: File content
- content = wsproperty(binary, _get_content, _set_content)
-
- def __init__(self, filename=None, file=None, content=None,
- contenttype=None, fieldstorage=None):
- self.filename = filename
- self.contenttype = contenttype
- self._file = file
- self._content = content
-
- if fieldstorage is not None:
- if fieldstorage.file:
- self._file = fieldstorage.file
- self.filename = fieldstorage.filename
- self.contenttype = text(fieldstorage.type)
- else:
- self._content = fieldstorage.value
-
- @property
- def file(self):
- if self._file is None and self._content:
- self._file = six.BytesIO(self._content)
- return self._file
-
-
-class DynamicBase(Base):
- """Base type for complex types for which all attributes are not
- defined when the class is constructed.
-
- This class is meant to be used as a base for types that have
- properties added after the main class is created, such as by
- loading plugins.
-
- """
-
- @classmethod
- def add_attributes(cls, **attrs):
- """Add more attributes
-
- The arguments should be valid Python attribute names
- associated with a type for the new attribute.
-
- """
- for n, t in attrs.items():
- setattr(cls, n, t)
- cls.__registry__.reregister(cls)
diff --git a/wsme/utils.py b/wsme/utils.py
deleted file mode 100644
index e52b0ef..0000000
--- a/wsme/utils.py
+++ /dev/null
@@ -1,118 +0,0 @@
-import decimal
-import datetime
-import pytz
-import re
-from six.moves import builtins, http_client
-
-try:
- import dateutil.parser
-except ImportError:
- dateutil = None # noqa
-
-date_re = r'(?P<year>-?\d{4,})-(?P<month>\d{2})-(?P<day>\d{2})'
-time_re = r'(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})' + \
- r'(\.(?P<sec_frac>\d+))?'
-tz_re = r'((?P<tz_sign>[+-])(?P<tz_hour>\d{2}):(?P<tz_min>\d{2}))' + \
- r'|(?P<tz_z>Z)'
-
-datetime_re = re.compile(
- '%sT%s(%s)?' % (date_re, time_re, tz_re))
-date_re = re.compile(date_re)
-time_re = re.compile('%s(%s)?' % (time_re, tz_re))
-
-
-if hasattr(builtins, '_'):
- _ = builtins._
-else:
- def _(s):
- return s
-
-
-def parse_isodate(value):
- m = date_re.match(value)
- if m is None:
- raise ValueError("'%s' is not a legal date value" % (value))
- try:
- return datetime.date(
- int(m.group('year')),
- int(m.group('month')),
- int(m.group('day')))
- except ValueError:
- raise ValueError("'%s' is a out-of-range date" % (value))
-
-
-def parse_isotime(value):
- m = time_re.match(value)
- if m is None:
- raise ValueError("'%s' is not a legal time value" % (value))
- try:
- ms = 0
- if m.group('sec_frac') is not None:
- f = decimal.Decimal('0.' + m.group('sec_frac'))
- f = str(f.quantize(decimal.Decimal('0.000001')))
- ms = int(f[2:])
- tz = _parse_tzparts(m.groupdict())
- return datetime.time(
- int(m.group('hour')),
- int(m.group('min')),
- int(m.group('sec')),
- ms,
- tz)
- except ValueError:
- raise ValueError("'%s' is a out-of-range time" % (value))
-
-
-def parse_isodatetime(value):
- if dateutil:
- return dateutil.parser.parse(value)
- m = datetime_re.match(value)
- if m is None:
- raise ValueError("'%s' is not a legal datetime value" % (value))
- try:
- ms = 0
- if m.group('sec_frac') is not None:
- f = decimal.Decimal('0.' + m.group('sec_frac'))
- f = f.quantize(decimal.Decimal('0.000001'))
- ms = int(str(f)[2:])
- tz = _parse_tzparts(m.groupdict())
- return datetime.datetime(
- int(m.group('year')),
- int(m.group('month')),
- int(m.group('day')),
- int(m.group('hour')),
- int(m.group('min')),
- int(m.group('sec')),
- ms,
- tz)
- except ValueError:
- raise ValueError("'%s' is a out-of-range datetime" % (value))
-
-
-def _parse_tzparts(parts):
- if 'tz_z' in parts and parts['tz_z'] == 'Z':
- return pytz.UTC
- if 'tz_min' not in parts or not parts['tz_min']:
- return None
-
- tz_minute_offset = (int(parts['tz_hour']) * 60 + int(parts['tz_min']))
- tz_multiplier = -1 if parts['tz_sign'] == '-' else 1
-
- return pytz.FixedOffset(tz_multiplier * tz_minute_offset)
-
-
-def is_valid_code(code_value):
- """
- This function checks if incoming value in http response codes range.
- """
- return code_value in http_client.responses
-
-
-def is_client_error(code):
- """ Checks client error code (RFC 2616)."""
- return 400 <= code < 500
-
-
-try:
- from collections import OrderedDict
-except ImportError:
- from ordereddict import OrderedDict # noqa