diff options
author | Jim Rollenhagen <jim@jimrollenhagen.com> | 2019-09-26 09:43:27 -0400 |
---|---|---|
committer | Jim Rollenhagen <jim@jimrollenhagen.com> | 2019-09-26 09:43:27 -0400 |
commit | e9c6edfe510f4ed407f8d2d84b4b931a382b48b3 (patch) | |
tree | 94bbd6a34bcf09e99f7ae1be88b19960192d6adb /wsme | |
parent | 1d73d6e50411ebc45fb96a6ed3c63ca91a500323 (diff) | |
download | wsme-master.tar.gz |
Diffstat (limited to 'wsme')
-rw-r--r-- | wsme/__init__.py | 10 | ||||
-rw-r--r-- | wsme/api.py | 237 | ||||
-rw-r--r-- | wsme/exc.py | 92 | ||||
-rw-r--r-- | wsme/protocol.py | 147 | ||||
-rw-r--r-- | wsme/rest/__init__.py | 78 | ||||
-rw-r--r-- | wsme/rest/args.py | 310 | ||||
-rw-r--r-- | wsme/rest/json.py | 328 | ||||
-rw-r--r-- | wsme/rest/protocol.py | 133 | ||||
-rw-r--r-- | wsme/rest/xml.py | 298 | ||||
-rw-r--r-- | wsme/root.py | 372 | ||||
-rw-r--r-- | wsme/runtime.py | 9 | ||||
-rw-r--r-- | wsme/spore.py | 64 | ||||
-rw-r--r-- | wsme/tests/__init__.py | 0 | ||||
-rw-r--r-- | wsme/tests/protocol.py | 720 | ||||
-rw-r--r-- | wsme/tests/test_api.py | 419 | ||||
-rw-r--r-- | wsme/tests/test_exc.py | 40 | ||||
-rw-r--r-- | wsme/tests/test_protocols.py | 70 | ||||
-rw-r--r-- | wsme/tests/test_protocols_commons.py | 159 | ||||
-rw-r--r-- | wsme/tests/test_restjson.py | 779 | ||||
-rw-r--r-- | wsme/tests/test_restxml.py | 211 | ||||
-rw-r--r-- | wsme/tests/test_root.py | 122 | ||||
-rw-r--r-- | wsme/tests/test_spore.py | 51 | ||||
-rw-r--r-- | wsme/tests/test_types.py | 667 | ||||
-rw-r--r-- | wsme/tests/test_utils.py | 97 | ||||
-rw-r--r-- | wsme/types.py | 840 | ||||
-rw-r--r-- | wsme/utils.py | 118 |
26 files changed, 0 insertions, 6371 deletions
diff --git a/wsme/__init__.py b/wsme/__init__.py deleted file mode 100644 index 35109ec..0000000 --- a/wsme/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from wsme.api import signature -from wsme.rest import expose, validate -from wsme.root import WSRoot -from wsme.types import wsattr, wsproperty, Unset - -__all__ = [ - 'expose', 'validate', 'signature', - 'WSRoot', - 'wsattr', 'wsproperty', 'Unset' -] diff --git a/wsme/api.py b/wsme/api.py deleted file mode 100644 index 67263a0..0000000 --- a/wsme/api.py +++ /dev/null @@ -1,237 +0,0 @@ -import traceback -import functools -import inspect -import logging -import six - -import wsme.exc -import wsme.types - -from wsme import utils - -log = logging.getLogger(__name__) - - -def iswsmefunction(f): - return hasattr(f, '_wsme_definition') - - -def wrapfunc(f): - @functools.wraps(f) - def wrapper(*args, **kwargs): - return f(*args, **kwargs) - wrapper._wsme_original_func = f - return wrapper - - -def getargspec(f): - f = getattr(f, '_wsme_original_func', f) - return inspect.getargspec(f) - - -class FunctionArgument(object): - """ - An argument definition of an api entry - """ - def __init__(self, name, datatype, mandatory, default): - #: argument name - self.name = name - - #: Data type - self.datatype = datatype - - #: True if the argument is mandatory - self.mandatory = mandatory - - #: Default value if argument is omitted - self.default = default - - def resolve_type(self, registry): - self.datatype = registry.resolve_type(self.datatype) - - -class FunctionDefinition(object): - """ - An api entry definition - """ - def __init__(self, func): - #: Function name - self.name = func.__name__ - - #: Function documentation - self.doc = func.__doc__ - - #: Return type - self.return_type = None - - #: The function arguments (list of :class:`FunctionArgument`) - self.arguments = [] - - #: If the body carry the datas of a single argument, its type - self.body_type = None - - #: Status code - self.status_code = 200 - - #: True if extra arguments should be ignored, NOT inserted in - #: the kwargs of the function and not raise UnknownArgument - #: exceptions - self.ignore_extra_args = False - - #: name of the function argument to pass the host request object. - #: Should be set by using the :class:`wsme.types.HostRequest` type - #: in the function @\ :function:`signature` - self.pass_request = False - - #: Dictionnary of protocol-specific options. - self.extra_options = None - - @staticmethod - def get(func): - """ - Returns the :class:`FunctionDefinition` of a method. - """ - if not hasattr(func, '_wsme_definition'): - fd = FunctionDefinition(func) - func._wsme_definition = fd - - return func._wsme_definition - - def get_arg(self, name): - """ - Returns a :class:`FunctionArgument` from its name - """ - for arg in self.arguments: - if arg.name == name: - return arg - return None - - def resolve_types(self, registry): - self.return_type = registry.resolve_type(self.return_type) - self.body_type = registry.resolve_type(self.body_type) - for arg in self.arguments: - arg.resolve_type(registry) - - def set_options(self, body=None, ignore_extra_args=False, status_code=200, - rest_content_types=('json', 'xml'), **extra_options): - self.body_type = body - self.status_code = status_code - self.ignore_extra_args = ignore_extra_args - self.rest_content_types = rest_content_types - self.extra_options = extra_options - - def set_arg_types(self, argspec, arg_types): - args, varargs, keywords, defaults = argspec - if args[0] == 'self': - args = args[1:] - arg_types = list(arg_types) - if self.body_type is not None: - arg_types.append(self.body_type) - for i, argname in enumerate(args): - datatype = arg_types[i] - mandatory = defaults is None or i < (len(args) - len(defaults)) - default = None - if not mandatory: - default = defaults[i - (len(args) - len(defaults))] - if datatype is wsme.types.HostRequest: - self.pass_request = argname - else: - self.arguments.append(FunctionArgument(argname, datatype, - mandatory, default)) - - -class signature(object): - - """Decorator that specify the argument types of an exposed function. - - :param return_type: Type of the value returned by the function - :param argN: Type of the Nth argument - :param body: If the function takes a final argument that is supposed to be - the request body by itself, its type. - :param status_code: HTTP return status code of the function. - :param ignore_extra_args: Allow extra/unknow arguments (default to False) - - Most of the time this decorator is not supposed to be used directly, - unless you are not using WSME on top of another framework. - - If an adapter is used, it will provide either a specialised version of this - decororator, either a new decorator named @wsexpose that takes the same - parameters (it will in addition expose the function, hence its name). - """ - - def __init__(self, *types, **options): - self.return_type = types[0] if types else None - self.arg_types = [] - if len(types) > 1: - self.arg_types.extend(types[1:]) - if 'body' in options: - self.arg_types.append(options['body']) - self.wrap = options.pop('wrap', False) - self.options = options - - def __call__(self, func): - argspec = getargspec(func) - if self.wrap: - func = wrapfunc(func) - fd = FunctionDefinition.get(func) - if fd.extra_options is not None: - raise ValueError("This function is already exposed") - fd.return_type = self.return_type - fd.set_options(**self.options) - if self.arg_types: - fd.set_arg_types(argspec, self.arg_types) - return func - - -sig = signature - - -class Response(object): - """ - Object to hold the "response" from a view function - """ - def __init__(self, obj, status_code=None, error=None, - return_type=wsme.types.Unset): - #: Store the result object from the view - self.obj = obj - - #: Store an optional status_code - self.status_code = status_code - - #: Return error details - #: Must be a dictionnary with the following keys: faultcode, - #: faultstring and an optional debuginfo - self.error = error - - #: Return type - #: Type of the value returned by the function - #: If the return type is wsme.types.Unset it will be ignored - #: and the default return type will prevail. - self.return_type = return_type - - -def format_exception(excinfo, debug=False): - """Extract informations that can be sent to the client.""" - error = excinfo[1] - code = getattr(error, 'code', None) - if code and utils.is_valid_code(code) and utils.is_client_error(code): - faultstring = (error.faultstring if hasattr(error, 'faultstring') - else six.text_type(error)) - r = dict(faultcode="Client", - faultstring=faultstring) - log.debug("Client-side error: %s" % r['faultstring']) - r['debuginfo'] = None - return r - else: - faultstring = six.text_type(error) - debuginfo = "\n".join(traceback.format_exception(*excinfo)) - - log.error('Server-side error: "%s". Detail: \n%s' % ( - faultstring, debuginfo)) - - r = dict(faultcode="Server", faultstring=faultstring) - if debug: - r['debuginfo'] = debuginfo - else: - r['debuginfo'] = None - return r diff --git a/wsme/exc.py b/wsme/exc.py deleted file mode 100644 index 81dcbf8..0000000 --- a/wsme/exc.py +++ /dev/null @@ -1,92 +0,0 @@ -import six - -from wsme.utils import _ - - -class ClientSideError(RuntimeError): - def __init__(self, msg=None, status_code=400): - self.msg = msg - self.code = status_code - super(ClientSideError, self).__init__(self.faultstring) - - @property - def faultstring(self): - if self.msg is None: - return str(self) - elif isinstance(self.msg, six.text_type): - return self.msg - else: - return six.u(self.msg) - - -class InvalidInput(ClientSideError): - def __init__(self, fieldname, value, msg=''): - self.fieldname = fieldname - self.value = value - super(InvalidInput, self).__init__(msg) - - @property - def faultstring(self): - return _(six.u( - "Invalid input for field/attribute %s. Value: '%s'. %s") - ) % (self.fieldname, self.value, self.msg) - - -class MissingArgument(ClientSideError): - def __init__(self, argname, msg=''): - self.argname = argname - super(MissingArgument, self).__init__(msg) - - @property - def faultstring(self): - return _(six.u('Missing argument: "%s"%s')) % ( - self.argname, self.msg and ": " + self.msg or "") - - -class UnknownArgument(ClientSideError): - def __init__(self, argname, msg=''): - self.argname = argname - super(UnknownArgument, self).__init__(msg) - - @property - def faultstring(self): - return _(six.u('Unknown argument: "%s"%s')) % ( - self.argname, self.msg and ": " + self.msg or "") - - -class UnknownFunction(ClientSideError): - def __init__(self, name): - self.name = name - super(UnknownFunction, self).__init__() - - @property - def faultstring(self): - return _(six.u("Unknown function name: %s")) % (self.name) - - -class UnknownAttribute(ClientSideError): - def __init__(self, fieldname, attributes, msg=''): - self.fieldname = fieldname - self.attributes = attributes - self.msg = msg - super(UnknownAttribute, self).__init__(self.msg) - - @property - def faultstring(self): - error = _("Unknown attribute for argument %(argn)s: %(attrs)s") - if len(self.attributes) > 1: - error = _("Unknown attributes for argument %(argn)s: %(attrs)s") - str_attrs = ", ".join(self.attributes) - return error % {'argn': self.fieldname, 'attrs': str_attrs} - - def add_fieldname(self, name): - """Add a fieldname to concatenate the full name. - - Add a fieldname so that the whole hierarchy is displayed. Successive - calls to this method will prepend ``name`` to the hierarchy of names. - """ - if self.fieldname is not None: - self.fieldname = "{}.{}".format(name, self.fieldname) - else: - self.fieldname = name - super(UnknownAttribute, self).__init__(self.msg) diff --git a/wsme/protocol.py b/wsme/protocol.py deleted file mode 100644 index b0107ab..0000000 --- a/wsme/protocol.py +++ /dev/null @@ -1,147 +0,0 @@ -import weakref - -import pkg_resources - -from wsme.exc import ClientSideError - - -__all__ = [ - 'CallContext', - - 'register_protocol', 'getprotocol', -] - -registered_protocols = {} - - -def _cfg(f): - cfg = getattr(f, '_cfg', None) - if cfg is None: - f._cfg = cfg = {} - return cfg - - -class expose(object): - def __init__(self, path, content_type): - self.path = path - self.content_type = content_type - - def __call__(self, func): - func.exposed = True - cfg = _cfg(func) - cfg['content-type'] = self.content_type - cfg.setdefault('paths', []).append(self.path) - return func - - -class CallContext(object): - def __init__(self, request): - self._request = weakref.ref(request) - self.path = None - - self.func = None - self.funcdef = None - - @property - def request(self): - return self._request() - - -class ObjectDict(object): - def __init__(self, obj): - self.obj = obj - - def __getitem__(self, name): - return getattr(self.obj, name) - - -class Protocol(object): - name = None - displayname = None - content_types = [] - - def resolve_path(self, path): - if '$' in path: - from string import Template - s = Template(path) - path = s.substitute(ObjectDict(self)) - return path - - def iter_routes(self): - for attrname in dir(self): - attr = getattr(self, attrname) - if getattr(attr, 'exposed', False): - for path in _cfg(attr)['paths']: - yield self.resolve_path(path), attr - - def accept(self, request): - return request.headers.get('Content-Type') in self.content_types - - def iter_calls(self, request): - pass - - def extract_path(self, context): - pass - - def read_arguments(self, context): - pass - - def encode_result(self, context, result): - pass - - def encode_sample_value(self, datatype, value, format=False): - return ('none', 'N/A') - - def encode_sample_params(self, params, format=False): - return ('none', 'N/A') - - def encode_sample_result(self, datatype, value, format=False): - return ('none', 'N/A') - - -def register_protocol(protocol): - registered_protocols[protocol.name] = protocol - - -def getprotocol(name, **options): - protocol_class = registered_protocols.get(name) - if protocol_class is None: - for entry_point in pkg_resources.iter_entry_points( - 'wsme.protocols', name): - if entry_point.name == name: - protocol_class = entry_point.load() - if protocol_class is None: - raise ValueError("Cannot find protocol '%s'" % name) - registered_protocols[name] = protocol_class - return protocol_class(**options) - - -def media_type_accept(request, content_types): - """Validate media types against request.method. - - When request.method is GET or HEAD compare with the Accept header. - When request.method is POST, PUT or PATCH compare with the Content-Type - header. - When request.method is DELETE media type is irrelevant, so return True. - """ - if request.method in ['GET', 'HEAD']: - if request.accept: - if request.accept.best_match(content_types): - return True - error_message = ('Unacceptable Accept type: %s not in %s' - % (request.accept, content_types)) - raise ClientSideError(error_message, status_code=406) - elif request.method in ['PUT', 'POST', 'PATCH']: - content_type = request.headers.get('Content-Type') - if content_type: - for ct in content_types: - if request.headers.get('Content-Type', '').startswith(ct): - return True - error_message = ('Unacceptable Content-Type: %s not in %s' - % (content_type, content_types)) - raise ClientSideError(error_message, status_code=415) - else: - raise ClientSideError('missing Content-Type header') - elif request.method in ['DELETE']: - return True - return False diff --git a/wsme/rest/__init__.py b/wsme/rest/__init__.py deleted file mode 100644 index f35d6a9..0000000 --- a/wsme/rest/__init__.py +++ /dev/null @@ -1,78 +0,0 @@ -import inspect -import wsme.api - -APIPATH_MAXLEN = 20 - - -class expose(object): - def __init__(self, *args, **kwargs): - self.signature = wsme.api.signature(*args, **kwargs) - - def __call__(self, func): - return self.signature(func) - - @classmethod - def with_method(cls, method, *args, **kwargs): - kwargs['method'] = method - return cls(*args, **kwargs) - - @classmethod - def get(cls, *args, **kwargs): - return cls.with_method('GET', *args, **kwargs) - - @classmethod - def post(cls, *args, **kwargs): - return cls.with_method('POST', *args, **kwargs) - - @classmethod - def put(cls, *args, **kwargs): - return cls.with_method('PUT', *args, **kwargs) - - @classmethod - def delete(cls, *args, **kwargs): - return cls.with_method('DELETE', *args, **kwargs) - - -class validate(object): - """ - Decorator that define the arguments types of a function. - - - Example:: - - class MyController(object): - @expose(str) - @validate(datetime.date, datetime.time) - def format(self, d, t): - return d.isoformat() + ' ' + t.isoformat() - """ - def __init__(self, *param_types): - self.param_types = param_types - - def __call__(self, func): - argspec = wsme.api.getargspec(func) - fd = wsme.api.FunctionDefinition.get(func) - fd.set_arg_types(argspec, self.param_types) - return func - - -def scan_api(controller, path=[], objects=[]): - """ - Recursively iterate a controller api entries. - """ - for name in dir(controller): - if name.startswith('_'): - continue - a = getattr(controller, name) - if a in objects: - continue - if inspect.ismethod(a): - if wsme.api.iswsmefunction(a): - yield path + [name], a, [] - elif inspect.isclass(a): - continue - else: - if len(path) > APIPATH_MAXLEN: - raise ValueError("Path is too long: " + str(path)) - for i in scan_api(a, path + [name], objects + [a]): - yield i diff --git a/wsme/rest/args.py b/wsme/rest/args.py deleted file mode 100644 index 9dc16c7..0000000 --- a/wsme/rest/args.py +++ /dev/null @@ -1,310 +0,0 @@ -import cgi -import datetime -import re - -from simplegeneric import generic - -from wsme.exc import ClientSideError, UnknownArgument, InvalidInput - -from wsme.types import iscomplex, list_attributes, Unset -from wsme.types import UserType, ArrayType, DictType, File -from wsme.utils import parse_isodate, parse_isotime, parse_isodatetime -import wsme.runtime - -from six import moves - -ARRAY_MAX_SIZE = 1000 - - -@generic -def from_param(datatype, value): - return datatype(value) if value is not None else None - - -@from_param.when_object(datetime.date) -def date_from_param(datatype, value): - return parse_isodate(value) if value else None - - -@from_param.when_object(datetime.time) -def time_from_param(datatype, value): - return parse_isotime(value) if value else None - - -@from_param.when_object(datetime.datetime) -def datetime_from_param(datatype, value): - return parse_isodatetime(value) if value else None - - -@from_param.when_object(File) -def filetype_from_param(datatype, value): - if isinstance(value, cgi.FieldStorage): - return File(fieldstorage=value) - return File(content=value) - - -@from_param.when_type(UserType) -def usertype_from_param(datatype, value): - return datatype.frombasetype( - from_param(datatype.basetype, value)) - - -@from_param.when_type(ArrayType) -def array_from_param(datatype, value): - if value is None: - return value - return [ - from_param(datatype.item_type, item) - for item in value - ] - - -@generic -def from_params(datatype, params, path, hit_paths): - if iscomplex(datatype) and datatype is not File: - objfound = False - for key in params: - if key.startswith(path + '.'): - objfound = True - break - if objfound: - r = datatype() - for attrdef in list_attributes(datatype): - value = from_params( - attrdef.datatype, - params, '%s.%s' % (path, attrdef.key), hit_paths - ) - if value is not Unset: - setattr(r, attrdef.key, value) - return r - else: - if path in params: - hit_paths.add(path) - return from_param(datatype, params[path]) - return Unset - - -@from_params.when_type(ArrayType) -def array_from_params(datatype, params, path, hit_paths): - if hasattr(params, 'getall'): - # webob multidict - def getall(params, path): - return params.getall(path) - elif hasattr(params, 'getlist'): - # werkzeug multidict - def getall(params, path): # noqa - return params.getlist(path) - if path in params: - hit_paths.add(path) - return [ - from_param(datatype.item_type, value) - for value in getall(params, path)] - - if iscomplex(datatype.item_type): - attributes = set() - r = re.compile(r'^%s\.(?P<attrname>[^\.])' % re.escape(path)) - for p in params.keys(): - m = r.match(p) - if m: - attributes.add(m.group('attrname')) - if attributes: - value = [] - for attrdef in list_attributes(datatype.item_type): - attrpath = '%s.%s' % (path, attrdef.key) - hit_paths.add(attrpath) - attrvalues = getall(params, attrpath) - if len(value) < len(attrvalues): - value[-1:] = [ - datatype.item_type() - for i in moves.range(len(attrvalues) - len(value)) - ] - for i, attrvalue in enumerate(attrvalues): - setattr( - value[i], - attrdef.key, - from_param(attrdef.datatype, attrvalue) - ) - return value - - indexes = set() - r = re.compile(r'^%s\[(?P<index>\d+)\]' % re.escape(path)) - - for p in params.keys(): - m = r.match(p) - if m: - indexes.add(int(m.group('index'))) - - if not indexes: - return Unset - - indexes = list(indexes) - indexes.sort() - - return [from_params(datatype.item_type, params, - '%s[%s]' % (path, index), hit_paths) - for index in indexes] - - -@from_params.when_type(DictType) -def dict_from_params(datatype, params, path, hit_paths): - - keys = set() - r = re.compile(r'^%s\[(?P<key>[a-zA-Z0-9_\.]+)\]' % re.escape(path)) - - for p in params.keys(): - m = r.match(p) - if m: - keys.add(from_param(datatype.key_type, m.group('key'))) - - if not keys: - return Unset - - return dict(( - (key, from_params(datatype.value_type, - params, '%s[%s]' % (path, key), hit_paths)) - for key in keys)) - - -@from_params.when_type(UserType) -def usertype_from_params(datatype, params, path, hit_paths): - value = from_params(datatype.basetype, params, path, hit_paths) - if value is not Unset: - return datatype.frombasetype(value) - return Unset - - -def args_from_args(funcdef, args, kwargs): - newargs = [] - for argdef, arg in zip(funcdef.arguments[:len(args)], args): - try: - newargs.append(from_param(argdef.datatype, arg)) - except Exception as e: - if isinstance(argdef.datatype, UserType): - datatype_name = argdef.datatype.name - elif isinstance(argdef.datatype, type): - datatype_name = argdef.datatype.__name__ - else: - datatype_name = argdef.datatype.__class__.__name__ - raise InvalidInput( - argdef.name, - arg, - "unable to convert to %(datatype)s. Error: %(error)s" % { - 'datatype': datatype_name, 'error': e}) - newkwargs = {} - for argname, value in kwargs.items(): - newkwargs[argname] = from_param( - funcdef.get_arg(argname).datatype, value - ) - return newargs, newkwargs - - -def args_from_params(funcdef, params): - kw = {} - hit_paths = set() - for argdef in funcdef.arguments: - value = from_params( - argdef.datatype, params, argdef.name, hit_paths) - if value is not Unset: - kw[argdef.name] = value - paths = set(params.keys()) - unknown_paths = paths - hit_paths - if '__body__' in unknown_paths: - unknown_paths.remove('__body__') - if not funcdef.ignore_extra_args and unknown_paths: - raise UnknownArgument(', '.join(unknown_paths)) - return [], kw - - -def args_from_body(funcdef, body, mimetype): - from wsme.rest import json as restjson - from wsme.rest import xml as restxml - - if funcdef.body_type is not None: - datatypes = {funcdef.arguments[-1].name: funcdef.body_type} - else: - datatypes = dict(((a.name, a.datatype) for a in funcdef.arguments)) - - if not body: - return (), {} - if mimetype == "application/x-www-form-urlencoded": - # the parameters should have been parsed in params - return (), {} - elif mimetype in restjson.accept_content_types: - dataformat = restjson - elif mimetype in restxml.accept_content_types: - dataformat = restxml - else: - raise ClientSideError("Unknown mimetype: %s" % mimetype, - status_code=415) - - try: - kw = dataformat.parse( - body, datatypes, bodyarg=funcdef.body_type is not None - ) - except UnknownArgument: - if not funcdef.ignore_extra_args: - raise - kw = {} - - return (), kw - - -def combine_args(funcdef, akw, allow_override=False): - newargs, newkwargs = [], {} - for args, kwargs in akw: - for i, arg in enumerate(args): - n = funcdef.arguments[i].name - if not allow_override and n in newkwargs: - raise ClientSideError( - "Parameter %s was given several times" % n) - newkwargs[n] = arg - for name, value in kwargs.items(): - n = str(name) - if not allow_override and n in newkwargs: - raise ClientSideError( - "Parameter %s was given several times" % n) - newkwargs[n] = value - return newargs, newkwargs - - -def get_args(funcdef, args, kwargs, params, form, body, mimetype): - """Combine arguments from : - * the host framework args and kwargs - * the request params - * the request body - - Note that the host framework args and kwargs can be overridden - by arguments from params of body - """ - # get the body from params if not given directly - if not body and '__body__' in params: - body = params['__body__'] - - # extract args from the host args and kwargs - from_args = args_from_args(funcdef, args, kwargs) - - # extract args from the request parameters - from_params = args_from_params(funcdef, params) - - # extract args from the form parameters - if form: - from_form_params = args_from_params(funcdef, form) - else: - from_form_params = (), {} - - # extract args from the request body - from_body = args_from_body(funcdef, body, mimetype) - - # combine params and body arguments - from_params_and_body = combine_args( - funcdef, - (from_params, from_form_params, from_body) - ) - - args, kwargs = combine_args( - funcdef, - (from_args, from_params_and_body), - allow_override=True - ) - wsme.runtime.check_arguments(funcdef, args, kwargs) - return args, kwargs diff --git a/wsme/rest/json.py b/wsme/rest/json.py deleted file mode 100644 index 48bd082..0000000 --- a/wsme/rest/json.py +++ /dev/null @@ -1,328 +0,0 @@ -"""REST+Json protocol implementation.""" -from __future__ import absolute_import -import datetime -import decimal - -import six - -from simplegeneric import generic - -import wsme.exc -import wsme.types -from wsme.types import Unset -import wsme.utils - - -try: - import simplejson as json -except ImportError: - import json # noqa - - -content_type = 'application/json' -accept_content_types = [ - content_type, - 'text/javascript', - 'application/javascript' -] -ENUM_TRUE = ('true', 't', 'yes', 'y', 'on', '1') -ENUM_FALSE = ('false', 'f', 'no', 'n', 'off', '0') - - -@generic -def tojson(datatype, value): - """ - A generic converter from python to jsonify-able datatypes. - - If a non-complex user specific type is to be used in the api, - a specific tojson should be added:: - - from wsme.protocol.restjson import tojson - - myspecialtype = object() - - @tojson.when_object(myspecialtype) - def myspecialtype_tojson(datatype, value): - return str(value) - """ - if value is None: - return None - if wsme.types.iscomplex(datatype): - d = dict() - for attr in wsme.types.list_attributes(datatype): - attr_value = getattr(value, attr.key) - if attr_value is not Unset: - d[attr.name] = tojson(attr.datatype, attr_value) - return d - elif wsme.types.isusertype(datatype): - return tojson(datatype.basetype, datatype.tobasetype(value)) - return value - - -@tojson.when_object(wsme.types.bytes) -def bytes_tojson(datatype, value): - if value is None: - return None - return value.decode('ascii') - - -@tojson.when_type(wsme.types.ArrayType) -def array_tojson(datatype, value): - if value is None: - return None - return [tojson(datatype.item_type, item) for item in value] - - -@tojson.when_type(wsme.types.DictType) -def dict_tojson(datatype, value): - if value is None: - return None - return dict(( - (tojson(datatype.key_type, item[0]), - tojson(datatype.value_type, item[1])) - for item in value.items() - )) - - -@tojson.when_object(decimal.Decimal) -def decimal_tojson(datatype, value): - if value is None: - return None - return str(value) - - -@tojson.when_object(datetime.date) -def date_tojson(datatype, value): - if value is None: - return None - return value.isoformat() - - -@tojson.when_object(datetime.time) -def time_tojson(datatype, value): - if value is None: - return None - return value.isoformat() - - -@tojson.when_object(datetime.datetime) -def datetime_tojson(datatype, value): - if value is None: - return None - return value.isoformat() - - -@generic -def fromjson(datatype, value): - """A generic converter from json base types to python datatype. - - If a non-complex user specific type is to be used in the api, - a specific fromjson should be added:: - - from wsme.protocol.restjson import fromjson - - class MySpecialType(object): - pass - - @fromjson.when_object(MySpecialType) - def myspecialtype_fromjson(datatype, value): - return MySpecialType(value) - """ - if value is None: - return None - if wsme.types.iscomplex(datatype): - obj = datatype() - attributes = wsme.types.list_attributes(datatype) - - # Here we check that all the attributes in the value are also defined - # in our type definition, otherwise we raise an Error. - v_keys = set(value.keys()) - a_keys = set(adef.name for adef in attributes) - if not v_keys <= a_keys: - raise wsme.exc.UnknownAttribute(None, v_keys - a_keys) - - for attrdef in attributes: - if attrdef.name in value: - try: - val_fromjson = fromjson(attrdef.datatype, - value[attrdef.name]) - except wsme.exc.UnknownAttribute as e: - e.add_fieldname(attrdef.name) - raise - if getattr(attrdef, 'readonly', False): - raise wsme.exc.InvalidInput(attrdef.name, val_fromjson, - "Cannot set read only field.") - setattr(obj, attrdef.key, val_fromjson) - elif attrdef.mandatory: - raise wsme.exc.InvalidInput(attrdef.name, None, - "Mandatory field missing.") - - return wsme.types.validate_value(datatype, obj) - elif wsme.types.isusertype(datatype): - value = datatype.frombasetype( - fromjson(datatype.basetype, value)) - return value - - -@fromjson.when_type(wsme.types.ArrayType) -def array_fromjson(datatype, value): - if value is None: - return None - if not isinstance(value, list): - raise ValueError("Value not a valid list: %s" % value) - return [fromjson(datatype.item_type, item) for item in value] - - -@fromjson.when_type(wsme.types.DictType) -def dict_fromjson(datatype, value): - if value is None: - return None - if not isinstance(value, dict): - raise ValueError("Value not a valid dict: %s" % value) - return dict(( - (fromjson(datatype.key_type, item[0]), - fromjson(datatype.value_type, item[1])) - for item in value.items())) - - -@fromjson.when_object(six.binary_type) -def str_fromjson(datatype, value): - if (isinstance(value, six.string_types) or - isinstance(value, six.integer_types) or - isinstance(value, float)): - return six.text_type(value).encode('utf8') - - -@fromjson.when_object(wsme.types.text) -def text_fromjson(datatype, value): - if value is not None and isinstance(value, wsme.types.bytes): - return wsme.types.text(value) - return value - - -@fromjson.when_object(*six.integer_types + (float,)) -def numeric_fromjson(datatype, value): - """Convert string object to built-in types int, long or float.""" - if value is None: - return None - return datatype(value) - - -@fromjson.when_object(bool) -def bool_fromjson(datatype, value): - """Convert to bool, restricting strings to just unambiguous values.""" - if value is None: - return None - if isinstance(value, six.integer_types + (bool,)): - return bool(value) - if value in ENUM_TRUE: - return True - if value in ENUM_FALSE: - return False - raise ValueError("Value not an unambiguous boolean: %s" % value) - - -@fromjson.when_object(decimal.Decimal) -def decimal_fromjson(datatype, value): - if value is None: - return None - return decimal.Decimal(value) - - -@fromjson.when_object(datetime.date) -def date_fromjson(datatype, value): - if value is None: - return None - return wsme.utils.parse_isodate(value) - - -@fromjson.when_object(datetime.time) -def time_fromjson(datatype, value): - if value is None: - return None - return wsme.utils.parse_isotime(value) - - -@fromjson.when_object(datetime.datetime) -def datetime_fromjson(datatype, value): - if value is None: - return None - return wsme.utils.parse_isodatetime(value) - - -def parse(s, datatypes, bodyarg, encoding='utf8'): - jload = json.load - if not hasattr(s, 'read'): - if six.PY3 and isinstance(s, six.binary_type): - s = s.decode(encoding) - jload = json.loads - try: - jdata = jload(s) - except ValueError: - raise wsme.exc.ClientSideError("Request is not in valid JSON format") - if bodyarg: - argname = list(datatypes.keys())[0] - try: - kw = {argname: fromjson(datatypes[argname], jdata)} - except ValueError as e: - raise wsme.exc.InvalidInput(argname, jdata, e.args[0]) - except wsme.exc.UnknownAttribute as e: - # We only know the fieldname at this level, not in the - # called function. We fill in this information here. - e.add_fieldname(argname) - raise - else: - kw = {} - extra_args = [] - if not isinstance(jdata, dict): - raise wsme.exc.ClientSideError("Request must be a JSON dict") - for key in jdata: - if key not in datatypes: - extra_args.append(key) - else: - try: - kw[key] = fromjson(datatypes[key], jdata[key]) - except ValueError as e: - raise wsme.exc.InvalidInput(key, jdata[key], e.args[0]) - except wsme.exc.UnknownAttribute as e: - # We only know the fieldname at this level, not in the - # called function. We fill in this information here. - e.add_fieldname(key) - raise - if extra_args: - raise wsme.exc.UnknownArgument(', '.join(extra_args)) - return kw - - -def encode_result(value, datatype, **options): - jsondata = tojson(datatype, value) - if options.get('nest_result', False): - jsondata = {options.get('nested_result_attrname', 'result'): jsondata} - return json.dumps(jsondata) - - -def encode_error(context, errordetail): - return json.dumps(errordetail) - - -def encode_sample_value(datatype, value, format=False): - r = tojson(datatype, value) - content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0, - sort_keys=format) - return ('javascript', content) - - -def encode_sample_params(params, format=False): - kw = {} - for name, datatype, value in params: - kw[name] = tojson(datatype, value) - content = json.dumps(kw, ensure_ascii=False, indent=4 if format else 0, - sort_keys=format) - return ('javascript', content) - - -def encode_sample_result(datatype, value, format=False): - r = tojson(datatype, value) - content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0, - sort_keys=format) - return ('javascript', content) diff --git a/wsme/rest/protocol.py b/wsme/rest/protocol.py deleted file mode 100644 index 5201ccf..0000000 --- a/wsme/rest/protocol.py +++ /dev/null @@ -1,133 +0,0 @@ -import os.path -import logging - -from wsme.utils import OrderedDict -from wsme.protocol import CallContext, Protocol, media_type_accept - -import wsme.rest -import wsme.rest.args -import wsme.runtime - -log = logging.getLogger(__name__) - - -class RestProtocol(Protocol): - name = 'rest' - displayname = 'REST' - dataformats = ['json', 'xml'] - content_types = ['application/json', 'text/xml'] - - def __init__(self, dataformats=None): - if dataformats is None: - dataformats = RestProtocol.dataformats - - self.dataformats = OrderedDict() - self.content_types = [] - - for dataformat in dataformats: - __import__('wsme.rest.' + dataformat) - dfmod = getattr(wsme.rest, dataformat) - self.dataformats[dataformat] = dfmod - self.content_types.extend(dfmod.accept_content_types) - - def accept(self, request): - for dataformat in self.dataformats: - if request.path.endswith('.' + dataformat): - return True - return media_type_accept(request, self.content_types) - - def iter_calls(self, request): - context = CallContext(request) - context.outformat = None - ext = os.path.splitext(request.path.split('/')[-1])[1] - inmime = request.content_type - outmime = request.accept.best_match(self.content_types) - - outformat = None - informat = None - for dfname, df in self.dataformats.items(): - if ext == '.' + dfname: - outformat = df - if not inmime: - informat = df - - if outformat is None and request.accept: - for dfname, df in self.dataformats.items(): - if outmime in df.accept_content_types: - outformat = df - if not inmime: - informat = df - - if outformat is None: - for dfname, df in self.dataformats.items(): - if inmime == df.content_type: - outformat = df - - context.outformat = outformat - context.outformat_options = { - 'nest_result': getattr(self, 'nest_result', False) - } - if not inmime and informat: - inmime = informat.content_type - log.debug("Inferred input type: %s" % inmime) - context.inmime = inmime - yield context - - def extract_path(self, context): - path = context.request.path - assert path.startswith(self.root._webpath) - path = path[len(self.root._webpath):] - path = path.strip('/').split('/') - - for dataformat in self.dataformats: - if path[-1].endswith('.' + dataformat): - path[-1] = path[-1][:-len(dataformat) - 1] - - # Check if the path is actually a function, and if not - # see if the http method make a difference - # TODO Re-think the function lookup phases. Here we are - # doing the job that will be done in a later phase, which - # is sub-optimal - for p, fdef in self.root.getapi(): - if p == path: - return path - - # No function at this path. Now check for function that have - # this path as a prefix, and declared an http method - for p, fdef in self.root.getapi(): - if len(p) == len(path) + 1 and p[:len(path)] == path and \ - fdef.extra_options.get('method') == context.request.method: - return p - - return path - - def read_arguments(self, context): - request = context.request - funcdef = context.funcdef - - body = None - if request.content_length not in (None, 0, '0'): - body = request.body - if not body and '__body__' in request.params: - body = request.params['__body__'] - - args, kwargs = wsme.rest.args.combine_args( - funcdef, - (wsme.rest.args.args_from_params(funcdef, request.params), - wsme.rest.args.args_from_body(funcdef, body, context.inmime)) - ) - wsme.runtime.check_arguments(funcdef, args, kwargs) - return kwargs - - def encode_result(self, context, result): - out = context.outformat.encode_result( - result, context.funcdef.return_type, - **context.outformat_options - ) - return out - - def encode_error(self, context, errordetail): - out = context.outformat.encode_error( - context, errordetail - ) - return out diff --git a/wsme/rest/xml.py b/wsme/rest/xml.py deleted file mode 100644 index 286afa7..0000000 --- a/wsme/rest/xml.py +++ /dev/null @@ -1,298 +0,0 @@ -from __future__ import absolute_import - -import datetime - -import six - -import xml.etree.ElementTree as et - -from simplegeneric import generic - -import wsme.types -from wsme.exc import UnknownArgument, InvalidInput - -import re - -content_type = 'text/xml' -accept_content_types = [ - content_type, -] - -time_re = re.compile(r'(?P<h>[0-2][0-9]):(?P<m>[0-5][0-9]):(?P<s>[0-6][0-9])') - - -def xml_indent(elem, level=0): - i = "\n" + level * " " - if len(elem): - if not elem.text or not elem.text.strip(): - elem.text = i + " " - for e in elem: - xml_indent(e, level + 1) - if not e.tail or not e.tail.strip(): - e.tail = i - if level and (not elem.tail or not elem.tail.strip()): - elem.tail = i - - -@generic -def toxml(datatype, key, value): - """ - A generic converter from python to xml elements. - - If a non-complex user specific type is to be used in the api, - a specific toxml should be added:: - - from wsme.protocol.restxml import toxml - - myspecialtype = object() - - @toxml.when_object(myspecialtype) - def myspecialtype_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - el.text = str(value) - return el - """ - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - if wsme.types.isusertype(datatype): - return toxml(datatype.basetype, - key, datatype.tobasetype(value)) - elif wsme.types.iscomplex(datatype): - for attrdef in datatype._wsme_attributes: - attrvalue = getattr(value, attrdef.key) - if attrvalue is not wsme.types.Unset: - el.append(toxml(attrdef.datatype, attrdef.name, - attrvalue)) - else: - el.text = six.text_type(value) - return el - - -@generic -def fromxml(datatype, element): - """ - A generic converter from xml elements to python datatype. - - If a non-complex user specific type is to be used in the api, - a specific fromxml should be added:: - - from wsme.protocol.restxml import fromxml - - class MySpecialType(object): - pass - - @fromxml.when_object(MySpecialType) - def myspecialtype_fromxml(datatype, element): - if element.get('nil', False): - return None - return MySpecialType(element.text) - """ - if element.get('nil', False): - return None - if wsme.types.isusertype(datatype): - return datatype.frombasetype(fromxml(datatype.basetype, element)) - if wsme.types.iscomplex(datatype): - obj = datatype() - for attrdef in wsme.types.list_attributes(datatype): - sub = element.find(attrdef.name) - if sub is not None: - val_fromxml = fromxml(attrdef.datatype, sub) - if getattr(attrdef, 'readonly', False): - raise InvalidInput(attrdef.name, val_fromxml, - "Cannot set read only field.") - setattr(obj, attrdef.key, val_fromxml) - elif attrdef.mandatory: - raise InvalidInput(attrdef.name, None, - "Mandatory field missing.") - return wsme.types.validate_value(datatype, obj) - if datatype is wsme.types.bytes: - return element.text.encode('ascii') - return datatype(element.text) - - -@toxml.when_type(wsme.types.ArrayType) -def array_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - for item in value: - el.append(toxml(datatype.item_type, 'item', item)) - return el - - -@toxml.when_type(wsme.types.DictType) -def dict_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - for item in value.items(): - key = toxml(datatype.key_type, 'key', item[0]) - value = toxml(datatype.value_type, 'value', item[1]) - node = et.Element('item') - node.append(key) - node.append(value) - el.append(node) - return el - - -@toxml.when_object(wsme.types.bytes) -def bytes_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - el.text = value.decode('ascii') - return el - - -@toxml.when_object(bool) -def bool_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - el.text = value and 'true' or 'false' - return el - - -@toxml.when_object(datetime.date) -def date_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - el.text = value.isoformat() - return el - - -@toxml.when_object(datetime.datetime) -def datetime_toxml(datatype, key, value): - el = et.Element(key) - if value is None: - el.set('nil', 'true') - else: - el.text = value.isoformat() - return el - - -@fromxml.when_type(wsme.types.ArrayType) -def array_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return [ - fromxml(datatype.item_type, item) - for item in element.findall('item') - ] - - -@fromxml.when_object(bool) -def bool_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return element.text.lower() != 'false' - - -@fromxml.when_type(wsme.types.DictType) -def dict_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return dict(( - (fromxml(datatype.key_type, item.find('key')), - fromxml(datatype.value_type, item.find('value'))) - for item in element.findall('item'))) - - -@fromxml.when_object(wsme.types.text) -def unicode_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return wsme.types.text(element.text) if element.text else six.u('') - - -@fromxml.when_object(datetime.date) -def date_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return wsme.utils.parse_isodate(element.text) - - -@fromxml.when_object(datetime.time) -def time_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return wsme.utils.parse_isotime(element.text) - - -@fromxml.when_object(datetime.datetime) -def datetime_fromxml(datatype, element): - if element.get('nil') == 'true': - return None - return wsme.utils.parse_isodatetime(element.text) - - -def parse(s, datatypes, bodyarg): - if hasattr(s, 'read'): - tree = et.parse(s) - else: - tree = et.fromstring(s) - if bodyarg: - name = list(datatypes.keys())[0] - return {name: fromxml(datatypes[name], tree)} - else: - kw = {} - extra_args = [] - for sub in tree: - if sub.tag not in datatypes: - extra_args.append(sub.tag) - kw[sub.tag] = fromxml(datatypes[sub.tag], sub) - if extra_args: - raise UnknownArgument(', '.join(extra_args)) - return kw - - -def encode_result(value, datatype, **options): - return et.tostring(toxml( - datatype, options.get('nested_result_attrname', 'result'), value - )) - - -def encode_error(context, errordetail): - el = et.Element('error') - et.SubElement(el, 'faultcode').text = errordetail['faultcode'] - et.SubElement(el, 'faultstring').text = errordetail['faultstring'] - if 'debuginfo' in errordetail: - et.SubElement(el, 'debuginfo').text = errordetail['debuginfo'] - return et.tostring(el) - - -def encode_sample_value(datatype, value, format=False): - r = toxml(datatype, 'value', value) - if format: - xml_indent(r) - content = et.tostring(r) - return ('xml', content) - - -def encode_sample_params(params, format=False): - node = et.Element('parameters') - for name, datatype, value in params: - node.append(toxml(datatype, name, value)) - if format: - xml_indent(node) - content = et.tostring(node) - return ('xml', content) - - -def encode_sample_result(datatype, value, format=False): - r = toxml(datatype, 'result', value) - if format: - xml_indent(r) - content = et.tostring(r) - return ('xml', content) diff --git a/wsme/root.py b/wsme/root.py deleted file mode 100644 index e71f1f7..0000000 --- a/wsme/root.py +++ /dev/null @@ -1,372 +0,0 @@ -import logging -import sys -import weakref - -from six import u, b -import six - -import webob - -from wsme.exc import ClientSideError, UnknownFunction -from wsme.protocol import getprotocol -from wsme.rest import scan_api -from wsme import spore -import wsme.api -import wsme.types - -log = logging.getLogger(__name__) - -html_body = u(""" -<html> -<head> - <style type='text/css'> - %(css)s - </style> -</head> -<body> -%(content)s -</body> -</html> -""") - - -def default_prepare_response_body(request, results): - r = None - sep = None - for value in results: - if sep is None: - if isinstance(value, six.text_type): - sep = u('\n') - r = u('') - else: - sep = b('\n') - r = b('') - else: - r += sep - r += value - return r - - -class DummyTransaction: - def commit(self): - pass - - def abort(self): - pass - - -class WSRoot(object): - """ - Root controller for webservices. - - :param protocols: A list of protocols to enable (see :meth:`addprotocol`) - :param webpath: The web path where the webservice is published. - - :type transaction: A `transaction - <http://pypi.python.org/pypi/transaction>`_-like - object or ``True``. - :param transaction: If specified, a transaction will be created and - handled on a per-call base. - - This option *can* be enabled along with `repoze.tm2 - <http://pypi.python.org/pypi/repoze.tm2>`_ - (it will only make it void). - - If ``True``, the default :mod:`transaction` - module will be imported and used. - - """ - __registry__ = wsme.types.registry - - def __init__(self, protocols=[], webpath='', transaction=None, - scan_api=scan_api): - self._debug = True - self._webpath = webpath - self.protocols = [] - self._scan_api = scan_api - - self._transaction = transaction - if self._transaction is True: - import transaction - self._transaction = transaction - - for protocol in protocols: - self.addprotocol(protocol) - - self._api = None - - def wsgiapp(self): - """Returns a wsgi application""" - from webob.dec import wsgify - return wsgify(self._handle_request) - - def begin(self): - if self._transaction: - return self._transaction.begin() - else: - return DummyTransaction() - - def addprotocol(self, protocol, **options): - """ - Enable a new protocol on the controller. - - :param protocol: A registered protocol name or an instance - of a protocol. - """ - if isinstance(protocol, str): - protocol = getprotocol(protocol, **options) - self.protocols.append(protocol) - protocol.root = weakref.proxy(self) - - def getapi(self): - """ - Returns the api description. - - :rtype: list of (path, :class:`FunctionDefinition`) - """ - if self._api is None: - self._api = [ - (path, f, f._wsme_definition, args) - for path, f, args in self._scan_api(self) - ] - for path, f, fdef, args in self._api: - fdef.resolve_types(self.__registry__) - return [ - (path, fdef) - for path, f, fdef, args in self._api - ] - - def _get_protocol(self, name): - for protocol in self.protocols: - if protocol.name == name: - return protocol - - def _select_protocol(self, request): - log.debug("Selecting a protocol for the following request :\n" - "headers: %s\nbody: %s", request.headers.items(), - request.content_length and ( - request.content_length > 512 and - request.body[:512] or - request.body) or '') - protocol = None - error = ClientSideError(status_code=406) - path = str(request.path) - assert path.startswith(self._webpath) - path = path[len(self._webpath) + 1:] - if 'wsmeproto' in request.params: - return self._get_protocol(request.params['wsmeproto']) - else: - - for p in self.protocols: - try: - if p.accept(request): - protocol = p - break - except ClientSideError as e: - error = e - # If we could not select a protocol, we raise the last exception - # that we got, or the default one. - if not protocol: - raise error - return protocol - - def _do_call(self, protocol, context): - request = context.request - request.calls.append(context) - try: - if context.path is None: - context.path = protocol.extract_path(context) - - if context.path is None: - raise ClientSideError(u( - 'The %s protocol was unable to extract a function ' - 'path from the request') % protocol.name) - - context.func, context.funcdef, args = \ - self._lookup_function(context.path) - kw = protocol.read_arguments(context) - args = list(args) - - txn = self.begin() - try: - result = context.func(*args, **kw) - txn.commit() - except Exception: - txn.abort() - raise - - else: - # TODO make sure result type == a._wsme_definition.return_type - return protocol.encode_result(context, result) - - except Exception as e: - infos = wsme.api.format_exception(sys.exc_info(), self._debug) - if isinstance(e, ClientSideError): - request.client_errorcount += 1 - request.client_last_status_code = e.code - else: - request.server_errorcount += 1 - return protocol.encode_error(context, infos) - - def find_route(self, path): - for p in self.protocols: - for routepath, func in p.iter_routes(): - if path.startswith(routepath): - return routepath, func - return None, None - - def _handle_request(self, request): - res = webob.Response() - res_content_type = None - - path = request.path - if path.startswith(self._webpath): - path = path[len(self._webpath):] - routepath, func = self.find_route(path) - if routepath: - content = func() - if isinstance(content, six.text_type): - res.text = content - elif isinstance(content, six.binary_type): - res.body = content - res.content_type = func._cfg['content-type'] - return res - - if request.path == self._webpath + '/api.spore': - res.body = spore.getdesc(self, request.host_url) - res.content_type = 'application/json' - return res - - try: - msg = None - error_status = 500 - protocol = self._select_protocol(request) - except ClientSideError as e: - error_status = e.code - msg = e.faultstring - protocol = None - except Exception as e: - msg = ("Unexpected error while selecting protocol: %s" % str(e)) - log.exception(msg) - protocol = None - error_status = 500 - - if protocol is None: - if not msg: - msg = ("None of the following protocols can handle this " - "request : %s" % ','.join(( - p.name for p in self.protocols))) - res.status = error_status - res.content_type = 'text/plain' - try: - res.text = u(msg) - except TypeError: - res.text = msg - log.error(msg) - return res - - request.calls = [] - request.client_errorcount = 0 - request.client_last_status_code = None - request.server_errorcount = 0 - - try: - - context = None - - if hasattr(protocol, 'prepare_response_body'): - prepare_response_body = protocol.prepare_response_body - else: - prepare_response_body = default_prepare_response_body - - body = prepare_response_body(request, ( - self._do_call(protocol, context) - for context in protocol.iter_calls(request))) - - if isinstance(body, six.text_type): - res.text = body - else: - res.body = body - - if len(request.calls) == 1: - if hasattr(protocol, 'get_response_status'): - res.status = protocol.get_response_status(request) - else: - if request.client_errorcount == 1: - res.status = request.client_last_status_code - elif request.client_errorcount: - res.status = 400 - elif request.server_errorcount: - res.status = 500 - else: - res.status = 200 - else: - res.status = protocol.get_response_status(request) - res_content_type = protocol.get_response_contenttype(request) - except ClientSideError as e: - request.server_errorcount += 1 - res.status = e.code - res.text = e.faultstring - except Exception: - infos = wsme.api.format_exception(sys.exc_info(), self._debug) - request.server_errorcount += 1 - res.text = protocol.encode_error(context, infos) - res.status = 500 - - if res_content_type is None: - # Attempt to correctly guess what content-type we should return. - ctypes = [ct for ct in protocol.content_types if ct] - if ctypes: - res_content_type = request.accept.best_match(ctypes) - - # If not we will attempt to convert the body to an accepted - # output format. - if res_content_type is None: - if "text/html" in request.accept: - res.text = self._html_format(res.body, protocol.content_types) - res_content_type = "text/html" - - # TODO should we consider the encoding asked by - # the web browser ? - res.headers['Content-Type'] = "%s; charset=UTF-8" % res_content_type - - return res - - def _lookup_function(self, path): - if not self._api: - self.getapi() - - for fpath, f, fdef, args in self._api: - if path == fpath: - return f, fdef, args - raise UnknownFunction('/'.join(path)) - - def _html_format(self, content, content_types): - try: - from pygments import highlight - from pygments.lexers import get_lexer_for_mimetype - from pygments.formatters import HtmlFormatter - - lexer = None - for ct in content_types: - try: - lexer = get_lexer_for_mimetype(ct) - break - except Exception: - pass - - if lexer is None: - raise ValueError("No lexer found") - formatter = HtmlFormatter() - return html_body % dict( - css=formatter.get_style_defs(), - content=highlight(content, lexer, formatter).encode('utf8')) - except Exception as e: - log.warning( - "Could not pygment the content because of the following " - "error :\n%s" % e) - return html_body % dict( - css='', - content=u('<pre>%s</pre>') % - content.replace(b('>'), b('>')) - .replace(b('<'), b('<'))) diff --git a/wsme/runtime.py b/wsme/runtime.py deleted file mode 100644 index e114d13..0000000 --- a/wsme/runtime.py +++ /dev/null @@ -1,9 +0,0 @@ -from wsme.exc import MissingArgument - - -def check_arguments(funcdef, args, kw): - """Check if some arguments are missing""" - assert len(args) == 0 - for arg in funcdef.arguments: - if arg.mandatory and arg.name not in kw: - raise MissingArgument(arg.name) diff --git a/wsme/spore.py b/wsme/spore.py deleted file mode 100644 index 8de0ed2..0000000 --- a/wsme/spore.py +++ /dev/null @@ -1,64 +0,0 @@ -from wsme import types - -try: - import simplejson as json -except ImportError: - import json # noqa - - -def getdesc(root, host_url=''): - methods = {} - - for path, funcdef in root.getapi(): - method = funcdef.extra_options.get('method', None) - name = '_'.join(path) - if method is not None: - path = path[:-1] - else: - method = 'GET' - for argdef in funcdef.arguments: - if types.iscomplex(argdef.datatype) \ - or types.isarray(argdef.datatype) \ - or types.isdict(argdef.datatype): - method = 'POST' - break - - required_params = [] - optional_params = [] - for argdef in funcdef.arguments: - if method == 'GET' and argdef.mandatory: - required_params.append(argdef.name) - else: - optional_params.append(argdef.name) - - methods[name] = { - 'method': method, - 'path': '/'.join(path) - } - if required_params: - methods[name]['required_params'] = required_params - if optional_params: - methods[name]['optional_params'] = optional_params - if funcdef.doc: - methods[name]['documentation'] = funcdef.doc - - formats = [] - for p in root.protocols: - if p.name == 'restxml': - formats.append('xml') - if p.name == 'restjson': - formats.append('json') - - api = { - 'base_url': host_url + root._webpath, - 'version': '0.1', - 'name': getattr(root, 'name', 'name'), - 'authority': '', - 'formats': [ - 'json', - 'xml' - ], - 'methods': methods - } - - return json.dumps(api, indent=4) diff --git a/wsme/tests/__init__.py b/wsme/tests/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/wsme/tests/__init__.py +++ /dev/null diff --git a/wsme/tests/protocol.py b/wsme/tests/protocol.py deleted file mode 100644 index 1c41f8c..0000000 --- a/wsme/tests/protocol.py +++ /dev/null @@ -1,720 +0,0 @@ -# coding=utf-8 - -import unittest -import warnings -import datetime -import decimal -import six - -from six import u, b - -from webtest import TestApp - -from wsme import WSRoot, Unset -from wsme import expose, validate -import wsme.types -import wsme.utils - -warnings.filterwarnings('ignore', module='webob.dec') - -binarysample = b('\x00\xff\x43') - -try: - 1 / 0 -except ZeroDivisionError as e: - zerodivisionerrormsg = str(e) - - -class CallException(RuntimeError): - def __init__(self, faultcode, faultstring, debuginfo): - self.faultcode = faultcode - self.faultstring = faultstring - self.debuginfo = debuginfo - - def __str__(self): - return 'faultcode=%s, faultstring=%s, debuginfo=%s' % ( - self.faultcode, self.faultstring, self.debuginfo - ) - - -myenumtype = wsme.types.Enum(wsme.types.bytes, 'v1', 'v2') - - -class NestedInner(object): - aint = int - - def __init__(self, aint=None): - self.aint = aint - - -class NestedOuter(object): - inner = NestedInner - inner_array = wsme.types.wsattr([NestedInner]) - inner_dict = {wsme.types.text: NestedInner} - - def __init__(self): - self.inner = NestedInner(0) - - -class NamedAttrsObject(object): - def __init__(self, v1=Unset, v2=Unset): - self.attr_1 = v1 - self.attr_2 = v2 - - attr_1 = wsme.types.wsattr(int, name='attr.1') - attr_2 = wsme.types.wsattr(int, name='attr.2') - - -class CustomObject(object): - aint = int - name = wsme.types.text - - -class ExtendedInt(wsme.types.UserType): - basetype = int - name = "Extended integer" - - -class NestedInnerApi(object): - @expose(bool) - def deepfunction(self): - return True - - -class NestedOuterApi(object): - inner = NestedInnerApi() - - -class ReturnTypes(object): - @expose(wsme.types.bytes) - def getbytes(self): - return b("astring") - - @expose(wsme.types.text) - def gettext(self): - return u('\xe3\x81\xae') - - @expose(int) - def getint(self): - return 2 - - @expose(float) - def getfloat(self): - return 3.14159265 - - @expose(decimal.Decimal) - def getdecimal(self): - return decimal.Decimal('3.14159265') - - @expose(datetime.date) - def getdate(self): - return datetime.date(1994, 1, 26) - - @expose(bool) - def getbooltrue(self): - return True - - @expose(bool) - def getboolfalse(self): - return False - - @expose(datetime.time) - def gettime(self): - return datetime.time(12, 0, 0) - - @expose(datetime.datetime) - def getdatetime(self): - return datetime.datetime(1994, 1, 26, 12, 0, 0) - - @expose(wsme.types.binary) - def getbinary(self): - return binarysample - - @expose(NestedOuter) - def getnested(self): - n = NestedOuter() - return n - - @expose([wsme.types.bytes]) - def getbytesarray(self): - return [b("A"), b("B"), b("C")] - - @expose([NestedOuter]) - def getnestedarray(self): - return [NestedOuter(), NestedOuter()] - - @expose({wsme.types.bytes: NestedOuter}) - def getnesteddict(self): - return {b('a'): NestedOuter(), b('b'): NestedOuter()} - - @expose(NestedOuter) - def getobjectarrayattribute(self): - obj = NestedOuter() - obj.inner_array = [NestedInner(12), NestedInner(13)] - return obj - - @expose(NestedOuter) - def getobjectdictattribute(self): - obj = NestedOuter() - obj.inner_dict = { - '12': NestedInner(12), - '13': NestedInner(13) - } - return obj - - @expose(myenumtype) - def getenum(self): - return b('v2') - - @expose(NamedAttrsObject) - def getnamedattrsobj(self): - return NamedAttrsObject(5, 6) - - -class ArgTypes(object): - def assertEqual(self, a, b): - if not (a == b): - raise AssertionError('%s != %s' % (a, b)) - - def assertIsInstance(self, value, v_type): - assert isinstance(value, v_type), ("%s is not instance of type %s" % - (value, v_type)) - - @expose(wsme.types.bytes) - @validate(wsme.types.bytes) - def setbytes(self, value): - print(repr(value)) - self.assertEqual(type(value), wsme.types.bytes) - return value - - @expose(wsme.types.text) - @validate(wsme.types.text) - def settext(self, value): - print(repr(value)) - self.assertEqual(type(value), wsme.types.text) - return value - - @expose(wsme.types.text) - @validate(wsme.types.text) - def settextnone(self, value): - print(repr(value)) - self.assertEqual(type(value), type(None)) - return value - - @expose(bool) - @validate(bool) - def setbool(self, value): - print(repr(value)) - self.assertEqual(type(value), bool) - return value - - @expose(int) - @validate(int) - def setint(self, value): - print(repr(value)) - self.assertEqual(type(value), int) - return value - - @expose(float) - @validate(float) - def setfloat(self, value): - print(repr(value)) - self.assertEqual(type(value), float) - return value - - @expose(decimal.Decimal) - @validate(decimal.Decimal) - def setdecimal(self, value): - print(repr(value)) - self.assertEqual(type(value), decimal.Decimal) - return value - - @expose(datetime.date) - @validate(datetime.date) - def setdate(self, value): - print(repr(value)) - self.assertEqual(type(value), datetime.date) - return value - - @expose(datetime.time) - @validate(datetime.time) - def settime(self, value): - print(repr(value)) - self.assertEqual(type(value), datetime.time) - return value - - @expose(datetime.datetime) - @validate(datetime.datetime) - def setdatetime(self, value): - print(repr(value)) - self.assertEqual(type(value), datetime.datetime) - return value - - @expose(wsme.types.binary) - @validate(wsme.types.binary) - def setbinary(self, value): - print(repr(value)) - self.assertEqual(type(value), six.binary_type) - return value - - @expose([wsme.types.bytes]) - @validate([wsme.types.bytes]) - def setbytesarray(self, value): - print(repr(value)) - self.assertEqual(type(value), list) - self.assertEqual(type(value[0]), wsme.types.bytes) - return value - - @expose([wsme.types.text]) - @validate([wsme.types.text]) - def settextarray(self, value): - print(repr(value)) - self.assertEqual(type(value), list) - self.assertEqual(type(value[0]), wsme.types.text) - return value - - @expose([datetime.datetime]) - @validate([datetime.datetime]) - def setdatetimearray(self, value): - print(repr(value)) - self.assertEqual(type(value), list) - self.assertEqual(type(value[0]), datetime.datetime) - return value - - @expose(NestedOuter) - @validate(NestedOuter) - def setnested(self, value): - print(repr(value)) - self.assertEqual(type(value), NestedOuter) - return value - - @expose([NestedOuter]) - @validate([NestedOuter]) - def setnestedarray(self, value): - print(repr(value)) - self.assertEqual(type(value), list) - self.assertEqual(type(value[0]), NestedOuter) - return value - - @expose({wsme.types.bytes: NestedOuter}) - @validate({wsme.types.bytes: NestedOuter}) - def setnesteddict(self, value): - print(repr(value)) - self.assertEqual(type(value), dict) - self.assertEqual(type(list(value.keys())[0]), wsme.types.bytes) - self.assertEqual(type(list(value.values())[0]), NestedOuter) - return value - - @expose(myenumtype) - @validate(myenumtype) - def setenum(self, value): - print(value) - self.assertEqual(type(value), wsme.types.bytes) - return value - - @expose(NamedAttrsObject) - @validate(NamedAttrsObject) - def setnamedattrsobj(self, value): - print(value) - self.assertEqual(type(value), NamedAttrsObject) - self.assertEqual(value.attr_1, 10) - self.assertEqual(value.attr_2, 20) - return value - - @expose(CustomObject) - @validate(CustomObject) - def setcustomobject(self, value): - self.assertIsInstance(value, CustomObject) - self.assertIsInstance(value.name, wsme.types.text) - self.assertIsInstance(value.aint, int) - return value - - @expose(ExtendedInt()) - @validate(ExtendedInt()) - def setextendedint(self, value): - self.assertEqual(isinstance(value, ExtendedInt.basetype), True) - return value - - -class BodyTypes(object): - def assertEqual(self, a, b): - if not (a == b): - raise AssertionError('%s != %s' % (a, b)) - - @expose(int, body={wsme.types.text: int}) - @validate(int) - def setdict(self, body): - print(body) - self.assertEqual(type(body), dict) - self.assertEqual(type(body['test']), int) - self.assertEqual(body['test'], 10) - return body['test'] - - @expose(int, body=[int]) - @validate(int) - def setlist(self, body): - print(body) - self.assertEqual(type(body), list) - self.assertEqual(type(body[0]), int) - self.assertEqual(body[0], 10) - return body[0] - - -class WithErrors(object): - @expose() - def divide_by_zero(self): - 1 / 0 - - -class MiscFunctions(object): - @expose(int) - @validate(int, int) - def multiply(self, a, b): - return a * b - - -class WSTestRoot(WSRoot): - argtypes = ArgTypes() - returntypes = ReturnTypes() - bodytypes = BodyTypes() - witherrors = WithErrors() - nested = NestedOuterApi() - misc = MiscFunctions() - - def reset(self): - self._touched = False - - @expose() - def touch(self): - self._touched = True - - -class ProtocolTestCase(unittest.TestCase): - protocol_options = {} - - def assertTypedEquals(self, a, b, convert): - if isinstance(a, six.string_types): - a = convert(a) - if isinstance(b, six.string_types): - b = convert(b) - self.assertEqual(a, b) - - def assertDateEquals(self, a, b): - self.assertTypedEquals(a, b, wsme.utils.parse_isodate) - - def assertTimeEquals(self, a, b): - self.assertTypedEquals(a, b, wsme.utils.parse_isotime) - - def assertDateTimeEquals(self, a, b): - self.assertTypedEquals(a, b, wsme.utils.parse_isodatetime) - - def assertIntEquals(self, a, b): - self.assertTypedEquals(a, b, int) - - def assertFloatEquals(self, a, b): - self.assertTypedEquals(a, b, float) - - def assertDecimalEquals(self, a, b): - self.assertTypedEquals(a, b, decimal.Decimal) - - def setUp(self): - if self.__class__.__name__ != 'ProtocolTestCase': - self.root = WSTestRoot() - self.root.getapi() - self.root.addprotocol(self.protocol, **self.protocol_options) - - self.app = TestApp(self.root.wsgiapp()) - - def test_invalid_path(self): - try: - res = self.call('invalid_function') - print(res) - assert "No error raised" - except CallException as e: - self.assertEqual(e.faultcode, 'Client') - self.assertEqual(e.faultstring.lower(), - u('unknown function name: invalid_function')) - - def test_serverside_error(self): - try: - res = self.call('witherrors/divide_by_zero') - print(res) - assert "No error raised" - except CallException as e: - self.assertEqual(e.faultcode, 'Server') - self.assertEqual(e.faultstring, zerodivisionerrormsg) - assert e.debuginfo is not None - - def test_serverside_error_nodebug(self): - self.root._debug = False - try: - res = self.call('witherrors/divide_by_zero') - print(res) - assert "No error raised" - except CallException as e: - self.assertEqual(e.faultcode, 'Server') - self.assertEqual(e.faultstring, zerodivisionerrormsg) - assert e.debuginfo is None - - def test_touch(self): - r = self.call('touch') - assert r is None, r - - def test_return_bytes(self): - r = self.call('returntypes/getbytes', _rt=wsme.types.bytes) - self.assertEqual(r, b('astring')) - - def test_return_text(self): - r = self.call('returntypes/gettext', _rt=wsme.types.text) - self.assertEqual(r, u('\xe3\x81\xae')) - - def test_return_int(self): - r = self.call('returntypes/getint') - self.assertIntEquals(r, 2) - - def test_return_float(self): - r = self.call('returntypes/getfloat') - self.assertFloatEquals(r, 3.14159265) - - def test_return_decimal(self): - r = self.call('returntypes/getdecimal') - self.assertDecimalEquals(r, '3.14159265') - - def test_return_bool_true(self): - r = self.call('returntypes/getbooltrue', _rt=bool) - assert r - - def test_return_bool_false(self): - r = self.call('returntypes/getboolfalse', _rt=bool) - assert not r - - def test_return_date(self): - r = self.call('returntypes/getdate') - self.assertDateEquals(r, datetime.date(1994, 1, 26)) - - def test_return_time(self): - r = self.call('returntypes/gettime') - self.assertTimeEquals(r, datetime.time(12)) - - def test_return_datetime(self): - r = self.call('returntypes/getdatetime') - self.assertDateTimeEquals(r, datetime.datetime(1994, 1, 26, 12)) - - def test_return_binary(self): - r = self.call('returntypes/getbinary', _rt=wsme.types.binary) - self.assertEqual(r, binarysample) - - def test_return_nested(self): - r = self.call('returntypes/getnested', _rt=NestedOuter) - self.assertEqual(r, {'inner': {'aint': 0}}) - - def test_return_bytesarray(self): - r = self.call('returntypes/getbytesarray', _rt=[six.binary_type]) - self.assertEqual(r, [b('A'), b('B'), b('C')]) - - def test_return_nestedarray(self): - r = self.call('returntypes/getnestedarray', _rt=[NestedOuter]) - self.assertEqual(r, [{'inner': {'aint': 0}}, {'inner': {'aint': 0}}]) - - def test_return_nesteddict(self): - r = self.call('returntypes/getnesteddict', - _rt={wsme.types.bytes: NestedOuter}) - self.assertEqual(r, { - b('a'): {'inner': {'aint': 0}}, - b('b'): {'inner': {'aint': 0}} - }) - - def test_return_objectarrayattribute(self): - r = self.call('returntypes/getobjectarrayattribute', _rt=NestedOuter) - self.assertEqual(r, { - 'inner': {'aint': 0}, - 'inner_array': [{'aint': 12}, {'aint': 13}] - }) - - def test_return_objectdictattribute(self): - r = self.call('returntypes/getobjectdictattribute', _rt=NestedOuter) - self.assertEqual(r, { - 'inner': {'aint': 0}, - 'inner_dict': { - '12': {'aint': 12}, - '13': {'aint': 13} - } - }) - - def test_return_enum(self): - r = self.call('returntypes/getenum', _rt=myenumtype) - self.assertEqual(r, b('v2'), r) - - def test_return_namedattrsobj(self): - r = self.call('returntypes/getnamedattrsobj', _rt=NamedAttrsObject) - self.assertEqual(r, {'attr.1': 5, 'attr.2': 6}) - - def test_setbytes(self): - assert self.call('argtypes/setbytes', value=b('astring'), - _rt=wsme.types.bytes) == b('astring') - - def test_settext(self): - assert self.call('argtypes/settext', value=u('\xe3\x81\xae'), - _rt=wsme.types.text) == u('\xe3\x81\xae') - - def test_settext_empty(self): - assert self.call('argtypes/settext', value=u(''), - _rt=wsme.types.text) == u('') - - def test_settext_none(self): - self.assertEqual( - None, - self.call('argtypes/settextnone', value=None, _rt=wsme.types.text) - ) - - def test_setint(self): - r = self.call('argtypes/setint', value=3, _rt=int) - self.assertEqual(r, 3) - - def test_setfloat(self): - assert self.call('argtypes/setfloat', value=3.54, - _rt=float) == 3.54 - - def test_setbool_true(self): - r = self.call('argtypes/setbool', value=True, _rt=bool) - assert r - - def test_setbool_false(self): - r = self.call('argtypes/setbool', value=False, _rt=bool) - assert not r - - def test_setdecimal(self): - value = decimal.Decimal('3.14') - assert self.call('argtypes/setdecimal', value=value, - _rt=decimal.Decimal) == value - - def test_setdate(self): - value = datetime.date(2008, 4, 6) - r = self.call('argtypes/setdate', value=value, - _rt=datetime.date) - self.assertEqual(r, value) - - def test_settime(self): - value = datetime.time(12, 12, 15) - r = self.call('argtypes/settime', value=value, - _rt=datetime.time) - self.assertEqual(r, datetime.time(12, 12, 15)) - - def test_setdatetime(self): - value = datetime.datetime(2008, 4, 6, 12, 12, 15) - r = self.call('argtypes/setdatetime', value=value, - _rt=datetime.datetime) - self.assertEqual(r, datetime.datetime(2008, 4, 6, 12, 12, 15)) - - def test_setbinary(self): - value = binarysample - r = self.call('argtypes/setbinary', value=(value, wsme.types.binary), - _rt=wsme.types.binary) == value - print(r) - - def test_setnested(self): - value = {'inner': {'aint': 54}} - r = self.call('argtypes/setnested', - value=(value, NestedOuter), - _rt=NestedOuter) - self.assertEqual(r, value) - - def test_setnested_nullobj(self): - value = {'inner': None} - r = self.call( - 'argtypes/setnested', - value=(value, NestedOuter), - _rt=NestedOuter - ) - self.assertEqual(r, value) - - def test_setbytesarray(self): - value = [b("1"), b("2"), b("three")] - r = self.call('argtypes/setbytesarray', - value=(value, [wsme.types.bytes]), - _rt=[wsme.types.bytes]) - self.assertEqual(r, value) - - def test_settextarray(self): - value = [u("1")] - r = self.call('argtypes/settextarray', - value=(value, [wsme.types.text]), - _rt=[wsme.types.text]) - self.assertEqual(r, value) - - def test_setdatetimearray(self): - value = [ - datetime.datetime(2008, 3, 6, 12, 12, 15), - datetime.datetime(2008, 4, 6, 2, 12, 15), - ] - r = self.call('argtypes/setdatetimearray', - value=(value, [datetime.datetime]), - _rt=[datetime.datetime]) - self.assertEqual(r, value) - - def test_setnestedarray(self): - value = [ - {'inner': {'aint': 54}}, - {'inner': {'aint': 55}}, - ] - r = self.call('argtypes/setnestedarray', - value=(value, [NestedOuter]), - _rt=[NestedOuter]) - self.assertEqual(r, value) - - def test_setnesteddict(self): - value = { - b('o1'): {'inner': {'aint': 54}}, - b('o2'): {'inner': {'aint': 55}}, - } - r = self.call('argtypes/setnesteddict', - value=(value, {six.binary_type: NestedOuter}), - _rt={six.binary_type: NestedOuter}) - print(r) - self.assertEqual(r, value) - - def test_setenum(self): - value = b('v1') - r = self.call('argtypes/setenum', value=value, - _rt=myenumtype) - self.assertEqual(r, value) - - def test_setnamedattrsobj(self): - value = {'attr.1': 10, 'attr.2': 20} - r = self.call('argtypes/setnamedattrsobj', - value=(value, NamedAttrsObject), - _rt=NamedAttrsObject) - self.assertEqual(r, value) - - def test_nested_api(self): - r = self.call('nested/inner/deepfunction', _rt=bool) - assert r is True - - def test_missing_argument(self): - try: - r = self.call('argtypes/setdatetime') - print(r) - assert "No error raised" - except CallException as e: - self.assertEqual(e.faultcode, 'Client') - self.assertEqual(e.faultstring, u('Missing argument: "value"')) - - def test_misc_multiply(self): - self.assertEqual(self.call('misc/multiply', a=5, b=2, _rt=int), 10) - - def test_html_format(self): - res = self.call('argtypes/setdatetime', _accept="text/html", - _no_result_decode=True) - self.assertEqual(res.content_type, 'text/html') - - -class RestOnlyProtocolTestCase(ProtocolTestCase): - def test_body_list(self): - r = self.call('bodytypes/setlist', body=([10], [int]), _rt=int) - self.assertEqual(r, 10) - - def test_body_dict(self): - r = self.call('bodytypes/setdict', - body=({'test': 10}, {wsme.types.text: int}), - _rt=int) - self.assertEqual(r, 10) diff --git a/wsme/tests/test_api.py b/wsme/tests/test_api.py deleted file mode 100644 index fc1e157..0000000 --- a/wsme/tests/test_api.py +++ /dev/null @@ -1,419 +0,0 @@ -# encoding=utf8 - -from six import b - -try: - import unittest2 as unittest -except ImportError: - import unittest -import webtest - -from wsme import WSRoot, expose, validate -from wsme.rest import scan_api -from wsme import types -from wsme import exc -import wsme.api as wsme_api -import wsme.types - -from wsme.tests.test_protocols import DummyProtocol - - -class TestController(unittest.TestCase): - def test_expose(self): - class MyWS(WSRoot): - @expose(int) - def getint(self): - return 1 - - assert MyWS.getint._wsme_definition.return_type == int - - def test_validate(self): - class ComplexType(object): - attr = int - - class MyWS(object): - @expose(int) - @validate(int, int, int) - def add(self, a, b, c=0): - return a + b + c - - @expose(bool) - @validate(ComplexType) - def setcplx(self, obj): - pass - - MyWS.add._wsme_definition.resolve_types(wsme.types.registry) - MyWS.setcplx._wsme_definition.resolve_types(wsme.types.registry) - args = MyWS.add._wsme_definition.arguments - - assert args[0].name == 'a' - assert args[0].datatype == int - assert args[0].mandatory - assert args[0].default is None - - assert args[1].name == 'b' - assert args[1].datatype == int - assert args[1].mandatory - assert args[1].default is None - - assert args[2].name == 'c' - assert args[2].datatype == int - assert not args[2].mandatory - assert args[2].default == 0 - - assert types.iscomplex(ComplexType) - - def test_validate_enum_with_none(self): - class Version(object): - number = types.Enum(str, 'v1', 'v2', None) - - class MyWS(WSRoot): - @expose(str) - @validate(Version) - def setcplx(self, version): - pass - - r = MyWS(['restjson']) - app = webtest.TestApp(r.wsgiapp()) - res = app.post_json('/setcplx', params={'version': {'number': 'arf'}}, - expect_errors=True, - headers={'Accept': 'application/json'}) - self.assertTrue( - res.json_body['faultstring'].startswith( - "Invalid input for field/attribute number. Value: 'arf'. \ -Value should be one of:")) - self.assertIn('v1', res.json_body['faultstring']) - self.assertIn('v2', res.json_body['faultstring']) - self.assertIn('None', res.json_body['faultstring']) - self.assertEqual(res.status_int, 400) - - def test_validate_enum_with_wrong_type(self): - class Version(object): - number = types.Enum(str, 'v1', 'v2', None) - - class MyWS(WSRoot): - @expose(str) - @validate(Version) - def setcplx(self, version): - pass - - r = MyWS(['restjson']) - app = webtest.TestApp(r.wsgiapp()) - res = app.post_json('/setcplx', params={'version': {'number': 1}}, - expect_errors=True, - headers={'Accept': 'application/json'}) - self.assertTrue( - res.json_body['faultstring'].startswith( - "Invalid input for field/attribute number. Value: '1'. \ -Value should be one of:")) - self.assertIn('v1', res.json_body['faultstring']) - self.assertIn('v2', res.json_body['faultstring']) - self.assertIn('None', res.json_body['faultstring']) - self.assertEqual(res.status_int, 400) - - def test_scan_api(self): - class NS(object): - @expose(int) - @validate(int, int) - def multiply(self, a, b): - return a * b - - class MyRoot(WSRoot): - ns = NS() - - r = MyRoot() - - api = list(scan_api(r)) - assert len(api) == 1 - path, fd, args = api[0] - assert path == ['ns', 'multiply'] - assert fd._wsme_definition.name == 'multiply' - assert args == [] - - def test_scan_subclass(self): - class MyRoot(WSRoot): - class SubClass(object): - pass - - r = MyRoot() - api = list(scan_api(r)) - - assert len(api) == 0 - - def test_scan_api_too_deep(self): - class Loop(object): - pass - - ell = Loop() - for i in range(0, 21): - nl = Loop() - nl.ell = ell - ell = nl - - class MyRoot(WSRoot): - loop = ell - - r = MyRoot() - - try: - list(scan_api(r)) - assert False, "ValueError not raised" - except ValueError as e: - assert str(e).startswith("Path is too long") - - def test_handle_request(self): - class MyRoot(WSRoot): - @expose() - def touch(self): - pass - - p = DummyProtocol() - r = MyRoot(protocols=[p]) - - app = webtest.TestApp(r.wsgiapp()) - - res = app.get('/') - - assert p.lastreq.path == '/' - assert p.hits == 1 - - res = app.get('/touch?wsmeproto=dummy') - - assert p.lastreq.path == '/touch' - assert p.hits == 2 - - class NoPathProto(DummyProtocol): - def extract_path(self, request): - return None - - p = NoPathProto() - r = MyRoot(protocols=[p]) - - app = webtest.TestApp(r.wsgiapp()) - - res = app.get('/', expect_errors=True) - print(res.status, res.body) - assert res.status_int == 400 - - def test_no_available_protocol(self): - r = WSRoot() - - app = webtest.TestApp(r.wsgiapp()) - - res = app.get('/', expect_errors=True) - print(res.status_int) - assert res.status_int == 406 - print(res.body) - assert res.body.find( - b("None of the following protocols can handle this request")) != -1 - - def test_return_content_type_guess(self): - class DummierProto(DummyProtocol): - content_types = ['text/xml', 'text/plain'] - - r = WSRoot([DummierProto()]) - - app = webtest.TestApp(r.wsgiapp()) - - res = app.get('/', expect_errors=True, headers={ - 'Accept': 'text/xml,q=0.8'}) - assert res.status_int == 400 - assert res.content_type == 'text/xml', res.content_type - - res = app.get('/', expect_errors=True, headers={ - 'Accept': 'text/plain'}) - assert res.status_int == 400 - assert res.content_type == 'text/plain', res.content_type - - def test_double_expose(self): - try: - class MyRoot(WSRoot): - @expose() - @expose() - def atest(self): - pass - assert False, "A ValueError should have been raised" - except ValueError: - pass - - def test_multiple_expose(self): - class MyRoot(WSRoot): - def multiply(self, a, b): - return a * b - - mul_int = expose(int, int, int, wrap=True)(multiply) - - mul_float = expose( - float, float, float, - wrap=True)(multiply) - - mul_string = expose( - wsme.types.text, wsme.types.text, int, - wrap=True)(multiply) - - r = MyRoot(['restjson']) - - app = webtest.TestApp(r.wsgiapp()) - - res = app.get('/mul_int?a=2&b=5', headers={ - 'Accept': 'application/json' - }) - - self.assertEqual(res.body, b('10')) - - res = app.get('/mul_float?a=1.2&b=2.9', headers={ - 'Accept': 'application/json' - }) - - self.assertEqual(res.body, b('3.48')) - - res = app.get('/mul_string?a=hello&b=2', headers={ - 'Accept': 'application/json' - }) - - self.assertEqual(res.body, b('"hellohello"')) - - def test_wsattr_mandatory(self): - class ComplexType(object): - attr = wsme.types.wsattr(int, mandatory=True) - - class MyRoot(WSRoot): - @expose(int, body=ComplexType) - @validate(ComplexType) - def clx(self, a): - return a.attr - - r = MyRoot(['restjson']) - app = webtest.TestApp(r.wsgiapp()) - res = app.post_json('/clx', params={}, expect_errors=True, - headers={'Accept': 'application/json'}) - self.assertEqual(res.status_int, 400) - - def test_wsattr_readonly(self): - class ComplexType(object): - attr = wsme.types.wsattr(int, readonly=True) - - class MyRoot(WSRoot): - @expose(int, body=ComplexType) - @validate(ComplexType) - def clx(self, a): - return a.attr - - r = MyRoot(['restjson']) - app = webtest.TestApp(r.wsgiapp()) - res = app.post_json('/clx', params={'attr': 1005}, expect_errors=True, - headers={'Accept': 'application/json'}) - self.assertIn('Cannot set read only field.', - res.json_body['faultstring']) - self.assertIn('1005', res.json_body['faultstring']) - self.assertEqual(res.status_int, 400) - - def test_wsattr_default(self): - class ComplexType(object): - attr = wsme.types.wsattr(wsme.types.Enum(str, 'or', 'and'), - default='and') - - class MyRoot(WSRoot): - @expose(int) - @validate(ComplexType) - def clx(self, a): - return a.attr - - r = MyRoot(['restjson']) - app = webtest.TestApp(r.wsgiapp()) - res = app.post_json('/clx', params={}, expect_errors=True, - headers={'Accept': 'application/json'}) - self.assertEqual(res.status_int, 400) - - def test_wsproperty_mandatory(self): - class ComplexType(object): - def foo(self): - pass - - attr = wsme.types.wsproperty(int, foo, foo, mandatory=True) - - class MyRoot(WSRoot): - @expose(int, body=ComplexType) - @validate(ComplexType) - def clx(self, a): - return a.attr - - r = MyRoot(['restjson']) - app = webtest.TestApp(r.wsgiapp()) - res = app.post_json('/clx', params={}, expect_errors=True, - headers={'Accept': 'application/json'}) - self.assertEqual(res.status_int, 400) - - def test_validate_enum_mandatory(self): - class Version(object): - number = wsme.types.wsattr(wsme.types.Enum(str, 'v1', 'v2'), - mandatory=True) - - class MyWS(WSRoot): - @expose(str) - @validate(Version) - def setcplx(self, version): - pass - - r = MyWS(['restjson']) - app = webtest.TestApp(r.wsgiapp()) - res = app.post_json('/setcplx', params={'version': {}}, - expect_errors=True, - headers={'Accept': 'application/json'}) - self.assertEqual(res.status_int, 400) - - -class TestFunctionDefinition(unittest.TestCase): - - def test_get_arg(self): - def myfunc(self): - pass - - fd = wsme_api.FunctionDefinition(wsme_api.FunctionDefinition) - fd.arguments.append(wsme_api.FunctionArgument('a', int, True, None)) - - assert fd.get_arg('a').datatype is int - assert fd.get_arg('b') is None - - -class TestFormatException(unittest.TestCase): - - def _test_format_exception(self, exception, debug=False): - fake_exc_info = (None, exception, None) - return wsme_api.format_exception(fake_exc_info, debug=debug) - - def test_format_client_exception(self): - faultstring = 'boom' - ret = self._test_format_exception(exc.ClientSideError(faultstring)) - self.assertIsNone(ret['debuginfo']) - self.assertEqual('Client', ret['faultcode']) - self.assertEqual(faultstring, ret['faultstring']) - - def test_format_client_exception_unicode(self): - faultstring = u'\xc3\xa3o' - ret = self._test_format_exception(exc.ClientSideError(faultstring)) - self.assertIsNone(ret['debuginfo']) - self.assertEqual('Client', ret['faultcode']) - self.assertEqual(faultstring, ret['faultstring']) - - def test_format_server_exception(self): - faultstring = 'boom' - ret = self._test_format_exception(Exception(faultstring)) - self.assertIsNone(ret['debuginfo']) - self.assertEqual('Server', ret['faultcode']) - self.assertEqual(faultstring, ret['faultstring']) - - def test_format_server_exception_unicode(self): - faultstring = u'\xc3\xa3o' - ret = self._test_format_exception(Exception(faultstring)) - self.assertIsNone(ret['debuginfo']) - self.assertEqual('Server', ret['faultcode']) - self.assertEqual(faultstring, ret['faultstring']) - - def test_format_server_exception_debug(self): - faultstring = 'boom' - ret = self._test_format_exception(Exception(faultstring), debug=True) - # assert debuginfo is populated - self.assertIsNotNone(ret['debuginfo']) - self.assertEqual('Server', ret['faultcode']) - self.assertEqual(faultstring, ret['faultstring']) diff --git a/wsme/tests/test_exc.py b/wsme/tests/test_exc.py deleted file mode 100644 index ba903fb..0000000 --- a/wsme/tests/test_exc.py +++ /dev/null @@ -1,40 +0,0 @@ -# encoding=utf8 - -from wsme.exc import (ClientSideError, InvalidInput, MissingArgument, - UnknownArgument) -from six import u - - -def test_clientside_error(): - e = ClientSideError("Test") - - assert e.faultstring == u("Test") - - -def test_unicode_clientside_error(): - e = ClientSideError(u("\u30d5\u30a1\u30b7\u30ea")) - - assert e.faultstring == u("\u30d5\u30a1\u30b7\u30ea") - - -def test_invalidinput(): - e = InvalidInput('field', 'badvalue', "error message") - - assert e.faultstring == u( - "Invalid input for field/attribute field. Value: 'badvalue'. " - "error message" - ), e.faultstring - - -def test_missingargument(): - e = MissingArgument('argname', "error message") - - assert e.faultstring == \ - u('Missing argument: "argname": error message'), e.faultstring - - -def test_unknownargument(): - e = UnknownArgument('argname', "error message") - - assert e.faultstring == \ - u('Unknown argument: "argname": error message'), e.faultstring diff --git a/wsme/tests/test_protocols.py b/wsme/tests/test_protocols.py deleted file mode 100644 index f4b7270..0000000 --- a/wsme/tests/test_protocols.py +++ /dev/null @@ -1,70 +0,0 @@ -# encoding=utf8 - -import unittest - -from wsme import WSRoot -from wsme.protocol import getprotocol, CallContext, Protocol -import wsme.protocol - - -class DummyProtocol(Protocol): - name = 'dummy' - content_types = ['', None] - - def __init__(self): - self.hits = 0 - - def accept(self, req): - return True - - def iter_calls(self, req): - yield CallContext(req) - - def extract_path(self, context): - return ['touch'] - - def read_arguments(self, context): - self.lastreq = context.request - self.hits += 1 - return {} - - def encode_result(self, context, result): - return str(result) - - def encode_error(self, context, infos): - return str(infos) - - -def test_getprotocol(): - try: - getprotocol('invalid') - assert False, "ValueError was not raised" - except ValueError: - pass - - -class TestProtocols(unittest.TestCase): - def test_register_protocol(self): - wsme.protocol.register_protocol(DummyProtocol) - assert wsme.protocol.registered_protocols['dummy'] == DummyProtocol - - r = WSRoot() - assert len(r.protocols) == 0 - - r.addprotocol('dummy') - assert len(r.protocols) == 1 - assert r.protocols[0].__class__ == DummyProtocol - - r = WSRoot(['dummy']) - assert len(r.protocols) == 1 - assert r.protocols[0].__class__ == DummyProtocol - - def test_Protocol(self): - p = wsme.protocol.Protocol() - assert p.iter_calls(None) is None - assert p.extract_path(None) is None - assert p.read_arguments(None) is None - assert p.encode_result(None, None) is None - assert p.encode_sample_value(None, None) == ('none', 'N/A') - assert p.encode_sample_params(None) == ('none', 'N/A') - assert p.encode_sample_result(None, None) == ('none', 'N/A') diff --git a/wsme/tests/test_protocols_commons.py b/wsme/tests/test_protocols_commons.py deleted file mode 100644 index af5aa05..0000000 --- a/wsme/tests/test_protocols_commons.py +++ /dev/null @@ -1,159 +0,0 @@ -# encoding=utf8 - -import datetime -import unittest - -from wsme.api import FunctionArgument, FunctionDefinition -from wsme.rest.args import from_param, from_params, args_from_args -from wsme.exc import InvalidInput - -from wsme.types import UserType, Unset, ArrayType, DictType, Base - - -class MyBaseType(Base): - test = str - - -class MyUserType(UserType): - basetype = str - - -class DictBasedUserType(UserType): - basetype = DictType(int, int) - - -class TestProtocolsCommons(unittest.TestCase): - def test_from_param_date(self): - assert from_param(datetime.date, '2008-02-28') == \ - datetime.date(2008, 2, 28) - - def test_from_param_time(self): - assert from_param(datetime.time, '12:14:56') == \ - datetime.time(12, 14, 56) - - def test_from_param_datetime(self): - assert from_param(datetime.datetime, '2009-12-23T12:14:56') == \ - datetime.datetime(2009, 12, 23, 12, 14, 56) - - def test_from_param_usertype(self): - assert from_param(MyUserType(), 'test') == 'test' - - def test_from_params_empty(self): - assert from_params(str, {}, '', set()) is Unset - - def test_from_params_native_array(self): - class params(dict): - def getall(self, path): - return ['1', '2'] - p = params({'a': []}) - assert from_params(ArrayType(int), p, 'a', set()) == [1, 2] - - def test_from_params_empty_array(self): - assert from_params(ArrayType(int), {}, 'a', set()) is Unset - - def test_from_params_dict(self): - value = from_params( - DictType(int, str), - {'a[2]': 'a2', 'a[3]': 'a3'}, - 'a', - set() - ) - assert value == {2: 'a2', 3: 'a3'}, value - - def test_from_params_dict_unset(self): - assert from_params(DictType(int, str), {}, 'a', set()) is Unset - - def test_from_params_usertype(self): - value = from_params( - DictBasedUserType(), - {'a[2]': '2'}, - 'a', - set() - ) - self.assertEqual(value, {2: 2}) - - def test_args_from_args_usertype(self): - - class FakeType(UserType): - name = 'fake-type' - basetype = int - - fake_type = FakeType() - fd = FunctionDefinition(FunctionDefinition) - fd.arguments.append(FunctionArgument('fake-arg', fake_type, True, 0)) - - new_args = args_from_args(fd, [1], {}) - self.assertEqual([1], new_args[0]) - - # can't convert str to int - try: - args_from_args(fd, ['invalid-argument'], {}) - except InvalidInput as e: - assert fake_type.name in str(e) - else: - self.fail('Should have thrown an InvalidInput') - - def test_args_from_args_custom_exc(self): - - class FakeType(UserType): - name = 'fake-type' - basetype = int - - def validate(self, value): - if value < 10: - raise ValueError('should be greater than 10') - - def frombasetype(self, value): - self.validate(value) - - fake_type = FakeType() - fd = FunctionDefinition(FunctionDefinition) - fd.arguments.append(FunctionArgument('fake-arg', fake_type, True, 0)) - - try: - args_from_args(fd, [9], {}) - except InvalidInput as e: - assert fake_type.name in str(e) - assert 'Error: should be greater than 10' in str(e) - else: - self.fail('Should have thrown an InvalidInput') - - def test_args_from_args_array_type(self): - fake_type = ArrayType(MyBaseType) - fd = FunctionDefinition(FunctionDefinition) - fd.arguments.append(FunctionArgument('fake-arg', fake_type, True, [])) - try: - args_from_args(fd, [['invalid-argument']], {}) - except InvalidInput as e: - assert ArrayType.__name__ in str(e) - else: - self.fail('Should have thrown an InvalidInput') - - -class ArgTypeConversion(unittest.TestCase): - - def test_int_zero(self): - self.assertEqual(0, from_param(int, 0)) - self.assertEqual(0, from_param(int, '0')) - - def test_int_nonzero(self): - self.assertEqual(1, from_param(int, 1)) - self.assertEqual(1, from_param(int, '1')) - - def test_int_none(self): - self.assertEqual(None, from_param(int, None)) - - def test_float_zero(self): - self.assertEqual(0.0, from_param(float, 0)) - self.assertEqual(0.0, from_param(float, 0.0)) - self.assertEqual(0.0, from_param(float, '0')) - self.assertEqual(0.0, from_param(float, '0.0')) - - def test_float_nonzero(self): - self.assertEqual(1.0, from_param(float, 1)) - self.assertEqual(1.0, from_param(float, 1.0)) - self.assertEqual(1.0, from_param(float, '1')) - self.assertEqual(1.0, from_param(float, '1.0')) - - def test_float_none(self): - self.assertEqual(None, from_param(float, None)) diff --git a/wsme/tests/test_restjson.py b/wsme/tests/test_restjson.py deleted file mode 100644 index 7afbb86..0000000 --- a/wsme/tests/test_restjson.py +++ /dev/null @@ -1,779 +0,0 @@ -import base64 -import datetime -import decimal - -import wsme.tests.protocol - -try: - import simplejson as json -except ImportError: - import json # noqa - -from wsme.rest.json import fromjson, tojson, parse -from wsme.utils import parse_isodatetime, parse_isotime, parse_isodate -from wsme.types import isarray, isdict, isusertype, register_type -from wsme.types import UserType, ArrayType, DictType -from wsme.rest import expose, validate -from wsme.exc import ClientSideError, InvalidInput - - -import six -from six import b, u - -if six.PY3: - from urllib.parse import urlencode -else: - from urllib import urlencode # noqa - - -def prepare_value(value, datatype): - if isinstance(datatype, list): - return [prepare_value(item, datatype[0]) for item in value] - if isinstance(datatype, dict): - key_type, value_type = list(datatype.items())[0] - return dict(( - (prepare_value(item[0], key_type), - prepare_value(item[1], value_type)) - for item in value.items() - )) - if datatype in (datetime.date, datetime.time, datetime.datetime): - return value.isoformat() - if datatype == decimal.Decimal: - return str(value) - if datatype == wsme.types.binary: - return base64.encodestring(value).decode('ascii') - if datatype == wsme.types.bytes: - return value.decode('ascii') - return value - - -def prepare_result(value, datatype): - print(value, datatype) - if value is None: - return None - if datatype == wsme.types.binary: - return base64.decodestring(value.encode('ascii')) - if isusertype(datatype): - datatype = datatype.basetype - if isinstance(datatype, list): - return [prepare_result(item, datatype[0]) for item in value] - if isarray(datatype): - return [prepare_result(item, datatype.item_type) for item in value] - if isinstance(datatype, dict): - return dict(( - (prepare_result(item[0], list(datatype.keys())[0]), - prepare_result(item[1], list(datatype.values())[0])) - for item in value.items() - )) - if isdict(datatype): - return dict(( - (prepare_result(item[0], datatype.key_type), - prepare_result(item[1], datatype.value_type)) - for item in value.items() - )) - if datatype == datetime.date: - return parse_isodate(value) - if datatype == datetime.time: - return parse_isotime(value) - if datatype == datetime.datetime: - return parse_isodatetime(value) - if hasattr(datatype, '_wsme_attributes'): - for attr in datatype._wsme_attributes: - if attr.key not in value: - continue - value[attr.key] = prepare_result(value[attr.key], attr.datatype) - return value - if datatype == wsme.types.bytes: - return value.encode('ascii') - if type(value) != datatype: - print(type(value), datatype) - return datatype(value) - return value - - -class CustomInt(UserType): - basetype = int - name = "custom integer" - - -class Obj(wsme.types.Base): - id = int - name = wsme.types.text - - -class NestedObj(wsme.types.Base): - o = Obj - - -class CRUDResult(object): - data = Obj - message = wsme.types.text - - def __init__(self, data=wsme.types.Unset, message=wsme.types.Unset): - self.data = data - self.message = message - - -class MiniCrud(object): - @expose(CRUDResult, method='PUT') - @validate(Obj) - def create(self, data): - print(repr(data)) - return CRUDResult(data, u('create')) - - @expose(CRUDResult, method='GET', ignore_extra_args=True) - @validate(Obj) - def read(self, ref): - print(repr(ref)) - if ref.id == 1: - ref.name = u('test') - return CRUDResult(ref, u('read')) - - @expose(CRUDResult, method='POST') - @validate(Obj) - def update(self, data): - print(repr(data)) - return CRUDResult(data, u('update')) - - @expose(CRUDResult, wsme.types.text, body=Obj) - def update_with_body(self, msg, data): - print(repr(data)) - return CRUDResult(data, msg) - - @expose(CRUDResult, method='DELETE') - @validate(Obj) - def delete(self, ref): - print(repr(ref)) - if ref.id == 1: - ref.name = u('test') - return CRUDResult(ref, u('delete')) - - -wsme.tests.protocol.WSTestRoot.crud = MiniCrud() - - -class TestRestJson(wsme.tests.protocol.RestOnlyProtocolTestCase): - protocol = 'restjson' - - def call(self, fpath, _rt=None, _accept=None, _no_result_decode=False, - body=None, **kw): - if body: - if isinstance(body, tuple): - body, datatype = body - else: - datatype = type(body) - body = prepare_value(body, datatype) - content = json.dumps(body) - else: - for key in kw: - if isinstance(kw[key], tuple): - value, datatype = kw[key] - else: - value = kw[key] - datatype = type(value) - kw[key] = prepare_value(value, datatype) - content = json.dumps(kw) - headers = { - 'Content-Type': 'application/json', - } - if _accept is not None: - headers["Accept"] = _accept - res = self.app.post( - '/' + fpath, - content, - headers=headers, - expect_errors=True) - print("Received:", res.body) - - if _no_result_decode: - return res - - r = json.loads(res.text) - if res.status_int == 200: - if _rt and r: - r = prepare_result(r, _rt) - return r - else: - raise wsme.tests.protocol.CallException( - r['faultcode'], - r['faultstring'], - r.get('debuginfo') - ) - - return json.loads(res.text) - - def test_fromjson(self): - assert fromjson(str, None) is None - - def test_keyargs(self): - r = self.app.get('/argtypes/setint.json?value=2') - print(r) - assert json.loads(r.text) == 2 - - nestedarray = 'value[0].inner.aint=54&value[1].inner.aint=55' - r = self.app.get('/argtypes/setnestedarray.json?' + nestedarray) - print(r) - assert json.loads(r.text) == [ - {'inner': {'aint': 54}}, - {'inner': {'aint': 55}}] - - def test_form_urlencoded_args(self): - params = { - 'value[0].inner.aint': 54, - 'value[1].inner.aint': 55 - } - body = urlencode(params) - r = self.app.post( - '/argtypes/setnestedarray.json', - body, - headers={'Content-Type': 'application/x-www-form-urlencoded'} - ) - print(r) - - assert json.loads(r.text) == [ - {'inner': {'aint': 54}}, - {'inner': {'aint': 55}}] - - def test_body_and_params(self): - r = self.app.post('/argtypes/setint.json?value=2', '{"value": 2}', - headers={"Content-Type": "application/json"}, - expect_errors=True) - print(r) - assert r.status_int == 400 - assert json.loads(r.text)['faultstring'] == \ - "Parameter value was given several times" - - def test_inline_body(self): - params = urlencode({'__body__': '{"value": 4}'}) - r = self.app.get('/argtypes/setint.json?' + params) - print(r) - assert json.loads(r.text) == 4 - - def test_empty_body(self): - params = urlencode({'__body__': ''}) - r = self.app.get('/returntypes/getint.json?' + params) - print(r) - assert json.loads(r.text) == 2 - - def test_invalid_content_type_body(self): - r = self.app.post('/argtypes/setint.json', '{"value": 2}', - headers={"Content-Type": "application/invalid"}, - expect_errors=True) - print(r) - assert r.status_int == 415 - assert json.loads(r.text)['faultstring'] == \ - "Unknown mimetype: application/invalid" - - def test_invalid_json_body(self): - r = self.app.post('/argtypes/setint.json', '{"value": 2', - headers={"Content-Type": "application/json"}, - expect_errors=True) - print(r) - assert r.status_int == 400 - assert json.loads(r.text)['faultstring'] == \ - "Request is not in valid JSON format" - - def test_unknown_arg(self): - r = self.app.post('/returntypes/getint.json', '{"a": 2}', - headers={"Content-Type": "application/json"}, - expect_errors=True) - print(r) - assert r.status_int == 400 - assert json.loads(r.text)['faultstring'].startswith( - "Unknown argument:" - ) - - r = self.app.get('/returntypes/getint.json?a=2', expect_errors=True) - print(r) - assert r.status_int == 400 - assert json.loads(r.text)['faultstring'].startswith( - "Unknown argument:" - ) - - def test_set_custom_object(self): - r = self.app.post( - '/argtypes/setcustomobject', - '{"value": {"aint": 2, "name": "test"}}', - headers={"Content-Type": "application/json"} - ) - self.assertEqual(r.status_int, 200) - self.assertEqual(r.json, {'aint': 2, 'name': 'test'}) - - def test_set_extended_int(self): - r = self.app.post( - '/argtypes/setextendedint', - '{"value": 3}', - headers={"Content-Type": "application/json"} - ) - self.assertEqual(r.status_int, 200) - self.assertEqual(r.json, 3) - - def test_unset_attrs(self): - class AType(object): - attr = int - - wsme.types.register_type(AType) - - j = tojson(AType, AType()) - assert j == {} - - def test_array_tojson(self): - assert tojson([int], None) is None - assert tojson([int], []) == [] - assert tojson([str], ['1', '4']) == ['1', '4'] - - def test_dict_tojson(self): - assert tojson({int: str}, None) is None - assert tojson({int: str}, {5: '5'}) == {5: '5'} - - def test_None_tojson(self): - for dt in (datetime.date, datetime.time, datetime.datetime, - decimal.Decimal): - assert tojson(dt, None) is None - - def test_None_fromjson(self): - for dt in (str, int, datetime.date, datetime.time, datetime.datetime, - decimal.Decimal, [int], {int: int}): - assert fromjson(dt, None) is None - - def test_parse_valid_date(self): - j = parse('{"a": "2011-01-01"}', {'a': datetime.date}, False) - assert isinstance(j['a'], datetime.date) - - def test_invalid_root_dict_fromjson(self): - try: - parse('["invalid"]', {'a': ArrayType(str)}, False) - assert False - except Exception as e: - assert isinstance(e, ClientSideError) - assert e.msg == "Request must be a JSON dict" - - def test_invalid_list_fromjson(self): - jlist = "invalid" - try: - parse('{"a": "%s"}' % jlist, {'a': ArrayType(str)}, False) - assert False - except Exception as e: - assert isinstance(e, InvalidInput) - assert e.fieldname == 'a' - assert e.value == jlist - assert e.msg == "Value not a valid list: %s" % jlist - - def test_invalid_dict_fromjson(self): - jdict = "invalid" - try: - parse('{"a": "%s"}' % jdict, {'a': DictType(str, str)}, False) - assert False - except Exception as e: - assert isinstance(e, InvalidInput) - assert e.fieldname == 'a' - assert e.value == jdict - assert e.msg == "Value not a valid dict: %s" % jdict - - def test_invalid_date_fromjson(self): - jdate = "2015-01-invalid" - try: - parse('{"a": "%s"}' % jdate, {'a': datetime.date}, False) - assert False - except Exception as e: - assert isinstance(e, InvalidInput) - assert e.fieldname == 'a' - assert e.value == jdate - assert e.msg == "'%s' is not a legal date value" % jdate - - def test_parse_valid_date_bodyarg(self): - j = parse('"2011-01-01"', {'a': datetime.date}, True) - assert isinstance(j['a'], datetime.date) - - def test_invalid_date_fromjson_bodyarg(self): - jdate = "2015-01-invalid" - try: - parse('"%s"' % jdate, {'a': datetime.date}, True) - assert False - except Exception as e: - assert isinstance(e, InvalidInput) - assert e.fieldname == 'a' - assert e.value == jdate - assert e.msg == "'%s' is not a legal date value" % jdate - - def test_valid_str_to_builtin_fromjson(self): - types = six.integer_types + (bool, float) - value = '2' - for t in types: - for ba in True, False: - jd = '%s' if ba else '{"a": %s}' - i = parse(jd % value, {'a': t}, ba) - self.assertEqual( - i, {'a': t(value)}, - "Parsed value does not correspond for %s: " - "%s != {'a': %s}" % ( - t, repr(i), repr(t(value)) - ) - ) - self.assertIsInstance(i['a'], t) - - def test_valid_int_fromjson(self): - value = 2 - for ba in True, False: - jd = '%d' if ba else '{"a": %d}' - i = parse(jd % value, {'a': int}, ba) - self.assertEqual(i, {'a': 2}) - self.assertIsInstance(i['a'], int) - - def test_valid_num_to_float_fromjson(self): - values = 2, 2.3 - for v in values: - for ba in True, False: - jd = '%f' if ba else '{"a": %f}' - i = parse(jd % v, {'a': float}, ba) - self.assertEqual(i, {'a': float(v)}) - self.assertIsInstance(i['a'], float) - - def test_invalid_str_to_buitin_fromjson(self): - types = six.integer_types + (float, bool) - value = '2a' - for t in types: - for ba in True, False: - jd = '"%s"' if ba else '{"a": "%s"}' - try: - parse(jd % value, {'a': t}, ba) - assert False, ( - "Value '%s' should not parse correctly for %s." % - (value, t) - ) - except ClientSideError as e: - self.assertIsInstance(e, InvalidInput) - self.assertEqual(e.fieldname, 'a') - self.assertEqual(e.value, value) - - def test_ambiguous_to_bool(self): - amb_values = ('', 'randomstring', '2', '-32', 'not true') - for value in amb_values: - for ba in True, False: - jd = '"%s"' if ba else '{"a": "%s"}' - try: - parse(jd % value, {'a': bool}, ba) - assert False, ( - "Value '%s' should not parse correctly for %s." % - (value, bool) - ) - except ClientSideError as e: - self.assertIsInstance(e, InvalidInput) - self.assertEqual(e.fieldname, 'a') - self.assertEqual(e.value, value) - - def test_true_strings_to_bool(self): - true_values = ('true', 't', 'yes', 'y', 'on', '1') - for value in true_values: - for ba in True, False: - jd = '"%s"' if ba else '{"a": "%s"}' - i = parse(jd % value, {'a': bool}, ba) - self.assertIsInstance(i['a'], bool) - self.assertTrue(i['a']) - - def test_false_strings_to_bool(self): - false_values = ('false', 'f', 'no', 'n', 'off', '0') - for value in false_values: - for ba in True, False: - jd = '"%s"' if ba else '{"a": "%s"}' - i = parse(jd % value, {'a': bool}, ba) - self.assertIsInstance(i['a'], bool) - self.assertFalse(i['a']) - - def test_true_ints_to_bool(self): - true_values = (1, 5, -3) - for value in true_values: - for ba in True, False: - jd = '%d' if ba else '{"a": %d}' - i = parse(jd % value, {'a': bool}, ba) - self.assertIsInstance(i['a'], bool) - self.assertTrue(i['a']) - - def test_false_ints_to_bool(self): - value = 0 - for ba in True, False: - jd = '%d' if ba else '{"a": %d}' - i = parse(jd % value, {'a': bool}, ba) - self.assertIsInstance(i['a'], bool) - self.assertFalse(i['a']) - - def test_valid_simple_custom_type_fromjson(self): - value = 2 - for ba in True, False: - jd = '"%d"' if ba else '{"a": "%d"}' - i = parse(jd % value, {'a': CustomInt()}, ba) - self.assertEqual(i, {'a': 2}) - self.assertIsInstance(i['a'], int) - - def test_invalid_simple_custom_type_fromjson(self): - value = '2b' - for ba in True, False: - jd = '"%s"' if ba else '{"a": "%s"}' - try: - i = parse(jd % value, {'a': CustomInt()}, ba) - self.assertEqual(i, {'a': 2}) - except ClientSideError as e: - self.assertIsInstance(e, InvalidInput) - self.assertEqual(e.fieldname, 'a') - self.assertEqual(e.value, value) - self.assertEqual( - e.msg, - "invalid literal for int() with base 10: '%s'" % value - ) - - def test_parse_unexpected_attribute(self): - o = { - "id": "1", - "name": "test", - "other": "unknown", - "other2": "still unknown", - } - for ba in True, False: - jd = o if ba else {"o": o} - try: - parse(json.dumps(jd), {'o': Obj}, ba) - raise AssertionError("Object should not parse correcty.") - except wsme.exc.UnknownAttribute as e: - self.assertEqual(e.attributes, set(['other', 'other2'])) - - def test_parse_unexpected_nested_attribute(self): - no = { - "o": { - "id": "1", - "name": "test", - "other": "unknown", - }, - } - for ba in False, True: - jd = no if ba else {"no": no} - try: - parse(json.dumps(jd), {'no': NestedObj}, ba) - except wsme.exc.UnknownAttribute as e: - self.assertEqual(e.attributes, set(['other'])) - self.assertEqual(e.fieldname, "no.o") - - def test_nest_result(self): - self.root.protocols[0].nest_result = True - r = self.app.get('/returntypes/getint.json') - print(r) - assert json.loads(r.text) == {"result": 2} - - def test_encode_sample_value(self): - class MyType(object): - aint = int - astr = str - - register_type(MyType) - - v = MyType() - v.aint = 4 - v.astr = 's' - - r = wsme.rest.json.encode_sample_value(MyType, v, True) - print(r) - assert r[0] == ('javascript') - assert r[1] == json.dumps({'aint': 4, 'astr': 's'}, ensure_ascii=False, - indent=4, sort_keys=True) - - def test_bytes_tojson(self): - assert tojson(wsme.types.bytes, None) is None - assert tojson(wsme.types.bytes, b('ascii')) == u('ascii') - - def test_encode_sample_params(self): - r = wsme.rest.json.encode_sample_params( - [('a', int, 2)], True - ) - assert r[0] == 'javascript', r[0] - assert r[1] == '''{ - "a": 2 -}''', r[1] - - def test_encode_sample_result(self): - r = wsme.rest.json.encode_sample_result( - int, 2, True - ) - assert r[0] == 'javascript', r[0] - assert r[1] == '''2''' - - def test_PUT(self): - data = {"id": 1, "name": u("test")} - content = json.dumps(dict(data=data)) - headers = { - 'Content-Type': 'application/json', - } - res = self.app.put( - '/crud', - content, - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - assert result['message'] == "create" - - def test_GET(self): - headers = { - 'Accept': 'application/json', - } - res = self.app.get( - '/crud?ref.id=1', - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - - def test_GET_complex_accept(self): - headers = { - 'Accept': 'text/html,application/xml;q=0.9,*/*;q=0.8' - } - res = self.app.get( - '/crud?ref.id=1', - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - - def test_GET_complex_choose_xml(self): - headers = { - 'Accept': 'text/html,text/xml;q=0.9,*/*;q=0.8' - } - res = self.app.get( - '/crud?ref.id=1', - headers=headers, - expect_errors=False) - print("Received:", res.body) - assert res.content_type == 'text/xml' - - def test_GET_complex_accept_no_match(self): - headers = { - 'Accept': 'text/html,application/xml;q=0.9' - } - res = self.app.get( - '/crud?ref.id=1', - headers=headers, - status=406) - print("Received:", res.body) - assert res.body == b("Unacceptable Accept type: " - "text/html, application/xml;q=0.9 not in " - "['application/json', 'text/javascript', " - "'application/javascript', 'text/xml']") - - def test_GET_bad_simple_accept(self): - headers = { - 'Accept': 'text/plain', - } - res = self.app.get( - '/crud?ref.id=1', - headers=headers, - status=406) - print("Received:", res.body) - assert res.body == b("Unacceptable Accept type: text/plain not in " - "['application/json', 'text/javascript', " - "'application/javascript', 'text/xml']") - - def test_POST(self): - headers = { - 'Content-Type': 'application/json', - } - res = self.app.post( - '/crud', - json.dumps(dict(data=dict(id=1, name=u('test')))), - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - assert result['message'] == "update" - - def test_POST_bad_content_type(self): - headers = { - 'Content-Type': 'text/plain', - } - res = self.app.post( - '/crud', - json.dumps(dict(data=dict(id=1, name=u('test')))), - headers=headers, - status=415) - print("Received:", res.body) - assert res.body == b("Unacceptable Content-Type: text/plain not in " - "['application/json', 'text/javascript', " - "'application/javascript', 'text/xml']") - - def test_DELETE(self): - res = self.app.delete( - '/crud.json?ref.id=1', - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - assert result['message'] == "delete" - - def test_extra_arguments(self): - headers = { - 'Accept': 'application/json', - } - res = self.app.get( - '/crud?ref.id=1&extraarg=foo', - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - assert result['message'] == "read" - - def test_unexpected_extra_arg(self): - headers = { - 'Content-Type': 'application/json', - } - data = {"id": 1, "name": "test"} - content = json.dumps({"data": data, "other": "unexpected"}) - res = self.app.put( - '/crud', - content, - headers=headers, - expect_errors=True) - self.assertEqual(res.status_int, 400) - - def test_unexpected_extra_attribute(self): - """Expect a failure if we send an unexpected object attribute.""" - headers = { - 'Content-Type': 'application/json', - } - data = {"id": 1, "name": "test", "other": "unexpected"} - content = json.dumps({"data": data}) - res = self.app.put( - '/crud', - content, - headers=headers, - expect_errors=True) - self.assertEqual(res.status_int, 400) - - def test_body_arg(self): - headers = { - 'Content-Type': 'application/json', - } - res = self.app.post( - '/crud/update_with_body?msg=hello', - json.dumps(dict(id=1, name=u('test'))), - headers=headers, - expect_errors=False) - print("Received:", res.body) - result = json.loads(res.text) - print(result) - assert result['data']['id'] == 1 - assert result['data']['name'] == u("test") - assert result['message'] == "hello" diff --git a/wsme/tests/test_restxml.py b/wsme/tests/test_restxml.py deleted file mode 100644 index b677a65..0000000 --- a/wsme/tests/test_restxml.py +++ /dev/null @@ -1,211 +0,0 @@ -import decimal -import datetime -import base64 - -from six import u, b -import six - -import wsme.tests.protocol -from wsme.utils import parse_isodatetime, parse_isodate, parse_isotime -from wsme.types import isarray, isdict, isusertype, register_type - -from wsme.rest.xml import fromxml, toxml - -try: - import xml.etree.ElementTree as et -except ImportError: - import cElementTree as et # noqa - - -def dumpxml(key, obj, datatype=None): - el = et.Element(key) - if isinstance(obj, tuple): - obj, datatype = obj - if isinstance(datatype, list): - for item in obj: - el.append(dumpxml('item', item, datatype[0])) - elif isinstance(datatype, dict): - key_type, value_type = list(datatype.items())[0] - for item in obj.items(): - node = et.SubElement(el, 'item') - node.append(dumpxml('key', item[0], key_type)) - node.append(dumpxml('value', item[1], value_type)) - elif datatype == wsme.types.binary: - el.text = base64.encodestring(obj).decode('ascii') - elif isinstance(obj, wsme.types.bytes): - el.text = obj.decode('ascii') - elif isinstance(obj, wsme.types.text): - el.text = obj - elif type(obj) in (int, float, bool, decimal.Decimal): - el.text = six.text_type(obj) - elif type(obj) in (datetime.date, datetime.time, datetime.datetime): - el.text = obj.isoformat() - elif isinstance(obj, type(None)): - el.set('nil', 'true') - elif hasattr(datatype, '_wsme_attributes'): - for attr in datatype._wsme_attributes: - name = attr.name - if name not in obj: - continue - o = obj[name] - el.append(dumpxml(name, o, attr.datatype)) - elif type(obj) == dict: - for name, value in obj.items(): - el.append(dumpxml(name, value)) - print(obj, datatype, et.tostring(el)) - return el - - -def loadxml(el, datatype): - print(el, datatype, len(el)) - if el.get('nil') == 'true': - return None - if isinstance(datatype, list): - return [loadxml(item, datatype[0]) for item in el.findall('item')] - elif isarray(datatype): - return [ - loadxml(item, datatype.item_type) for item in el.findall('item') - ] - elif isinstance(datatype, dict): - key_type, value_type = list(datatype.items())[0] - return dict(( - (loadxml(item.find('key'), key_type), - loadxml(item.find('value'), value_type)) - for item in el.findall('item') - )) - elif isdict(datatype): - return dict(( - (loadxml(item.find('key'), datatype.key_type), - loadxml(item.find('value'), datatype.value_type)) - for item in el.findall('item') - )) - elif isdict(datatype): - return dict(( - (loadxml(item.find('key'), datatype.key_type), - loadxml(item.find('value'), datatype.value_type)) - for item in el.findall('item') - )) - elif len(el): - d = {} - for attr in datatype._wsme_attributes: - name = attr.name - child = el.find(name) - print(name, attr, child) - if child is not None: - d[name] = loadxml(child, attr.datatype) - print(d) - return d - else: - if datatype == wsme.types.binary: - return base64.decodestring(el.text.encode('ascii')) - if isusertype(datatype): - datatype = datatype.basetype - if datatype == datetime.date: - return parse_isodate(el.text) - if datatype == datetime.time: - return parse_isotime(el.text) - if datatype == datetime.datetime: - return parse_isodatetime(el.text) - if datatype == wsme.types.text: - return datatype(el.text if el.text else u('')) - if datatype == bool: - return el.text.lower() != 'false' - if datatype is None: - return el.text - if datatype is wsme.types.bytes: - return el.text.encode('ascii') - return datatype(el.text) - - -class TestRestXML(wsme.tests.protocol.RestOnlyProtocolTestCase): - protocol = 'restxml' - - def call(self, fpath, _rt=None, _accept=None, _no_result_decode=False, - body=None, **kw): - if body: - el = dumpxml('body', body) - else: - el = dumpxml('parameters', kw) - content = et.tostring(el) - headers = { - 'Content-Type': 'text/xml', - } - if _accept is not None: - headers['Accept'] = _accept - res = self.app.post( - '/' + fpath, - content, - headers=headers, - expect_errors=True) - print("Received:", res.body) - - if _no_result_decode: - return res - - el = et.fromstring(res.body) - if el.tag == 'error': - raise wsme.tests.protocol.CallException( - el.find('faultcode').text, - el.find('faultstring').text, - el.find('debuginfo') is not None and - el.find('debuginfo').text or None - ) - - else: - return loadxml(et.fromstring(res.body), _rt) - - def test_encode_sample_value(self): - class MyType(object): - aint = int - atext = wsme.types.text - - register_type(MyType) - - value = MyType() - value.aint = 5 - value.atext = u('test') - - language, sample = wsme.rest.xml.encode_sample_value( - MyType, value, True) - print(language, sample) - - assert language == 'xml' - assert sample == b("""<value> - <aint>5</aint> - <atext>test</atext> -</value>""") - - def test_encode_sample_params(self): - lang, content = wsme.rest.xml.encode_sample_params( - [('a', int, 2)], True) - assert lang == 'xml', lang - assert content == b('<parameters>\n <a>2</a>\n</parameters>'), content - - def test_encode_sample_result(self): - lang, content = wsme.rest.xml.encode_sample_result(int, 2, True) - assert lang == 'xml', lang - assert content == b('<result>2</result>'), content - - def test_nil_fromxml(self): - for dt in ( - str, [int], {int: str}, bool, - datetime.date, datetime.time, datetime.datetime): - e = et.Element('value', nil='true') - assert fromxml(dt, e) is None - - def test_nil_toxml(self): - for dt in ( - wsme.types.bytes, - [int], {int: str}, bool, - datetime.date, datetime.time, datetime.datetime): - x = et.tostring(toxml(dt, 'value', None)) - assert x == b('<value nil="true" />'), x - - def test_unset_attrs(self): - class AType(object): - someattr = wsme.types.bytes - - wsme.types.register_type(AType) - - x = et.tostring(toxml(AType, 'value', AType())) - assert x == b('<value />'), x diff --git a/wsme/tests/test_root.py b/wsme/tests/test_root.py deleted file mode 100644 index 8ccb0c1..0000000 --- a/wsme/tests/test_root.py +++ /dev/null @@ -1,122 +0,0 @@ -# encoding=utf8 - -import unittest - -from wsme import WSRoot -import wsme.protocol -import wsme.rest.protocol -from wsme.root import default_prepare_response_body - -from six import b, u -from webob import Request - - -class TestRoot(unittest.TestCase): - def test_default_transaction(self): - import transaction - root = WSRoot(transaction=True) - assert root._transaction is transaction - - txn = root.begin() - txn.abort() - - def test_default_prepare_response_body(self): - default_prepare_response_body(None, [b('a')]) == b('a') - default_prepare_response_body(None, [b('a'), b('b')]) == b('a\nb') - default_prepare_response_body(None, [u('a')]) == u('a') - default_prepare_response_body(None, [u('a'), u('b')]) == u('a\nb') - - def test_protocol_selection_error(self): - class P(wsme.protocol.Protocol): - name = "test" - - def accept(self, r): - raise Exception('test') - - root = WSRoot() - root.addprotocol(P()) - - from webob import Request - req = Request.blank('/test?check=a&check=b&name=Bob') - res = root._handle_request(req) - assert res.status_int == 500 - assert res.content_type == 'text/plain' - assert (res.text == - 'Unexpected error while selecting protocol: test'), req.text - - def test_protocol_selection_accept_mismatch(self): - """Verify that we get a 406 error on wrong Accept header.""" - class P(wsme.protocol.Protocol): - name = "test" - - def accept(self, r): - return False - - root = WSRoot() - root.addprotocol(wsme.rest.protocol.RestProtocol()) - root.addprotocol(P()) - - req = Request.blank('/test?check=a&check=b&name=Bob') - req.method = 'GET' - res = root._handle_request(req) - assert res.status_int == 406 - assert res.content_type == 'text/plain' - assert res.text.startswith( - 'None of the following protocols can handle this request' - ), req.text - - def test_protocol_selection_content_type_mismatch(self): - """Verify that we get a 415 error on wrong Content-Type header.""" - class P(wsme.protocol.Protocol): - name = "test" - - def accept(self, r): - return False - - root = WSRoot() - root.addprotocol(wsme.rest.protocol.RestProtocol()) - root.addprotocol(P()) - - req = Request.blank('/test?check=a&check=b&name=Bob') - req.method = 'POST' - req.headers['Content-Type'] = "test/unsupported" - res = root._handle_request(req) - assert res.status_int == 415 - assert res.content_type == 'text/plain' - assert res.text.startswith( - 'Unacceptable Content-Type: test/unsupported not in' - ), req.text - - def test_protocol_selection_get_method(self): - class P(wsme.protocol.Protocol): - name = "test" - - def accept(self, r): - return True - - root = WSRoot() - root.addprotocol(wsme.rest.protocol.RestProtocol()) - root.addprotocol(P()) - - req = Request.blank('/test?check=a&check=b&name=Bob') - req.method = 'GET' - req.headers['Accept'] = 'test/fake' - p = root._select_protocol(req) - assert p.name == "test" - - def test_protocol_selection_post_method(self): - class P(wsme.protocol.Protocol): - name = "test" - - def accept(self, r): - return True - - root = WSRoot() - root.addprotocol(wsme.rest.protocol.RestProtocol()) - root.addprotocol(P()) - - req = Request.blank('/test?check=a&check=b&name=Bob') - req.headers['Content-Type'] = 'test/fake' - req.method = 'POST' - p = root._select_protocol(req) - assert p.name == "test" diff --git a/wsme/tests/test_spore.py b/wsme/tests/test_spore.py deleted file mode 100644 index d6509f3..0000000 --- a/wsme/tests/test_spore.py +++ /dev/null @@ -1,51 +0,0 @@ -import unittest - -try: - import simplejson as json -except ImportError: - import json - -from wsme.tests.protocol import WSTestRoot -import wsme.tests.test_restjson -import wsme.spore - - -class TestSpore(unittest.TestCase): - def test_spore(self): - spore = wsme.spore.getdesc(WSTestRoot()) - - print(spore) - - spore = json.loads(spore) - - assert len(spore['methods']) == 51, str(len(spore['methods'])) - - m = spore['methods']['argtypes_setbytesarray'] - assert m['path'] == 'argtypes/setbytesarray', m['path'] - assert m['optional_params'] == ['value'] - assert m['method'] == 'POST' - - m = spore['methods']['argtypes_setdecimal'] - assert m['path'] == 'argtypes/setdecimal' - assert m['required_params'] == ['value'] - assert m['method'] == 'GET' - - m = spore['methods']['crud_create'] - assert m['path'] == 'crud' - assert m['method'] == 'PUT' - assert m['optional_params'] == ['data'] - - m = spore['methods']['crud_read'] - assert m['path'] == 'crud' - assert m['method'] == 'GET' - assert m['required_params'] == ['ref'] - - m = spore['methods']['crud_update'] - assert m['path'] == 'crud' - assert m['method'] == 'POST' - assert m['optional_params'] == ['data'] - - m = spore['methods']['crud_delete'] - assert m['path'] == 'crud' - assert m['method'] == 'DELETE' - assert m['optional_params'] == ['ref'] diff --git a/wsme/tests/test_types.py b/wsme/tests/test_types.py deleted file mode 100644 index 4a07130..0000000 --- a/wsme/tests/test_types.py +++ /dev/null @@ -1,667 +0,0 @@ -import re -try: - import unittest2 as unittest -except ImportError: - import unittest -import six - -from wsme import exc -from wsme import types - - -def gen_class(): - d = {} - exec('''class tmp(object): pass''', d) - return d['tmp'] - - -class TestTypes(unittest.TestCase): - def setUp(self): - types.registry = types.Registry() - - def test_default_usertype(self): - class MyType(types.UserType): - basetype = str - - My = MyType() - - assert My.validate('a') == 'a' - assert My.tobasetype('a') == 'a' - assert My.frombasetype('a') == 'a' - - def test_unset(self): - u = types.Unset - - assert not u - - def test_flat_type(self): - class Flat(object): - aint = int - abytes = six.binary_type - atext = six.text_type - afloat = float - - types.register_type(Flat) - - assert len(Flat._wsme_attributes) == 4 - attrs = Flat._wsme_attributes - print(attrs) - - assert attrs[0].key == 'aint' - assert attrs[0].name == 'aint' - assert isinstance(attrs[0], types.wsattr) - assert attrs[0].datatype == int - assert attrs[0].mandatory is False - assert attrs[1].key == 'abytes' - assert attrs[1].name == 'abytes' - assert attrs[2].key == 'atext' - assert attrs[2].name == 'atext' - assert attrs[3].key == 'afloat' - assert attrs[3].name == 'afloat' - - def test_private_attr(self): - class WithPrivateAttrs(object): - _private = 12 - - types.register_type(WithPrivateAttrs) - - assert len(WithPrivateAttrs._wsme_attributes) == 0 - - def test_attribute_order(self): - class ForcedOrder(object): - _wsme_attr_order = ('a2', 'a1', 'a3') - a1 = int - a2 = int - a3 = int - - types.register_type(ForcedOrder) - - print(ForcedOrder._wsme_attributes) - assert ForcedOrder._wsme_attributes[0].key == 'a2' - assert ForcedOrder._wsme_attributes[1].key == 'a1' - assert ForcedOrder._wsme_attributes[2].key == 'a3' - - c = gen_class() - print(c) - types.register_type(c) - del c._wsme_attributes - - c.a2 = int - c.a1 = int - c.a3 = int - - types.register_type(c) - - assert c._wsme_attributes[0].key == 'a1', c._wsme_attributes[0].key - assert c._wsme_attributes[1].key == 'a2' - assert c._wsme_attributes[2].key == 'a3' - - def test_wsproperty(self): - class WithWSProp(object): - def __init__(self): - self._aint = 0 - - def get_aint(self): - return self._aint - - def set_aint(self, value): - self._aint = value - - aint = types.wsproperty(int, get_aint, set_aint, mandatory=True) - - types.register_type(WithWSProp) - - print(WithWSProp._wsme_attributes) - assert len(WithWSProp._wsme_attributes) == 1 - a = WithWSProp._wsme_attributes[0] - assert a.key == 'aint' - assert a.datatype == int - assert a.mandatory - - o = WithWSProp() - o.aint = 12 - - assert o.aint == 12 - - def test_nested(self): - class Inner(object): - aint = int - - class Outer(object): - inner = Inner - - types.register_type(Outer) - - assert hasattr(Inner, '_wsme_attributes') - assert len(Inner._wsme_attributes) == 1 - - def test_inspect_with_inheritance(self): - class Parent(object): - parent_attribute = int - - class Child(Parent): - child_attribute = int - - types.register_type(Parent) - types.register_type(Child) - - assert len(Child._wsme_attributes) == 2 - - def test_selfreftype(self): - class SelfRefType(object): - pass - - SelfRefType.parent = SelfRefType - - types.register_type(SelfRefType) - - def test_inspect_with_property(self): - class AType(object): - @property - def test(self): - return 'test' - - types.register_type(AType) - - assert len(AType._wsme_attributes) == 0 - assert AType().test == 'test' - - def test_enum(self): - aenum = types.Enum(str, 'v1', 'v2') - assert aenum.basetype is str - - class AType(object): - a = aenum - - types.register_type(AType) - - assert AType.a.datatype is aenum - - obj = AType() - obj.a = 'v1' - assert obj.a == 'v1', repr(obj.a) - - self.assertRaisesRegexp(exc.InvalidInput, - "Invalid input for field/attribute a. \ -Value: 'v3'. Value should be one of: v., v.", - setattr, - obj, - 'a', - 'v3') - - def test_attribute_validation(self): - class AType(object): - alist = [int] - aint = int - - types.register_type(AType) - - obj = AType() - - obj.alist = [1, 2, 3] - assert obj.alist == [1, 2, 3] - obj.aint = 5 - assert obj.aint == 5 - - self.assertRaises(exc.InvalidInput, setattr, obj, 'alist', 12) - self.assertRaises(exc.InvalidInput, setattr, obj, 'alist', [2, 'a']) - - def test_attribute_validation_minimum(self): - class ATypeInt(object): - attr = types.IntegerType(minimum=1, maximum=5) - - types.register_type(ATypeInt) - - obj = ATypeInt() - obj.attr = 2 - - # comparison between 'zero' value and intger minimum (1) raises a - # TypeError which must be wrapped into an InvalidInput exception - self.assertRaises(exc.InvalidInput, setattr, obj, 'attr', 'zero') - - def test_text_attribute_conversion(self): - class SType(object): - atext = types.text - abytes = types.bytes - - types.register_type(SType) - - obj = SType() - - obj.atext = six.b('somebytes') - assert obj.atext == six.u('somebytes') - assert isinstance(obj.atext, types.text) - - obj.abytes = six.u('sometext') - assert obj.abytes == six.b('sometext') - assert isinstance(obj.abytes, types.bytes) - - def test_named_attribute(self): - class ABCDType(object): - a_list = types.wsattr([int], name='a.list') - astr = str - - types.register_type(ABCDType) - - assert len(ABCDType._wsme_attributes) == 2 - attrs = ABCDType._wsme_attributes - - assert attrs[0].key == 'a_list', attrs[0].key - assert attrs[0].name == 'a.list', attrs[0].name - assert attrs[1].key == 'astr', attrs[1].key - assert attrs[1].name == 'astr', attrs[1].name - - def test_wsattr_del(self): - class MyType(object): - a = types.wsattr(int) - - types.register_type(MyType) - - value = MyType() - - value.a = 5 - assert value.a == 5 - del value.a - assert value.a is types.Unset - - def test_validate_dict(self): - assert types.validate_value({int: str}, {1: '1', 5: '5'}) - - self.assertRaises(ValueError, types.validate_value, - {int: str}, []) - - assert types.validate_value({int: str}, {'1': '1', 5: '5'}) - - self.assertRaises(ValueError, types.validate_value, - {int: str}, {1: 1, 5: '5'}) - - def test_validate_list_valid(self): - assert types.validate_value([int], [1, 2]) - assert types.validate_value([int], ['5']) - - def test_validate_list_empty(self): - assert types.validate_value([int], []) == [] - - def test_validate_list_none(self): - v = types.ArrayType(int) - assert v.validate(None) is None - - def test_validate_list_invalid_member(self): - self.assertRaises(ValueError, types.validate_value, [int], - ['not-a-number']) - - def test_validate_list_invalid_type(self): - self.assertRaises(ValueError, types.validate_value, [int], 1) - - def test_validate_float(self): - self.assertEqual(types.validate_value(float, 1), 1.0) - self.assertEqual(types.validate_value(float, '1'), 1.0) - self.assertEqual(types.validate_value(float, 1.1), 1.1) - self.assertRaises(ValueError, types.validate_value, float, []) - self.assertRaises(ValueError, types.validate_value, float, - 'not-a-float') - - def test_validate_int(self): - self.assertEqual(types.validate_value(int, 1), 1) - self.assertEqual(types.validate_value(int, '1'), 1) - self.assertEqual(types.validate_value(int, six.u('1')), 1) - self.assertRaises(ValueError, types.validate_value, int, 1.1) - - def test_validate_integer_type(self): - v = types.IntegerType(minimum=1, maximum=10) - v.validate(1) - v.validate(5) - v.validate(10) - self.assertRaises(ValueError, v.validate, 0) - self.assertRaises(ValueError, v.validate, 11) - - def test_validate_string_type(self): - v = types.StringType(min_length=1, max_length=10, - pattern='^[a-zA-Z0-9]*$') - v.validate('1') - v.validate('12345') - v.validate('1234567890') - self.assertRaises(ValueError, v.validate, '') - self.assertRaises(ValueError, v.validate, '12345678901') - - # Test a pattern validation - v.validate('a') - v.validate('A') - self.assertRaises(ValueError, v.validate, '_') - - def test_validate_string_type_precompile(self): - precompile = re.compile('^[a-zA-Z0-9]*$') - v = types.StringType(min_length=1, max_length=10, - pattern=precompile) - - # Test a pattern validation - v.validate('a') - v.validate('A') - self.assertRaises(ValueError, v.validate, '_') - - def test_validate_string_type_pattern_exception_message(self): - regex = '^[a-zA-Z0-9]*$' - v = types.StringType(pattern=regex) - try: - v.validate('_') - self.assertFail() - except ValueError as e: - self.assertIn(regex, str(e)) - - def test_validate_ipv4_address_type(self): - v = types.IPv4AddressType() - self.assertEqual(v.validate('127.0.0.1'), '127.0.0.1') - self.assertEqual(v.validate('192.168.0.1'), '192.168.0.1') - self.assertEqual(v.validate(u'8.8.1.1'), u'8.8.1.1') - self.assertRaises(ValueError, v.validate, '') - self.assertRaises(ValueError, v.validate, 'foo') - self.assertRaises(ValueError, v.validate, - '2001:0db8:bd05:01d2:288a:1fc0:0001:10ee') - self.assertRaises(ValueError, v.validate, '1.2.3') - - def test_validate_ipv6_address_type(self): - v = types.IPv6AddressType() - self.assertEqual(v.validate('0:0:0:0:0:0:0:1'), - '0:0:0:0:0:0:0:1') - self.assertEqual(v.validate(u'0:0:0:0:0:0:0:1'), u'0:0:0:0:0:0:0:1') - self.assertEqual(v.validate('2001:0db8:bd05:01d2:288a:1fc0:0001:10ee'), - '2001:0db8:bd05:01d2:288a:1fc0:0001:10ee') - self.assertRaises(ValueError, v.validate, '') - self.assertRaises(ValueError, v.validate, 'foo') - self.assertRaises(ValueError, v.validate, '192.168.0.1') - self.assertRaises(ValueError, v.validate, '0:0:0:0:0:0:1') - - def test_validate_uuid_type(self): - v = types.UuidType() - self.assertEqual(v.validate('6a0a707c-45ef-4758-b533-e55adddba8ce'), - '6a0a707c-45ef-4758-b533-e55adddba8ce') - self.assertEqual(v.validate('6a0a707c45ef4758b533e55adddba8ce'), - '6a0a707c-45ef-4758-b533-e55adddba8ce') - self.assertRaises(ValueError, v.validate, '') - self.assertRaises(ValueError, v.validate, 'foo') - self.assertRaises(ValueError, v.validate, - '6a0a707c-45ef-4758-b533-e55adddba8ce-a') - - def test_register_invalid_array(self): - self.assertRaises(ValueError, types.register_type, []) - self.assertRaises(ValueError, types.register_type, [int, str]) - self.assertRaises(AttributeError, types.register_type, [1]) - - def test_register_invalid_dict(self): - self.assertRaises(ValueError, types.register_type, {}) - self.assertRaises(ValueError, types.register_type, - {int: str, str: int}) - self.assertRaises(ValueError, types.register_type, - {types.Unset: str}) - - def test_list_attribute_no_auto_register(self): - class MyType(object): - aint = int - - assert not hasattr(MyType, '_wsme_attributes') - - self.assertRaises(TypeError, types.list_attributes, MyType) - - assert not hasattr(MyType, '_wsme_attributes') - - def test_list_of_complextypes(self): - class A(object): - bs = types.wsattr(['B']) - - class B(object): - i = int - - types.register_type(A) - types.register_type(B) - - assert A.bs.datatype.item_type is B - - def test_cross_referenced_types(self): - class A(object): - b = types.wsattr('B') - - class B(object): - a = A - - types.register_type(A) - types.register_type(B) - - assert A.b.datatype is B - - def test_base(self): - class B1(types.Base): - b2 = types.wsattr('B2') - - class B2(types.Base): - b2 = types.wsattr('B2') - - assert B1.b2.datatype is B2, repr(B1.b2.datatype) - assert B2.b2.datatype is B2 - - def test_base_init(self): - class C1(types.Base): - s = six.text_type - - c = C1(s=six.u('test')) - assert c.s == six.u('test') - - def test_array_eq(self): - ell = [types.ArrayType(str)] - assert types.ArrayType(str) in ell - - def test_array_sample(self): - s = types.ArrayType(str).sample() - assert isinstance(s, list) - assert s - assert s[0] == '' - - def test_dict_sample(self): - s = types.DictType(str, str).sample() - assert isinstance(s, dict) - assert s - assert s == {'': ''} - - def test_binary_to_base(self): - import base64 - assert types.binary.tobasetype(None) is None - expected = base64.encodestring(six.b('abcdef')) - assert types.binary.tobasetype(six.b('abcdef')) == expected - - def test_binary_from_base(self): - import base64 - assert types.binary.frombasetype(None) is None - encoded = base64.encodestring(six.b('abcdef')) - assert types.binary.frombasetype(encoded) == six.b('abcdef') - - def test_wsattr_weakref_datatype(self): - # If the datatype inside the wsattr ends up a weakref, it - # should be converted to the real type when accessed again by - # the property getter. - import weakref - a = types.wsattr(int) - a.datatype = weakref.ref(int) - assert a.datatype is int - - def test_wsattr_list_datatype(self): - # If the datatype inside the wsattr ends up a list of weakrefs - # to types, it should be converted to the real types when - # accessed again by the property getter. - import weakref - a = types.wsattr(int) - a.datatype = [weakref.ref(int)] - assert isinstance(a.datatype, list) - assert a.datatype[0] is int - - def test_file_get_content_by_reading(self): - class buffer: - def read(self): - return 'abcdef' - f = types.File(file=buffer()) - assert f.content == 'abcdef' - - def test_file_content_overrides_file(self): - class buffer: - def read(self): - return 'from-file' - f = types.File(content='from-content', file=buffer()) - assert f.content == 'from-content' - - def test_file_setting_content_discards_file(self): - class buffer: - def read(self): - return 'from-file' - f = types.File(file=buffer()) - f.content = 'from-content' - assert f.content == 'from-content' - - def test_file_field_storage(self): - class buffer: - def read(self): - return 'from-file' - - class fieldstorage: - filename = 'static.json' - file = buffer() - type = 'application/json' - f = types.File(fieldstorage=fieldstorage) - assert f.content == 'from-file' - - def test_file_field_storage_value(self): - class buffer: - def read(self): - return 'from-file' - - class fieldstorage: - filename = 'static.json' - file = None - type = 'application/json' - value = 'from-value' - f = types.File(fieldstorage=fieldstorage) - assert f.content == 'from-value' - - def test_file_property_file(self): - class buffer: - def read(self): - return 'from-file' - buf = buffer() - f = types.File(file=buf) - assert f.file is buf - - def test_file_property_content(self): - class buffer: - def read(self): - return 'from-file' - f = types.File(content=six.b('from-content')) - assert f.file.read() == six.b('from-content') - - def test_unregister(self): - class TempType(object): - pass - types.registry.register(TempType) - v = types.registry.lookup('TempType') - self.assertIs(v, TempType) - types.registry._unregister(TempType) - after = types.registry.lookup('TempType') - self.assertIs(after, None) - - def test_unregister_twice(self): - class TempType(object): - pass - types.registry.register(TempType) - v = types.registry.lookup('TempType') - self.assertIs(v, TempType) - types.registry._unregister(TempType) - # Second call should not raise an exception - types.registry._unregister(TempType) - after = types.registry.lookup('TempType') - self.assertIs(after, None) - - def test_unregister_array_type(self): - class TempType(object): - pass - t = [TempType] - types.registry.register(t) - self.assertNotEqual(types.registry.array_types, set()) - types.registry._unregister(t) - self.assertEqual(types.registry.array_types, set()) - - def test_unregister_array_type_twice(self): - class TempType(object): - pass - t = [TempType] - types.registry.register(t) - self.assertNotEqual(types.registry.array_types, set()) - types.registry._unregister(t) - # Second call should not raise an exception - types.registry._unregister(t) - self.assertEqual(types.registry.array_types, set()) - - def test_unregister_dict_type(self): - class TempType(object): - pass - t = {str: TempType} - types.registry.register(t) - self.assertNotEqual(types.registry.dict_types, set()) - types.registry._unregister(t) - self.assertEqual(types.registry.dict_types, set()) - - def test_unregister_dict_type_twice(self): - class TempType(object): - pass - t = {str: TempType} - types.registry.register(t) - self.assertNotEqual(types.registry.dict_types, set()) - types.registry._unregister(t) - # Second call should not raise an exception - types.registry._unregister(t) - self.assertEqual(types.registry.dict_types, set()) - - def test_reregister(self): - class TempType(object): - pass - types.registry.register(TempType) - v = types.registry.lookup('TempType') - self.assertIs(v, TempType) - types.registry.reregister(TempType) - after = types.registry.lookup('TempType') - self.assertIs(after, TempType) - - def test_reregister_and_add_attr(self): - class TempType(object): - pass - types.registry.register(TempType) - attrs = types.list_attributes(TempType) - self.assertEqual(attrs, []) - TempType.one = str - types.registry.reregister(TempType) - after = types.list_attributes(TempType) - self.assertNotEqual(after, []) - - def test_dynamicbase_add_attributes(self): - class TempType(types.DynamicBase): - pass - types.registry.register(TempType) - attrs = types.list_attributes(TempType) - self.assertEqual(attrs, []) - TempType.add_attributes(one=str) - after = types.list_attributes(TempType) - self.assertEqual(len(after), 1) - - def test_dynamicbase_add_attributes_second(self): - class TempType(types.DynamicBase): - pass - types.registry.register(TempType) - attrs = types.list_attributes(TempType) - self.assertEqual(attrs, []) - TempType.add_attributes(one=str) - TempType.add_attributes(two=int) - after = types.list_attributes(TempType) - self.assertEqual(len(after), 2) - - def test_non_registered_complex_type(self): - class TempType(types.Base): - __registry__ = None - - self.assertFalse(types.iscomplex(TempType)) - types.registry.register(TempType) - self.assertTrue(types.iscomplex(TempType)) diff --git a/wsme/tests/test_utils.py b/wsme/tests/test_utils.py deleted file mode 100644 index f9464c8..0000000 --- a/wsme/tests/test_utils.py +++ /dev/null @@ -1,97 +0,0 @@ -import datetime -import unittest -import pytz - -from wsme import utils - - -class TestUtils(unittest.TestCase): - def test_parse_isodate(self): - good_dates = [ - ('2008-02-01', datetime.date(2008, 2, 1)), - ('2009-01-04', datetime.date(2009, 1, 4)), - ] - ill_formatted_dates = [ - '24-12-2004' - ] - out_of_range_dates = [ - '0000-00-00', - '2012-02-30', - ] - for s, d in good_dates: - assert utils.parse_isodate(s) == d - for s in ill_formatted_dates + out_of_range_dates: - self.assertRaises(ValueError, utils.parse_isodate, s) - - def test_parse_isotime(self): - good_times = [ - ('12:03:54', datetime.time(12, 3, 54)), - ('23:59:59.000004', datetime.time(23, 59, 59, 4)), - ('01:02:03+00:00', datetime.time(1, 2, 3, 0, pytz.UTC)), - ('01:02:03+23:59', datetime.time(1, 2, 3, 0, - pytz.FixedOffset(1439))), - ('01:02:03-23:59', datetime.time(1, 2, 3, 0, - pytz.FixedOffset(-1439))), - ] - ill_formatted_times = [ - '24-12-2004' - ] - out_of_range_times = [ - '32:12:00', - '00:54:60', - '01:02:03-24:00', - '01:02:03+24:00', - ] - for s, t in good_times: - assert utils.parse_isotime(s) == t - for s in ill_formatted_times + out_of_range_times: - self.assertRaises(ValueError, utils.parse_isotime, s) - - def test_parse_isodatetime(self): - good_datetimes = [ - ('2008-02-12T12:03:54', - datetime.datetime(2008, 2, 12, 12, 3, 54)), - ('2012-05-14T23:59:59.000004', - datetime.datetime(2012, 5, 14, 23, 59, 59, 4)), - ('1856-07-10T01:02:03+00:00', - datetime.datetime(1856, 7, 10, 1, 2, 3, 0, pytz.UTC)), - ('1856-07-10T01:02:03+23:59', - datetime.datetime(1856, 7, 10, 1, 2, 3, 0, - pytz.FixedOffset(1439))), - ('1856-07-10T01:02:03-23:59', - datetime.datetime(1856, 7, 10, 1, 2, 3, 0, - pytz.FixedOffset(-1439))), - ] - ill_formatted_datetimes = [ - '24-32-2004', - '1856-07-10+33:00' - ] - out_of_range_datetimes = [ - '2008-02-12T32:12:00', - '2012-13-12T00:54:60', - ] - for s, t in good_datetimes: - assert utils.parse_isodatetime(s) == t - for s in ill_formatted_datetimes + out_of_range_datetimes: - self.assertRaises(ValueError, utils.parse_isodatetime, s) - - def test_validator_with_valid_code(self): - valid_code = 404 - self.assertTrue( - utils.is_valid_code(valid_code), - "Valid status code not detected" - ) - - def test_validator_with_invalid_int_code(self): - invalid_int_code = 648 - self.assertFalse( - utils.is_valid_code(invalid_int_code), - "Invalid status code not detected" - ) - - def test_validator_with_invalid_str_code(self): - invalid_str_code = '404' - self.assertFalse( - utils.is_valid_code(invalid_str_code), - "Invalid status code not detected" - ) diff --git a/wsme/types.py b/wsme/types.py deleted file mode 100644 index 0a5fc02..0000000 --- a/wsme/types.py +++ /dev/null @@ -1,840 +0,0 @@ -import base64 -import datetime -import decimal -import inspect -import logging -import netaddr -import re -import six -import sys -import uuid -import weakref - -from wsme import exc - -log = logging.getLogger(__name__) - -#: The 'str' (python 2) or 'bytes' (python 3) type. -#: Its use should be restricted to -#: pure ascii strings as the protocols will generally not be -#: be able to send non-unicode strings. -#: To transmit binary strings, use the :class:`binary` type -bytes = six.binary_type - -#: Unicode string. -text = six.text_type - - -class ArrayType(object): - def __init__(self, item_type): - if iscomplex(item_type): - self._item_type = weakref.ref(item_type) - else: - self._item_type = item_type - - def __hash__(self): - return hash(self.item_type) - - def __eq__(self, other): - return isinstance(other, ArrayType) \ - and self.item_type == other.item_type - - def sample(self): - return [getattr(self.item_type, 'sample', self.item_type)()] - - @property - def item_type(self): - if isinstance(self._item_type, weakref.ref): - return self._item_type() - else: - return self._item_type - - def validate(self, value): - if value is None: - return - if not isinstance(value, list): - raise ValueError("Wrong type. Expected '[%s]', got '%s'" % ( - self.item_type, type(value) - )) - return [ - validate_value(self.item_type, item) - for item in value - ] - - -class DictType(object): - def __init__(self, key_type, value_type): - if key_type not in pod_types: - raise ValueError("Dictionaries key can only be a pod type") - self.key_type = key_type - if iscomplex(value_type): - self._value_type = weakref.ref(value_type) - else: - self._value_type = value_type - - def __hash__(self): - return hash((self.key_type, self.value_type)) - - def sample(self): - key = getattr(self.key_type, 'sample', self.key_type)() - value = getattr(self.value_type, 'sample', self.value_type)() - return {key: value} - - @property - def value_type(self): - if isinstance(self._value_type, weakref.ref): - return self._value_type() - else: - return self._value_type - - def validate(self, value): - if not isinstance(value, dict): - raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % ( - self.key_type, self.value_type, type(value) - )) - return dict(( - ( - validate_value(self.key_type, key), - validate_value(self.value_type, v) - ) for key, v in value.items() - )) - - -class UserType(object): - basetype = None - name = None - - def validate(self, value): - return value - - def tobasetype(self, value): - return value - - def frombasetype(self, value): - return value - - -def isusertype(class_): - return isinstance(class_, UserType) - - -class BinaryType(UserType): - """ - A user type that use base64 strings to carry binary data. - """ - basetype = bytes - name = 'binary' - - def tobasetype(self, value): - if value is None: - return None - return base64.encodestring(value) - - def frombasetype(self, value): - if value is None: - return None - return base64.decodestring(value) - - -#: The binary almost-native type -binary = BinaryType() - - -class IntegerType(UserType): - """ - A simple integer type. Can validate a value range. - - :param minimum: Possible minimum value - :param maximum: Possible maximum value - - Example:: - - Price = IntegerType(minimum=1) - - """ - basetype = int - name = "integer" - - def __init__(self, minimum=None, maximum=None): - self.minimum = minimum - self.maximum = maximum - - @staticmethod - def frombasetype(value): - return int(value) if value is not None else None - - def validate(self, value): - if self.minimum is not None and value < self.minimum: - error = 'Value should be greater or equal to %s' % self.minimum - raise ValueError(error) - - if self.maximum is not None and value > self.maximum: - error = 'Value should be lower or equal to %s' % self.maximum - raise ValueError(error) - - return value - - -class StringType(UserType): - """ - A simple string type. Can validate a length and a pattern. - - :param min_length: Possible minimum length - :param max_length: Possible maximum length - :param pattern: Possible string pattern - - Example:: - - Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$') - - """ - basetype = six.string_types - name = "string" - - def __init__(self, min_length=None, max_length=None, pattern=None): - self.min_length = min_length - self.max_length = max_length - if isinstance(pattern, six.string_types): - self.pattern = re.compile(pattern) - else: - self.pattern = pattern - - def validate(self, value): - if not isinstance(value, self.basetype): - error = 'Value should be string' - raise ValueError(error) - - if self.min_length is not None and len(value) < self.min_length: - error = 'Value should have a minimum character requirement of %s' \ - % self.min_length - raise ValueError(error) - - if self.max_length is not None and len(value) > self.max_length: - error = 'Value should have a maximum character requirement of %s' \ - % self.max_length - raise ValueError(error) - - if self.pattern is not None and not self.pattern.search(value): - error = 'Value should match the pattern %s' % self.pattern.pattern - raise ValueError(error) - - return value - - -class IPv4AddressType(UserType): - """ - A simple IPv4 type. - """ - basetype = six.string_types - name = "ipv4address" - - @staticmethod - def validate(value): - try: - netaddr.IPAddress(value, version=4, flags=netaddr.INET_PTON) - except netaddr.AddrFormatError: - error = 'Value should be IPv4 format' - raise ValueError(error) - else: - return value - - -class IPv6AddressType(UserType): - """ - A simple IPv6 type. - - This type represents IPv6 addresses in the short format. - """ - basetype = six.string_types - name = "ipv6address" - - @staticmethod - def validate(value): - try: - netaddr.IPAddress(value, version=6, flags=netaddr.INET_PTON) - except netaddr.AddrFormatError: - error = 'Value should be IPv6 format' - raise ValueError(error) - else: - return value - - -class UuidType(UserType): - """ - A simple UUID type. - - This type allows not only UUID having dashes but also UUID not - having dashes. For example, '6a0a707c-45ef-4758-b533-e55adddba8ce' - and '6a0a707c45ef4758b533e55adddba8ce' are distinguished as valid. - """ - basetype = six.string_types - name = "uuid" - - @staticmethod - def validate(value): - try: - return six.text_type((uuid.UUID(value))) - except (TypeError, ValueError, AttributeError): - error = 'Value should be UUID format' - raise ValueError(error) - - -class Enum(UserType): - """ - A simple enumeration type. Can be based on any non-complex type. - - :param basetype: The actual data type - :param values: A set of possible values - - If nullable, 'None' should be added the values set. - - Example:: - - Gender = Enum(str, 'male', 'female') - Specie = Enum(str, 'cat', 'dog') - - """ - def __init__(self, basetype, *values, **kw): - self.basetype = basetype - self.values = set(values) - name = kw.pop('name', None) - if name is None: - name = "Enum(%s)" % ', '.join((six.text_type(v) for v in values)) - self.name = name - - def validate(self, value): - if value not in self.values: - raise ValueError("Value should be one of: %s" % - ', '.join(map(six.text_type, self.values))) - return value - - def tobasetype(self, value): - return value - - def frombasetype(self, value): - return value - - -class UnsetType(object): - if sys.version < '3': - def __nonzero__(self): - return False - else: - def __bool__(self): - return False - - def __repr__(self): - return 'Unset' - - -Unset = UnsetType() - -#: A special type that corresponds to the host framework request object. -#: It can only be used in the function parameters, and if so the request object -#: of the host framework will be passed to the function. -HostRequest = object() - - -pod_types = six.integer_types + ( - bytes, text, float, bool) -dt_types = (datetime.date, datetime.time, datetime.datetime) -extra_types = (binary, decimal.Decimal) -native_types = pod_types + dt_types + extra_types -# The types for which we allow promotion to certain numbers. -_promotable_types = six.integer_types + (text, bytes) - - -def iscomplex(datatype): - return inspect.isclass(datatype) \ - and '_wsme_attributes' in datatype.__dict__ - - -def isarray(datatype): - return isinstance(datatype, ArrayType) - - -def isdict(datatype): - return isinstance(datatype, DictType) - - -def validate_value(datatype, value): - if value in (Unset, None): - return value - - # Try to promote the data type to one of our complex types. - if isinstance(datatype, list): - datatype = ArrayType(datatype[0]) - elif isinstance(datatype, dict): - datatype = DictType(*list(datatype.items())[0]) - - # If the datatype has its own validator, use that. - if hasattr(datatype, 'validate'): - return datatype.validate(value) - - # Do type promotion/conversion and data validation for builtin - # types. - v_type = type(value) - if datatype in six.integer_types: - if v_type in _promotable_types: - try: - # Try to turn the value into an int - value = datatype(value) - except ValueError: - # An error is raised at the end of the function - # when the types don't match. - pass - elif datatype is float and v_type in _promotable_types: - try: - value = float(value) - except ValueError: - # An error is raised at the end of the function - # when the types don't match. - pass - elif datatype is text and isinstance(value, bytes): - value = value.decode() - elif datatype is bytes and isinstance(value, text): - value = value.encode() - - if not isinstance(value, datatype): - raise ValueError( - "Wrong type. Expected '%s', got '%s'" % ( - datatype, v_type - )) - return value - - -class wsproperty(property): - """ - A specialised :class:`property` to define typed-property on complex types. - Example:: - - class MyComplexType(wsme.types.Base): - def get_aint(self): - return self._aint - - def set_aint(self, value): - assert avalue < 10 # Dummy input validation - self._aint = value - - aint = wsproperty(int, get_aint, set_aint, mandatory=True) - """ - def __init__(self, datatype, fget, fset=None, - mandatory=False, doc=None, name=None): - property.__init__(self, fget, fset) - #: The property name in the parent python class - self.key = None - #: The attribute name on the public of the api. - #: Defaults to :attr:`key` - self.name = name - #: property data type - self.datatype = datatype - #: True if the property is mandatory - self.mandatory = mandatory - - -class wsattr(object): - """ - Complex type attribute definition. - - Example:: - - class MyComplexType(wsme.types.Base): - optionalvalue = int - mandatoryvalue = wsattr(int, mandatory=True) - named_value = wsattr(int, name='named.value') - - After inspection, the non-wsattr attributes will be replaced, and - the above class will be equivalent to:: - - class MyComplexType(wsme.types.Base): - optionalvalue = wsattr(int) - mandatoryvalue = wsattr(int, mandatory=True) - - """ - def __init__(self, datatype, mandatory=False, name=None, default=Unset, - readonly=False): - #: The attribute name in the parent python class. - #: Set by :func:`inspect_class` - self.key = None # will be set by class inspection - #: The attribute name on the public of the api. - #: Defaults to :attr:`key` - self.name = name - self._datatype = (datatype,) - #: True if the attribute is mandatory - self.mandatory = mandatory - #: Default value. The attribute will return this instead - #: of :data:`Unset` if no value has been set. - self.default = default - #: If True value cannot be set from json/xml input data - self.readonly = readonly - - self.complextype = None - - def _get_dataholder(self, instance): - dataholder = getattr(instance, '_wsme_dataholder', None) - if dataholder is None: - dataholder = instance._wsme_DataHolderClass() - instance._wsme_dataholder = dataholder - return dataholder - - def __get__(self, instance, owner): - if instance is None: - return self - return getattr( - self._get_dataholder(instance), - self.key, - self.default - ) - - def __set__(self, instance, value): - try: - value = validate_value(self.datatype, value) - except (ValueError, TypeError) as e: - raise exc.InvalidInput(self.name, value, six.text_type(e)) - dataholder = self._get_dataholder(instance) - if value is Unset: - if hasattr(dataholder, self.key): - delattr(dataholder, self.key) - else: - setattr(dataholder, self.key, value) - - def __delete__(self, instance): - self.__set__(instance, Unset) - - def _get_datatype(self): - if isinstance(self._datatype, tuple): - self._datatype = \ - self.complextype().__registry__.resolve_type(self._datatype[0]) - if isinstance(self._datatype, weakref.ref): - return self._datatype() - if isinstance(self._datatype, list): - return [ - item() if isinstance(item, weakref.ref) else item - for item in self._datatype - ] - return self._datatype - - def _set_datatype(self, datatype): - self._datatype = datatype - - #: attribute data type. Can be either an actual type, - #: or a type name, in which case the actual type will be - #: determined when needed (generally just before scanning the api). - datatype = property(_get_datatype, _set_datatype) - - -def iswsattr(attr): - if inspect.isfunction(attr) or inspect.ismethod(attr): - return False - if isinstance(attr, property) and not isinstance(attr, wsproperty): - return False - return True - - -def sort_attributes(class_, attributes): - """Sort a class attributes list. - - 3 mechanisms are attempted : - - #. Look for a _wsme_attr_order attribute on the class_. This allow - to define an arbitrary order of the attributes (useful for - generated types). - - #. Access the object source code to find the declaration order. - - #. Sort by alphabetically""" - - if not len(attributes): - return - - attrs = dict((a.key, a) for a in attributes) - - if hasattr(class_, '_wsme_attr_order'): - names_order = class_._wsme_attr_order - else: - names = attrs.keys() - names_order = [] - try: - lines = [] - for cls in inspect.getmro(class_): - if cls is object: - continue - lines[len(lines):] = inspect.getsourcelines(cls)[0] - for line in lines: - line = line.strip().replace(" ", "") - if '=' in line: - aname = line[:line.index('=')] - if aname in names and aname not in names_order: - names_order.append(aname) - if len(names_order) < len(names): - names_order.extend(( - name for name in names if name not in names_order)) - assert len(names_order) == len(names) - except (TypeError, IOError): - names_order = list(names) - names_order.sort() - - attributes[:] = [attrs[name] for name in names_order] - - -def inspect_class(class_): - """Extract a list of (name, wsattr|wsproperty) for the given class_""" - attributes = [] - for name, attr in inspect.getmembers(class_, iswsattr): - if name.startswith('_'): - continue - if inspect.isroutine(attr): - continue - - if isinstance(attr, (wsattr, wsproperty)): - attrdef = attr - else: - if attr not in native_types and ( - inspect.isclass(attr) or - isinstance(attr, (list, dict))): - register_type(attr) - attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr) - - attrdef.key = name - if attrdef.name is None: - attrdef.name = name - attrdef.complextype = weakref.ref(class_) - attributes.append(attrdef) - setattr(class_, name, attrdef) - - sort_attributes(class_, attributes) - return attributes - - -def list_attributes(class_): - """ - Returns a list of a complex type attributes. - """ - if not iscomplex(class_): - raise TypeError("%s is not a registered type") - return class_._wsme_attributes - - -def make_dataholder(class_): - # the slots are computed outside the class scope to avoid - # 'attr' to pullute the class namespace, which leads to weird - # things if one of the slots is named 'attr'. - slots = [attr.key for attr in class_._wsme_attributes] - - class DataHolder(object): - __slots__ = slots - - DataHolder.__name__ = class_.__name__ + 'DataHolder' - return DataHolder - - -class Registry(object): - def __init__(self): - self._complex_types = [] - self.array_types = set() - self.dict_types = set() - - @property - def complex_types(self): - return [t() for t in self._complex_types if t()] - - def register(self, class_): - """ - Make sure a type is registered. - - It is automatically called by :class:`expose() <wsme.expose>` - and :class:`validate() <wsme.validate>`. - Unless you want to control when the class inspection is done there - is no need to call it. - """ - if class_ is None or \ - class_ in native_types or \ - isusertype(class_) or iscomplex(class_) or \ - isarray(class_) or isdict(class_): - return class_ - - if isinstance(class_, list): - if len(class_) != 1: - raise ValueError("Cannot register type %s" % repr(class_)) - dt = ArrayType(class_[0]) - self.register(dt.item_type) - self.array_types.add(dt) - return dt - - if isinstance(class_, dict): - if len(class_) != 1: - raise ValueError("Cannot register type %s" % repr(class_)) - dt = DictType(*list(class_.items())[0]) - self.register(dt.value_type) - self.dict_types.add(dt) - return dt - - class_._wsme_attributes = None - class_._wsme_attributes = inspect_class(class_) - class_._wsme_DataHolderClass = make_dataholder(class_) - - class_.__registry__ = self - self._complex_types.append(weakref.ref(class_)) - return class_ - - def reregister(self, class_): - """Register a type which may already have been registered. - """ - self._unregister(class_) - return self.register(class_) - - def _unregister(self, class_): - """Remove a previously registered type. - """ - # Clear the existing attribute reference so it is rebuilt if - # the class is registered again later. - if hasattr(class_, '_wsme_attributes'): - del class_._wsme_attributes - # FIXME(dhellmann): This method does not recurse through the - # types like register() does. Should it? - if isinstance(class_, list): - at = ArrayType(class_[0]) - try: - self.array_types.remove(at) - except KeyError: - pass - elif isinstance(class_, dict): - key_type, value_type = list(class_.items())[0] - self.dict_types = set( - dt for dt in self.dict_types - if (dt.key_type, dt.value_type) != (key_type, value_type) - ) - # We can't use remove() here because the items in - # _complex_types are weakref objects pointing to the classes, - # so we can't compare with them directly. - self._complex_types = [ - ct for ct in self._complex_types - if ct() is not class_ - ] - - def lookup(self, typename): - log.debug('Lookup %s' % typename) - modname = None - if '.' in typename: - modname, typename = typename.rsplit('.', 1) - for ct in self._complex_types: - ct = ct() - if ct is not None and typename == ct.__name__ and ( - modname is None or modname == ct.__module__): - return ct - - def resolve_type(self, type_): - if isinstance(type_, six.string_types): - return self.lookup(type_) - if isinstance(type_, list): - type_ = ArrayType(type_[0]) - if isinstance(type_, dict): - type_ = DictType(list(type_.keys())[0], list(type_.values())[0]) - if isinstance(type_, ArrayType): - type_ = ArrayType(self.resolve_type(type_.item_type)) - self.array_types.add(type_) - elif isinstance(type_, DictType): - type_ = DictType( - type_.key_type, - self.resolve_type(type_.value_type) - ) - self.dict_types.add(type_) - else: - type_ = self.register(type_) - return type_ - - -# Default type registry -registry = Registry() - - -def register_type(class_): - return registry.register(class_) - - -class BaseMeta(type): - def __new__(cls, name, bases, dct): - if bases and bases[0] is not object and '__registry__' not in dct: - dct['__registry__'] = registry - return type.__new__(cls, name, bases, dct) - - def __init__(cls, name, bases, dct): - if bases and bases[0] is not object and cls.__registry__: - cls.__registry__.register(cls) - - -class Base(six.with_metaclass(BaseMeta)): - """Base type for complex types""" - def __init__(self, **kw): - for key, value in kw.items(): - if hasattr(self, key): - setattr(self, key, value) - - -class File(Base): - """A complex type that represents a file. - - In the particular case of protocol accepting form encoded data as - input, File can be loaded from a form file field. - """ - #: The file name - filename = wsattr(text) - - #: Mime type of the content - contenttype = wsattr(text) - - def _get_content(self): - if self._content is None and self._file: - self._content = self._file.read() - return self._content - - def _set_content(self, value): - self._content = value - self._file = None - - #: File content - content = wsproperty(binary, _get_content, _set_content) - - def __init__(self, filename=None, file=None, content=None, - contenttype=None, fieldstorage=None): - self.filename = filename - self.contenttype = contenttype - self._file = file - self._content = content - - if fieldstorage is not None: - if fieldstorage.file: - self._file = fieldstorage.file - self.filename = fieldstorage.filename - self.contenttype = text(fieldstorage.type) - else: - self._content = fieldstorage.value - - @property - def file(self): - if self._file is None and self._content: - self._file = six.BytesIO(self._content) - return self._file - - -class DynamicBase(Base): - """Base type for complex types for which all attributes are not - defined when the class is constructed. - - This class is meant to be used as a base for types that have - properties added after the main class is created, such as by - loading plugins. - - """ - - @classmethod - def add_attributes(cls, **attrs): - """Add more attributes - - The arguments should be valid Python attribute names - associated with a type for the new attribute. - - """ - for n, t in attrs.items(): - setattr(cls, n, t) - cls.__registry__.reregister(cls) diff --git a/wsme/utils.py b/wsme/utils.py deleted file mode 100644 index e52b0ef..0000000 --- a/wsme/utils.py +++ /dev/null @@ -1,118 +0,0 @@ -import decimal -import datetime -import pytz -import re -from six.moves import builtins, http_client - -try: - import dateutil.parser -except ImportError: - dateutil = None # noqa - -date_re = r'(?P<year>-?\d{4,})-(?P<month>\d{2})-(?P<day>\d{2})' -time_re = r'(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})' + \ - r'(\.(?P<sec_frac>\d+))?' -tz_re = r'((?P<tz_sign>[+-])(?P<tz_hour>\d{2}):(?P<tz_min>\d{2}))' + \ - r'|(?P<tz_z>Z)' - -datetime_re = re.compile( - '%sT%s(%s)?' % (date_re, time_re, tz_re)) -date_re = re.compile(date_re) -time_re = re.compile('%s(%s)?' % (time_re, tz_re)) - - -if hasattr(builtins, '_'): - _ = builtins._ -else: - def _(s): - return s - - -def parse_isodate(value): - m = date_re.match(value) - if m is None: - raise ValueError("'%s' is not a legal date value" % (value)) - try: - return datetime.date( - int(m.group('year')), - int(m.group('month')), - int(m.group('day'))) - except ValueError: - raise ValueError("'%s' is a out-of-range date" % (value)) - - -def parse_isotime(value): - m = time_re.match(value) - if m is None: - raise ValueError("'%s' is not a legal time value" % (value)) - try: - ms = 0 - if m.group('sec_frac') is not None: - f = decimal.Decimal('0.' + m.group('sec_frac')) - f = str(f.quantize(decimal.Decimal('0.000001'))) - ms = int(f[2:]) - tz = _parse_tzparts(m.groupdict()) - return datetime.time( - int(m.group('hour')), - int(m.group('min')), - int(m.group('sec')), - ms, - tz) - except ValueError: - raise ValueError("'%s' is a out-of-range time" % (value)) - - -def parse_isodatetime(value): - if dateutil: - return dateutil.parser.parse(value) - m = datetime_re.match(value) - if m is None: - raise ValueError("'%s' is not a legal datetime value" % (value)) - try: - ms = 0 - if m.group('sec_frac') is not None: - f = decimal.Decimal('0.' + m.group('sec_frac')) - f = f.quantize(decimal.Decimal('0.000001')) - ms = int(str(f)[2:]) - tz = _parse_tzparts(m.groupdict()) - return datetime.datetime( - int(m.group('year')), - int(m.group('month')), - int(m.group('day')), - int(m.group('hour')), - int(m.group('min')), - int(m.group('sec')), - ms, - tz) - except ValueError: - raise ValueError("'%s' is a out-of-range datetime" % (value)) - - -def _parse_tzparts(parts): - if 'tz_z' in parts and parts['tz_z'] == 'Z': - return pytz.UTC - if 'tz_min' not in parts or not parts['tz_min']: - return None - - tz_minute_offset = (int(parts['tz_hour']) * 60 + int(parts['tz_min'])) - tz_multiplier = -1 if parts['tz_sign'] == '-' else 1 - - return pytz.FixedOffset(tz_multiplier * tz_minute_offset) - - -def is_valid_code(code_value): - """ - This function checks if incoming value in http response codes range. - """ - return code_value in http_client.responses - - -def is_client_error(code): - """ Checks client error code (RFC 2616).""" - return 400 <= code < 500 - - -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict # noqa |