diff options
Diffstat (limited to 'wsmeext')
-rw-r--r-- | wsmeext/__init__.py | 2 | ||||
-rw-r--r-- | wsmeext/cornice.py | 168 | ||||
-rw-r--r-- | wsmeext/extdirect/__init__.py | 1 | ||||
-rw-r--r-- | wsmeext/extdirect/datastore.py | 121 | ||||
-rw-r--r-- | wsmeext/extdirect/protocol.py | 450 | ||||
-rw-r--r-- | wsmeext/extdirect/sadatastore.py | 19 | ||||
-rw-r--r-- | wsmeext/flask.py | 108 | ||||
-rw-r--r-- | wsmeext/pecan.py | 142 | ||||
-rw-r--r-- | wsmeext/soap/__init__.py | 5 | ||||
-rw-r--r-- | wsmeext/soap/protocol.py | 478 | ||||
-rw-r--r-- | wsmeext/soap/simplegeneric.py | 107 | ||||
-rw-r--r-- | wsmeext/soap/wsdl.py | 297 | ||||
-rw-r--r-- | wsmeext/sphinxext.py | 600 | ||||
-rw-r--r-- | wsmeext/sqlalchemy/__init__.py | 0 | ||||
-rw-r--r-- | wsmeext/sqlalchemy/controllers.py | 97 | ||||
-rw-r--r-- | wsmeext/sqlalchemy/types.py | 200 | ||||
-rw-r--r-- | wsmeext/tests/__init__.py | 0 | ||||
-rw-r--r-- | wsmeext/tests/test_extdirect.py | 243 | ||||
-rw-r--r-- | wsmeext/tests/test_soap.py | 423 | ||||
-rw-r--r-- | wsmeext/tests/test_sqlalchemy_controllers.py | 223 | ||||
-rw-r--r-- | wsmeext/tests/test_sqlalchemy_types.py | 72 | ||||
-rw-r--r-- | wsmeext/tg1.py | 173 | ||||
-rw-r--r-- | wsmeext/tg11.py | 40 | ||||
-rw-r--r-- | wsmeext/tg15.py | 20 |
24 files changed, 0 insertions, 3989 deletions
diff --git a/wsmeext/__init__.py b/wsmeext/__init__.py deleted file mode 100644 index ece379c..0000000 --- a/wsmeext/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -import pkg_resources -pkg_resources.declare_namespace(__name__) diff --git a/wsmeext/cornice.py b/wsmeext/cornice.py deleted file mode 100644 index 6c81386..0000000 --- a/wsmeext/cornice.py +++ /dev/null @@ -1,168 +0,0 @@ -""" -WSME for cornice - - -Activate it:: - - config.include('wsme.cornice') - - -And use it:: - - @hello.get() - @wsexpose(Message, wsme.types.text) - def get_hello(who=u'World'): - return Message(text='Hello %s' % who) -""" -from __future__ import absolute_import - -import inspect -import sys - -import wsme -from wsme.rest import json as restjson -from wsme.rest import xml as restxml -import wsme.runtime -import wsme.api -import functools - -from wsme.rest.args import ( - args_from_args, args_from_params, args_from_body, combine_args -) - - -class WSMEJsonRenderer(object): - def __init__(self, info): - pass - - def __call__(self, data, context): - response = context['request'].response - response.content_type = 'application/json' - if 'faultcode' in data: - if 'orig_code' in data: - response.status_code = data['orig_code'] - elif data['faultcode'] == 'Client': - response.status_code = 400 - else: - response.status_code = 500 - return restjson.encode_error(None, data) - obj = data['result'] - if isinstance(obj, wsme.api.Response): - response.status_code = obj.status_code - if obj.error: - return restjson.encode_error(None, obj.error) - obj = obj.obj - return restjson.encode_result(obj, data['datatype']) - - -class WSMEXmlRenderer(object): - def __init__(self, info): - pass - - def __call__(self, data, context): - response = context['request'].response - if 'faultcode' in data: - if data['faultcode'] == 'Client': - response.status_code = 400 - else: - response.status_code = 500 - return restxml.encode_error(None, data) - response.content_type = 'text/xml' - return restxml.encode_result(data['result'], data['datatype']) - - -def get_outputformat(request): - df = None - if 'Accept' in request.headers: - if 'application/json' in request.headers['Accept']: - df = 'json' - elif 'text/xml' in request.headers['Accept']: - df = 'xml' - if df is None and 'Content-Type' in request.headers: - if 'application/json' in request.headers['Content-Type']: - df = 'json' - elif 'text/xml' in request.headers['Content-Type']: - df = 'xml' - return df if df else 'json' - - -def signature(*args, **kwargs): - sig = wsme.signature(*args, **kwargs) - - def decorate(f): - args = inspect.getargspec(f)[0] - with_self = args[0] == 'self' if args else False - f = sig(f) - funcdef = wsme.api.FunctionDefinition.get(f) - funcdef.resolve_types(wsme.types.registry) - - @functools.wraps(f) - def callfunction(*args): - if with_self: - if len(args) == 1: - self = args[0] - request = self.request - elif len(args) == 2: - self, request = args - else: - raise ValueError("Cannot do anything with these arguments") - else: - request = args[0] - request.override_renderer = 'wsme' + get_outputformat(request) - try: - args, kwargs = combine_args(funcdef, ( - args_from_args(funcdef, (), request.matchdict), - args_from_params(funcdef, request.params), - args_from_body(funcdef, request.body, request.content_type) - )) - wsme.runtime.check_arguments(funcdef, args, kwargs) - if funcdef.pass_request: - kwargs[funcdef.pass_request] = request - if with_self: - args.insert(0, self) - - result = f(*args, **kwargs) - return { - 'datatype': funcdef.return_type, - 'result': result - } - except Exception: - try: - exception_info = sys.exc_info() - orig_exception = exception_info[1] - orig_code = getattr(orig_exception, 'code', None) - data = wsme.api.format_exception(exception_info) - if orig_code is not None: - data['orig_code'] = orig_code - return data - finally: - del exception_info - - callfunction.wsme_func = f - return callfunction - return decorate - - -def scan_api(root=None): - from cornice.service import get_services - for service in get_services(): - for method, func, options in service.definitions: - wsme_func = getattr(func, 'wsme_func') - basepath = service.path.split('/') - if basepath and not basepath[0]: - del basepath[0] - if wsme_func: - yield ( - basepath + [method.lower()], - wsme_func._wsme_definition - ) - - -def includeme(config): - import pyramid.wsgi - wsroot = wsme.WSRoot(scan_api=scan_api, webpath='/ws') - wsroot.addprotocol('extdirect') - config.add_renderer('wsmejson', WSMEJsonRenderer) - config.add_renderer('wsmexml', WSMEXmlRenderer) - config.add_route('wsme', '/ws/*path') - config.add_view(pyramid.wsgi.wsgiapp(wsroot.wsgiapp()), route_name='wsme') diff --git a/wsmeext/extdirect/__init__.py b/wsmeext/extdirect/__init__.py deleted file mode 100644 index ff14bd5..0000000 --- a/wsmeext/extdirect/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from wsmeext.extdirect.protocol import ExtDirectProtocol # noqa diff --git a/wsmeext/extdirect/datastore.py b/wsmeext/extdirect/datastore.py deleted file mode 100644 index 287e3a7..0000000 --- a/wsmeext/extdirect/datastore.py +++ /dev/null @@ -1,121 +0,0 @@ -import wsme -import wsme.types - -try: - import simplejson as json -except ImportError: - import json - - -class ReadResultBase(wsme.types.Base): - total = int - success = bool - message = wsme.types.text - - -def make_readresult(datatype): - ReadResult = type( - datatype.__name__ + 'ReadResult', - (ReadResultBase,), { - 'data': [datatype] - } - ) - return ReadResult - - -class DataStoreControllerMeta(type): - def __init__(cls, name, bases, dct): - if cls.__datatype__ is None: - return - if getattr(cls, '__readresulttype__', None) is None: - cls.__readresulttype__ = make_readresult(cls.__datatype__) - - cls.create = wsme.expose( - cls.__readresulttype__, - extdirect_params_notation='positional')(cls.create) - cls.create = wsme.validate(cls.__datatype__)(cls.create) - - cls.read = wsme.expose( - cls.__readresulttype__, - extdirect_params_notation='named')(cls.read) - cls.read = wsme.validate(str, str, int, int, int)(cls.read) - - cls.update = wsme.expose( - cls.__readresulttype__, - extdirect_params_notation='positional')(cls.update) - cls.update = wsme.validate(cls.__datatype__)(cls.update) - - cls.destroy = wsme.expose( - cls.__readresulttype__, - extdirect_params_notation='positional')(cls.destroy) - cls.destroy = wsme.validate(cls.__idtype__)(cls.destroy) - - -class DataStoreControllerMixin(object): - __datatype__ = None - __idtype__ = int - - __readresulttype__ = None - - def create(self, obj): - pass - - def read(self, query=None, sort=None, page=None, start=None, limit=None): - pass - - def update(self, obj): - pass - - def destroy(self, obj_id): - pass - - def model(self): - tpl = """ -Ext.define('%(appns)s.model.%(classname)s', { - extend: 'Ext.data.Model', - fields: %(fields)s, - - proxy: { - type: 'direct', - api: { - create: %(appns)s.%(controllerns)s.create, - read: %(appns)s.%(controllerns)s.read, - update: %(appns)s.%(controllerns)s.update, - destroy: %(appns)s.%(controllerns)s.destroy - }, - reader: { - root: 'data' - } - } -}); - """ - fields = [ - attr.name for attr in self.__datatype__._wsme_attributes - ] - d = { - 'appns': 'Demo', - 'controllerns': 'stores.' + self.__datatype__.__name__.lower(), - 'classname': self.__datatype__.__name__, - 'fields': json.dumps(fields) - } - return tpl % d - - def store(self): - tpl = """ -Ext.define('%(appns)s.store.%(classname)s', { - extend: 'Ext.data.Store', - model: '%(appns)s.model.%(classname)s' -}); -""" - d = { - 'appns': 'Demo', - 'classname': self.__datatype__.__name__, - } - - return tpl % d - - -DataStoreController = DataStoreControllerMeta( - 'DataStoreController', - (DataStoreControllerMixin,), {} -) diff --git a/wsmeext/extdirect/protocol.py b/wsmeext/extdirect/protocol.py deleted file mode 100644 index 23793b4..0000000 --- a/wsmeext/extdirect/protocol.py +++ /dev/null @@ -1,450 +0,0 @@ -import datetime -import decimal - -from simplegeneric import generic - -from wsme.exc import ClientSideError -from wsme.protocol import CallContext, Protocol, expose -from wsme.utils import parse_isodate, parse_isodatetime, parse_isotime -from wsme.rest.args import from_params -from wsme.types import iscomplex, isusertype, list_attributes, Unset -import wsme.types - -try: - import simplejson as json -except ImportError: - import json # noqa - -from six import u - - -class APIDefinitionGenerator(object): - tpl = """\ -Ext.ns("%(rootns)s"); - -if (!%(rootns)s.wsroot) { - %(rootns)s.wsroot = "%(webpath)s. -} - -%(descriptors)s - -Ext.syncRequire(['Ext.direct.*'], function() { - %(providers)s -}); -""" - descriptor_tpl = """\ -Ext.ns("%(fullns)s"); - -%(fullns)s.Descriptor = { - "url": %(rootns)s.wsroot + "extdirect/router/%(ns)s", - "namespace": "%(fullns)s", - "type": "remoting", - "actions": %(actions)s - "enableBuffer": true -}; -""" - provider_tpl = """\ - Ext.direct.Manager.addProvider(%(fullns)s.Descriptor); -""" - - def __init__(self): - pass - - def render(self, rootns, webpath, namespaces, fullns): - descriptors = u('') - for ns in sorted(namespaces): - descriptors += self.descriptor_tpl % { - 'ns': ns, - 'rootns': rootns, - 'fullns': fullns(ns), - 'actions': '\n'.join(( - ' ' * 4 + line - for line - in json.dumps(namespaces[ns], indent=4).split('\n') - )) - } - - providers = u('') - for ns in sorted(namespaces): - providers += self.provider_tpl % { - 'fullns': fullns(ns) - } - - r = self.tpl % { - 'rootns': rootns, - 'webpath': webpath, - 'descriptors': descriptors, - 'providers': providers, - } - return r - - -@generic -def fromjson(datatype, value): - if value is None: - return None - if iscomplex(datatype): - newvalue = datatype() - for attrdef in list_attributes(datatype): - if attrdef.name in value: - setattr(newvalue, attrdef.key, - fromjson(attrdef.datatype, value[attrdef.name])) - value = newvalue - elif isusertype(datatype): - value = datatype.frombasetype(fromjson(datatype.basetype, value)) - return value - - -@generic -def tojson(datatype, value): - if value is None: - return value - if iscomplex(datatype): - d = {} - for attrdef in list_attributes(datatype): - attrvalue = getattr(value, attrdef.key) - if attrvalue is not Unset: - d[attrdef.name] = tojson(attrdef.datatype, attrvalue) - value = d - elif isusertype(datatype): - value = tojson(datatype.basetype, datatype.tobasetype(value)) - return value - - -@fromjson.when_type(wsme.types.ArrayType) -def array_fromjson(datatype, value): - return [fromjson(datatype.item_type, item) for item in value] - - -@tojson.when_type(wsme.types.ArrayType) -def array_tojson(datatype, value): - if value is None: - return value - return [tojson(datatype.item_type, item) for item in value] - - -@fromjson.when_type(wsme.types.DictType) -def dict_fromjson(datatype, value): - if value is None: - return value - return dict(( - (fromjson(datatype.key_type, key), - fromjson(datatype.value_type, value)) - for key, value in value.items() - )) - - -@tojson.when_type(wsme.types.DictType) -def dict_tojson(datatype, value): - if value is None: - return value - return dict(( - (tojson(datatype.key_type, key), - tojson(datatype.value_type, value)) - for key, value in value.items() - )) - - -@tojson.when_object(wsme.types.bytes) -def bytes_tojson(datatype, value): - if value is None: - return value - return value.decode('ascii') - - -# raw strings -@fromjson.when_object(wsme.types.bytes) -def bytes_fromjson(datatype, value): - if value is not None: - value = value.encode('ascii') - return value - - -# unicode strings - -@fromjson.when_object(wsme.types.text) -def text_fromjson(datatype, value): - if isinstance(value, wsme.types.bytes): - return value.decode('utf-8') - return value - - -# datetime.time - -@fromjson.when_object(datetime.time) -def time_fromjson(datatype, value): - if value is None or value == '': - return None - return parse_isotime(value) - - -@tojson.when_object(datetime.time) -def time_tojson(datatype, value): - if value is None: - return value - return value.isoformat() - - -# datetime.date - -@fromjson.when_object(datetime.date) -def date_fromjson(datatype, value): - if value is None or value == '': - return None - return parse_isodate(value) - - -@tojson.when_object(datetime.date) -def date_tojson(datatype, value): - if value is None: - return value - return value.isoformat() - - -# datetime.datetime - -@fromjson.when_object(datetime.datetime) -def datetime_fromjson(datatype, value): - if value is None or value == '': - return None - return parse_isodatetime(value) - - -@tojson.when_object(datetime.datetime) -def datetime_tojson(datatype, value): - if value is None: - return value - return value.isoformat() - - -# decimal.Decimal - -@fromjson.when_object(decimal.Decimal) -def decimal_fromjson(datatype, value): - if value is None: - return value - return decimal.Decimal(value) - - -@tojson.when_object(decimal.Decimal) -def decimal_tojson(datatype, value): - if value is None: - return value - return str(value) - - -class ExtCallContext(CallContext): - def __init__(self, request, namespace, calldata): - super(ExtCallContext, self).__init__(request) - self.namespace = namespace - - self.tid = calldata['tid'] - self.action = calldata['action'] - self.method = calldata['method'] - self.params = calldata['data'] - - -class FormExtCallContext(CallContext): - def __init__(self, request, namespace): - super(FormExtCallContext, self).__init__(request) - self.namespace = namespace - - self.tid = request.params['extTID'] - self.action = request.params['extAction'] - self.method = request.params['extMethod'] - self.params = [] - - -class ExtDirectProtocol(Protocol): - """ - ExtDirect protocol. - - For more detail on the protocol, see - http://www.sencha.com/products/extjs/extdirect. - - .. autoattribute:: name - .. autoattribute:: content_types - """ - name = 'extdirect' - displayname = 'ExtDirect' - content_types = ['application/json', 'text/javascript'] - - def __init__(self, namespace='', params_notation='named', nsfolder=None): - self.namespace = namespace - self.appns, self.apins = namespace.rsplit('.', 2) \ - if '.' in namespace else (namespace, '') - self.default_params_notation = params_notation - self.appnsfolder = nsfolder - - @property - def api_alias(self): - if self.appnsfolder: - alias = '/%s/%s.js' % ( - self.appnsfolder, - self.apins.replace('.', '/')) - return alias - - def accept(self, req): - path = req.path - assert path.startswith(self.root._webpath) - path = path[len(self.root._webpath):] - - return ( - path == self.api_alias or - path == "/extdirect/api" or - path.startswith("/extdirect/router") - ) - - def iter_calls(self, req): - path = req.path - - assert path.startswith(self.root._webpath) - path = path[len(self.root._webpath):].strip() - - assert path.startswith('/extdirect/router'), path - path = path[17:].strip('/') - - if path: - namespace = path.split('.') - else: - namespace = [] - - if 'extType' in req.params: - req.wsme_extdirect_batchcall = False - yield FormExtCallContext(req, namespace) - else: - data = json.loads(req.body.decode('utf8')) - req.wsme_extdirect_batchcall = isinstance(data, list) - if not req.wsme_extdirect_batchcall: - data = [data] - req.callcount = len(data) - - for call in data: - yield ExtCallContext(req, namespace, call) - - def extract_path(self, context): - path = list(context.namespace) - - if context.action: - path.append(context.action) - - path.append(context.method) - - return path - - def read_std_arguments(self, context): - funcdef = context.funcdef - notation = funcdef.extra_options.get('extdirect_params_notation', - self.default_params_notation) - args = context.params - if notation == 'positional': - kw = dict( - (argdef.name, fromjson(argdef.datatype, arg)) - for argdef, arg in zip(funcdef.arguments, args) - ) - elif notation == 'named': - if len(args) == 0: - args = [{}] - elif len(args) > 1: - raise ClientSideError( - "Named arguments: takes a single object argument") - args = args[0] - kw = dict( - (argdef.name, fromjson(argdef.datatype, args[argdef.name])) - for argdef in funcdef.arguments if argdef.name in args - ) - else: - raise ValueError("Invalid notation: %s" % notation) - return kw - - def read_form_arguments(self, context): - kw = {} - for argdef in context.funcdef.arguments: - value = from_params(argdef.datatype, context.request.params, - argdef.name, set()) - if value is not Unset: - kw[argdef.name] = value - return kw - - def read_arguments(self, context): - if isinstance(context, ExtCallContext): - kwargs = self.read_std_arguments(context) - elif isinstance(context, FormExtCallContext): - kwargs = self.read_form_arguments(context) - wsme.runtime.check_arguments(context.funcdef, (), kwargs) - return kwargs - - def encode_result(self, context, result): - return json.dumps({ - 'type': 'rpc', - 'tid': context.tid, - 'action': context.action, - 'method': context.method, - 'result': tojson(context.funcdef.return_type, result) - }) - - def encode_error(self, context, infos): - return json.dumps({ - 'type': 'exception', - 'tid': context.tid, - 'action': context.action, - 'method': context.method, - 'message': '%(faultcode)s: %(faultstring)s' % infos, - 'where': infos['debuginfo']}) - - def prepare_response_body(self, request, results): - r = ",\n".join(results) - if request.wsme_extdirect_batchcall: - return "[\n%s\n]" % r - else: - return r - - def get_response_status(self, request): - return 200 - - def get_response_contenttype(self, request): - return "text/javascript" - - def fullns(self, ns): - return ns and '%s.%s' % (self.namespace, ns) or self.namespace - - @expose('/extdirect/api', "text/javascript") - @expose('${api_alias}', "text/javascript") - def api(self): - namespaces = {} - for path, funcdef in self.root.getapi(): - if len(path) > 1: - namespace = '.'.join(path[:-2]) - action = path[-2] - else: - namespace = '' - action = '' - if namespace not in namespaces: - namespaces[namespace] = {} - if action not in namespaces[namespace]: - namespaces[namespace][action] = [] - notation = funcdef.extra_options.get('extdirect_params_notation', - self.default_params_notation) - method = { - 'name': funcdef.name} - - if funcdef.extra_options.get('extdirect_formhandler', False): - method['formHandler'] = True - method['len'] = 1 if notation == 'named' \ - else len(funcdef.arguments) - namespaces[namespace][action].append(method) - webpath = self.root._webpath - if webpath and not webpath.endswith('/'): - webpath += '/' - return APIDefinitionGenerator().render( - namespaces=namespaces, - webpath=webpath, - rootns=self.namespace, - fullns=self.fullns, - ) - - def encode_sample_value(self, datatype, value, format=False): - r = tojson(datatype, value) - content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0, - sort_keys=format) - return ('javascript', content) diff --git a/wsmeext/extdirect/sadatastore.py b/wsmeext/extdirect/sadatastore.py deleted file mode 100644 index 44d79cb..0000000 --- a/wsmeext/extdirect/sadatastore.py +++ /dev/null @@ -1,19 +0,0 @@ -from wsmeext.extdirect import datastore - - -class SADataStoreController(datastore.DataStoreController): - __dbsession__ = None - __datatype__ = None - - def read(self, query=None, sort=None, page=None, start=None, limit=None): - q = self.__dbsession__.query(self.__datatype__.__saclass__) - total = q.count() - if start is not None and limit is not None: - q = q.slice(start, limit) - return self.__readresulttype__( - data=[ - self.__datatype__(o) for o in q - ], - success=True, - total=total - ) diff --git a/wsmeext/flask.py b/wsmeext/flask.py deleted file mode 100644 index 4d9e446..0000000 --- a/wsmeext/flask.py +++ /dev/null @@ -1,108 +0,0 @@ -from __future__ import absolute_import - -import functools -import logging -import sys -import inspect - -import wsme -import wsme.api -import wsme.rest.json -import wsme.rest.xml -import wsme.rest.args -from wsme.utils import is_valid_code - -import flask - -log = logging.getLogger(__name__) - - -TYPES = { - 'application/json': wsme.rest.json, - 'application/xml': wsme.rest.xml, - 'text/xml': wsme.rest.xml -} - - -def get_dataformat(): - if 'Accept' in flask.request.headers: - for t in TYPES: - if t in flask.request.headers['Accept']: - return TYPES[t] - - # Look for the wanted data format in the request. - req_dataformat = getattr(flask.request, 'response_type', None) - if req_dataformat in TYPES: - return TYPES[req_dataformat] - - log.info('''Could not determine what format is wanted by the - caller, falling back to json''') - return wsme.rest.json - - -def signature(*args, **kw): - sig = wsme.signature(*args, **kw) - - def decorator(f): - args = inspect.getargspec(f)[0] - ismethod = args and args[0] == 'self' - sig(f) - funcdef = wsme.api.FunctionDefinition.get(f) - funcdef.resolve_types(wsme.types.registry) - - @functools.wraps(f) - def wrapper(*args, **kwargs): - if ismethod: - self, args = args[0], args[1:] - args, kwargs = wsme.rest.args.get_args( - funcdef, args, kwargs, - flask.request.args, flask.request.form, - flask.request.data, - flask.request.mimetype - ) - - if funcdef.pass_request: - kwargs[funcdef.pass_request] = flask.request - - dataformat = get_dataformat() - - try: - if ismethod: - args = [self] + list(args) - result = f(*args, **kwargs) - - # NOTE: Support setting of status_code with default 20 - status_code = funcdef.status_code - if isinstance(result, wsme.api.Response): - status_code = result.status_code - result = result.obj - - res = flask.make_response( - dataformat.encode_result( - result, - funcdef.return_type - ) - ) - res.mimetype = dataformat.content_type - res.status_code = status_code - except Exception: - try: - exception_info = sys.exc_info() or None - orig_exception = exception_info[1] - orig_code = getattr(orig_exception, 'code', None) - data = wsme.api.format_exception(exception_info) - finally: - del exception_info - - res = flask.make_response(dataformat.encode_error(None, data)) - if orig_code and is_valid_code(orig_code): - res.status_code = orig_code - elif data['faultcode'].lower() == 'client': - res.status_code = 400 - else: - res.status_code = 500 - return res - - wrapper.wsme_func = f - return wrapper - return decorator diff --git a/wsmeext/pecan.py b/wsmeext/pecan.py deleted file mode 100644 index 9d63dde..0000000 --- a/wsmeext/pecan.py +++ /dev/null @@ -1,142 +0,0 @@ -from __future__ import absolute_import - -import functools -import inspect -import sys - -import wsme -import wsme.rest.args -import wsme.rest.json -import wsme.rest.xml - -import pecan - -from wsme.utils import is_valid_code - - -class JSonRenderer(object): - @staticmethod - def __init__(path, extra_vars): - pass - - @staticmethod - def render(template_path, namespace): - if 'faultcode' in namespace: - return wsme.rest.json.encode_error(None, namespace) - return wsme.rest.json.encode_result( - namespace['result'], - namespace['datatype'] - ) - - -class XMLRenderer(object): - @staticmethod - def __init__(path, extra_vars): - pass - - @staticmethod - def render(template_path, namespace): - if 'faultcode' in namespace: - return wsme.rest.xml.encode_error(None, namespace) - return wsme.rest.xml.encode_result( - namespace['result'], - namespace['datatype'] - ) - - -pecan.templating._builtin_renderers['wsmejson'] = JSonRenderer -pecan.templating._builtin_renderers['wsmexml'] = XMLRenderer - -pecan_json_decorate = pecan.expose( - template='wsmejson:', - content_type='application/json', - generic=False) -pecan_xml_decorate = pecan.expose( - template='wsmexml:', - content_type='application/xml', - generic=False -) -pecan_text_xml_decorate = pecan.expose( - template='wsmexml:', - content_type='text/xml', - generic=False -) - - -def wsexpose(*args, **kwargs): - sig = wsme.signature(*args, **kwargs) - - def decorate(f): - sig(f) - funcdef = wsme.api.FunctionDefinition.get(f) - funcdef.resolve_types(wsme.types.registry) - - @functools.wraps(f) - def callfunction(self, *args, **kwargs): - return_type = funcdef.return_type - - try: - args, kwargs = wsme.rest.args.get_args( - funcdef, args, kwargs, pecan.request.params, None, - pecan.request.body, pecan.request.content_type - ) - if funcdef.pass_request: - kwargs[funcdef.pass_request] = pecan.request - result = f(self, *args, **kwargs) - - # NOTE: Support setting of status_code with default 201 - pecan.response.status = funcdef.status_code - if isinstance(result, wsme.api.Response): - pecan.response.status = result.status_code - - # NOTE(lucasagomes): If the return code is 204 - # (No Response) we have to make sure that we are not - # returning anything in the body response and the - # content-length is 0 - if result.status_code == 204: - return_type = None - elif not isinstance(result.return_type, - wsme.types.UnsetType): - return_type = result.return_type - - result = result.obj - - except Exception: - try: - exception_info = sys.exc_info() - orig_exception = exception_info[1] - orig_code = getattr(orig_exception, 'code', None) - data = wsme.api.format_exception( - exception_info, - pecan.conf.get('wsme', {}).get('debug', False) - ) - finally: - del exception_info - - if orig_code and is_valid_code(orig_code): - pecan.response.status = orig_code - else: - pecan.response.status = 500 - - return data - - if return_type is None: - pecan.request.pecan['content_type'] = None - pecan.response.content_type = None - return '' - - return dict( - datatype=return_type, - result=result - ) - - if 'xml' in funcdef.rest_content_types: - pecan_xml_decorate(callfunction) - pecan_text_xml_decorate(callfunction) - if 'json' in funcdef.rest_content_types: - pecan_json_decorate(callfunction) - pecan.util._cfg(callfunction)['argspec'] = inspect.getargspec(f) - callfunction._wsme_definition = funcdef - return callfunction - - return decorate diff --git a/wsmeext/soap/__init__.py b/wsmeext/soap/__init__.py deleted file mode 100644 index 7a237ba..0000000 --- a/wsmeext/soap/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from __future__ import absolute_import - -from wsmeext.soap.protocol import SoapProtocol - -__all__ = ['SoapProtocol'] diff --git a/wsmeext/soap/protocol.py b/wsmeext/soap/protocol.py deleted file mode 100644 index 0d87af8..0000000 --- a/wsmeext/soap/protocol.py +++ /dev/null @@ -1,478 +0,0 @@ -""" -A SOAP implementation for wsme. -Parts of the code were taken from the tgwebservices soap implmentation. -""" -from __future__ import absolute_import - -import pkg_resources -import datetime -import decimal -import base64 -import logging - -import six - -from wsmeext.soap.simplegeneric import generic -from wsmeext.soap.wsdl import WSDLGenerator - -try: - from lxml import etree as ET - use_lxml = True -except ImportError: - from xml.etree import cElementTree as ET # noqa - use_lxml = False - -from wsme.protocol import CallContext, Protocol, expose - -import wsme.types -import wsme.runtime - -from wsme import exc -from wsme.utils import parse_isodate, parse_isotime, parse_isodatetime - -log = logging.getLogger(__name__) - -xsd_ns = 'http://www.w3.org/2001/XMLSchema' -xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance' -soapenv_ns = 'http://schemas.xmlsoap.org/soap/envelope/' - -if not use_lxml: - ET.register_namespace('soap', soapenv_ns) - -type_qn = '{%s}type' % xsi_ns -nil_qn = '{%s}nil' % xsi_ns - -Envelope_qn = '{%s}Envelope' % soapenv_ns -Body_qn = '{%s}Body' % soapenv_ns -Fault_qn = '{%s}Fault' % soapenv_ns -faultcode_qn = '{%s}faultcode' % soapenv_ns -faultstring_qn = '{%s}faultstring' % soapenv_ns -detail_qn = '{%s}detail' % soapenv_ns - - -type_registry = { - wsme.types.bytes: 'xs:string', - wsme.types.text: 'xs:string', - int: 'xs:int', - float: "xs:float", - bool: "xs:boolean", - datetime.datetime: "xs:dateTime", - datetime.date: "xs:date", - datetime.time: "xs:time", - decimal.Decimal: "xs:decimal", - wsme.types.binary: "xs:base64Binary", -} - -if not six.PY3: - type_registry[long] = "xs:long" # noqa - -array_registry = { - wsme.types.text: "String_Array", - wsme.types.bytes: "String_Array", - int: "Int_Array", - float: "Float_Array", - bool: "Boolean_Array", -} - -if not six.PY3: - array_registry[long] = "Long_Array" # noqa - - -def soap_array(datatype, ns): - if datatype.item_type in array_registry: - name = array_registry[datatype.item_type] - else: - name = soap_type(datatype.item_type, False) + '_Array' - if ns: - name = 'types:' + name - return name - - -def soap_type(datatype, ns): - name = None - if wsme.types.isarray(datatype): - return soap_array(datatype, ns) - if wsme.types.isdict(datatype): - return None - if datatype in type_registry: - stype = type_registry[datatype] - if not ns: - stype = stype[3:] - return stype - if wsme.types.iscomplex(datatype): - name = datatype.__name__ - if name and ns: - name = 'types:' + name - return name - if wsme.types.isusertype(datatype): - return soap_type(datatype.basetype, ns) - - -def soap_fname(path, funcdef): - return "".join([path[0]] + [i.capitalize() for i in path[1:]]) - - -class SoapEncoder(object): - def __init__(self, types_ns): - self.types_ns = types_ns - - def make_soap_element(self, datatype, tag, value, xsitype=None): - el = ET.Element(tag) - if value is None: - el.set(nil_qn, 'true') - elif xsitype is not None: - el.set(type_qn, xsitype) - el.text = value - elif wsme.types.isusertype(datatype): - return self.tosoap(datatype.basetype, tag, - datatype.tobasetype(value)) - elif wsme.types.iscomplex(datatype): - el.set(type_qn, 'types:%s' % (datatype.__name__)) - for attrdef in wsme.types.list_attributes(datatype): - attrvalue = getattr(value, attrdef.key) - if attrvalue is not wsme.types.Unset: - el.append(self.tosoap( - attrdef.datatype, - '{%s}%s' % (self.types_ns, attrdef.name), - attrvalue - )) - else: - el.set(type_qn, type_registry.get(datatype)) - if not isinstance(value, wsme.types.text): - value = wsme.types.text(value) - el.text = value - return el - - @generic - def tosoap(self, datatype, tag, value): - """Converts a value into xml Element objects for inclusion in the SOAP - response output (after adding the type to the type_registry). - - If a non-complex user specific type is to be used in the api, - a specific toxml should be added:: - - from wsme.protocol.soap import tosoap, make_soap_element, \ - type_registry - - class MySpecialType(object): - pass - - type_registry[MySpecialType] = 'xs:MySpecialType' - - @tosoap.when_object(MySpecialType) - def myspecialtype_tosoap(datatype, tag, value): - return make_soap_element(datatype, tag, str(value)) - """ - return self.make_soap_element(datatype, tag, value) - - @tosoap.when_type(wsme.types.ArrayType) - def array_tosoap(self, datatype, tag, value): - el = ET.Element(tag) - el.set(type_qn, soap_array(datatype, self.types_ns)) - if value is None: - el.set(nil_qn, 'true') - elif len(value) == 0: - el.append(ET.Element('item')) - else: - for item in value: - el.append(self.tosoap(datatype.item_type, 'item', item)) - return el - - @tosoap.when_object(bool) - def bool_tosoap(self, datatype, tag, value): - return self.make_soap_element( - datatype, - tag, - 'true' if value is True else 'false' if value is False else None - ) - - @tosoap.when_object(wsme.types.bytes) - def bytes_tosoap(self, datatype, tag, value): - log.debug('(bytes_tosoap, %s, %s, %s, %s)', datatype, - tag, value, type(value)) - if isinstance(value, wsme.types.bytes): - value = value.decode('ascii') - return self.make_soap_element(datatype, tag, value) - - @tosoap.when_object(datetime.datetime) - def datetime_tosoap(self, datatype, tag, value): - return self.make_soap_element( - datatype, - tag, - value is not None and value.isoformat() or None - ) - - @tosoap.when_object(wsme.types.binary) - def binary_tosoap(self, datatype, tag, value): - log.debug("(%s, %s, %s)", datatype, tag, value) - value = base64.encodestring(value) if value is not None else None - if six.PY3: - value = value.decode('ascii') - return self.make_soap_element( - datatype.basetype, tag, value, 'xs:base64Binary' - ) - - @tosoap.when_object(None) - def None_tosoap(self, datatype, tag, value): - return self.make_soap_element(datatype, tag, None) - - -@generic -def fromsoap(datatype, el, ns): - """ - A generic converter from soap elements to python datatype. - - If a non-complex user specific type is to be used in the api, - a specific fromsoap should be added. - """ - if el.get(nil_qn) == 'true': - return None - if datatype in type_registry: - value = datatype(el.text) - elif wsme.types.isusertype(datatype): - value = datatype.frombasetype( - fromsoap(datatype.basetype, el, ns)) - else: - value = datatype() - for attr in wsme.types.list_attributes(datatype): - child = el.find('{%s}%s' % (ns['type'], attr.name)) - if child is not None: - setattr(value, attr.key, fromsoap(attr.datatype, child, ns)) - return value - - -@fromsoap.when_type(wsme.types.ArrayType) -def array_fromsoap(datatype, el, ns): - if len(el) == 1: - if datatype.item_type \ - not in wsme.types.pod_types + wsme.types.dt_types \ - and len(el[0]) == 0: - return [] - return [fromsoap(datatype.item_type, child, ns) for child in el] - - -@fromsoap.when_object(wsme.types.bytes) -def bytes_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:string'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return el.text.encode('ascii') if el.text else six.b('') - - -@fromsoap.when_object(wsme.types.text) -def text_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:string'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return datatype(el.text if el.text else '') - - -@fromsoap.when_object(bool) -def bool_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:boolean'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return el.text.lower() != 'false' - - -@fromsoap.when_object(datetime.date) -def date_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:date'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return parse_isodate(el.text) - - -@fromsoap.when_object(datetime.time) -def time_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:time'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return parse_isotime(el.text) - - -@fromsoap.when_object(datetime.datetime) -def datetime_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:dateTime'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return parse_isodatetime(el.text) - - -@fromsoap.when_object(wsme.types.binary) -def binary_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:base64Binary'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return base64.decodestring(el.text.encode('ascii')) - - -class SoapProtocol(Protocol): - """ - SOAP protocol. - - .. autoattribute:: name - .. autoattribute:: content_types - """ - name = 'soap' - displayname = 'SOAP' - content_types = ['application/soap+xml'] - - ns = { - "soap": "http://www.w3.org/2001/12/soap-envelope", - "soapenv": "http://schemas.xmlsoap.org/soap/envelope/", - "soapenc": "http://schemas.xmlsoap.org/soap/encoding/", - } - - def __init__(self, tns=None, typenamespace=None, baseURL=None, - servicename='MyApp'): - self.tns = tns - self.typenamespace = typenamespace - self.servicename = servicename - self.baseURL = baseURL - self._name_mapping = {} - - self.encoder = SoapEncoder(typenamespace) - - def get_name_mapping(self, service=None): - if service not in self._name_mapping: - self._name_mapping[service] = dict( - (soap_fname(path, f), path) - for path, f in self.root.getapi() - if service is None or (path and path[0] == service) - ) - return self._name_mapping[service] - - def accept(self, req): - for ct in self.content_types: - if req.headers['Content-Type'].startswith(ct): - return True - if req.headers.get("Soapaction"): - return True - return False - - def iter_calls(self, request): - yield CallContext(request) - - def extract_path(self, context): - request = context.request - el = ET.fromstring(request.body) - body = el.find('{%(soapenv)s}Body' % self.ns) - # Extract the service name from the tns - message = list(body)[0] - fname = message.tag - if fname.startswith('{%s}' % self.typenamespace): - fname = fname[len(self.typenamespace) + 2:] - mapping = self.get_name_mapping() - if fname not in mapping: - raise exc.UnknownFunction(fname) - path = mapping[fname] - context.soap_message = message - return path - return None - - def read_arguments(self, context): - kw = {} - if not hasattr(context, 'soap_message'): - return kw - msg = context.soap_message - for param in msg: - # FIX for python2.6 (only for lxml) - if use_lxml and isinstance(param, ET._Comment): - continue - name = param.tag[len(self.typenamespace) + 2:] - arg = context.funcdef.get_arg(name) - value = fromsoap(arg.datatype, param, { - 'type': self.typenamespace, - }) - kw[name] = value - wsme.runtime.check_arguments(context.funcdef, (), kw) - return kw - - def soap_response(self, path, funcdef, result): - r = ET.Element('{%s}%sResponse' % ( - self.typenamespace, soap_fname(path, funcdef) - )) - log.debug('(soap_response, %s, %s)', funcdef.return_type, result) - r.append(self.encoder.tosoap( - funcdef.return_type, '{%s}result' % self.typenamespace, result - )) - return r - - def encode_result(self, context, result): - log.debug('(encode_result, %s)', result) - if use_lxml: - envelope = ET.Element( - Envelope_qn, - nsmap={'xs': xsd_ns, 'types': self.typenamespace} - ) - else: - envelope = ET.Element(Envelope_qn, { - 'xmlns:xs': xsd_ns, - 'xmlns:types': self.typenamespace - }) - body = ET.SubElement(envelope, Body_qn) - body.append(self.soap_response(context.path, context.funcdef, result)) - s = ET.tostring(envelope) - return s - - def get_template(self, name): - return pkg_resources.resource_string( - __name__, '%s.html' % name) - - def encode_error(self, context, infos): - envelope = ET.Element(Envelope_qn) - body = ET.SubElement(envelope, Body_qn) - fault = ET.SubElement(body, Fault_qn) - ET.SubElement(fault, faultcode_qn).text = infos['faultcode'] - ET.SubElement(fault, faultstring_qn).text = infos['faultstring'] - if 'debuginfo' in infos: - ET.SubElement(fault, detail_qn).text = infos['debuginfo'] - s = ET.tostring(envelope) - return s - - @expose('/api.wsdl', 'text/xml') - def api_wsdl(self, service=None): - if service is None: - servicename = self.servicename - else: - servicename = self.servicename + service.capitalize() - return WSDLGenerator( - tns=self.tns, - types_ns=self.typenamespace, - soapenc=self.ns['soapenc'], - service_name=servicename, - complex_types=self.root.__registry__.complex_types, - funclist=self.root.getapi(), - arrays=self.root.__registry__.array_types, - baseURL=self.baseURL, - soap_array=soap_array, - soap_type=soap_type, - soap_fname=soap_fname, - ).generate(True) - - def encode_sample_value(self, datatype, value, format=False): - r = self.encoder.make_soap_element(datatype, 'value', value) - if format: - xml_indent(r) - return ('xml', six.text_type(r)) - - -def xml_indent(elem, level=0): - i = "\n" + level * " " - if len(elem): - if not elem.text or not elem.text.strip(): - elem.text = i + " " - for e in elem: - xml_indent(e, level + 1) - if not e.tail or not e.tail.strip(): - e.tail = i - if level and (not elem.tail or not elem.tail.strip()): - elem.tail = i diff --git a/wsmeext/soap/simplegeneric.py b/wsmeext/soap/simplegeneric.py deleted file mode 100644 index 97c169b..0000000 --- a/wsmeext/soap/simplegeneric.py +++ /dev/null @@ -1,107 +0,0 @@ -import inspect - -__all__ = ["generic"] -try: - from types import ClassType, InstanceType - classtypes = type, ClassType -except ImportError: - classtypes = type - InstanceType = None - - -def generic(func, argpos=None): - """Create a simple generic function""" - - if argpos is None: - if hasattr(func, 'argpos'): - argpos = func.argpos - else: - argnames = inspect.getargspec(func)[0] - if argnames and argnames[0] == 'self': - argpos = 1 - else: - argpos = 0 - - _sentinel = object() - - def _by_class(*args, **kw): - cls = args[argpos].__class__ - for t in type(cls.__name__, (cls, object), {}).__mro__: - f = _gbt(t, _sentinel) - if f is not _sentinel: - return f(*args, **kw) - else: - return func(*args, **kw) - - _by_type = {object: func, InstanceType: _by_class} - _gbt = _by_type.get - - def when_type(*types): - """Decorator to add a method that will be called for the given types""" - for t in types: - if not isinstance(t, classtypes): - raise TypeError( - "%r is not a type or class" % (t,) - ) - - def decorate(f): - for t in types: - if _by_type.setdefault(t, f) is not f: - raise TypeError( - "%r already has method for type %r" % (func, t) - ) - return f - return decorate - - _by_object = {} - _gbo = _by_object.get - - def when_object(*obs): - """Decorator to add a method to be called for the given object(s)""" - def decorate(f): - for o in obs: - if _by_object.setdefault(id(o), (o, f))[1] is not f: - raise TypeError( - "%r already has method for object %r" % (func, o) - ) - return f - return decorate - - def dispatch(*args, **kw): - f = _gbo(id(args[argpos]), _sentinel) - if f is _sentinel: - for t in type(args[argpos]).__mro__: - f = _gbt(t, _sentinel) - if f is not _sentinel: - return f(*args, **kw) - else: - return func(*args, **kw) - else: - return f[1](*args, **kw) - - dispatch.__name__ = func.__name__ - dispatch.__dict__ = func.__dict__.copy() - dispatch.__doc__ = func.__doc__ - dispatch.__module__ = func.__module__ - - dispatch.when_type = when_type - dispatch.when_object = when_object - dispatch.default = func - dispatch.has_object = lambda o: id(o) in _by_object - dispatch.has_type = lambda t: t in _by_type - dispatch.argpos = argpos - return dispatch - - -def test_suite(): - import doctest - return doctest.DocFileSuite( - 'README.txt', - optionflags=doctest.ELLIPSIS | doctest.REPORT_ONLY_FIRST_FAILURE, - ) - - -if __name__ == '__main__': - import unittest - r = unittest.TextTestRunner() - r.run(test_suite()) diff --git a/wsmeext/soap/wsdl.py b/wsmeext/soap/wsdl.py deleted file mode 100644 index b60aff4..0000000 --- a/wsmeext/soap/wsdl.py +++ /dev/null @@ -1,297 +0,0 @@ -import six -import wsme.types - -try: - from lxml import etree as ET - use_lxml = True -except ImportError: - from xml.etree import cElementTree as ET # noqa - use_lxml = False - - -def xml_tostring(el, pretty_print=False): - if use_lxml: - return ET.tostring(el, pretty_print=pretty_print) - return ET.tostring(el) - - -class NS(object): - def __init__(self, url): - self.url = url - - def __call__(self, name): - return self.qn(name) - - def __str__(self): - return self.url - - def qn(self, name): - return '{%s}%s' % (self.url, name) - - -wsdl_ns = NS("http://schemas.xmlsoap.org/wsdl/") -soap_ns = NS("http://schemas.xmlsoap.org/wsdl/soap/") -xs_ns = NS("http://www.w3.org/2001/XMLSchema") -soapenc_ns = NS("http://schemas.xmlsoap.org/soap/encoding/") - - -class WSDLGenerator(object): - def __init__( - self, - tns, - types_ns, - soapenc, - service_name, - complex_types, - funclist, - arrays, - baseURL, - soap_array, - soap_type, - soap_fname): - - self.tns = NS(tns) - self.types_ns = NS(types_ns) - self.soapenc = soapenc - self.service_name = service_name - self.complex_types = complex_types - self.funclist = funclist - self.arrays = arrays - self.baseURL = baseURL or '' - self.soap_array = soap_array - self.soap_fname = soap_fname - self.soap_type = soap_type - - def gen_complex_type(self, cls): - complexType = ET.Element(xs_ns('complexType')) - complexType.set('name', cls.__name__) - sequence = ET.SubElement(complexType, xs_ns('sequence')) - for attrdef in wsme.types.list_attributes(cls): - soap_type = self.soap_type(attrdef.datatype, str(self.types_ns)) - if soap_type is None: - continue - element = ET.SubElement(sequence, xs_ns('element')) - element.set('name', attrdef.name) - element.set('type', soap_type) - element.set('minOccurs', '1' if attrdef.mandatory else '0') - element.set('maxOccurs', '1') - return complexType - - def gen_array(self, array): - complexType = ET.Element(xs_ns('complexType')) - complexType.set('name', self.soap_array(array, False)) - ET.SubElement( - ET.SubElement(complexType, xs_ns('sequence')), - xs_ns('element'), - name='item', - maxOccurs='unbounded', - nillable='true', - type=self.soap_type(array.item_type, self.types_ns) - ) - return complexType - - def gen_function_types(self, path, funcdef): - args_el = ET.Element( - xs_ns('element'), - name=self.soap_fname(path, funcdef) - ) - - sequence = ET.SubElement( - ET.SubElement(args_el, xs_ns('complexType')), - xs_ns('sequence') - ) - - for farg in funcdef.arguments: - t = self.soap_type(farg.datatype, True) - if t is None: - continue - element = ET.SubElement( - sequence, xs_ns('element'), - name=farg.name, - type=self.soap_type(farg.datatype, True) - ) - if not farg.mandatory: - element.set('minOccurs', '0') - - response_el = ET.Element( - xs_ns('element'), - name=self.soap_fname(path, funcdef) + 'Response' - ) - element = ET.SubElement( - ET.SubElement( - ET.SubElement( - response_el, - xs_ns('complexType') - ), - xs_ns('sequence') - ), - xs_ns('element'), - name='result' - ) - return_soap_type = self.soap_type(funcdef.return_type, True) - if return_soap_type is not None: - element.set('type', return_soap_type) - - return args_el, response_el - - def gen_types(self): - types = ET.Element(wsdl_ns('types')) - schema = ET.SubElement(types, xs_ns('schema')) - schema.set('elementFormDefault', 'qualified') - schema.set('targetNamespace', str(self.types_ns)) - for cls in self.complex_types: - schema.append(self.gen_complex_type(cls)) - for array in self.arrays: - schema.append(self.gen_array(array)) - for path, funcdef in self.funclist: - schema.extend(self.gen_function_types(path, funcdef)) - return types - - def gen_functions(self): - messages = [] - - binding = ET.Element( - wsdl_ns('binding'), - name='%s_Binding' % self.service_name, - type='tns:%s_PortType' % self.service_name - ) - ET.SubElement( - binding, - soap_ns('binding'), - style='document', - transport='http://schemas.xmlsoap.org/soap/http' - ) - - portType = ET.Element( - wsdl_ns('portType'), - name='%s_PortType' % self.service_name - ) - - for path, funcdef in self.funclist: - soap_fname = self.soap_fname(path, funcdef) - - # message - req_message = ET.Element( - wsdl_ns('message'), - name=soap_fname + 'Request', - xmlns=str(self.types_ns) - ) - ET.SubElement( - req_message, - wsdl_ns('part'), - name='parameters', - element='types:%s' % soap_fname - ) - messages.append(req_message) - - res_message = ET.Element( - wsdl_ns('message'), - name=soap_fname + 'Response', - xmlns=str(self.types_ns) - ) - ET.SubElement( - res_message, - wsdl_ns('part'), - name='parameters', - element='types:%sResponse' % soap_fname - ) - messages.append(res_message) - - # portType/operation - operation = ET.SubElement( - portType, - wsdl_ns('operation'), - name=soap_fname - ) - if funcdef.doc: - ET.SubElement( - operation, - wsdl_ns('documentation') - ).text = funcdef.doc - ET.SubElement( - operation, wsdl_ns('input'), - message='tns:%sRequest' % soap_fname - ) - ET.SubElement( - operation, wsdl_ns('output'), - message='tns:%sResponse' % soap_fname - ) - - # binding/operation - operation = ET.SubElement( - binding, - wsdl_ns('operation'), - name=soap_fname - ) - ET.SubElement( - operation, - soap_ns('operation'), - soapAction=soap_fname - ) - ET.SubElement( - ET.SubElement( - operation, - wsdl_ns('input') - ), - soap_ns('body'), - use='literal' - ) - ET.SubElement( - ET.SubElement( - operation, - wsdl_ns('output') - ), - soap_ns('body'), - use='literal' - ) - - return messages + [portType, binding] - - def gen_service(self): - service = ET.Element(wsdl_ns('service'), name=self.service_name) - ET.SubElement( - service, - wsdl_ns('documentation') - ).text = six.u('WSDL File for %s') % self.service_name - ET.SubElement( - ET.SubElement( - service, - wsdl_ns('port'), - binding='tns:%s_Binding' % self.service_name, - name='%s_PortType' % self.service_name - ), - soap_ns('address'), - location=self.baseURL - ) - - return service - - def gen_definitions(self): - attrib = { - 'name': self.service_name, - 'targetNamespace': str(self.tns) - } - if use_lxml: - definitions = ET.Element( - wsdl_ns('definitions'), - attrib=attrib, - nsmap={ - 'xs': str(xs_ns), - 'soap': str(soap_ns), - 'types': str(self.types_ns), - 'tns': str(self.tns) - } - ) - else: - definitions = ET.Element(wsdl_ns('definitions'), **attrib) - definitions.set('xmlns:types', str(self.types_ns)) - definitions.set('xmlns:tns', str(self.tns)) - - definitions.set('name', self.service_name) - definitions.append(self.gen_types()) - definitions.extend(self.gen_functions()) - definitions.append(self.gen_service()) - return definitions - - def generate(self, format=False): - return xml_tostring(self.gen_definitions(), pretty_print=format) diff --git a/wsmeext/sphinxext.py b/wsmeext/sphinxext.py deleted file mode 100644 index 7c45a0f..0000000 --- a/wsmeext/sphinxext.py +++ /dev/null @@ -1,600 +0,0 @@ -import inspect -import re -import sys - -import six - -from sphinx import addnodes -from sphinx.ext import autodoc -from sphinx.domains.python import PyClasslike, PyClassmember -from sphinx.domains import Domain, ObjType -from sphinx.directives import ObjectDescription -from sphinx.util.docfields import Field -from sphinx.util.nodes import make_refnode - -from sphinx.roles import XRefRole -from sphinx.locale import l_, _ - -from docutils.parsers.rst import Directive -from docutils.parsers.rst import directives - -import wsme -import wsme.types -import wsme.rest.json -import wsme.rest.xml - -field_re = re.compile(r':(?P<field>\w+)(\s+(?P<name>\w+))?:') - - -def datatypename(datatype): - if isinstance(datatype, wsme.types.UserType): - return datatype.name - if isinstance(datatype, wsme.types.DictType): - return 'dict(%s: %s)' % (datatypename(datatype.key_type), - datatypename(datatype.value_type)) - if isinstance(datatype, wsme.types.ArrayType): - return 'list(%s)' % datatypename(datatype.item_type) - return datatype.__name__ - - -def make_sample_object(datatype): - if datatype is wsme.types.bytes: - return six.b('samplestring') - if datatype is wsme.types.text: - return u'sample unicode' - if datatype is int: - return 5 - sample_obj = getattr(datatype, 'sample', datatype)() - return sample_obj - - -def get_protocols(names): - names = list(names) - protocols = [] - if 'rest' in names: - names.remove('rest') - protocols.extend('restjson', 'restxml') - if 'restjson' in names: - names.remove('restjson') - protocols.append(('Json', wsme.rest.json)) - if 'restxml' in names: - names.remove('restxml') - protocols.append(('XML', wsme.rest.xml)) - for name in names: - p = wsme.protocol.getprotocol(name) - protocols.append((p.displayname or p.name, p)) - return protocols - - -class SampleType(object): - """A Sample Type""" - - #: A Int - aint = int - - def __init__(self, aint=None): - if aint: - self.aint = aint - - @classmethod - def sample(cls): - return cls(10) - - -class SampleService(wsme.WSRoot): - @wsme.expose(SampleType) - @wsme.validate(SampleType, int, str) - def change_aint(data, aint, dummy='useless'): - """ - :param aint: The new value - - :return: The data object with its aint field value changed. - """ - data.aint = aint - return data - - -def getroot(env, force=False): - root = env.temp_data.get('wsme:root') - if not force and root: - return root - rootpath = env.temp_data.get('wsme:rootpath', env.app.config.wsme_root) - - if rootpath is None: - return None - - modname, classname = rootpath.rsplit('.', 1) - __import__(modname) - module = sys.modules[modname] - root = getattr(module, classname) - env.temp_data['wsme:root'] = root - return root - - -def scan_services(service, path=[]): - has_functions = False - for name in dir(service): - if name.startswith('_'): - continue - a = getattr(service, name) - if inspect.ismethod(a): - if hasattr(a, '_wsme_definition'): - has_functions = True - if inspect.isclass(a): - continue - if len(path) > wsme.rest.APIPATH_MAXLEN: - raise ValueError("Path is too long: " + str(path)) - for value in scan_services(a, path + [name]): - yield value - if has_functions: - yield service, path - - -def find_service_path(env, service): - root = getroot(env) - if service == root: - return [] - for s, path in scan_services(root): - if s == service: - return path - return None - - -class TypeDirective(PyClasslike): - def get_index_text(self, modname, name_cls): - return _('%s (webservice type)') % name_cls[0] - - def add_target_and_index(self, name_cls, sig, signode): - ret = super(TypeDirective, self).add_target_and_index( - name_cls, sig, signode - ) - name = name_cls[0] - types = self.env.domaindata['wsme']['types'] - if name in types: - self.state_machine.reporter.warning( - 'duplicate type description of %s ' % name) - types[name] = self.env.docname - return ret - - -class AttributeDirective(PyClassmember): - doc_field_types = [ - Field('datatype', label=l_('Type'), has_arg=False, - names=('type', 'datatype')) - ] - - -def check_samples_slot(value): - """Validate the samples_slot option to the TypeDocumenter. - - Valid positions are 'before-docstring' and - 'after-docstring'. Using the explicit 'none' disables sample - output. The default is after-docstring. - """ - if not value: - return 'after-docstring' - val = directives.choice( - value, - ('none', # do not include - 'before-docstring', # show samples then docstring - 'after-docstring', # show docstring then samples - )) - return val - - -class TypeDocumenter(autodoc.ClassDocumenter): - objtype = 'type' - directivetype = 'type' - domain = 'wsme' - - required_arguments = 1 - default_samples_slot = 'after-docstring' - - option_spec = dict( - autodoc.ClassDocumenter.option_spec, - **{'protocols': lambda l: [v.strip() for v in l.split(',')], - 'samples-slot': check_samples_slot, - }) - - @staticmethod - def can_document_member(member, membername, isattr, parent): - # we don't want to be automaticaly used - # TODO check if the member is registered an an exposed type - return False - - def format_name(self): - return self.object.__name__ - - def format_signature(self): - return u'' - - def add_directive_header(self, sig): - super(TypeDocumenter, self).add_directive_header(sig) - # remove the :module: option that was added by ClassDocumenter - result_len = len(self.directive.result) - for index, item in zip(reversed(range(result_len)), - reversed(self.directive.result)): - if ':module:' in item: - self.directive.result.pop(index) - - def import_object(self): - if super(TypeDocumenter, self).import_object(): - wsme.types.register_type(self.object) - return True - else: - return False - - def add_content(self, more_content, no_docstring=False): - # Check where to include the samples - samples_slot = self.options.samples_slot or self.default_samples_slot - - def add_docstring(): - super(TypeDocumenter, self).add_content( - more_content, no_docstring) - - def add_samples(): - protocols = get_protocols( - self.options.protocols or self.env.app.config.wsme_protocols - ) - content = [] - if protocols: - sample_obj = make_sample_object(self.object) - content.extend([ - l_(u'Data samples:'), - u'', - u'.. cssclass:: toggle', - u'' - ]) - for name, protocol in protocols: - language, sample = protocol.encode_sample_value( - self.object, sample_obj, format=True) - content.extend([ - name, - u' .. code-block:: ' + language, - u'', - ]) - content.extend( - u' ' * 8 + line - for line in six.text_type(sample).split('\n')) - for line in content: - self.add_line(line, u'<wsmeext.sphinxext') - - self.add_line(u'', '<wsmeext.sphinxext>') - - if samples_slot == 'after-docstring': - add_docstring() - add_samples() - elif samples_slot == 'before-docstring': - add_samples() - add_docstring() - else: - add_docstring() - - -class AttributeDocumenter(autodoc.AttributeDocumenter): - datatype = None - domain = 'wsme' - - @staticmethod - def can_document_member(member, membername, isattr, parent): - return isinstance(parent, TypeDocumenter) - - def import_object(self): - success = super(AttributeDocumenter, self).import_object() - if success: - self.datatype = self.object.datatype - return success - - def add_content(self, more_content, no_docstring=False): - self.add_line( - u':type: %s' % datatypename(self.datatype), - '<wsmeext.sphinxext>' - ) - self.add_line(u'', '<wsmeext.sphinxext>') - super(AttributeDocumenter, self).add_content( - more_content, no_docstring) - - def add_directive_header(self, sig): - super(AttributeDocumenter, self).add_directive_header(sig) - - -class RootDirective(Directive): - """ - This directive is to tell what class is the Webservice root - """ - has_content = False - required_arguments = 1 - optional_arguments = 0 - final_argument_whitespace = False - option_spec = { - 'webpath': directives.unchanged - } - - def run(self): - env = self.state.document.settings.env - rootpath = self.arguments[0].strip() - env.temp_data['wsme:rootpath'] = rootpath - if 'wsme:root' in env.temp_data: - del env.temp_data['wsme:root'] - if 'webpath' in self.options: - env.temp_data['wsme:webpath'] = self.options['webpath'] - return [] - - -class ServiceDirective(ObjectDescription): - name = 'service' - - optional_arguments = 1 - - def handle_signature(self, sig, signode): - path = sig.split('/') - - namespace = '/'.join(path[:-1]) - if namespace and not namespace.endswith('/'): - namespace += '/' - - servicename = path[-1] - - if not namespace and not servicename: - servicename = '/' - - signode += addnodes.desc_annotation('service ', 'service ') - - if namespace: - signode += addnodes.desc_addname(namespace, namespace) - - signode += addnodes.desc_name(servicename, servicename) - - return sig - - -class ServiceDocumenter(autodoc.ClassDocumenter): - domain = 'wsme' - objtype = 'service' - directivetype = 'service' - - def add_directive_header(self, sig): - super(ServiceDocumenter, self).add_directive_header(sig) - # remove the :module: option that was added by ClassDocumenter - result_len = len(self.directive.result) - for index, item in zip(reversed(range(result_len)), - reversed(self.directive.result)): - if ':module:' in item: - self.directive.result.pop(index) - - def format_signature(self): - return u'' - - def format_name(self): - path = find_service_path(self.env, self.object) - if path is None: - return - return '/' + '/'.join(path) - - -class FunctionDirective(PyClassmember): - name = 'function' - objtype = 'function' - - def get_signature_prefix(self, sig): - return 'function ' - - -def document_function(funcdef, docstrings=None, protocols=['restjson']): - """A helper function to complete a function documentation with return and - parameter types""" - # If the function doesn't have a docstring, add an empty list - # so the default behaviors below work correctly. - if not docstrings: - docstrings = [[]] - found_params = set() - - for si, docstring in enumerate(docstrings): - for i, line in enumerate(docstring): - m = field_re.match(line) - if m and m.group('field') == 'param': - found_params.add(m.group('name')) - - next_param_pos = (0, 0) - - for arg in funcdef.arguments: - content = [ - u':type %s: :wsme:type:`%s`' % ( - arg.name, datatypename(arg.datatype)) - ] - if arg.name not in found_params: - content.insert(0, u':param %s: ' % (arg.name)) - pos = next_param_pos - else: - for si, docstring in enumerate(docstrings): - for i, line in enumerate(docstring): - m = field_re.match(line) - if m and m.group('field') == 'param' \ - and m.group('name') == arg.name: - pos = (si, i + 1) - break - docstring = docstrings[pos[0]] - docstring[pos[1]:pos[1]] = content - next_param_pos = (pos[0], pos[1] + len(content)) - - if funcdef.return_type: - content = [ - u':rtype: %s' % datatypename(funcdef.return_type) - ] - pos = None - for si, docstring in enumerate(docstrings): - for i, line in enumerate(docstring): - m = field_re.match(line) - if m and m.group('field') == 'return': - pos = (si, i + 1) - break - else: - pos = next_param_pos - docstring = docstrings[pos[0]] - docstring[pos[1]:pos[1]] = content - - codesamples = [] - - if protocols: - params = [] - for arg in funcdef.arguments: - params.append(( - arg.name, - arg.datatype, - make_sample_object(arg.datatype) - )) - codesamples.extend([ - u':%s:' % l_(u'Parameters samples'), - u' .. cssclass:: toggle', - u'' - ]) - for name, protocol in protocols: - language, sample = protocol.encode_sample_params( - params, format=True) - codesamples.extend([ - u' ' * 4 + name, - u' .. code-block:: ' + language, - u'', - ]) - codesamples.extend(( - u' ' * 12 + line - for line in six.text_type(sample).split('\n') - )) - - if funcdef.return_type: - codesamples.extend([ - u':%s:' % l_(u'Return samples'), - u' .. cssclass:: toggle', - u'' - ]) - sample_obj = make_sample_object(funcdef.return_type) - for name, protocol in protocols: - language, sample = protocol.encode_sample_result( - funcdef.return_type, sample_obj, format=True) - codesamples.extend([ - u' ' * 4 + name, - u' .. code-block:: ' + language, - u'', - ]) - codesamples.extend(( - u' ' * 12 + line - for line in six.text_type(sample).split('\n') - )) - - docstrings[0:0] = [codesamples] - return docstrings - - -class FunctionDocumenter(autodoc.MethodDocumenter): - domain = 'wsme' - directivetype = 'function' - objtype = 'function' - priority = 1 - - option_spec = { - 'path': directives.unchanged, - 'method': directives.unchanged - } - - @staticmethod - def can_document_member(member, membername, isattr, parent): - return (isinstance(parent, ServiceDocumenter) and - wsme.api.iswsmefunction(member)) - - def import_object(self): - ret = super(FunctionDocumenter, self).import_object() - self.directivetype = 'function' - self.wsme_fd = wsme.api.FunctionDefinition.get(self.object) - self.retann = datatypename(self.wsme_fd.return_type) - return ret - - def format_args(self): - args = [arg.name for arg in self.wsme_fd.arguments] - defaults = [ - arg.default - for arg in self.wsme_fd.arguments if not arg.mandatory - ] - return inspect.formatargspec(args, defaults=defaults) - - def get_doc(self, encoding=None): - """Inject the type and param fields into the docstrings so that the - user can add its own param fields to document the parameters""" - docstrings = super(FunctionDocumenter, self).get_doc(encoding) - - protocols = get_protocols( - self.options.protocols or self.env.app.config.wsme_protocols - ) - - return document_function( - self.wsme_fd, docstrings, protocols - ) - - def add_content(self, more_content, no_docstring=False): - super(FunctionDocumenter, self).add_content(more_content, no_docstring) - - def format_name(self): - return self.wsme_fd.name - - def add_directive_header(self, sig): - super(FunctionDocumenter, self).add_directive_header(sig) - # remove the :module: option that was added by ClassDocumenter - result_len = len(self.directive.result) - for index, item in zip(reversed(range(result_len)), - reversed(self.directive.result)): - if ':module:' in item: - self.directive.result.pop(index) - - -class WSMEDomain(Domain): - name = 'wsme' - label = 'WSME' - - object_types = { - 'type': ObjType(l_('type'), 'type', 'obj'), - 'service': ObjType(l_('service'), 'service', 'obj') - } - - directives = { - 'type': TypeDirective, - 'attribute': AttributeDirective, - 'service': ServiceDirective, - 'root': RootDirective, - 'function': FunctionDirective, - } - - roles = { - 'type': XRefRole() - } - - initial_data = { - 'types': {}, # fullname -> docname - } - - def clear_doc(self, docname): - keys = list(self.data['types'].keys()) - for key in keys: - value = self.data['types'][key] - if value == docname: - del self.data['types'][key] - - def resolve_xref(self, env, fromdocname, builder, - type, target, node, contnode): - if target not in self.data['types']: - return None - todocname = self.data['types'][target] - return make_refnode( - builder, fromdocname, todocname, target, contnode, target) - - -def setup(app): - app.add_domain(WSMEDomain) - app.add_autodocumenter(TypeDocumenter) - app.add_autodocumenter(AttributeDocumenter) - app.add_autodocumenter(ServiceDocumenter) - app.add_autodocumenter(FunctionDocumenter) - - app.add_config_value('wsme_root', None, 'env') - app.add_config_value('wsme_webpath', '/', 'env') - app.add_config_value('wsme_protocols', ['restjson', 'restxml'], 'env') - app.add_javascript('toggle.js') - app.add_stylesheet('toggle.css') diff --git a/wsmeext/sqlalchemy/__init__.py b/wsmeext/sqlalchemy/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/wsmeext/sqlalchemy/__init__.py +++ /dev/null diff --git a/wsmeext/sqlalchemy/controllers.py b/wsmeext/sqlalchemy/controllers.py deleted file mode 100644 index 504a46a..0000000 --- a/wsmeext/sqlalchemy/controllers.py +++ /dev/null @@ -1,97 +0,0 @@ -from wsme.rest import expose, validate -import wsme.types - -from wsmeext.sqlalchemy.types import SQLAlchemyRegistry - - -class CRUDControllerMeta(type): - def __init__(cls, name, bases, dct): - if cls.__saclass__ is not None: - if cls.__registry__ is None: - cls.__registry__ = wsme.types.registry - if cls.__wstype__ is None: - cls.__wstype__ = cls.__registry__.resolve_type( - SQLAlchemyRegistry.get( - cls.__registry__).getdatatype(cls.__saclass__)) - - cls.create = expose( - cls.__wstype__, - method='PUT', - wrap=True - )(cls.create) - cls.create = validate(cls.__wstype__)(cls.create) - - cls.read = expose( - cls.__wstype__, - method='GET', - wrap=True - )(cls.read) - cls.read = validate(cls.__wstype__)(cls.read) - - cls.update = expose( - cls.__wstype__, - method='POST', - wrap=True - )(cls.update) - cls.update = validate(cls.__wstype__)(cls.update) - - cls.delete = expose( - method='DELETE', - wrap=True - )(cls.delete) - cls.delete = validate(cls.__wstype__)(cls.delete) - - super(CRUDControllerMeta, cls).__init__(name, bases, dct) - - -class CRUDControllerBase(object): - __registry__ = None - __saclass__ = None - __wstype__ = None - __dbsession__ = None - - def _create_one(self, data): - obj = self.__saclass__() - data.to_instance(obj) - self.__dbsession__.add(obj) - return obj - - def _get_one(self, ref): - q = self.__dbsession__.query(self.__saclass__) - q = q.filter(ref.get_ref_criterion()) - return q.one() - - def _update_one(self, data): - obj = self._get_one(data) - if obj is None: - raise ValueError("No match for data=%s" % data) - data.to_instance(obj) - return obj - - def _delete(self, ref): - obj = self._get_one(ref) - self.__dbsession__.delete(obj) - - def create(self, data): - obj = self._create_one(data) - self.__dbsession__.flush() - return self.__wstype__(obj) - - def read(self, ref): - obj = self._get_one(ref) - return self.__wstype__(obj) - - def update(self, data): - obj = self._update_one(data) - self.__dbsession__.flush() - return self.__wstype__(obj) - - def delete(self, ref): - self._delete(ref) - self.__dbsession__.flush() - return None - - -CRUDController = CRUDControllerMeta( - 'CRUDController', (CRUDControllerBase,), {} -) diff --git a/wsmeext/sqlalchemy/types.py b/wsmeext/sqlalchemy/types.py deleted file mode 100644 index 414bf52..0000000 --- a/wsmeext/sqlalchemy/types.py +++ /dev/null @@ -1,200 +0,0 @@ -import datetime -import decimal -import logging - -import six - -from sqlalchemy.orm import class_mapper -from sqlalchemy.orm.properties import ColumnProperty, RelationProperty - -import sqlalchemy.types - -import wsme.types - -log = logging.getLogger(__name__) - - -class SQLAlchemyRegistry(object): - @classmethod - def get(cls, registry): - if not hasattr(registry, 'sqlalchemy'): - registry.sqlalchemy = cls() - return registry.sqlalchemy - - def __init__(self): - self.types = {} - self.satypeclasses = { - sqlalchemy.types.Integer: int, - sqlalchemy.types.Boolean: bool, - sqlalchemy.types.Float: float, - sqlalchemy.types.Numeric: decimal.Decimal, - sqlalchemy.types.Date: datetime.date, - sqlalchemy.types.Time: datetime.time, - sqlalchemy.types.DateTime: datetime.datetime, - sqlalchemy.types.String: wsme.types.text, - sqlalchemy.types.Unicode: wsme.types.text, - } - - def getdatatype(self, sadatatype): - if sadatatype.__class__ in self.satypeclasses: - return self.satypeclasses[sadatatype.__class__] - elif sadatatype in self.types: - return self.types[sadatatype] - else: - return sadatatype.__name__ - - -def register_saclass(registry, saclass, typename=None): - """Associate a webservice type name to a SQLAlchemy mapped class. - The default typename if the saclass name itself. - """ - if typename is None: - typename = saclass.__name__ - - SQLAlchemyRegistry.get(registry).types[saclass] = typename - - -class wsattr(wsme.types.wsattr): - def __init__(self, datatype, saproperty=None, **kw): - super(wsattr, self).__init__(datatype, **kw) - self.saname = saproperty.key - self.saproperty = saproperty - self.isrelation = isinstance(saproperty, RelationProperty) - - -def make_wsattr(registry, saproperty): - datatype = None - if isinstance(saproperty, ColumnProperty): - if len(saproperty.columns) > 1: - log.warning("Cannot handle multi-column ColumnProperty") - return None - datatype = SQLAlchemyRegistry.get(registry).getdatatype( - saproperty.columns[0].type) - elif isinstance(saproperty, RelationProperty): - other_saclass = saproperty.mapper.class_ - datatype = SQLAlchemyRegistry.get(registry).getdatatype(other_saclass) - if saproperty.uselist: - datatype = [datatype] - else: - log.warning("Don't know how to handle %s attributes" % - saproperty.__class__) - - if datatype: - return wsattr(datatype, saproperty) - - -class BaseMeta(wsme.types.BaseMeta): - def __new__(cls, name, bases, dct): - if '__registry__' not in dct: - dct['__registry__'] = wsme.types.registry - return type.__new__(cls, name, bases, dct) - - def __init__(cls, name, bases, dct): - saclass = getattr(cls, '__saclass__', None) - if saclass: - mapper = class_mapper(saclass) - cls._pkey_attrs = [] - cls._ref_attrs = [] - for prop in mapper.iterate_properties: - key = prop.key - if hasattr(cls, key): - continue - if key.startswith('_'): - continue - attr = make_wsattr(cls.__registry__, prop) - if attr is not None: - setattr(cls, key, attr) - - if attr and isinstance(prop, ColumnProperty) and \ - prop.columns[0] in mapper.primary_key: - cls._pkey_attrs.append(attr) - cls._ref_attrs.append(attr) - - register_saclass(cls.__registry__, cls.__saclass__, cls.__name__) - super(BaseMeta, cls).__init__(name, bases, dct) - - -class Base(six.with_metaclass(BaseMeta, wsme.types.Base)): - def __init__(self, instance=None, keyonly=False, attrs=None, eagerload=[]): - if instance: - self.from_instance(instance, keyonly, attrs, eagerload) - - def from_instance(self, instance, keyonly=False, attrs=None, eagerload=[]): - if keyonly: - attrs = self._pkey_attrs + self._ref_attrs - for attr in self._wsme_attributes: - if not isinstance(attr, wsattr): - continue - if attrs and not attr.isrelation and attr.name not in attrs: - continue - if attr.isrelation and attr.name not in eagerload: - continue - value = getattr(instance, attr.saname) - if attr.isrelation: - attr_keyonly = attr.name not in eagerload - attr_attrs = None - attr_eagerload = [] - if not attr_keyonly: - attr_attrs = [ - aname[len(attr.name) + 1:] - for aname in attrs - if aname.startswith(attr.name + '.') - ] - attr_eagerload = [ - aname[len(attr.name) + 1:] - for aname in eagerload - if aname.startswith(attr.name + '.') - ] - if attr.saproperty.uselist: - value = [ - attr.datatype.item_type( - o, - keyonly=attr_keyonly, - attrs=attr_attrs, - eagerload=attr_eagerload - ) - for o in value - ] - else: - value = attr.datatype( - value, - keyonly=attr_keyonly, - attrs=attr_attrs, - eagerload=attr_eagerload - ) - attr.__set__(self, value) - - def to_instance(self, instance): - for attr in self._wsme_attributes: - if isinstance(attr, wsattr): - value = attr.__get__(self, self.__class__) - if value is not wsme.types.Unset: - setattr(instance, attr.saname, value) - - def get_ref_criterion(self): - """Returns a criterion that match a database object - having the pkey/ref attribute values of this webservice object""" - criterions = [] - for attr in self._pkey_attrs + self._ref_attrs: - value = attr.__get__(self, self.__class__) - if value is not wsme.types.Unset: - criterions.append(attr.saproperty == value) - - -def generate_types(*classes, **kw): - registry = kw.pop('registry', wsme.types.registry) - prefix = kw.pop('prefix', '') - postfix = kw.pop('postfix', '') - makename = kw.pop('makename', lambda s: prefix + s + postfix) - - newtypes = {} - for c in classes: - if isinstance(c, list): - newtypes.update(generate_types(c)) - else: - name = makename(c.__name__) - newtypes[name] = BaseMeta(name, (Base, ), { - '__saclass__': c, - '__registry__': registry - }) - return newtypes diff --git a/wsmeext/tests/__init__.py b/wsmeext/tests/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/wsmeext/tests/__init__.py +++ /dev/null diff --git a/wsmeext/tests/test_extdirect.py b/wsmeext/tests/test_extdirect.py deleted file mode 100644 index 4c5bea8..0000000 --- a/wsmeext/tests/test_extdirect.py +++ /dev/null @@ -1,243 +0,0 @@ -import base64 -import datetime -import decimal - -try: - import simplejson as json -except ImportError: - import json # noqa - -import wsme.tests.protocol -from wsme.utils import parse_isodatetime, parse_isodate, parse_isotime -from wsme.types import isarray, isdict, isusertype - -import six - -if six.PY3: - from urllib.parse import urlencode -else: - from urllib import urlencode # noqa - - -def encode_arg(value): - if isinstance(value, tuple): - value, datatype = value - else: - datatype = type(value) - - if isinstance(datatype, list): - value = [encode_arg((item, datatype[0])) for item in value] - elif isinstance(datatype, dict): - key_type, value_type = list(datatype.items())[0] - value = dict(( - (encode_arg((key, key_type)), - encode_arg((value, value_type))) - for key, value in value.items() - )) - elif datatype in (datetime.date, datetime.time, datetime.datetime): - value = value.isoformat() - elif datatype == wsme.types.binary: - value = base64.encodestring(value).decode('ascii') - elif datatype == wsme.types.bytes: - value = value.decode('ascii') - elif datatype == decimal.Decimal: - value = str(value) - return value - - -def decode_result(value, datatype): - if value is None: - return None - if datatype == wsme.types.binary: - value = base64.decodestring(value.encode('ascii')) - return value - if isusertype(datatype): - datatype = datatype.basetype - if isinstance(datatype, list): - value = [decode_result(item, datatype[0]) for item in value] - elif isarray(datatype): - value = [decode_result(item, datatype.item_type) for item in value] - elif isinstance(datatype, dict): - key_type, value_type = list(datatype.items())[0] - value = dict(( - (decode_result(key, key_type), - decode_result(value, value_type)) - for key, value in value.items() - )) - elif isdict(datatype): - key_type, value_type = datatype.key_type, datatype.value_type - value = dict(( - (decode_result(key, key_type), - decode_result(value, value_type)) - for key, value in value.items() - )) - elif datatype == datetime.time: - value = parse_isotime(value) - elif datatype == datetime.date: - value = parse_isodate(value) - elif datatype == datetime.datetime: - value = parse_isodatetime(value) - elif hasattr(datatype, '_wsme_attributes'): - for attr in datatype._wsme_attributes: - if attr.key not in value: - continue - value[attr.key] = decode_result(value[attr.key], attr.datatype) - elif datatype == decimal.Decimal: - value = decimal.Decimal(value) - elif datatype == wsme.types.bytes: - value = value.encode('ascii') - elif datatype is not None and type(value) != datatype: - value = datatype(value) - return value - - -class TestExtDirectProtocol(wsme.tests.protocol.ProtocolTestCase): - protocol = 'extdirect' - protocol_options = { - 'namespace': 'MyNS.api', - 'nsfolder': 'app' - } - - def call(self, fname, _rt=None, _no_result_decode=False, _accept=None, - **kw): - path = fname.split('/') - try: - func, funcdef, args = self.root._lookup_function(path) - arguments = funcdef.arguments - except Exception: - arguments = [] - if len(path) == 1: - ns, action, fname = '', '', path[0] - elif len(path) == 2: - ns, action, fname = '', path[0], path[1] - else: - ns, action, fname = '.'.join(path[:-2]), path[-2], path[-1] - print(kw) - - args = [ - dict( - (arg.name, encode_arg(kw[arg.name])) - for arg in arguments if arg.name in kw - ) - ] - print("args =", args) - data = json.dumps({ - 'type': 'rpc', - 'tid': 0, - 'action': action, - 'method': fname, - 'data': args, - }) - print(data) - headers = {'Content-Type': 'application/json'} - if _accept: - headers['Accept'] = _accept - res = self.app.post('/extdirect/router/%s' % ns, data, headers=headers, - expect_errors=True) - - print(res.body) - - if _no_result_decode: - return res - - data = json.loads(res.text) - if data['type'] == 'rpc': - r = data['result'] - return decode_result(r, _rt) - elif data['type'] == 'exception': - faultcode, faultstring = data['message'].split(': ', 1) - debuginfo = data.get('where') - raise wsme.tests.protocol.CallException( - faultcode, faultstring, debuginfo) - - def test_api_alias(self): - assert self.root._get_protocol('extdirect').api_alias == '/app/api.js' - - def test_get_api(self): - res = self.app.get('/app/api.js') - print(res.body) - assert res.body - - def test_positional(self): - self.root._get_protocol('extdirect').default_params_notation = \ - 'positional' - - data = json.dumps({ - 'type': 'rpc', - 'tid': 0, - 'action': 'misc', - 'method': 'multiply', - 'data': [2, 5], - }) - headers = {'Content-Type': 'application/json'} - res = self.app.post('/extdirect/router', data, headers=headers) - - print(res.body) - - data = json.loads(res.text) - assert data['type'] == 'rpc' - r = data['result'] - assert r == 10 - - def test_batchcall(self): - data = json.dumps([{ - 'type': 'rpc', - 'tid': 1, - 'action': 'argtypes', - 'method': 'setdate', - 'data': [{'value': '2011-04-06'}], - }, { - 'type': 'rpc', - 'tid': 2, - 'action': 'returntypes', - 'method': 'getbytes', - 'data': [] - }]) - print(data) - headers = {'Content-Type': 'application/json'} - res = self.app.post('/extdirect/router', data, headers=headers) - - print(res.body) - - rdata = json.loads(res.text) - - assert len(rdata) == 2 - - assert rdata[0]['tid'] == 1 - assert rdata[0]['result'] == '2011-04-06' - assert rdata[1]['tid'] == 2 - assert rdata[1]['result'] == 'astring' - - def test_form_call(self): - params = { - 'value[0].inner.aint': 54, - 'value[1].inner.aint': 55, - 'extType': 'rpc', - 'extTID': 1, - 'extAction': 'argtypes', - 'extMethod': 'setnestedarray', - } - - body = urlencode(params) - r = self.app.post( - '/extdirect/router', - body, - headers={'Content-Type': 'application/x-www-form-urlencoded'} - ) - print(r) - - assert json.loads(r.text) == { - "tid": "1", - "action": "argtypes", - "type": "rpc", - "method": "setnestedarray", - "result": [{ - "inner": { - "aint": 54 - } - }, { - "inner": { - "aint": 55 - } - }] - } diff --git a/wsmeext/tests/test_soap.py b/wsmeext/tests/test_soap.py deleted file mode 100644 index bc17696..0000000 --- a/wsmeext/tests/test_soap.py +++ /dev/null @@ -1,423 +0,0 @@ -import decimal -import datetime -import base64 - -import six - -import wsme.tests.protocol - -try: - import xml.etree.ElementTree as et -except ImportError: - import cElementTree as et # noqa - -import suds.cache -import suds.client -import suds.transport - -import wsme.utils - - -class XDecimal(suds.xsd.sxbuiltin.XBuiltin): - def translate(self, value, topython=True): - if topython: - if isinstance(value, six.string_types) and len(value): - return decimal.Decimal(value) - else: - if isinstance(value, (decimal.Decimal, int, float)): - return str(value) - return value - - -suds.xsd.sxbuiltin.Factory.tags['decimal'] = XDecimal - - -class WebtestSudsTransport(suds.transport.Transport): - def __init__(self, app): - suds.transport.Transport.__init__(self) - self.app = app - - def open(self, request): - res = self.app.get(request.url, headers=request.headers) - return six.BytesIO(res.body) - - def send(self, request): - res = self.app.post( - request.url, - request.message, - headers=dict(( - (key, str(value)) for key, value in request.headers.items() - )), - expect_errors=True - ) - return suds.transport.Reply( - res.status_int, - dict(res.headers), - res.body - ) - - -class SudsCache(suds.cache.Cache): - def __init__(self): - self.d = {} - - def get(self, id): - return self.d.get(id) - - def getf(self, id): - b = self.get(id) - if b is not None: - return six.StringIO(self.get(id)) - - def put(self, id, bfr): - self.d[id] = bfr - - def putf(self, id, fp): - self.put(id, fp.read()) - - def purge(self, id): - try: - del self.d[id] - except KeyError: - pass - - def clear(self, id): - self.d = {} - - -sudscache = SudsCache() - -tns = "http://foo.bar.baz/soap/" -typenamespace = "http://foo.bar.baz/types/" - -soapenv_ns = 'http://schemas.xmlsoap.org/soap/envelope/' -xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance' -body_qn = '{%s}Body' % soapenv_ns -fault_qn = '{%s}Fault' % soapenv_ns -faultcode_qn = '{%s}faultcode' % soapenv_ns -faultstring_qn = '{%s}faultstring' % soapenv_ns -faultdetail_qn = '{%s}detail' % soapenv_ns -type_qn = '{%s}type' % xsi_ns -nil_qn = '{%s}nil' % xsi_ns - - -def build_soap_message(method, params=""): - message = """<?xml version="1.0"?> -<soap:Envelope -xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" -xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" -soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"> - - <soap:Body xmlns="%(typenamespace)s"> - <%(method)s> - %(params)s - </%(method)s> - </soap:Body> - -</soap:Envelope> -""" % dict(method=method, params=params, typenamespace=typenamespace) - return message - - -python_types = { - int: ('xs:int', str), - float: ('xs:float', str), - bool: ('xs:boolean', str), - wsme.types.bytes: ( - 'xs:string', - lambda x: x.decode('ascii') if isinstance(x, wsme.types.bytes) else x - ), - wsme.types.text: ('xs:string', wsme.types.text), - wsme.types.binary: ( - 'xs:base64Binary', - lambda x: base64.encodestring(x).decode('ascii') - ), - decimal.Decimal: ('xs:decimal', str), - datetime.date: ('xs:date', datetime.date.isoformat), - datetime.time: ('xs:time', datetime.time.isoformat), - datetime.datetime: ('xs:dateTime', datetime.datetime.isoformat), -} - -array_types = { - wsme.types.bytes: "String_Array", - wsme.types.text: "String_Array", - int: "Int_Array", - float: "Float_Array", - bool: "Boolean_Array", - datetime.datetime: "dateTime_Array" -} - -if not six.PY3: - array_types[long] = "Long_Array" # noqa - - -def tosoap(tag, value): - el = et.Element(tag) - if isinstance(value, tuple): - value, datatype = value - else: - datatype = type(value) - if value is None: - el.set('xsi:nil', 'true') - return el - if datatype in python_types: - stype, conv = python_types[datatype] - el.text = conv(value) - el.set('xsi:type', stype) - el.text = str(value) - return el - - -def tosuds(client, value): - if value is None: - return None - if isinstance(value, tuple): - value, datatype = value - else: - datatype = type(value) - if value is None: - return None - if isinstance(datatype, list): - if datatype[0] in array_types: - tname = array_types[datatype[0]] - else: - tname = datatype[0].__name__ + '_Array' - o = client.factory.create('types:' + tname) - o.item = [tosuds(client, (item, datatype[0])) for item in value] - return o - elif datatype in python_types: - return python_types[datatype][1](value) - else: - o = client.factory.create('types:' + datatype.__name__) - - for attr in datatype._wsme_attributes: - if attr.name in value: - setattr( - o, attr.name, - tosuds(client, (value[attr.name], attr.datatype)) - ) - return o - - -def read_bool(value): - return value == 'true' - - -soap_types = { - 'xs:string': wsme.types.text, - 'xs:int': int, - 'xs:long': int if six.PY3 else long, # noqa - 'xs:float': float, - 'xs:decimal': decimal.Decimal, - 'xs:boolean': read_bool, - 'xs:date': wsme.utils.parse_isodate, - 'xs:time': wsme.utils.parse_isotime, - 'xs:dateTime': wsme.utils.parse_isodatetime, - 'xs:base64Binary': base64.decodestring, -} - - -def fromsoap(el): - if el.get(nil_qn) == 'true': - return None - t = el.get(type_qn) - if t == 'xs:string': - return wsme.types.text(el.text if el.text else '') - if t in soap_types: - return soap_types[t](el.text) - elif t and t.endswith('_Array'): - return [fromsoap(i) for i in el] - else: - d = {} - for child in el: - name = child.tag - assert name.startswith('{%s}' % typenamespace), name - name = name[len(typenamespace) + 2:] - d[name] = fromsoap(child) - return d - - -def tobytes(value): - if isinstance(value, wsme.types.text): - value = value.encode() - return value - - -def tobin(value): - value = base64.decodestring(value.encode()) - return value - - -fromsuds_types = { - wsme.types.binary: tobin, - wsme.types.bytes: tobytes, - decimal.Decimal: decimal.Decimal, -} - - -def fromsuds(dt, value): - if value is None: - return None - if isinstance(dt, list): - return [fromsuds(dt[0], item) for item in value.item] - if wsme.types.isarray(dt): - return [fromsuds(dt.item_type, item) for item in value.item] - if wsme.types.isusertype(dt) and dt not in fromsuds_types: - dt = dt.basetype - if dt in fromsuds_types: - print(dt, value) - value = fromsuds_types[dt](value) - print(value) - return value - if wsme.types.iscomplex(dt): - d = {} - for attrdef in dt._wsme_attributes: - if not hasattr(value, attrdef.name): - continue - d[attrdef.name] = fromsuds( - attrdef.datatype, getattr(value, attrdef.name) - ) - return d - return value - - -class TestSOAP(wsme.tests.protocol.ProtocolTestCase): - protocol = 'soap' - protocol_options = dict(tns=tns, typenamespace=typenamespace) - ws_path = '/' - _sudsclient = None - - def setUp(self): - wsme.tests.protocol.ProtocolTestCase.setUp(self) - - def test_simple_call(self): - message = build_soap_message('touch') - print(message) - res = self.app.post( - self.ws_path, - message, - headers={"Content-Type": "application/soap+xml; charset=utf-8"}, - expect_errors=True - ) - print(res.body) - assert res.status.startswith('200') - - def call(self, fpath, _rt=None, _accept=None, _no_result_decode=False, - **kw): - - if _no_result_decode or _accept or self._testMethodName in ( - 'test_missing_argument', 'test_invalid_path', 'test_settext_empty', - 'test_settext_none' - ): - return self.raw_call(fpath, _rt, _accept, _no_result_decode, **kw) - - path = fpath.strip('/').split('/') - methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]]) - - m = getattr(self.sudsclient.service, methodname) - kw = dict(( - (key, tosuds(self.sudsclient, value)) for key, value in kw.items() - )) - print(kw) - try: - return fromsuds(_rt, m(**kw)) - except suds.WebFault as exc: - raise wsme.tests.protocol.CallException( - exc.fault.faultcode, - exc.fault.faultstring, - getattr(exc.fault, 'detail', None) or None - ) - - def raw_call(self, fpath, _rt=None, _accept=None, _no_result_decode=False, - **kw): - path = fpath.strip('/').split('/') - methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]]) - # get the actual definition so we can build the adequate request - if kw: - el = et.Element('parameters') - for key, value in kw.items(): - el.append(tosoap(key, value)) - - params = six.b("\n").join(et.tostring(el) for el in el) - else: - params = "" - methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]]) - message = build_soap_message(methodname, params) - print(message) - headers = {"Content-Type": "application/soap+xml; charset=utf-8"} - if _accept is not None: - headers['Accept'] = _accept - res = self.app.post( - self.ws_path, - message, - headers=headers, - expect_errors=True - ) - print("Status: ", res.status, "Received:", res.body) - - if _no_result_decode: - return res - - el = et.fromstring(res.body) - body = el.find(body_qn) - print(body) - - if res.status_int == 200: - response_tag = '{%s}%sResponse' % (typenamespace, methodname) - r = body.find(response_tag) - result = r.find('{%s}result' % typenamespace) - print("Result element: ", result) - return fromsoap(result) - elif res.status_int == 400: - fault = body.find(fault_qn) - raise wsme.tests.protocol.CallException( - fault.find(faultcode_qn).text, - fault.find(faultstring_qn).text, - "") - - elif res.status_int == 500: - fault = body.find(fault_qn) - raise wsme.tests.protocol.CallException( - fault.find(faultcode_qn).text, - fault.find(faultstring_qn).text, - fault.find(faultdetail_qn) is not None and - fault.find(faultdetail_qn).text or None) - - @property - def sudsclient(self): - if self._sudsclient is None: - self._sudsclient = suds.client.Client( - self.ws_path + 'api.wsdl', - transport=WebtestSudsTransport(self.app), - cache=sudscache - ) - return self._sudsclient - - def test_wsdl(self): - c = self.sudsclient - assert c.wsdl.tns[1] == tns, c.wsdl.tns - - sd = c.sd[0] - - assert len(sd.ports) == 1 - port, methods = sd.ports[0] - self.assertEqual(len(methods), 51) - - methods = dict(methods) - - assert 'returntypesGettext' in methods - print(methods) - - assert methods['argtypesSettime'][0][0] == 'value' - - def test_return_nesteddict(self): - pass - - def test_setnesteddict(self): - pass - - def test_return_objectdictattribute(self): - pass - - def test_setnested_nullobj(self): - pass # TODO write a soap adapted version of this test. diff --git a/wsmeext/tests/test_sqlalchemy_controllers.py b/wsmeext/tests/test_sqlalchemy_controllers.py deleted file mode 100644 index 1956788..0000000 --- a/wsmeext/tests/test_sqlalchemy_controllers.py +++ /dev/null @@ -1,223 +0,0 @@ -import datetime - -try: - import json -except ImportError: - import simplejson as json - -from webtest import TestApp - -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy import Column, Integer, Unicode, Date, ForeignKey -from sqlalchemy.orm import relation - -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker, scoped_session - -from wsme import WSRoot -import wsme.types - -from wsmeext.sqlalchemy.types import generate_types -from wsmeext.sqlalchemy.controllers import CRUDController - -from six import u - -engine = create_engine('sqlite:///') -DBSession = scoped_session(sessionmaker(autocommit=False, autoflush=False, - bind=engine)) -DBBase = declarative_base() - -registry = wsme.types.Registry() - - -class DBPerson(DBBase): - __tablename__ = 'person' - - id = Column(Integer, primary_key=True) - name = Column(Unicode(50)) - birthdate = Column(Date) - - addresses = relation('DBAddress') - - -class DBAddress(DBBase): - __tablename__ = 'address' - - id = Column(Integer, primary_key=True) - - _person_id = Column('person_id', ForeignKey(DBPerson.id)) - - street = Column(Unicode(50)) - city = Column(Unicode(50)) - - person = relation(DBPerson) - - -globals().update( - generate_types(DBPerson, DBAddress, makename=lambda s: s[2:], - registry=registry)) - - -class PersonController(CRUDController): - __saclass__ = DBPerson - __dbsession__ = DBSession - __registry__ = registry - - -class AddressController(CRUDController): - __saclass__ = DBAddress - __dbsession__ = DBSession - __registry__ = registry - - -class Root(WSRoot): - __registry__ = registry - - person = PersonController() - address = AddressController() - - -class TestCRUDController(): - def setUp(self): - DBBase.metadata.create_all(DBSession.bind) - - self.root = Root() - self.root.getapi() - self.root.addprotocol('restjson') - - self.app = TestApp(self.root.wsgiapp()) - - def tearDown(self): - DBBase.metadata.drop_all(DBSession.bind) - - def test_create(self): - data = dict(data=dict( - name=u('Pierre-Joseph'), - birthdate=u('1809-01-15') - )) - r = self.app.post('/person/create', json.dumps(data), - headers={'Content-Type': 'application/json'}) - r = json.loads(r.text) - print(r) - assert r['name'] == u('Pierre-Joseph') - assert r['birthdate'] == u('1809-01-15') - - def test_PUT(self): - data = dict(data=dict( - name=u('Pierre-Joseph'), - birthdate=u('1809-01-15') - )) - r = self.app.put('/person', json.dumps(data), - headers={'Content-Type': 'application/json'}) - r = json.loads(r.text) - print(r) - assert r['name'] == u('Pierre-Joseph') - assert r['birthdate'] == u('1809-01-15') - - def test_read(self): - p = DBPerson( - name=u('Pierre-Joseph'), - birthdate=datetime.date(1809, 1, 15)) - DBSession.add(p) - DBSession.flush() - pid = p.id - r = self.app.post('/person/read', '{"ref": {"id": %s}}' % pid, - headers={'Content-Type': 'application/json'}) - r = json.loads(r.text) - print(r) - assert r['name'] == u('Pierre-Joseph') - assert r['birthdate'] == u('1809-01-15') - - def test_GET(self): - p = DBPerson( - name=u('Pierre-Joseph'), - birthdate=datetime.date(1809, 1, 15)) - DBSession.add(p) - DBSession.flush() - pid = p.id - r = self.app.get('/person?ref.id=%s' % pid, - headers={'Accept': 'application/json'}) - r = json.loads(r.text) - print(r) - assert r['name'] == u('Pierre-Joseph') - assert r['birthdate'] == u('1809-01-15') - - def test_GET_bad_accept(self): - p = DBPerson( - name=u('Pierre-Joseph'), - birthdate=datetime.date(1809, 1, 15)) - DBSession.add(p) - DBSession.flush() - pid = p.id - r = self.app.get('/person?ref.id=%s' % pid, - headers={'Accept': 'text/plain'}, - status=406) - assert r.text == ("Unacceptable Accept type: text/plain not in " - "['application/json', 'text/javascript', " - "'application/javascript', 'text/xml']") - - def test_update(self): - p = DBPerson( - name=u('Pierre-Joseph'), - birthdate=datetime.date(1809, 1, 15)) - DBSession.add(p) - DBSession.flush() - pid = p.id - data = { - "id": pid, - "name": u('Pierre-Joseph Proudon') - } - r = self.app.post('/person/update', json.dumps(dict(data=data)), - headers={'Content-Type': 'application/json'}) - r = json.loads(r.text) - print(r) - assert r['name'] == u('Pierre-Joseph Proudon') - assert r['birthdate'] == u('1809-01-15') - - def test_POST(self): - p = DBPerson( - name=u('Pierre-Joseph'), - birthdate=datetime.date(1809, 1, 15)) - DBSession.add(p) - DBSession.flush() - pid = p.id - data = { - "id": pid, - "name": u('Pierre-Joseph Proudon') - } - r = self.app.post('/person', json.dumps(dict(data=data)), - headers={'Content-Type': 'application/json'}) - r = json.loads(r.text) - print(r) - assert r['name'] == u('Pierre-Joseph Proudon') - assert r['birthdate'] == u('1809-01-15') - - def test_delete(self): - p = DBPerson( - name=u('Pierre-Joseph'), - birthdate=datetime.date(1809, 1, 15)) - DBSession.add(p) - DBSession.flush() - pid = p.id - r = self.app.post('/person/delete', json.dumps( - dict(ref=dict(id=pid))), - headers={ - 'Content-Type': 'application/json' - }) - print(r) - assert DBSession.query(DBPerson).get(pid) is None - - def test_DELETE(self): - p = DBPerson( - name=u('Pierre-Joseph'), - birthdate=datetime.date(1809, 1, 15)) - DBSession.add(p) - DBSession.flush() - pid = p.id - r = self.app.delete('/person?ref.id=%s' % pid, - headers={'Content-Type': 'application/json'}) - print(r) - assert DBSession.query(DBPerson).get(pid) is None - - def test_nothing(self): - pass diff --git a/wsmeext/tests/test_sqlalchemy_types.py b/wsmeext/tests/test_sqlalchemy_types.py deleted file mode 100644 index 8512015..0000000 --- a/wsmeext/tests/test_sqlalchemy_types.py +++ /dev/null @@ -1,72 +0,0 @@ -import datetime - -import wsmeext.sqlalchemy.types - -from wsme.types import text, Unset, isarray - -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy import Column, Integer, String, Date, ForeignKey -from sqlalchemy.orm import relation - -from six import u - -SABase = declarative_base() - - -class SomeClass(SABase): - __tablename__ = 'some_table' - id = Column(Integer, primary_key=True) - name = Column(String(50)) - - adate = Column(Date) - - -def test_complextype(): - class AType(wsmeext.sqlalchemy.types.Base): - __saclass__ = SomeClass - - assert AType.id.datatype is int - assert AType.name.datatype is text - assert AType.adate.datatype is datetime.date - - a = AType() - s = SomeClass(name=u('aname'), adate=datetime.date(2012, 6, 26)) - assert s.name == u('aname') - - a.from_instance(s) - assert a.name == u('aname') - assert a.adate == datetime.date(2012, 6, 26) - - a.name = u('test') - del a.adate - assert a.adate is Unset - - a.to_instance(s) - assert s.name == u('test') - assert s.adate == datetime.date(2012, 6, 26) - - -def test_generate(): - class A(SABase): - __tablename__ = 'a' - id = Column(Integer, primary_key=True) - name = Column(String(50)) - - _b_id = Column(ForeignKey('b.id')) - - b = relation('B') - - class B(SABase): - __tablename__ = 'b' - id = Column(Integer, primary_key=True) - name = Column(String(50)) - - alist = relation(A) - - newtypes = wsmeext.sqlalchemy.types.generate_types(A, B) - - assert newtypes['A'].id.datatype is int - assert newtypes['A'].b.datatype is newtypes['B'] - assert newtypes['B'].id.datatype is int - assert isarray(newtypes['B'].alist.datatype) - assert newtypes['B'].alist.datatype.item_type is newtypes['A'] diff --git a/wsmeext/tg1.py b/wsmeext/tg1.py deleted file mode 100644 index b978166..0000000 --- a/wsmeext/tg1.py +++ /dev/null @@ -1,173 +0,0 @@ -try: - import json -except ImportError: - import simplejson as json # noqa - -import functools -import sys - -import cherrypy -import webob -from turbogears import expose, util -import turbogears.view - -from wsme.rest import validate as wsvalidate -import wsme.api -import wsme.rest -import wsme.rest.args -import wsme.rest.json -from wsme.utils import is_valid_code - -import inspect - -APIPATH_MAXLEN = 50 - -__all__ = ['wsexpose', 'wsvalidate'] - - -def wsexpose(*args, **kwargs): - tg_json_expose = expose( - 'wsmejson:', - accept_format='application/json', - content_type='application/json', - tg_format='json' - ) - tg_altjson_expose = expose( - 'wsmejson:', - accept_format='text/javascript', - content_type='application/json' - ) - tg_xml_expose = expose( - 'wsmexml:', - accept_format='text/xml', - content_type='text/xml', - tg_format='xml' - ) - sig = wsme.signature(*args, **kwargs) - - def decorate(f): - sig(f) - funcdef = wsme.api.FunctionDefinition.get(f) - - @functools.wraps(f) - def callfunction(self, *args, **kwargs): - args, kwargs = wsme.rest.args.get_args( - funcdef, args, kwargs, - cherrypy.request.params, None, - cherrypy.request.body, - cherrypy.request.headers['Content-Type'] - ) - if funcdef.pass_request: - kwargs[funcdef.pass_request] = cherrypy.request - try: - result = f(self, *args, **kwargs) - except Exception: - try: - exception_info = sys.exc_info() - orig_exception = exception_info[1] - if isinstance(orig_exception, cherrypy.HTTPError): - orig_code = getattr(orig_exception, 'status', None) - else: - orig_code = getattr(orig_exception, 'code', None) - data = wsme.api.format_exception(exception_info) - finally: - del exception_info - - cherrypy.response.status = 500 - if data['faultcode'] == 'client': - cherrypy.response.status = 400 - elif orig_code and is_valid_code(orig_code): - cherrypy.response.status = orig_code - - accept = cherrypy.request.headers.get('Accept', "").lower() - accept = util.simplify_http_accept_header(accept) - - decorators = {'text/xml': wsme.rest.xml.encode_error} - return decorators.get( - accept, - wsme.rest.json.encode_error - )(None, data) - - return dict( - datatype=funcdef.return_type, - result=result - ) - - callfunction = tg_xml_expose(callfunction) - callfunction = tg_altjson_expose(callfunction) - callfunction = tg_json_expose(callfunction) - callfunction._wsme_original_function = f - return callfunction - - return decorate - - -class AutoJSONTemplate(object): - def __init__(self, extra_vars_func=None, options=None): - pass - - def render(self, info, format="json", fragment=False, template=None): - "Renders the template to a string using the provided info." - return wsme.rest.json.encode_result( - info['result'], info['datatype'] - ) - - def get_content_type(self, user_agent): - return "application/json" - - -class AutoXMLTemplate(object): - def __init__(self, extra_vars_func=None, options=None): - pass - - def render(self, info, format="json", fragment=False, template=None): - "Renders the template to a string using the provided info." - return wsme.rest.xml.encode_result( - info['result'], info['datatype'] - ) - - def get_content_type(self, user_agent): - return "text/xml" - - -turbogears.view.engines['wsmejson'] = AutoJSONTemplate(turbogears.view.stdvars) -turbogears.view.engines['wsmexml'] = AutoXMLTemplate(turbogears.view.stdvars) - - -class Controller(object): - def __init__(self, wsroot): - self._wsroot = wsroot - - @expose() - def default(self, *args, **kw): - req = webob.Request(cherrypy.request.wsgi_environ) - res = self._wsroot._handle_request(req) - cherrypy.response.header_list = res.headerlist - cherrypy.response.status = res.status - return res.body - - -def _scan_api(controller, path=[], objects=[]): - """ - Recursively iterate a controller api entries. - """ - for name in dir(controller): - if name.startswith('_'): - continue - a = getattr(controller, name) - if a in objects: - continue - if inspect.ismethod(a): - if wsme.api.iswsmefunction(a): - yield path + [name], a._wsme_original_function, [controller] - elif inspect.isclass(a): - continue - else: - if len(path) > APIPATH_MAXLEN: - raise ValueError("Path is too long: " + str(path)) - for i in _scan_api(a, path + [name], objects + [a]): - yield i - - -def scan_api(root=None): - return _scan_api(cherrypy.root) diff --git a/wsmeext/tg11.py b/wsmeext/tg11.py deleted file mode 100644 index 80ec50c..0000000 --- a/wsmeext/tg11.py +++ /dev/null @@ -1,40 +0,0 @@ -from turbogears import config -import cherrypy -from cherrypy.filters.basefilter import BaseFilter -from turbogears.startup import call_on_startup, call_on_shutdown -from wsmeext.tg1 import wsexpose, wsvalidate -import wsmeext.tg1 - -__all__ = ['adapt', 'wsexpose', 'wsvalidate'] - - -class WSMECherrypyFilter(BaseFilter): - def __init__(self, controller): - self.controller = controller - self.webpath = None - - def on_start_resource(self): - path = cherrypy.request.path - if path.startswith(self.controller._wsroot._webpath): - cherrypy.request.processRequestBody = False - - -def adapt(wsroot): - wsroot._scan_api = wsmeext.tg1.scan_api - controller = wsmeext.tg1.Controller(wsroot) - filter_ = WSMECherrypyFilter(controller) - - def install_filter(): - filter_.webpath = config.get('server.webpath') or '' - controller._wsroot._webpath = \ - filter_.webpath + controller._wsroot._webpath - cherrypy.root._cp_filters.append(filter_) - - def uninstall_filter(): - cherrypy.root._cp_filters.remove(filter_) - controller._wsroot._webpath = \ - controller._wsroot._webpath[len(filter_.webpath):] - - call_on_startup.append(install_filter) - call_on_shutdown.insert(0, uninstall_filter) - return controller diff --git a/wsmeext/tg15.py b/wsmeext/tg15.py deleted file mode 100644 index 4b8a9b1..0000000 --- a/wsmeext/tg15.py +++ /dev/null @@ -1,20 +0,0 @@ -import cherrypy - -from wsmeext.tg1 import wsexpose, wsvalidate -import wsmeext.tg1 - - -__all__ = ['adapt', 'wsexpose', 'wsvalidate'] - - -def scan_api(root=None): - for baseurl, instance in cherrypy.tree.apps.items(): - path = [token for token in baseurl.split('/') if token] - for i in wsmeext.tg1._scan_api(instance.root, path): - yield i - - -def adapt(wsroot): - wsroot._scan_api = scan_api - controller = wsmeext.tg1.Controller(wsroot) - return controller |