summaryrefslogtreecommitdiff
path: root/wsmeext
diff options
context:
space:
mode:
Diffstat (limited to 'wsmeext')
-rw-r--r--wsmeext/__init__.py2
-rw-r--r--wsmeext/cornice.py168
-rw-r--r--wsmeext/extdirect/__init__.py1
-rw-r--r--wsmeext/extdirect/datastore.py121
-rw-r--r--wsmeext/extdirect/protocol.py450
-rw-r--r--wsmeext/extdirect/sadatastore.py19
-rw-r--r--wsmeext/flask.py108
-rw-r--r--wsmeext/pecan.py142
-rw-r--r--wsmeext/soap/__init__.py5
-rw-r--r--wsmeext/soap/protocol.py478
-rw-r--r--wsmeext/soap/simplegeneric.py107
-rw-r--r--wsmeext/soap/wsdl.py297
-rw-r--r--wsmeext/sphinxext.py600
-rw-r--r--wsmeext/sqlalchemy/__init__.py0
-rw-r--r--wsmeext/sqlalchemy/controllers.py97
-rw-r--r--wsmeext/sqlalchemy/types.py200
-rw-r--r--wsmeext/tests/__init__.py0
-rw-r--r--wsmeext/tests/test_extdirect.py243
-rw-r--r--wsmeext/tests/test_soap.py423
-rw-r--r--wsmeext/tests/test_sqlalchemy_controllers.py223
-rw-r--r--wsmeext/tests/test_sqlalchemy_types.py72
-rw-r--r--wsmeext/tg1.py173
-rw-r--r--wsmeext/tg11.py40
-rw-r--r--wsmeext/tg15.py20
24 files changed, 0 insertions, 3989 deletions
diff --git a/wsmeext/__init__.py b/wsmeext/__init__.py
deleted file mode 100644
index ece379c..0000000
--- a/wsmeext/__init__.py
+++ /dev/null
@@ -1,2 +0,0 @@
-import pkg_resources
-pkg_resources.declare_namespace(__name__)
diff --git a/wsmeext/cornice.py b/wsmeext/cornice.py
deleted file mode 100644
index 6c81386..0000000
--- a/wsmeext/cornice.py
+++ /dev/null
@@ -1,168 +0,0 @@
-"""
-WSME for cornice
-
-
-Activate it::
-
- config.include('wsme.cornice')
-
-
-And use it::
-
- @hello.get()
- @wsexpose(Message, wsme.types.text)
- def get_hello(who=u'World'):
- return Message(text='Hello %s' % who)
-"""
-from __future__ import absolute_import
-
-import inspect
-import sys
-
-import wsme
-from wsme.rest import json as restjson
-from wsme.rest import xml as restxml
-import wsme.runtime
-import wsme.api
-import functools
-
-from wsme.rest.args import (
- args_from_args, args_from_params, args_from_body, combine_args
-)
-
-
-class WSMEJsonRenderer(object):
- def __init__(self, info):
- pass
-
- def __call__(self, data, context):
- response = context['request'].response
- response.content_type = 'application/json'
- if 'faultcode' in data:
- if 'orig_code' in data:
- response.status_code = data['orig_code']
- elif data['faultcode'] == 'Client':
- response.status_code = 400
- else:
- response.status_code = 500
- return restjson.encode_error(None, data)
- obj = data['result']
- if isinstance(obj, wsme.api.Response):
- response.status_code = obj.status_code
- if obj.error:
- return restjson.encode_error(None, obj.error)
- obj = obj.obj
- return restjson.encode_result(obj, data['datatype'])
-
-
-class WSMEXmlRenderer(object):
- def __init__(self, info):
- pass
-
- def __call__(self, data, context):
- response = context['request'].response
- if 'faultcode' in data:
- if data['faultcode'] == 'Client':
- response.status_code = 400
- else:
- response.status_code = 500
- return restxml.encode_error(None, data)
- response.content_type = 'text/xml'
- return restxml.encode_result(data['result'], data['datatype'])
-
-
-def get_outputformat(request):
- df = None
- if 'Accept' in request.headers:
- if 'application/json' in request.headers['Accept']:
- df = 'json'
- elif 'text/xml' in request.headers['Accept']:
- df = 'xml'
- if df is None and 'Content-Type' in request.headers:
- if 'application/json' in request.headers['Content-Type']:
- df = 'json'
- elif 'text/xml' in request.headers['Content-Type']:
- df = 'xml'
- return df if df else 'json'
-
-
-def signature(*args, **kwargs):
- sig = wsme.signature(*args, **kwargs)
-
- def decorate(f):
- args = inspect.getargspec(f)[0]
- with_self = args[0] == 'self' if args else False
- f = sig(f)
- funcdef = wsme.api.FunctionDefinition.get(f)
- funcdef.resolve_types(wsme.types.registry)
-
- @functools.wraps(f)
- def callfunction(*args):
- if with_self:
- if len(args) == 1:
- self = args[0]
- request = self.request
- elif len(args) == 2:
- self, request = args
- else:
- raise ValueError("Cannot do anything with these arguments")
- else:
- request = args[0]
- request.override_renderer = 'wsme' + get_outputformat(request)
- try:
- args, kwargs = combine_args(funcdef, (
- args_from_args(funcdef, (), request.matchdict),
- args_from_params(funcdef, request.params),
- args_from_body(funcdef, request.body, request.content_type)
- ))
- wsme.runtime.check_arguments(funcdef, args, kwargs)
- if funcdef.pass_request:
- kwargs[funcdef.pass_request] = request
- if with_self:
- args.insert(0, self)
-
- result = f(*args, **kwargs)
- return {
- 'datatype': funcdef.return_type,
- 'result': result
- }
- except Exception:
- try:
- exception_info = sys.exc_info()
- orig_exception = exception_info[1]
- orig_code = getattr(orig_exception, 'code', None)
- data = wsme.api.format_exception(exception_info)
- if orig_code is not None:
- data['orig_code'] = orig_code
- return data
- finally:
- del exception_info
-
- callfunction.wsme_func = f
- return callfunction
- return decorate
-
-
-def scan_api(root=None):
- from cornice.service import get_services
- for service in get_services():
- for method, func, options in service.definitions:
- wsme_func = getattr(func, 'wsme_func')
- basepath = service.path.split('/')
- if basepath and not basepath[0]:
- del basepath[0]
- if wsme_func:
- yield (
- basepath + [method.lower()],
- wsme_func._wsme_definition
- )
-
-
-def includeme(config):
- import pyramid.wsgi
- wsroot = wsme.WSRoot(scan_api=scan_api, webpath='/ws')
- wsroot.addprotocol('extdirect')
- config.add_renderer('wsmejson', WSMEJsonRenderer)
- config.add_renderer('wsmexml', WSMEXmlRenderer)
- config.add_route('wsme', '/ws/*path')
- config.add_view(pyramid.wsgi.wsgiapp(wsroot.wsgiapp()), route_name='wsme')
diff --git a/wsmeext/extdirect/__init__.py b/wsmeext/extdirect/__init__.py
deleted file mode 100644
index ff14bd5..0000000
--- a/wsmeext/extdirect/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-from wsmeext.extdirect.protocol import ExtDirectProtocol # noqa
diff --git a/wsmeext/extdirect/datastore.py b/wsmeext/extdirect/datastore.py
deleted file mode 100644
index 287e3a7..0000000
--- a/wsmeext/extdirect/datastore.py
+++ /dev/null
@@ -1,121 +0,0 @@
-import wsme
-import wsme.types
-
-try:
- import simplejson as json
-except ImportError:
- import json
-
-
-class ReadResultBase(wsme.types.Base):
- total = int
- success = bool
- message = wsme.types.text
-
-
-def make_readresult(datatype):
- ReadResult = type(
- datatype.__name__ + 'ReadResult',
- (ReadResultBase,), {
- 'data': [datatype]
- }
- )
- return ReadResult
-
-
-class DataStoreControllerMeta(type):
- def __init__(cls, name, bases, dct):
- if cls.__datatype__ is None:
- return
- if getattr(cls, '__readresulttype__', None) is None:
- cls.__readresulttype__ = make_readresult(cls.__datatype__)
-
- cls.create = wsme.expose(
- cls.__readresulttype__,
- extdirect_params_notation='positional')(cls.create)
- cls.create = wsme.validate(cls.__datatype__)(cls.create)
-
- cls.read = wsme.expose(
- cls.__readresulttype__,
- extdirect_params_notation='named')(cls.read)
- cls.read = wsme.validate(str, str, int, int, int)(cls.read)
-
- cls.update = wsme.expose(
- cls.__readresulttype__,
- extdirect_params_notation='positional')(cls.update)
- cls.update = wsme.validate(cls.__datatype__)(cls.update)
-
- cls.destroy = wsme.expose(
- cls.__readresulttype__,
- extdirect_params_notation='positional')(cls.destroy)
- cls.destroy = wsme.validate(cls.__idtype__)(cls.destroy)
-
-
-class DataStoreControllerMixin(object):
- __datatype__ = None
- __idtype__ = int
-
- __readresulttype__ = None
-
- def create(self, obj):
- pass
-
- def read(self, query=None, sort=None, page=None, start=None, limit=None):
- pass
-
- def update(self, obj):
- pass
-
- def destroy(self, obj_id):
- pass
-
- def model(self):
- tpl = """
-Ext.define('%(appns)s.model.%(classname)s', {
- extend: 'Ext.data.Model',
- fields: %(fields)s,
-
- proxy: {
- type: 'direct',
- api: {
- create: %(appns)s.%(controllerns)s.create,
- read: %(appns)s.%(controllerns)s.read,
- update: %(appns)s.%(controllerns)s.update,
- destroy: %(appns)s.%(controllerns)s.destroy
- },
- reader: {
- root: 'data'
- }
- }
-});
- """
- fields = [
- attr.name for attr in self.__datatype__._wsme_attributes
- ]
- d = {
- 'appns': 'Demo',
- 'controllerns': 'stores.' + self.__datatype__.__name__.lower(),
- 'classname': self.__datatype__.__name__,
- 'fields': json.dumps(fields)
- }
- return tpl % d
-
- def store(self):
- tpl = """
-Ext.define('%(appns)s.store.%(classname)s', {
- extend: 'Ext.data.Store',
- model: '%(appns)s.model.%(classname)s'
-});
-"""
- d = {
- 'appns': 'Demo',
- 'classname': self.__datatype__.__name__,
- }
-
- return tpl % d
-
-
-DataStoreController = DataStoreControllerMeta(
- 'DataStoreController',
- (DataStoreControllerMixin,), {}
-)
diff --git a/wsmeext/extdirect/protocol.py b/wsmeext/extdirect/protocol.py
deleted file mode 100644
index 23793b4..0000000
--- a/wsmeext/extdirect/protocol.py
+++ /dev/null
@@ -1,450 +0,0 @@
-import datetime
-import decimal
-
-from simplegeneric import generic
-
-from wsme.exc import ClientSideError
-from wsme.protocol import CallContext, Protocol, expose
-from wsme.utils import parse_isodate, parse_isodatetime, parse_isotime
-from wsme.rest.args import from_params
-from wsme.types import iscomplex, isusertype, list_attributes, Unset
-import wsme.types
-
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
-
-from six import u
-
-
-class APIDefinitionGenerator(object):
- tpl = """\
-Ext.ns("%(rootns)s");
-
-if (!%(rootns)s.wsroot) {
- %(rootns)s.wsroot = "%(webpath)s.
-}
-
-%(descriptors)s
-
-Ext.syncRequire(['Ext.direct.*'], function() {
- %(providers)s
-});
-"""
- descriptor_tpl = """\
-Ext.ns("%(fullns)s");
-
-%(fullns)s.Descriptor = {
- "url": %(rootns)s.wsroot + "extdirect/router/%(ns)s",
- "namespace": "%(fullns)s",
- "type": "remoting",
- "actions": %(actions)s
- "enableBuffer": true
-};
-"""
- provider_tpl = """\
- Ext.direct.Manager.addProvider(%(fullns)s.Descriptor);
-"""
-
- def __init__(self):
- pass
-
- def render(self, rootns, webpath, namespaces, fullns):
- descriptors = u('')
- for ns in sorted(namespaces):
- descriptors += self.descriptor_tpl % {
- 'ns': ns,
- 'rootns': rootns,
- 'fullns': fullns(ns),
- 'actions': '\n'.join((
- ' ' * 4 + line
- for line
- in json.dumps(namespaces[ns], indent=4).split('\n')
- ))
- }
-
- providers = u('')
- for ns in sorted(namespaces):
- providers += self.provider_tpl % {
- 'fullns': fullns(ns)
- }
-
- r = self.tpl % {
- 'rootns': rootns,
- 'webpath': webpath,
- 'descriptors': descriptors,
- 'providers': providers,
- }
- return r
-
-
-@generic
-def fromjson(datatype, value):
- if value is None:
- return None
- if iscomplex(datatype):
- newvalue = datatype()
- for attrdef in list_attributes(datatype):
- if attrdef.name in value:
- setattr(newvalue, attrdef.key,
- fromjson(attrdef.datatype, value[attrdef.name]))
- value = newvalue
- elif isusertype(datatype):
- value = datatype.frombasetype(fromjson(datatype.basetype, value))
- return value
-
-
-@generic
-def tojson(datatype, value):
- if value is None:
- return value
- if iscomplex(datatype):
- d = {}
- for attrdef in list_attributes(datatype):
- attrvalue = getattr(value, attrdef.key)
- if attrvalue is not Unset:
- d[attrdef.name] = tojson(attrdef.datatype, attrvalue)
- value = d
- elif isusertype(datatype):
- value = tojson(datatype.basetype, datatype.tobasetype(value))
- return value
-
-
-@fromjson.when_type(wsme.types.ArrayType)
-def array_fromjson(datatype, value):
- return [fromjson(datatype.item_type, item) for item in value]
-
-
-@tojson.when_type(wsme.types.ArrayType)
-def array_tojson(datatype, value):
- if value is None:
- return value
- return [tojson(datatype.item_type, item) for item in value]
-
-
-@fromjson.when_type(wsme.types.DictType)
-def dict_fromjson(datatype, value):
- if value is None:
- return value
- return dict((
- (fromjson(datatype.key_type, key),
- fromjson(datatype.value_type, value))
- for key, value in value.items()
- ))
-
-
-@tojson.when_type(wsme.types.DictType)
-def dict_tojson(datatype, value):
- if value is None:
- return value
- return dict((
- (tojson(datatype.key_type, key),
- tojson(datatype.value_type, value))
- for key, value in value.items()
- ))
-
-
-@tojson.when_object(wsme.types.bytes)
-def bytes_tojson(datatype, value):
- if value is None:
- return value
- return value.decode('ascii')
-
-
-# raw strings
-@fromjson.when_object(wsme.types.bytes)
-def bytes_fromjson(datatype, value):
- if value is not None:
- value = value.encode('ascii')
- return value
-
-
-# unicode strings
-
-@fromjson.when_object(wsme.types.text)
-def text_fromjson(datatype, value):
- if isinstance(value, wsme.types.bytes):
- return value.decode('utf-8')
- return value
-
-
-# datetime.time
-
-@fromjson.when_object(datetime.time)
-def time_fromjson(datatype, value):
- if value is None or value == '':
- return None
- return parse_isotime(value)
-
-
-@tojson.when_object(datetime.time)
-def time_tojson(datatype, value):
- if value is None:
- return value
- return value.isoformat()
-
-
-# datetime.date
-
-@fromjson.when_object(datetime.date)
-def date_fromjson(datatype, value):
- if value is None or value == '':
- return None
- return parse_isodate(value)
-
-
-@tojson.when_object(datetime.date)
-def date_tojson(datatype, value):
- if value is None:
- return value
- return value.isoformat()
-
-
-# datetime.datetime
-
-@fromjson.when_object(datetime.datetime)
-def datetime_fromjson(datatype, value):
- if value is None or value == '':
- return None
- return parse_isodatetime(value)
-
-
-@tojson.when_object(datetime.datetime)
-def datetime_tojson(datatype, value):
- if value is None:
- return value
- return value.isoformat()
-
-
-# decimal.Decimal
-
-@fromjson.when_object(decimal.Decimal)
-def decimal_fromjson(datatype, value):
- if value is None:
- return value
- return decimal.Decimal(value)
-
-
-@tojson.when_object(decimal.Decimal)
-def decimal_tojson(datatype, value):
- if value is None:
- return value
- return str(value)
-
-
-class ExtCallContext(CallContext):
- def __init__(self, request, namespace, calldata):
- super(ExtCallContext, self).__init__(request)
- self.namespace = namespace
-
- self.tid = calldata['tid']
- self.action = calldata['action']
- self.method = calldata['method']
- self.params = calldata['data']
-
-
-class FormExtCallContext(CallContext):
- def __init__(self, request, namespace):
- super(FormExtCallContext, self).__init__(request)
- self.namespace = namespace
-
- self.tid = request.params['extTID']
- self.action = request.params['extAction']
- self.method = request.params['extMethod']
- self.params = []
-
-
-class ExtDirectProtocol(Protocol):
- """
- ExtDirect protocol.
-
- For more detail on the protocol, see
- http://www.sencha.com/products/extjs/extdirect.
-
- .. autoattribute:: name
- .. autoattribute:: content_types
- """
- name = 'extdirect'
- displayname = 'ExtDirect'
- content_types = ['application/json', 'text/javascript']
-
- def __init__(self, namespace='', params_notation='named', nsfolder=None):
- self.namespace = namespace
- self.appns, self.apins = namespace.rsplit('.', 2) \
- if '.' in namespace else (namespace, '')
- self.default_params_notation = params_notation
- self.appnsfolder = nsfolder
-
- @property
- def api_alias(self):
- if self.appnsfolder:
- alias = '/%s/%s.js' % (
- self.appnsfolder,
- self.apins.replace('.', '/'))
- return alias
-
- def accept(self, req):
- path = req.path
- assert path.startswith(self.root._webpath)
- path = path[len(self.root._webpath):]
-
- return (
- path == self.api_alias or
- path == "/extdirect/api" or
- path.startswith("/extdirect/router")
- )
-
- def iter_calls(self, req):
- path = req.path
-
- assert path.startswith(self.root._webpath)
- path = path[len(self.root._webpath):].strip()
-
- assert path.startswith('/extdirect/router'), path
- path = path[17:].strip('/')
-
- if path:
- namespace = path.split('.')
- else:
- namespace = []
-
- if 'extType' in req.params:
- req.wsme_extdirect_batchcall = False
- yield FormExtCallContext(req, namespace)
- else:
- data = json.loads(req.body.decode('utf8'))
- req.wsme_extdirect_batchcall = isinstance(data, list)
- if not req.wsme_extdirect_batchcall:
- data = [data]
- req.callcount = len(data)
-
- for call in data:
- yield ExtCallContext(req, namespace, call)
-
- def extract_path(self, context):
- path = list(context.namespace)
-
- if context.action:
- path.append(context.action)
-
- path.append(context.method)
-
- return path
-
- def read_std_arguments(self, context):
- funcdef = context.funcdef
- notation = funcdef.extra_options.get('extdirect_params_notation',
- self.default_params_notation)
- args = context.params
- if notation == 'positional':
- kw = dict(
- (argdef.name, fromjson(argdef.datatype, arg))
- for argdef, arg in zip(funcdef.arguments, args)
- )
- elif notation == 'named':
- if len(args) == 0:
- args = [{}]
- elif len(args) > 1:
- raise ClientSideError(
- "Named arguments: takes a single object argument")
- args = args[0]
- kw = dict(
- (argdef.name, fromjson(argdef.datatype, args[argdef.name]))
- for argdef in funcdef.arguments if argdef.name in args
- )
- else:
- raise ValueError("Invalid notation: %s" % notation)
- return kw
-
- def read_form_arguments(self, context):
- kw = {}
- for argdef in context.funcdef.arguments:
- value = from_params(argdef.datatype, context.request.params,
- argdef.name, set())
- if value is not Unset:
- kw[argdef.name] = value
- return kw
-
- def read_arguments(self, context):
- if isinstance(context, ExtCallContext):
- kwargs = self.read_std_arguments(context)
- elif isinstance(context, FormExtCallContext):
- kwargs = self.read_form_arguments(context)
- wsme.runtime.check_arguments(context.funcdef, (), kwargs)
- return kwargs
-
- def encode_result(self, context, result):
- return json.dumps({
- 'type': 'rpc',
- 'tid': context.tid,
- 'action': context.action,
- 'method': context.method,
- 'result': tojson(context.funcdef.return_type, result)
- })
-
- def encode_error(self, context, infos):
- return json.dumps({
- 'type': 'exception',
- 'tid': context.tid,
- 'action': context.action,
- 'method': context.method,
- 'message': '%(faultcode)s: %(faultstring)s' % infos,
- 'where': infos['debuginfo']})
-
- def prepare_response_body(self, request, results):
- r = ",\n".join(results)
- if request.wsme_extdirect_batchcall:
- return "[\n%s\n]" % r
- else:
- return r
-
- def get_response_status(self, request):
- return 200
-
- def get_response_contenttype(self, request):
- return "text/javascript"
-
- def fullns(self, ns):
- return ns and '%s.%s' % (self.namespace, ns) or self.namespace
-
- @expose('/extdirect/api', "text/javascript")
- @expose('${api_alias}', "text/javascript")
- def api(self):
- namespaces = {}
- for path, funcdef in self.root.getapi():
- if len(path) > 1:
- namespace = '.'.join(path[:-2])
- action = path[-2]
- else:
- namespace = ''
- action = ''
- if namespace not in namespaces:
- namespaces[namespace] = {}
- if action not in namespaces[namespace]:
- namespaces[namespace][action] = []
- notation = funcdef.extra_options.get('extdirect_params_notation',
- self.default_params_notation)
- method = {
- 'name': funcdef.name}
-
- if funcdef.extra_options.get('extdirect_formhandler', False):
- method['formHandler'] = True
- method['len'] = 1 if notation == 'named' \
- else len(funcdef.arguments)
- namespaces[namespace][action].append(method)
- webpath = self.root._webpath
- if webpath and not webpath.endswith('/'):
- webpath += '/'
- return APIDefinitionGenerator().render(
- namespaces=namespaces,
- webpath=webpath,
- rootns=self.namespace,
- fullns=self.fullns,
- )
-
- def encode_sample_value(self, datatype, value, format=False):
- r = tojson(datatype, value)
- content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0,
- sort_keys=format)
- return ('javascript', content)
diff --git a/wsmeext/extdirect/sadatastore.py b/wsmeext/extdirect/sadatastore.py
deleted file mode 100644
index 44d79cb..0000000
--- a/wsmeext/extdirect/sadatastore.py
+++ /dev/null
@@ -1,19 +0,0 @@
-from wsmeext.extdirect import datastore
-
-
-class SADataStoreController(datastore.DataStoreController):
- __dbsession__ = None
- __datatype__ = None
-
- def read(self, query=None, sort=None, page=None, start=None, limit=None):
- q = self.__dbsession__.query(self.__datatype__.__saclass__)
- total = q.count()
- if start is not None and limit is not None:
- q = q.slice(start, limit)
- return self.__readresulttype__(
- data=[
- self.__datatype__(o) for o in q
- ],
- success=True,
- total=total
- )
diff --git a/wsmeext/flask.py b/wsmeext/flask.py
deleted file mode 100644
index 4d9e446..0000000
--- a/wsmeext/flask.py
+++ /dev/null
@@ -1,108 +0,0 @@
-from __future__ import absolute_import
-
-import functools
-import logging
-import sys
-import inspect
-
-import wsme
-import wsme.api
-import wsme.rest.json
-import wsme.rest.xml
-import wsme.rest.args
-from wsme.utils import is_valid_code
-
-import flask
-
-log = logging.getLogger(__name__)
-
-
-TYPES = {
- 'application/json': wsme.rest.json,
- 'application/xml': wsme.rest.xml,
- 'text/xml': wsme.rest.xml
-}
-
-
-def get_dataformat():
- if 'Accept' in flask.request.headers:
- for t in TYPES:
- if t in flask.request.headers['Accept']:
- return TYPES[t]
-
- # Look for the wanted data format in the request.
- req_dataformat = getattr(flask.request, 'response_type', None)
- if req_dataformat in TYPES:
- return TYPES[req_dataformat]
-
- log.info('''Could not determine what format is wanted by the
- caller, falling back to json''')
- return wsme.rest.json
-
-
-def signature(*args, **kw):
- sig = wsme.signature(*args, **kw)
-
- def decorator(f):
- args = inspect.getargspec(f)[0]
- ismethod = args and args[0] == 'self'
- sig(f)
- funcdef = wsme.api.FunctionDefinition.get(f)
- funcdef.resolve_types(wsme.types.registry)
-
- @functools.wraps(f)
- def wrapper(*args, **kwargs):
- if ismethod:
- self, args = args[0], args[1:]
- args, kwargs = wsme.rest.args.get_args(
- funcdef, args, kwargs,
- flask.request.args, flask.request.form,
- flask.request.data,
- flask.request.mimetype
- )
-
- if funcdef.pass_request:
- kwargs[funcdef.pass_request] = flask.request
-
- dataformat = get_dataformat()
-
- try:
- if ismethod:
- args = [self] + list(args)
- result = f(*args, **kwargs)
-
- # NOTE: Support setting of status_code with default 20
- status_code = funcdef.status_code
- if isinstance(result, wsme.api.Response):
- status_code = result.status_code
- result = result.obj
-
- res = flask.make_response(
- dataformat.encode_result(
- result,
- funcdef.return_type
- )
- )
- res.mimetype = dataformat.content_type
- res.status_code = status_code
- except Exception:
- try:
- exception_info = sys.exc_info() or None
- orig_exception = exception_info[1]
- orig_code = getattr(orig_exception, 'code', None)
- data = wsme.api.format_exception(exception_info)
- finally:
- del exception_info
-
- res = flask.make_response(dataformat.encode_error(None, data))
- if orig_code and is_valid_code(orig_code):
- res.status_code = orig_code
- elif data['faultcode'].lower() == 'client':
- res.status_code = 400
- else:
- res.status_code = 500
- return res
-
- wrapper.wsme_func = f
- return wrapper
- return decorator
diff --git a/wsmeext/pecan.py b/wsmeext/pecan.py
deleted file mode 100644
index 9d63dde..0000000
--- a/wsmeext/pecan.py
+++ /dev/null
@@ -1,142 +0,0 @@
-from __future__ import absolute_import
-
-import functools
-import inspect
-import sys
-
-import wsme
-import wsme.rest.args
-import wsme.rest.json
-import wsme.rest.xml
-
-import pecan
-
-from wsme.utils import is_valid_code
-
-
-class JSonRenderer(object):
- @staticmethod
- def __init__(path, extra_vars):
- pass
-
- @staticmethod
- def render(template_path, namespace):
- if 'faultcode' in namespace:
- return wsme.rest.json.encode_error(None, namespace)
- return wsme.rest.json.encode_result(
- namespace['result'],
- namespace['datatype']
- )
-
-
-class XMLRenderer(object):
- @staticmethod
- def __init__(path, extra_vars):
- pass
-
- @staticmethod
- def render(template_path, namespace):
- if 'faultcode' in namespace:
- return wsme.rest.xml.encode_error(None, namespace)
- return wsme.rest.xml.encode_result(
- namespace['result'],
- namespace['datatype']
- )
-
-
-pecan.templating._builtin_renderers['wsmejson'] = JSonRenderer
-pecan.templating._builtin_renderers['wsmexml'] = XMLRenderer
-
-pecan_json_decorate = pecan.expose(
- template='wsmejson:',
- content_type='application/json',
- generic=False)
-pecan_xml_decorate = pecan.expose(
- template='wsmexml:',
- content_type='application/xml',
- generic=False
-)
-pecan_text_xml_decorate = pecan.expose(
- template='wsmexml:',
- content_type='text/xml',
- generic=False
-)
-
-
-def wsexpose(*args, **kwargs):
- sig = wsme.signature(*args, **kwargs)
-
- def decorate(f):
- sig(f)
- funcdef = wsme.api.FunctionDefinition.get(f)
- funcdef.resolve_types(wsme.types.registry)
-
- @functools.wraps(f)
- def callfunction(self, *args, **kwargs):
- return_type = funcdef.return_type
-
- try:
- args, kwargs = wsme.rest.args.get_args(
- funcdef, args, kwargs, pecan.request.params, None,
- pecan.request.body, pecan.request.content_type
- )
- if funcdef.pass_request:
- kwargs[funcdef.pass_request] = pecan.request
- result = f(self, *args, **kwargs)
-
- # NOTE: Support setting of status_code with default 201
- pecan.response.status = funcdef.status_code
- if isinstance(result, wsme.api.Response):
- pecan.response.status = result.status_code
-
- # NOTE(lucasagomes): If the return code is 204
- # (No Response) we have to make sure that we are not
- # returning anything in the body response and the
- # content-length is 0
- if result.status_code == 204:
- return_type = None
- elif not isinstance(result.return_type,
- wsme.types.UnsetType):
- return_type = result.return_type
-
- result = result.obj
-
- except Exception:
- try:
- exception_info = sys.exc_info()
- orig_exception = exception_info[1]
- orig_code = getattr(orig_exception, 'code', None)
- data = wsme.api.format_exception(
- exception_info,
- pecan.conf.get('wsme', {}).get('debug', False)
- )
- finally:
- del exception_info
-
- if orig_code and is_valid_code(orig_code):
- pecan.response.status = orig_code
- else:
- pecan.response.status = 500
-
- return data
-
- if return_type is None:
- pecan.request.pecan['content_type'] = None
- pecan.response.content_type = None
- return ''
-
- return dict(
- datatype=return_type,
- result=result
- )
-
- if 'xml' in funcdef.rest_content_types:
- pecan_xml_decorate(callfunction)
- pecan_text_xml_decorate(callfunction)
- if 'json' in funcdef.rest_content_types:
- pecan_json_decorate(callfunction)
- pecan.util._cfg(callfunction)['argspec'] = inspect.getargspec(f)
- callfunction._wsme_definition = funcdef
- return callfunction
-
- return decorate
diff --git a/wsmeext/soap/__init__.py b/wsmeext/soap/__init__.py
deleted file mode 100644
index 7a237ba..0000000
--- a/wsmeext/soap/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-from __future__ import absolute_import
-
-from wsmeext.soap.protocol import SoapProtocol
-
-__all__ = ['SoapProtocol']
diff --git a/wsmeext/soap/protocol.py b/wsmeext/soap/protocol.py
deleted file mode 100644
index 0d87af8..0000000
--- a/wsmeext/soap/protocol.py
+++ /dev/null
@@ -1,478 +0,0 @@
-"""
-A SOAP implementation for wsme.
-Parts of the code were taken from the tgwebservices soap implmentation.
-"""
-from __future__ import absolute_import
-
-import pkg_resources
-import datetime
-import decimal
-import base64
-import logging
-
-import six
-
-from wsmeext.soap.simplegeneric import generic
-from wsmeext.soap.wsdl import WSDLGenerator
-
-try:
- from lxml import etree as ET
- use_lxml = True
-except ImportError:
- from xml.etree import cElementTree as ET # noqa
- use_lxml = False
-
-from wsme.protocol import CallContext, Protocol, expose
-
-import wsme.types
-import wsme.runtime
-
-from wsme import exc
-from wsme.utils import parse_isodate, parse_isotime, parse_isodatetime
-
-log = logging.getLogger(__name__)
-
-xsd_ns = 'http://www.w3.org/2001/XMLSchema'
-xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance'
-soapenv_ns = 'http://schemas.xmlsoap.org/soap/envelope/'
-
-if not use_lxml:
- ET.register_namespace('soap', soapenv_ns)
-
-type_qn = '{%s}type' % xsi_ns
-nil_qn = '{%s}nil' % xsi_ns
-
-Envelope_qn = '{%s}Envelope' % soapenv_ns
-Body_qn = '{%s}Body' % soapenv_ns
-Fault_qn = '{%s}Fault' % soapenv_ns
-faultcode_qn = '{%s}faultcode' % soapenv_ns
-faultstring_qn = '{%s}faultstring' % soapenv_ns
-detail_qn = '{%s}detail' % soapenv_ns
-
-
-type_registry = {
- wsme.types.bytes: 'xs:string',
- wsme.types.text: 'xs:string',
- int: 'xs:int',
- float: "xs:float",
- bool: "xs:boolean",
- datetime.datetime: "xs:dateTime",
- datetime.date: "xs:date",
- datetime.time: "xs:time",
- decimal.Decimal: "xs:decimal",
- wsme.types.binary: "xs:base64Binary",
-}
-
-if not six.PY3:
- type_registry[long] = "xs:long" # noqa
-
-array_registry = {
- wsme.types.text: "String_Array",
- wsme.types.bytes: "String_Array",
- int: "Int_Array",
- float: "Float_Array",
- bool: "Boolean_Array",
-}
-
-if not six.PY3:
- array_registry[long] = "Long_Array" # noqa
-
-
-def soap_array(datatype, ns):
- if datatype.item_type in array_registry:
- name = array_registry[datatype.item_type]
- else:
- name = soap_type(datatype.item_type, False) + '_Array'
- if ns:
- name = 'types:' + name
- return name
-
-
-def soap_type(datatype, ns):
- name = None
- if wsme.types.isarray(datatype):
- return soap_array(datatype, ns)
- if wsme.types.isdict(datatype):
- return None
- if datatype in type_registry:
- stype = type_registry[datatype]
- if not ns:
- stype = stype[3:]
- return stype
- if wsme.types.iscomplex(datatype):
- name = datatype.__name__
- if name and ns:
- name = 'types:' + name
- return name
- if wsme.types.isusertype(datatype):
- return soap_type(datatype.basetype, ns)
-
-
-def soap_fname(path, funcdef):
- return "".join([path[0]] + [i.capitalize() for i in path[1:]])
-
-
-class SoapEncoder(object):
- def __init__(self, types_ns):
- self.types_ns = types_ns
-
- def make_soap_element(self, datatype, tag, value, xsitype=None):
- el = ET.Element(tag)
- if value is None:
- el.set(nil_qn, 'true')
- elif xsitype is not None:
- el.set(type_qn, xsitype)
- el.text = value
- elif wsme.types.isusertype(datatype):
- return self.tosoap(datatype.basetype, tag,
- datatype.tobasetype(value))
- elif wsme.types.iscomplex(datatype):
- el.set(type_qn, 'types:%s' % (datatype.__name__))
- for attrdef in wsme.types.list_attributes(datatype):
- attrvalue = getattr(value, attrdef.key)
- if attrvalue is not wsme.types.Unset:
- el.append(self.tosoap(
- attrdef.datatype,
- '{%s}%s' % (self.types_ns, attrdef.name),
- attrvalue
- ))
- else:
- el.set(type_qn, type_registry.get(datatype))
- if not isinstance(value, wsme.types.text):
- value = wsme.types.text(value)
- el.text = value
- return el
-
- @generic
- def tosoap(self, datatype, tag, value):
- """Converts a value into xml Element objects for inclusion in the SOAP
- response output (after adding the type to the type_registry).
-
- If a non-complex user specific type is to be used in the api,
- a specific toxml should be added::
-
- from wsme.protocol.soap import tosoap, make_soap_element, \
- type_registry
-
- class MySpecialType(object):
- pass
-
- type_registry[MySpecialType] = 'xs:MySpecialType'
-
- @tosoap.when_object(MySpecialType)
- def myspecialtype_tosoap(datatype, tag, value):
- return make_soap_element(datatype, tag, str(value))
- """
- return self.make_soap_element(datatype, tag, value)
-
- @tosoap.when_type(wsme.types.ArrayType)
- def array_tosoap(self, datatype, tag, value):
- el = ET.Element(tag)
- el.set(type_qn, soap_array(datatype, self.types_ns))
- if value is None:
- el.set(nil_qn, 'true')
- elif len(value) == 0:
- el.append(ET.Element('item'))
- else:
- for item in value:
- el.append(self.tosoap(datatype.item_type, 'item', item))
- return el
-
- @tosoap.when_object(bool)
- def bool_tosoap(self, datatype, tag, value):
- return self.make_soap_element(
- datatype,
- tag,
- 'true' if value is True else 'false' if value is False else None
- )
-
- @tosoap.when_object(wsme.types.bytes)
- def bytes_tosoap(self, datatype, tag, value):
- log.debug('(bytes_tosoap, %s, %s, %s, %s)', datatype,
- tag, value, type(value))
- if isinstance(value, wsme.types.bytes):
- value = value.decode('ascii')
- return self.make_soap_element(datatype, tag, value)
-
- @tosoap.when_object(datetime.datetime)
- def datetime_tosoap(self, datatype, tag, value):
- return self.make_soap_element(
- datatype,
- tag,
- value is not None and value.isoformat() or None
- )
-
- @tosoap.when_object(wsme.types.binary)
- def binary_tosoap(self, datatype, tag, value):
- log.debug("(%s, %s, %s)", datatype, tag, value)
- value = base64.encodestring(value) if value is not None else None
- if six.PY3:
- value = value.decode('ascii')
- return self.make_soap_element(
- datatype.basetype, tag, value, 'xs:base64Binary'
- )
-
- @tosoap.when_object(None)
- def None_tosoap(self, datatype, tag, value):
- return self.make_soap_element(datatype, tag, None)
-
-
-@generic
-def fromsoap(datatype, el, ns):
- """
- A generic converter from soap elements to python datatype.
-
- If a non-complex user specific type is to be used in the api,
- a specific fromsoap should be added.
- """
- if el.get(nil_qn) == 'true':
- return None
- if datatype in type_registry:
- value = datatype(el.text)
- elif wsme.types.isusertype(datatype):
- value = datatype.frombasetype(
- fromsoap(datatype.basetype, el, ns))
- else:
- value = datatype()
- for attr in wsme.types.list_attributes(datatype):
- child = el.find('{%s}%s' % (ns['type'], attr.name))
- if child is not None:
- setattr(value, attr.key, fromsoap(attr.datatype, child, ns))
- return value
-
-
-@fromsoap.when_type(wsme.types.ArrayType)
-def array_fromsoap(datatype, el, ns):
- if len(el) == 1:
- if datatype.item_type \
- not in wsme.types.pod_types + wsme.types.dt_types \
- and len(el[0]) == 0:
- return []
- return [fromsoap(datatype.item_type, child, ns) for child in el]
-
-
-@fromsoap.when_object(wsme.types.bytes)
-def bytes_fromsoap(datatype, el, ns):
- if el.get(nil_qn) == 'true':
- return None
- if el.get(type_qn) not in (None, 'xs:string'):
- raise exc.InvalidInput(el.tag, ET.tostring(el))
- return el.text.encode('ascii') if el.text else six.b('')
-
-
-@fromsoap.when_object(wsme.types.text)
-def text_fromsoap(datatype, el, ns):
- if el.get(nil_qn) == 'true':
- return None
- if el.get(type_qn) not in (None, 'xs:string'):
- raise exc.InvalidInput(el.tag, ET.tostring(el))
- return datatype(el.text if el.text else '')
-
-
-@fromsoap.when_object(bool)
-def bool_fromsoap(datatype, el, ns):
- if el.get(nil_qn) == 'true':
- return None
- if el.get(type_qn) not in (None, 'xs:boolean'):
- raise exc.InvalidInput(el.tag, ET.tostring(el))
- return el.text.lower() != 'false'
-
-
-@fromsoap.when_object(datetime.date)
-def date_fromsoap(datatype, el, ns):
- if el.get(nil_qn) == 'true':
- return None
- if el.get(type_qn) not in (None, 'xs:date'):
- raise exc.InvalidInput(el.tag, ET.tostring(el))
- return parse_isodate(el.text)
-
-
-@fromsoap.when_object(datetime.time)
-def time_fromsoap(datatype, el, ns):
- if el.get(nil_qn) == 'true':
- return None
- if el.get(type_qn) not in (None, 'xs:time'):
- raise exc.InvalidInput(el.tag, ET.tostring(el))
- return parse_isotime(el.text)
-
-
-@fromsoap.when_object(datetime.datetime)
-def datetime_fromsoap(datatype, el, ns):
- if el.get(nil_qn) == 'true':
- return None
- if el.get(type_qn) not in (None, 'xs:dateTime'):
- raise exc.InvalidInput(el.tag, ET.tostring(el))
- return parse_isodatetime(el.text)
-
-
-@fromsoap.when_object(wsme.types.binary)
-def binary_fromsoap(datatype, el, ns):
- if el.get(nil_qn) == 'true':
- return None
- if el.get(type_qn) not in (None, 'xs:base64Binary'):
- raise exc.InvalidInput(el.tag, ET.tostring(el))
- return base64.decodestring(el.text.encode('ascii'))
-
-
-class SoapProtocol(Protocol):
- """
- SOAP protocol.
-
- .. autoattribute:: name
- .. autoattribute:: content_types
- """
- name = 'soap'
- displayname = 'SOAP'
- content_types = ['application/soap+xml']
-
- ns = {
- "soap": "http://www.w3.org/2001/12/soap-envelope",
- "soapenv": "http://schemas.xmlsoap.org/soap/envelope/",
- "soapenc": "http://schemas.xmlsoap.org/soap/encoding/",
- }
-
- def __init__(self, tns=None, typenamespace=None, baseURL=None,
- servicename='MyApp'):
- self.tns = tns
- self.typenamespace = typenamespace
- self.servicename = servicename
- self.baseURL = baseURL
- self._name_mapping = {}
-
- self.encoder = SoapEncoder(typenamespace)
-
- def get_name_mapping(self, service=None):
- if service not in self._name_mapping:
- self._name_mapping[service] = dict(
- (soap_fname(path, f), path)
- for path, f in self.root.getapi()
- if service is None or (path and path[0] == service)
- )
- return self._name_mapping[service]
-
- def accept(self, req):
- for ct in self.content_types:
- if req.headers['Content-Type'].startswith(ct):
- return True
- if req.headers.get("Soapaction"):
- return True
- return False
-
- def iter_calls(self, request):
- yield CallContext(request)
-
- def extract_path(self, context):
- request = context.request
- el = ET.fromstring(request.body)
- body = el.find('{%(soapenv)s}Body' % self.ns)
- # Extract the service name from the tns
- message = list(body)[0]
- fname = message.tag
- if fname.startswith('{%s}' % self.typenamespace):
- fname = fname[len(self.typenamespace) + 2:]
- mapping = self.get_name_mapping()
- if fname not in mapping:
- raise exc.UnknownFunction(fname)
- path = mapping[fname]
- context.soap_message = message
- return path
- return None
-
- def read_arguments(self, context):
- kw = {}
- if not hasattr(context, 'soap_message'):
- return kw
- msg = context.soap_message
- for param in msg:
- # FIX for python2.6 (only for lxml)
- if use_lxml and isinstance(param, ET._Comment):
- continue
- name = param.tag[len(self.typenamespace) + 2:]
- arg = context.funcdef.get_arg(name)
- value = fromsoap(arg.datatype, param, {
- 'type': self.typenamespace,
- })
- kw[name] = value
- wsme.runtime.check_arguments(context.funcdef, (), kw)
- return kw
-
- def soap_response(self, path, funcdef, result):
- r = ET.Element('{%s}%sResponse' % (
- self.typenamespace, soap_fname(path, funcdef)
- ))
- log.debug('(soap_response, %s, %s)', funcdef.return_type, result)
- r.append(self.encoder.tosoap(
- funcdef.return_type, '{%s}result' % self.typenamespace, result
- ))
- return r
-
- def encode_result(self, context, result):
- log.debug('(encode_result, %s)', result)
- if use_lxml:
- envelope = ET.Element(
- Envelope_qn,
- nsmap={'xs': xsd_ns, 'types': self.typenamespace}
- )
- else:
- envelope = ET.Element(Envelope_qn, {
- 'xmlns:xs': xsd_ns,
- 'xmlns:types': self.typenamespace
- })
- body = ET.SubElement(envelope, Body_qn)
- body.append(self.soap_response(context.path, context.funcdef, result))
- s = ET.tostring(envelope)
- return s
-
- def get_template(self, name):
- return pkg_resources.resource_string(
- __name__, '%s.html' % name)
-
- def encode_error(self, context, infos):
- envelope = ET.Element(Envelope_qn)
- body = ET.SubElement(envelope, Body_qn)
- fault = ET.SubElement(body, Fault_qn)
- ET.SubElement(fault, faultcode_qn).text = infos['faultcode']
- ET.SubElement(fault, faultstring_qn).text = infos['faultstring']
- if 'debuginfo' in infos:
- ET.SubElement(fault, detail_qn).text = infos['debuginfo']
- s = ET.tostring(envelope)
- return s
-
- @expose('/api.wsdl', 'text/xml')
- def api_wsdl(self, service=None):
- if service is None:
- servicename = self.servicename
- else:
- servicename = self.servicename + service.capitalize()
- return WSDLGenerator(
- tns=self.tns,
- types_ns=self.typenamespace,
- soapenc=self.ns['soapenc'],
- service_name=servicename,
- complex_types=self.root.__registry__.complex_types,
- funclist=self.root.getapi(),
- arrays=self.root.__registry__.array_types,
- baseURL=self.baseURL,
- soap_array=soap_array,
- soap_type=soap_type,
- soap_fname=soap_fname,
- ).generate(True)
-
- def encode_sample_value(self, datatype, value, format=False):
- r = self.encoder.make_soap_element(datatype, 'value', value)
- if format:
- xml_indent(r)
- return ('xml', six.text_type(r))
-
-
-def xml_indent(elem, level=0):
- i = "\n" + level * " "
- if len(elem):
- if not elem.text or not elem.text.strip():
- elem.text = i + " "
- for e in elem:
- xml_indent(e, level + 1)
- if not e.tail or not e.tail.strip():
- e.tail = i
- if level and (not elem.tail or not elem.tail.strip()):
- elem.tail = i
diff --git a/wsmeext/soap/simplegeneric.py b/wsmeext/soap/simplegeneric.py
deleted file mode 100644
index 97c169b..0000000
--- a/wsmeext/soap/simplegeneric.py
+++ /dev/null
@@ -1,107 +0,0 @@
-import inspect
-
-__all__ = ["generic"]
-try:
- from types import ClassType, InstanceType
- classtypes = type, ClassType
-except ImportError:
- classtypes = type
- InstanceType = None
-
-
-def generic(func, argpos=None):
- """Create a simple generic function"""
-
- if argpos is None:
- if hasattr(func, 'argpos'):
- argpos = func.argpos
- else:
- argnames = inspect.getargspec(func)[0]
- if argnames and argnames[0] == 'self':
- argpos = 1
- else:
- argpos = 0
-
- _sentinel = object()
-
- def _by_class(*args, **kw):
- cls = args[argpos].__class__
- for t in type(cls.__name__, (cls, object), {}).__mro__:
- f = _gbt(t, _sentinel)
- if f is not _sentinel:
- return f(*args, **kw)
- else:
- return func(*args, **kw)
-
- _by_type = {object: func, InstanceType: _by_class}
- _gbt = _by_type.get
-
- def when_type(*types):
- """Decorator to add a method that will be called for the given types"""
- for t in types:
- if not isinstance(t, classtypes):
- raise TypeError(
- "%r is not a type or class" % (t,)
- )
-
- def decorate(f):
- for t in types:
- if _by_type.setdefault(t, f) is not f:
- raise TypeError(
- "%r already has method for type %r" % (func, t)
- )
- return f
- return decorate
-
- _by_object = {}
- _gbo = _by_object.get
-
- def when_object(*obs):
- """Decorator to add a method to be called for the given object(s)"""
- def decorate(f):
- for o in obs:
- if _by_object.setdefault(id(o), (o, f))[1] is not f:
- raise TypeError(
- "%r already has method for object %r" % (func, o)
- )
- return f
- return decorate
-
- def dispatch(*args, **kw):
- f = _gbo(id(args[argpos]), _sentinel)
- if f is _sentinel:
- for t in type(args[argpos]).__mro__:
- f = _gbt(t, _sentinel)
- if f is not _sentinel:
- return f(*args, **kw)
- else:
- return func(*args, **kw)
- else:
- return f[1](*args, **kw)
-
- dispatch.__name__ = func.__name__
- dispatch.__dict__ = func.__dict__.copy()
- dispatch.__doc__ = func.__doc__
- dispatch.__module__ = func.__module__
-
- dispatch.when_type = when_type
- dispatch.when_object = when_object
- dispatch.default = func
- dispatch.has_object = lambda o: id(o) in _by_object
- dispatch.has_type = lambda t: t in _by_type
- dispatch.argpos = argpos
- return dispatch
-
-
-def test_suite():
- import doctest
- return doctest.DocFileSuite(
- 'README.txt',
- optionflags=doctest.ELLIPSIS | doctest.REPORT_ONLY_FIRST_FAILURE,
- )
-
-
-if __name__ == '__main__':
- import unittest
- r = unittest.TextTestRunner()
- r.run(test_suite())
diff --git a/wsmeext/soap/wsdl.py b/wsmeext/soap/wsdl.py
deleted file mode 100644
index b60aff4..0000000
--- a/wsmeext/soap/wsdl.py
+++ /dev/null
@@ -1,297 +0,0 @@
-import six
-import wsme.types
-
-try:
- from lxml import etree as ET
- use_lxml = True
-except ImportError:
- from xml.etree import cElementTree as ET # noqa
- use_lxml = False
-
-
-def xml_tostring(el, pretty_print=False):
- if use_lxml:
- return ET.tostring(el, pretty_print=pretty_print)
- return ET.tostring(el)
-
-
-class NS(object):
- def __init__(self, url):
- self.url = url
-
- def __call__(self, name):
- return self.qn(name)
-
- def __str__(self):
- return self.url
-
- def qn(self, name):
- return '{%s}%s' % (self.url, name)
-
-
-wsdl_ns = NS("http://schemas.xmlsoap.org/wsdl/")
-soap_ns = NS("http://schemas.xmlsoap.org/wsdl/soap/")
-xs_ns = NS("http://www.w3.org/2001/XMLSchema")
-soapenc_ns = NS("http://schemas.xmlsoap.org/soap/encoding/")
-
-
-class WSDLGenerator(object):
- def __init__(
- self,
- tns,
- types_ns,
- soapenc,
- service_name,
- complex_types,
- funclist,
- arrays,
- baseURL,
- soap_array,
- soap_type,
- soap_fname):
-
- self.tns = NS(tns)
- self.types_ns = NS(types_ns)
- self.soapenc = soapenc
- self.service_name = service_name
- self.complex_types = complex_types
- self.funclist = funclist
- self.arrays = arrays
- self.baseURL = baseURL or ''
- self.soap_array = soap_array
- self.soap_fname = soap_fname
- self.soap_type = soap_type
-
- def gen_complex_type(self, cls):
- complexType = ET.Element(xs_ns('complexType'))
- complexType.set('name', cls.__name__)
- sequence = ET.SubElement(complexType, xs_ns('sequence'))
- for attrdef in wsme.types.list_attributes(cls):
- soap_type = self.soap_type(attrdef.datatype, str(self.types_ns))
- if soap_type is None:
- continue
- element = ET.SubElement(sequence, xs_ns('element'))
- element.set('name', attrdef.name)
- element.set('type', soap_type)
- element.set('minOccurs', '1' if attrdef.mandatory else '0')
- element.set('maxOccurs', '1')
- return complexType
-
- def gen_array(self, array):
- complexType = ET.Element(xs_ns('complexType'))
- complexType.set('name', self.soap_array(array, False))
- ET.SubElement(
- ET.SubElement(complexType, xs_ns('sequence')),
- xs_ns('element'),
- name='item',
- maxOccurs='unbounded',
- nillable='true',
- type=self.soap_type(array.item_type, self.types_ns)
- )
- return complexType
-
- def gen_function_types(self, path, funcdef):
- args_el = ET.Element(
- xs_ns('element'),
- name=self.soap_fname(path, funcdef)
- )
-
- sequence = ET.SubElement(
- ET.SubElement(args_el, xs_ns('complexType')),
- xs_ns('sequence')
- )
-
- for farg in funcdef.arguments:
- t = self.soap_type(farg.datatype, True)
- if t is None:
- continue
- element = ET.SubElement(
- sequence, xs_ns('element'),
- name=farg.name,
- type=self.soap_type(farg.datatype, True)
- )
- if not farg.mandatory:
- element.set('minOccurs', '0')
-
- response_el = ET.Element(
- xs_ns('element'),
- name=self.soap_fname(path, funcdef) + 'Response'
- )
- element = ET.SubElement(
- ET.SubElement(
- ET.SubElement(
- response_el,
- xs_ns('complexType')
- ),
- xs_ns('sequence')
- ),
- xs_ns('element'),
- name='result'
- )
- return_soap_type = self.soap_type(funcdef.return_type, True)
- if return_soap_type is not None:
- element.set('type', return_soap_type)
-
- return args_el, response_el
-
- def gen_types(self):
- types = ET.Element(wsdl_ns('types'))
- schema = ET.SubElement(types, xs_ns('schema'))
- schema.set('elementFormDefault', 'qualified')
- schema.set('targetNamespace', str(self.types_ns))
- for cls in self.complex_types:
- schema.append(self.gen_complex_type(cls))
- for array in self.arrays:
- schema.append(self.gen_array(array))
- for path, funcdef in self.funclist:
- schema.extend(self.gen_function_types(path, funcdef))
- return types
-
- def gen_functions(self):
- messages = []
-
- binding = ET.Element(
- wsdl_ns('binding'),
- name='%s_Binding' % self.service_name,
- type='tns:%s_PortType' % self.service_name
- )
- ET.SubElement(
- binding,
- soap_ns('binding'),
- style='document',
- transport='http://schemas.xmlsoap.org/soap/http'
- )
-
- portType = ET.Element(
- wsdl_ns('portType'),
- name='%s_PortType' % self.service_name
- )
-
- for path, funcdef in self.funclist:
- soap_fname = self.soap_fname(path, funcdef)
-
- # message
- req_message = ET.Element(
- wsdl_ns('message'),
- name=soap_fname + 'Request',
- xmlns=str(self.types_ns)
- )
- ET.SubElement(
- req_message,
- wsdl_ns('part'),
- name='parameters',
- element='types:%s' % soap_fname
- )
- messages.append(req_message)
-
- res_message = ET.Element(
- wsdl_ns('message'),
- name=soap_fname + 'Response',
- xmlns=str(self.types_ns)
- )
- ET.SubElement(
- res_message,
- wsdl_ns('part'),
- name='parameters',
- element='types:%sResponse' % soap_fname
- )
- messages.append(res_message)
-
- # portType/operation
- operation = ET.SubElement(
- portType,
- wsdl_ns('operation'),
- name=soap_fname
- )
- if funcdef.doc:
- ET.SubElement(
- operation,
- wsdl_ns('documentation')
- ).text = funcdef.doc
- ET.SubElement(
- operation, wsdl_ns('input'),
- message='tns:%sRequest' % soap_fname
- )
- ET.SubElement(
- operation, wsdl_ns('output'),
- message='tns:%sResponse' % soap_fname
- )
-
- # binding/operation
- operation = ET.SubElement(
- binding,
- wsdl_ns('operation'),
- name=soap_fname
- )
- ET.SubElement(
- operation,
- soap_ns('operation'),
- soapAction=soap_fname
- )
- ET.SubElement(
- ET.SubElement(
- operation,
- wsdl_ns('input')
- ),
- soap_ns('body'),
- use='literal'
- )
- ET.SubElement(
- ET.SubElement(
- operation,
- wsdl_ns('output')
- ),
- soap_ns('body'),
- use='literal'
- )
-
- return messages + [portType, binding]
-
- def gen_service(self):
- service = ET.Element(wsdl_ns('service'), name=self.service_name)
- ET.SubElement(
- service,
- wsdl_ns('documentation')
- ).text = six.u('WSDL File for %s') % self.service_name
- ET.SubElement(
- ET.SubElement(
- service,
- wsdl_ns('port'),
- binding='tns:%s_Binding' % self.service_name,
- name='%s_PortType' % self.service_name
- ),
- soap_ns('address'),
- location=self.baseURL
- )
-
- return service
-
- def gen_definitions(self):
- attrib = {
- 'name': self.service_name,
- 'targetNamespace': str(self.tns)
- }
- if use_lxml:
- definitions = ET.Element(
- wsdl_ns('definitions'),
- attrib=attrib,
- nsmap={
- 'xs': str(xs_ns),
- 'soap': str(soap_ns),
- 'types': str(self.types_ns),
- 'tns': str(self.tns)
- }
- )
- else:
- definitions = ET.Element(wsdl_ns('definitions'), **attrib)
- definitions.set('xmlns:types', str(self.types_ns))
- definitions.set('xmlns:tns', str(self.tns))
-
- definitions.set('name', self.service_name)
- definitions.append(self.gen_types())
- definitions.extend(self.gen_functions())
- definitions.append(self.gen_service())
- return definitions
-
- def generate(self, format=False):
- return xml_tostring(self.gen_definitions(), pretty_print=format)
diff --git a/wsmeext/sphinxext.py b/wsmeext/sphinxext.py
deleted file mode 100644
index 7c45a0f..0000000
--- a/wsmeext/sphinxext.py
+++ /dev/null
@@ -1,600 +0,0 @@
-import inspect
-import re
-import sys
-
-import six
-
-from sphinx import addnodes
-from sphinx.ext import autodoc
-from sphinx.domains.python import PyClasslike, PyClassmember
-from sphinx.domains import Domain, ObjType
-from sphinx.directives import ObjectDescription
-from sphinx.util.docfields import Field
-from sphinx.util.nodes import make_refnode
-
-from sphinx.roles import XRefRole
-from sphinx.locale import l_, _
-
-from docutils.parsers.rst import Directive
-from docutils.parsers.rst import directives
-
-import wsme
-import wsme.types
-import wsme.rest.json
-import wsme.rest.xml
-
-field_re = re.compile(r':(?P<field>\w+)(\s+(?P<name>\w+))?:')
-
-
-def datatypename(datatype):
- if isinstance(datatype, wsme.types.UserType):
- return datatype.name
- if isinstance(datatype, wsme.types.DictType):
- return 'dict(%s: %s)' % (datatypename(datatype.key_type),
- datatypename(datatype.value_type))
- if isinstance(datatype, wsme.types.ArrayType):
- return 'list(%s)' % datatypename(datatype.item_type)
- return datatype.__name__
-
-
-def make_sample_object(datatype):
- if datatype is wsme.types.bytes:
- return six.b('samplestring')
- if datatype is wsme.types.text:
- return u'sample unicode'
- if datatype is int:
- return 5
- sample_obj = getattr(datatype, 'sample', datatype)()
- return sample_obj
-
-
-def get_protocols(names):
- names = list(names)
- protocols = []
- if 'rest' in names:
- names.remove('rest')
- protocols.extend('restjson', 'restxml')
- if 'restjson' in names:
- names.remove('restjson')
- protocols.append(('Json', wsme.rest.json))
- if 'restxml' in names:
- names.remove('restxml')
- protocols.append(('XML', wsme.rest.xml))
- for name in names:
- p = wsme.protocol.getprotocol(name)
- protocols.append((p.displayname or p.name, p))
- return protocols
-
-
-class SampleType(object):
- """A Sample Type"""
-
- #: A Int
- aint = int
-
- def __init__(self, aint=None):
- if aint:
- self.aint = aint
-
- @classmethod
- def sample(cls):
- return cls(10)
-
-
-class SampleService(wsme.WSRoot):
- @wsme.expose(SampleType)
- @wsme.validate(SampleType, int, str)
- def change_aint(data, aint, dummy='useless'):
- """
- :param aint: The new value
-
- :return: The data object with its aint field value changed.
- """
- data.aint = aint
- return data
-
-
-def getroot(env, force=False):
- root = env.temp_data.get('wsme:root')
- if not force and root:
- return root
- rootpath = env.temp_data.get('wsme:rootpath', env.app.config.wsme_root)
-
- if rootpath is None:
- return None
-
- modname, classname = rootpath.rsplit('.', 1)
- __import__(modname)
- module = sys.modules[modname]
- root = getattr(module, classname)
- env.temp_data['wsme:root'] = root
- return root
-
-
-def scan_services(service, path=[]):
- has_functions = False
- for name in dir(service):
- if name.startswith('_'):
- continue
- a = getattr(service, name)
- if inspect.ismethod(a):
- if hasattr(a, '_wsme_definition'):
- has_functions = True
- if inspect.isclass(a):
- continue
- if len(path) > wsme.rest.APIPATH_MAXLEN:
- raise ValueError("Path is too long: " + str(path))
- for value in scan_services(a, path + [name]):
- yield value
- if has_functions:
- yield service, path
-
-
-def find_service_path(env, service):
- root = getroot(env)
- if service == root:
- return []
- for s, path in scan_services(root):
- if s == service:
- return path
- return None
-
-
-class TypeDirective(PyClasslike):
- def get_index_text(self, modname, name_cls):
- return _('%s (webservice type)') % name_cls[0]
-
- def add_target_and_index(self, name_cls, sig, signode):
- ret = super(TypeDirective, self).add_target_and_index(
- name_cls, sig, signode
- )
- name = name_cls[0]
- types = self.env.domaindata['wsme']['types']
- if name in types:
- self.state_machine.reporter.warning(
- 'duplicate type description of %s ' % name)
- types[name] = self.env.docname
- return ret
-
-
-class AttributeDirective(PyClassmember):
- doc_field_types = [
- Field('datatype', label=l_('Type'), has_arg=False,
- names=('type', 'datatype'))
- ]
-
-
-def check_samples_slot(value):
- """Validate the samples_slot option to the TypeDocumenter.
-
- Valid positions are 'before-docstring' and
- 'after-docstring'. Using the explicit 'none' disables sample
- output. The default is after-docstring.
- """
- if not value:
- return 'after-docstring'
- val = directives.choice(
- value,
- ('none', # do not include
- 'before-docstring', # show samples then docstring
- 'after-docstring', # show docstring then samples
- ))
- return val
-
-
-class TypeDocumenter(autodoc.ClassDocumenter):
- objtype = 'type'
- directivetype = 'type'
- domain = 'wsme'
-
- required_arguments = 1
- default_samples_slot = 'after-docstring'
-
- option_spec = dict(
- autodoc.ClassDocumenter.option_spec,
- **{'protocols': lambda l: [v.strip() for v in l.split(',')],
- 'samples-slot': check_samples_slot,
- })
-
- @staticmethod
- def can_document_member(member, membername, isattr, parent):
- # we don't want to be automaticaly used
- # TODO check if the member is registered an an exposed type
- return False
-
- def format_name(self):
- return self.object.__name__
-
- def format_signature(self):
- return u''
-
- def add_directive_header(self, sig):
- super(TypeDocumenter, self).add_directive_header(sig)
- # remove the :module: option that was added by ClassDocumenter
- result_len = len(self.directive.result)
- for index, item in zip(reversed(range(result_len)),
- reversed(self.directive.result)):
- if ':module:' in item:
- self.directive.result.pop(index)
-
- def import_object(self):
- if super(TypeDocumenter, self).import_object():
- wsme.types.register_type(self.object)
- return True
- else:
- return False
-
- def add_content(self, more_content, no_docstring=False):
- # Check where to include the samples
- samples_slot = self.options.samples_slot or self.default_samples_slot
-
- def add_docstring():
- super(TypeDocumenter, self).add_content(
- more_content, no_docstring)
-
- def add_samples():
- protocols = get_protocols(
- self.options.protocols or self.env.app.config.wsme_protocols
- )
- content = []
- if protocols:
- sample_obj = make_sample_object(self.object)
- content.extend([
- l_(u'Data samples:'),
- u'',
- u'.. cssclass:: toggle',
- u''
- ])
- for name, protocol in protocols:
- language, sample = protocol.encode_sample_value(
- self.object, sample_obj, format=True)
- content.extend([
- name,
- u' .. code-block:: ' + language,
- u'',
- ])
- content.extend(
- u' ' * 8 + line
- for line in six.text_type(sample).split('\n'))
- for line in content:
- self.add_line(line, u'<wsmeext.sphinxext')
-
- self.add_line(u'', '<wsmeext.sphinxext>')
-
- if samples_slot == 'after-docstring':
- add_docstring()
- add_samples()
- elif samples_slot == 'before-docstring':
- add_samples()
- add_docstring()
- else:
- add_docstring()
-
-
-class AttributeDocumenter(autodoc.AttributeDocumenter):
- datatype = None
- domain = 'wsme'
-
- @staticmethod
- def can_document_member(member, membername, isattr, parent):
- return isinstance(parent, TypeDocumenter)
-
- def import_object(self):
- success = super(AttributeDocumenter, self).import_object()
- if success:
- self.datatype = self.object.datatype
- return success
-
- def add_content(self, more_content, no_docstring=False):
- self.add_line(
- u':type: %s' % datatypename(self.datatype),
- '<wsmeext.sphinxext>'
- )
- self.add_line(u'', '<wsmeext.sphinxext>')
- super(AttributeDocumenter, self).add_content(
- more_content, no_docstring)
-
- def add_directive_header(self, sig):
- super(AttributeDocumenter, self).add_directive_header(sig)
-
-
-class RootDirective(Directive):
- """
- This directive is to tell what class is the Webservice root
- """
- has_content = False
- required_arguments = 1
- optional_arguments = 0
- final_argument_whitespace = False
- option_spec = {
- 'webpath': directives.unchanged
- }
-
- def run(self):
- env = self.state.document.settings.env
- rootpath = self.arguments[0].strip()
- env.temp_data['wsme:rootpath'] = rootpath
- if 'wsme:root' in env.temp_data:
- del env.temp_data['wsme:root']
- if 'webpath' in self.options:
- env.temp_data['wsme:webpath'] = self.options['webpath']
- return []
-
-
-class ServiceDirective(ObjectDescription):
- name = 'service'
-
- optional_arguments = 1
-
- def handle_signature(self, sig, signode):
- path = sig.split('/')
-
- namespace = '/'.join(path[:-1])
- if namespace and not namespace.endswith('/'):
- namespace += '/'
-
- servicename = path[-1]
-
- if not namespace and not servicename:
- servicename = '/'
-
- signode += addnodes.desc_annotation('service ', 'service ')
-
- if namespace:
- signode += addnodes.desc_addname(namespace, namespace)
-
- signode += addnodes.desc_name(servicename, servicename)
-
- return sig
-
-
-class ServiceDocumenter(autodoc.ClassDocumenter):
- domain = 'wsme'
- objtype = 'service'
- directivetype = 'service'
-
- def add_directive_header(self, sig):
- super(ServiceDocumenter, self).add_directive_header(sig)
- # remove the :module: option that was added by ClassDocumenter
- result_len = len(self.directive.result)
- for index, item in zip(reversed(range(result_len)),
- reversed(self.directive.result)):
- if ':module:' in item:
- self.directive.result.pop(index)
-
- def format_signature(self):
- return u''
-
- def format_name(self):
- path = find_service_path(self.env, self.object)
- if path is None:
- return
- return '/' + '/'.join(path)
-
-
-class FunctionDirective(PyClassmember):
- name = 'function'
- objtype = 'function'
-
- def get_signature_prefix(self, sig):
- return 'function '
-
-
-def document_function(funcdef, docstrings=None, protocols=['restjson']):
- """A helper function to complete a function documentation with return and
- parameter types"""
- # If the function doesn't have a docstring, add an empty list
- # so the default behaviors below work correctly.
- if not docstrings:
- docstrings = [[]]
- found_params = set()
-
- for si, docstring in enumerate(docstrings):
- for i, line in enumerate(docstring):
- m = field_re.match(line)
- if m and m.group('field') == 'param':
- found_params.add(m.group('name'))
-
- next_param_pos = (0, 0)
-
- for arg in funcdef.arguments:
- content = [
- u':type %s: :wsme:type:`%s`' % (
- arg.name, datatypename(arg.datatype))
- ]
- if arg.name not in found_params:
- content.insert(0, u':param %s: ' % (arg.name))
- pos = next_param_pos
- else:
- for si, docstring in enumerate(docstrings):
- for i, line in enumerate(docstring):
- m = field_re.match(line)
- if m and m.group('field') == 'param' \
- and m.group('name') == arg.name:
- pos = (si, i + 1)
- break
- docstring = docstrings[pos[0]]
- docstring[pos[1]:pos[1]] = content
- next_param_pos = (pos[0], pos[1] + len(content))
-
- if funcdef.return_type:
- content = [
- u':rtype: %s' % datatypename(funcdef.return_type)
- ]
- pos = None
- for si, docstring in enumerate(docstrings):
- for i, line in enumerate(docstring):
- m = field_re.match(line)
- if m and m.group('field') == 'return':
- pos = (si, i + 1)
- break
- else:
- pos = next_param_pos
- docstring = docstrings[pos[0]]
- docstring[pos[1]:pos[1]] = content
-
- codesamples = []
-
- if protocols:
- params = []
- for arg in funcdef.arguments:
- params.append((
- arg.name,
- arg.datatype,
- make_sample_object(arg.datatype)
- ))
- codesamples.extend([
- u':%s:' % l_(u'Parameters samples'),
- u' .. cssclass:: toggle',
- u''
- ])
- for name, protocol in protocols:
- language, sample = protocol.encode_sample_params(
- params, format=True)
- codesamples.extend([
- u' ' * 4 + name,
- u' .. code-block:: ' + language,
- u'',
- ])
- codesamples.extend((
- u' ' * 12 + line
- for line in six.text_type(sample).split('\n')
- ))
-
- if funcdef.return_type:
- codesamples.extend([
- u':%s:' % l_(u'Return samples'),
- u' .. cssclass:: toggle',
- u''
- ])
- sample_obj = make_sample_object(funcdef.return_type)
- for name, protocol in protocols:
- language, sample = protocol.encode_sample_result(
- funcdef.return_type, sample_obj, format=True)
- codesamples.extend([
- u' ' * 4 + name,
- u' .. code-block:: ' + language,
- u'',
- ])
- codesamples.extend((
- u' ' * 12 + line
- for line in six.text_type(sample).split('\n')
- ))
-
- docstrings[0:0] = [codesamples]
- return docstrings
-
-
-class FunctionDocumenter(autodoc.MethodDocumenter):
- domain = 'wsme'
- directivetype = 'function'
- objtype = 'function'
- priority = 1
-
- option_spec = {
- 'path': directives.unchanged,
- 'method': directives.unchanged
- }
-
- @staticmethod
- def can_document_member(member, membername, isattr, parent):
- return (isinstance(parent, ServiceDocumenter) and
- wsme.api.iswsmefunction(member))
-
- def import_object(self):
- ret = super(FunctionDocumenter, self).import_object()
- self.directivetype = 'function'
- self.wsme_fd = wsme.api.FunctionDefinition.get(self.object)
- self.retann = datatypename(self.wsme_fd.return_type)
- return ret
-
- def format_args(self):
- args = [arg.name for arg in self.wsme_fd.arguments]
- defaults = [
- arg.default
- for arg in self.wsme_fd.arguments if not arg.mandatory
- ]
- return inspect.formatargspec(args, defaults=defaults)
-
- def get_doc(self, encoding=None):
- """Inject the type and param fields into the docstrings so that the
- user can add its own param fields to document the parameters"""
- docstrings = super(FunctionDocumenter, self).get_doc(encoding)
-
- protocols = get_protocols(
- self.options.protocols or self.env.app.config.wsme_protocols
- )
-
- return document_function(
- self.wsme_fd, docstrings, protocols
- )
-
- def add_content(self, more_content, no_docstring=False):
- super(FunctionDocumenter, self).add_content(more_content, no_docstring)
-
- def format_name(self):
- return self.wsme_fd.name
-
- def add_directive_header(self, sig):
- super(FunctionDocumenter, self).add_directive_header(sig)
- # remove the :module: option that was added by ClassDocumenter
- result_len = len(self.directive.result)
- for index, item in zip(reversed(range(result_len)),
- reversed(self.directive.result)):
- if ':module:' in item:
- self.directive.result.pop(index)
-
-
-class WSMEDomain(Domain):
- name = 'wsme'
- label = 'WSME'
-
- object_types = {
- 'type': ObjType(l_('type'), 'type', 'obj'),
- 'service': ObjType(l_('service'), 'service', 'obj')
- }
-
- directives = {
- 'type': TypeDirective,
- 'attribute': AttributeDirective,
- 'service': ServiceDirective,
- 'root': RootDirective,
- 'function': FunctionDirective,
- }
-
- roles = {
- 'type': XRefRole()
- }
-
- initial_data = {
- 'types': {}, # fullname -> docname
- }
-
- def clear_doc(self, docname):
- keys = list(self.data['types'].keys())
- for key in keys:
- value = self.data['types'][key]
- if value == docname:
- del self.data['types'][key]
-
- def resolve_xref(self, env, fromdocname, builder,
- type, target, node, contnode):
- if target not in self.data['types']:
- return None
- todocname = self.data['types'][target]
- return make_refnode(
- builder, fromdocname, todocname, target, contnode, target)
-
-
-def setup(app):
- app.add_domain(WSMEDomain)
- app.add_autodocumenter(TypeDocumenter)
- app.add_autodocumenter(AttributeDocumenter)
- app.add_autodocumenter(ServiceDocumenter)
- app.add_autodocumenter(FunctionDocumenter)
-
- app.add_config_value('wsme_root', None, 'env')
- app.add_config_value('wsme_webpath', '/', 'env')
- app.add_config_value('wsme_protocols', ['restjson', 'restxml'], 'env')
- app.add_javascript('toggle.js')
- app.add_stylesheet('toggle.css')
diff --git a/wsmeext/sqlalchemy/__init__.py b/wsmeext/sqlalchemy/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wsmeext/sqlalchemy/__init__.py
+++ /dev/null
diff --git a/wsmeext/sqlalchemy/controllers.py b/wsmeext/sqlalchemy/controllers.py
deleted file mode 100644
index 504a46a..0000000
--- a/wsmeext/sqlalchemy/controllers.py
+++ /dev/null
@@ -1,97 +0,0 @@
-from wsme.rest import expose, validate
-import wsme.types
-
-from wsmeext.sqlalchemy.types import SQLAlchemyRegistry
-
-
-class CRUDControllerMeta(type):
- def __init__(cls, name, bases, dct):
- if cls.__saclass__ is not None:
- if cls.__registry__ is None:
- cls.__registry__ = wsme.types.registry
- if cls.__wstype__ is None:
- cls.__wstype__ = cls.__registry__.resolve_type(
- SQLAlchemyRegistry.get(
- cls.__registry__).getdatatype(cls.__saclass__))
-
- cls.create = expose(
- cls.__wstype__,
- method='PUT',
- wrap=True
- )(cls.create)
- cls.create = validate(cls.__wstype__)(cls.create)
-
- cls.read = expose(
- cls.__wstype__,
- method='GET',
- wrap=True
- )(cls.read)
- cls.read = validate(cls.__wstype__)(cls.read)
-
- cls.update = expose(
- cls.__wstype__,
- method='POST',
- wrap=True
- )(cls.update)
- cls.update = validate(cls.__wstype__)(cls.update)
-
- cls.delete = expose(
- method='DELETE',
- wrap=True
- )(cls.delete)
- cls.delete = validate(cls.__wstype__)(cls.delete)
-
- super(CRUDControllerMeta, cls).__init__(name, bases, dct)
-
-
-class CRUDControllerBase(object):
- __registry__ = None
- __saclass__ = None
- __wstype__ = None
- __dbsession__ = None
-
- def _create_one(self, data):
- obj = self.__saclass__()
- data.to_instance(obj)
- self.__dbsession__.add(obj)
- return obj
-
- def _get_one(self, ref):
- q = self.__dbsession__.query(self.__saclass__)
- q = q.filter(ref.get_ref_criterion())
- return q.one()
-
- def _update_one(self, data):
- obj = self._get_one(data)
- if obj is None:
- raise ValueError("No match for data=%s" % data)
- data.to_instance(obj)
- return obj
-
- def _delete(self, ref):
- obj = self._get_one(ref)
- self.__dbsession__.delete(obj)
-
- def create(self, data):
- obj = self._create_one(data)
- self.__dbsession__.flush()
- return self.__wstype__(obj)
-
- def read(self, ref):
- obj = self._get_one(ref)
- return self.__wstype__(obj)
-
- def update(self, data):
- obj = self._update_one(data)
- self.__dbsession__.flush()
- return self.__wstype__(obj)
-
- def delete(self, ref):
- self._delete(ref)
- self.__dbsession__.flush()
- return None
-
-
-CRUDController = CRUDControllerMeta(
- 'CRUDController', (CRUDControllerBase,), {}
-)
diff --git a/wsmeext/sqlalchemy/types.py b/wsmeext/sqlalchemy/types.py
deleted file mode 100644
index 414bf52..0000000
--- a/wsmeext/sqlalchemy/types.py
+++ /dev/null
@@ -1,200 +0,0 @@
-import datetime
-import decimal
-import logging
-
-import six
-
-from sqlalchemy.orm import class_mapper
-from sqlalchemy.orm.properties import ColumnProperty, RelationProperty
-
-import sqlalchemy.types
-
-import wsme.types
-
-log = logging.getLogger(__name__)
-
-
-class SQLAlchemyRegistry(object):
- @classmethod
- def get(cls, registry):
- if not hasattr(registry, 'sqlalchemy'):
- registry.sqlalchemy = cls()
- return registry.sqlalchemy
-
- def __init__(self):
- self.types = {}
- self.satypeclasses = {
- sqlalchemy.types.Integer: int,
- sqlalchemy.types.Boolean: bool,
- sqlalchemy.types.Float: float,
- sqlalchemy.types.Numeric: decimal.Decimal,
- sqlalchemy.types.Date: datetime.date,
- sqlalchemy.types.Time: datetime.time,
- sqlalchemy.types.DateTime: datetime.datetime,
- sqlalchemy.types.String: wsme.types.text,
- sqlalchemy.types.Unicode: wsme.types.text,
- }
-
- def getdatatype(self, sadatatype):
- if sadatatype.__class__ in self.satypeclasses:
- return self.satypeclasses[sadatatype.__class__]
- elif sadatatype in self.types:
- return self.types[sadatatype]
- else:
- return sadatatype.__name__
-
-
-def register_saclass(registry, saclass, typename=None):
- """Associate a webservice type name to a SQLAlchemy mapped class.
- The default typename if the saclass name itself.
- """
- if typename is None:
- typename = saclass.__name__
-
- SQLAlchemyRegistry.get(registry).types[saclass] = typename
-
-
-class wsattr(wsme.types.wsattr):
- def __init__(self, datatype, saproperty=None, **kw):
- super(wsattr, self).__init__(datatype, **kw)
- self.saname = saproperty.key
- self.saproperty = saproperty
- self.isrelation = isinstance(saproperty, RelationProperty)
-
-
-def make_wsattr(registry, saproperty):
- datatype = None
- if isinstance(saproperty, ColumnProperty):
- if len(saproperty.columns) > 1:
- log.warning("Cannot handle multi-column ColumnProperty")
- return None
- datatype = SQLAlchemyRegistry.get(registry).getdatatype(
- saproperty.columns[0].type)
- elif isinstance(saproperty, RelationProperty):
- other_saclass = saproperty.mapper.class_
- datatype = SQLAlchemyRegistry.get(registry).getdatatype(other_saclass)
- if saproperty.uselist:
- datatype = [datatype]
- else:
- log.warning("Don't know how to handle %s attributes" %
- saproperty.__class__)
-
- if datatype:
- return wsattr(datatype, saproperty)
-
-
-class BaseMeta(wsme.types.BaseMeta):
- def __new__(cls, name, bases, dct):
- if '__registry__' not in dct:
- dct['__registry__'] = wsme.types.registry
- return type.__new__(cls, name, bases, dct)
-
- def __init__(cls, name, bases, dct):
- saclass = getattr(cls, '__saclass__', None)
- if saclass:
- mapper = class_mapper(saclass)
- cls._pkey_attrs = []
- cls._ref_attrs = []
- for prop in mapper.iterate_properties:
- key = prop.key
- if hasattr(cls, key):
- continue
- if key.startswith('_'):
- continue
- attr = make_wsattr(cls.__registry__, prop)
- if attr is not None:
- setattr(cls, key, attr)
-
- if attr and isinstance(prop, ColumnProperty) and \
- prop.columns[0] in mapper.primary_key:
- cls._pkey_attrs.append(attr)
- cls._ref_attrs.append(attr)
-
- register_saclass(cls.__registry__, cls.__saclass__, cls.__name__)
- super(BaseMeta, cls).__init__(name, bases, dct)
-
-
-class Base(six.with_metaclass(BaseMeta, wsme.types.Base)):
- def __init__(self, instance=None, keyonly=False, attrs=None, eagerload=[]):
- if instance:
- self.from_instance(instance, keyonly, attrs, eagerload)
-
- def from_instance(self, instance, keyonly=False, attrs=None, eagerload=[]):
- if keyonly:
- attrs = self._pkey_attrs + self._ref_attrs
- for attr in self._wsme_attributes:
- if not isinstance(attr, wsattr):
- continue
- if attrs and not attr.isrelation and attr.name not in attrs:
- continue
- if attr.isrelation and attr.name not in eagerload:
- continue
- value = getattr(instance, attr.saname)
- if attr.isrelation:
- attr_keyonly = attr.name not in eagerload
- attr_attrs = None
- attr_eagerload = []
- if not attr_keyonly:
- attr_attrs = [
- aname[len(attr.name) + 1:]
- for aname in attrs
- if aname.startswith(attr.name + '.')
- ]
- attr_eagerload = [
- aname[len(attr.name) + 1:]
- for aname in eagerload
- if aname.startswith(attr.name + '.')
- ]
- if attr.saproperty.uselist:
- value = [
- attr.datatype.item_type(
- o,
- keyonly=attr_keyonly,
- attrs=attr_attrs,
- eagerload=attr_eagerload
- )
- for o in value
- ]
- else:
- value = attr.datatype(
- value,
- keyonly=attr_keyonly,
- attrs=attr_attrs,
- eagerload=attr_eagerload
- )
- attr.__set__(self, value)
-
- def to_instance(self, instance):
- for attr in self._wsme_attributes:
- if isinstance(attr, wsattr):
- value = attr.__get__(self, self.__class__)
- if value is not wsme.types.Unset:
- setattr(instance, attr.saname, value)
-
- def get_ref_criterion(self):
- """Returns a criterion that match a database object
- having the pkey/ref attribute values of this webservice object"""
- criterions = []
- for attr in self._pkey_attrs + self._ref_attrs:
- value = attr.__get__(self, self.__class__)
- if value is not wsme.types.Unset:
- criterions.append(attr.saproperty == value)
-
-
-def generate_types(*classes, **kw):
- registry = kw.pop('registry', wsme.types.registry)
- prefix = kw.pop('prefix', '')
- postfix = kw.pop('postfix', '')
- makename = kw.pop('makename', lambda s: prefix + s + postfix)
-
- newtypes = {}
- for c in classes:
- if isinstance(c, list):
- newtypes.update(generate_types(c))
- else:
- name = makename(c.__name__)
- newtypes[name] = BaseMeta(name, (Base, ), {
- '__saclass__': c,
- '__registry__': registry
- })
- return newtypes
diff --git a/wsmeext/tests/__init__.py b/wsmeext/tests/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wsmeext/tests/__init__.py
+++ /dev/null
diff --git a/wsmeext/tests/test_extdirect.py b/wsmeext/tests/test_extdirect.py
deleted file mode 100644
index 4c5bea8..0000000
--- a/wsmeext/tests/test_extdirect.py
+++ /dev/null
@@ -1,243 +0,0 @@
-import base64
-import datetime
-import decimal
-
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
-
-import wsme.tests.protocol
-from wsme.utils import parse_isodatetime, parse_isodate, parse_isotime
-from wsme.types import isarray, isdict, isusertype
-
-import six
-
-if six.PY3:
- from urllib.parse import urlencode
-else:
- from urllib import urlencode # noqa
-
-
-def encode_arg(value):
- if isinstance(value, tuple):
- value, datatype = value
- else:
- datatype = type(value)
-
- if isinstance(datatype, list):
- value = [encode_arg((item, datatype[0])) for item in value]
- elif isinstance(datatype, dict):
- key_type, value_type = list(datatype.items())[0]
- value = dict((
- (encode_arg((key, key_type)),
- encode_arg((value, value_type)))
- for key, value in value.items()
- ))
- elif datatype in (datetime.date, datetime.time, datetime.datetime):
- value = value.isoformat()
- elif datatype == wsme.types.binary:
- value = base64.encodestring(value).decode('ascii')
- elif datatype == wsme.types.bytes:
- value = value.decode('ascii')
- elif datatype == decimal.Decimal:
- value = str(value)
- return value
-
-
-def decode_result(value, datatype):
- if value is None:
- return None
- if datatype == wsme.types.binary:
- value = base64.decodestring(value.encode('ascii'))
- return value
- if isusertype(datatype):
- datatype = datatype.basetype
- if isinstance(datatype, list):
- value = [decode_result(item, datatype[0]) for item in value]
- elif isarray(datatype):
- value = [decode_result(item, datatype.item_type) for item in value]
- elif isinstance(datatype, dict):
- key_type, value_type = list(datatype.items())[0]
- value = dict((
- (decode_result(key, key_type),
- decode_result(value, value_type))
- for key, value in value.items()
- ))
- elif isdict(datatype):
- key_type, value_type = datatype.key_type, datatype.value_type
- value = dict((
- (decode_result(key, key_type),
- decode_result(value, value_type))
- for key, value in value.items()
- ))
- elif datatype == datetime.time:
- value = parse_isotime(value)
- elif datatype == datetime.date:
- value = parse_isodate(value)
- elif datatype == datetime.datetime:
- value = parse_isodatetime(value)
- elif hasattr(datatype, '_wsme_attributes'):
- for attr in datatype._wsme_attributes:
- if attr.key not in value:
- continue
- value[attr.key] = decode_result(value[attr.key], attr.datatype)
- elif datatype == decimal.Decimal:
- value = decimal.Decimal(value)
- elif datatype == wsme.types.bytes:
- value = value.encode('ascii')
- elif datatype is not None and type(value) != datatype:
- value = datatype(value)
- return value
-
-
-class TestExtDirectProtocol(wsme.tests.protocol.ProtocolTestCase):
- protocol = 'extdirect'
- protocol_options = {
- 'namespace': 'MyNS.api',
- 'nsfolder': 'app'
- }
-
- def call(self, fname, _rt=None, _no_result_decode=False, _accept=None,
- **kw):
- path = fname.split('/')
- try:
- func, funcdef, args = self.root._lookup_function(path)
- arguments = funcdef.arguments
- except Exception:
- arguments = []
- if len(path) == 1:
- ns, action, fname = '', '', path[0]
- elif len(path) == 2:
- ns, action, fname = '', path[0], path[1]
- else:
- ns, action, fname = '.'.join(path[:-2]), path[-2], path[-1]
- print(kw)
-
- args = [
- dict(
- (arg.name, encode_arg(kw[arg.name]))
- for arg in arguments if arg.name in kw
- )
- ]
- print("args =", args)
- data = json.dumps({
- 'type': 'rpc',
- 'tid': 0,
- 'action': action,
- 'method': fname,
- 'data': args,
- })
- print(data)
- headers = {'Content-Type': 'application/json'}
- if _accept:
- headers['Accept'] = _accept
- res = self.app.post('/extdirect/router/%s' % ns, data, headers=headers,
- expect_errors=True)
-
- print(res.body)
-
- if _no_result_decode:
- return res
-
- data = json.loads(res.text)
- if data['type'] == 'rpc':
- r = data['result']
- return decode_result(r, _rt)
- elif data['type'] == 'exception':
- faultcode, faultstring = data['message'].split(': ', 1)
- debuginfo = data.get('where')
- raise wsme.tests.protocol.CallException(
- faultcode, faultstring, debuginfo)
-
- def test_api_alias(self):
- assert self.root._get_protocol('extdirect').api_alias == '/app/api.js'
-
- def test_get_api(self):
- res = self.app.get('/app/api.js')
- print(res.body)
- assert res.body
-
- def test_positional(self):
- self.root._get_protocol('extdirect').default_params_notation = \
- 'positional'
-
- data = json.dumps({
- 'type': 'rpc',
- 'tid': 0,
- 'action': 'misc',
- 'method': 'multiply',
- 'data': [2, 5],
- })
- headers = {'Content-Type': 'application/json'}
- res = self.app.post('/extdirect/router', data, headers=headers)
-
- print(res.body)
-
- data = json.loads(res.text)
- assert data['type'] == 'rpc'
- r = data['result']
- assert r == 10
-
- def test_batchcall(self):
- data = json.dumps([{
- 'type': 'rpc',
- 'tid': 1,
- 'action': 'argtypes',
- 'method': 'setdate',
- 'data': [{'value': '2011-04-06'}],
- }, {
- 'type': 'rpc',
- 'tid': 2,
- 'action': 'returntypes',
- 'method': 'getbytes',
- 'data': []
- }])
- print(data)
- headers = {'Content-Type': 'application/json'}
- res = self.app.post('/extdirect/router', data, headers=headers)
-
- print(res.body)
-
- rdata = json.loads(res.text)
-
- assert len(rdata) == 2
-
- assert rdata[0]['tid'] == 1
- assert rdata[0]['result'] == '2011-04-06'
- assert rdata[1]['tid'] == 2
- assert rdata[1]['result'] == 'astring'
-
- def test_form_call(self):
- params = {
- 'value[0].inner.aint': 54,
- 'value[1].inner.aint': 55,
- 'extType': 'rpc',
- 'extTID': 1,
- 'extAction': 'argtypes',
- 'extMethod': 'setnestedarray',
- }
-
- body = urlencode(params)
- r = self.app.post(
- '/extdirect/router',
- body,
- headers={'Content-Type': 'application/x-www-form-urlencoded'}
- )
- print(r)
-
- assert json.loads(r.text) == {
- "tid": "1",
- "action": "argtypes",
- "type": "rpc",
- "method": "setnestedarray",
- "result": [{
- "inner": {
- "aint": 54
- }
- }, {
- "inner": {
- "aint": 55
- }
- }]
- }
diff --git a/wsmeext/tests/test_soap.py b/wsmeext/tests/test_soap.py
deleted file mode 100644
index bc17696..0000000
--- a/wsmeext/tests/test_soap.py
+++ /dev/null
@@ -1,423 +0,0 @@
-import decimal
-import datetime
-import base64
-
-import six
-
-import wsme.tests.protocol
-
-try:
- import xml.etree.ElementTree as et
-except ImportError:
- import cElementTree as et # noqa
-
-import suds.cache
-import suds.client
-import suds.transport
-
-import wsme.utils
-
-
-class XDecimal(suds.xsd.sxbuiltin.XBuiltin):
- def translate(self, value, topython=True):
- if topython:
- if isinstance(value, six.string_types) and len(value):
- return decimal.Decimal(value)
- else:
- if isinstance(value, (decimal.Decimal, int, float)):
- return str(value)
- return value
-
-
-suds.xsd.sxbuiltin.Factory.tags['decimal'] = XDecimal
-
-
-class WebtestSudsTransport(suds.transport.Transport):
- def __init__(self, app):
- suds.transport.Transport.__init__(self)
- self.app = app
-
- def open(self, request):
- res = self.app.get(request.url, headers=request.headers)
- return six.BytesIO(res.body)
-
- def send(self, request):
- res = self.app.post(
- request.url,
- request.message,
- headers=dict((
- (key, str(value)) for key, value in request.headers.items()
- )),
- expect_errors=True
- )
- return suds.transport.Reply(
- res.status_int,
- dict(res.headers),
- res.body
- )
-
-
-class SudsCache(suds.cache.Cache):
- def __init__(self):
- self.d = {}
-
- def get(self, id):
- return self.d.get(id)
-
- def getf(self, id):
- b = self.get(id)
- if b is not None:
- return six.StringIO(self.get(id))
-
- def put(self, id, bfr):
- self.d[id] = bfr
-
- def putf(self, id, fp):
- self.put(id, fp.read())
-
- def purge(self, id):
- try:
- del self.d[id]
- except KeyError:
- pass
-
- def clear(self, id):
- self.d = {}
-
-
-sudscache = SudsCache()
-
-tns = "http://foo.bar.baz/soap/"
-typenamespace = "http://foo.bar.baz/types/"
-
-soapenv_ns = 'http://schemas.xmlsoap.org/soap/envelope/'
-xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance'
-body_qn = '{%s}Body' % soapenv_ns
-fault_qn = '{%s}Fault' % soapenv_ns
-faultcode_qn = '{%s}faultcode' % soapenv_ns
-faultstring_qn = '{%s}faultstring' % soapenv_ns
-faultdetail_qn = '{%s}detail' % soapenv_ns
-type_qn = '{%s}type' % xsi_ns
-nil_qn = '{%s}nil' % xsi_ns
-
-
-def build_soap_message(method, params=""):
- message = """<?xml version="1.0"?>
-<soap:Envelope
-xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
-xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
-
- <soap:Body xmlns="%(typenamespace)s">
- <%(method)s>
- %(params)s
- </%(method)s>
- </soap:Body>
-
-</soap:Envelope>
-""" % dict(method=method, params=params, typenamespace=typenamespace)
- return message
-
-
-python_types = {
- int: ('xs:int', str),
- float: ('xs:float', str),
- bool: ('xs:boolean', str),
- wsme.types.bytes: (
- 'xs:string',
- lambda x: x.decode('ascii') if isinstance(x, wsme.types.bytes) else x
- ),
- wsme.types.text: ('xs:string', wsme.types.text),
- wsme.types.binary: (
- 'xs:base64Binary',
- lambda x: base64.encodestring(x).decode('ascii')
- ),
- decimal.Decimal: ('xs:decimal', str),
- datetime.date: ('xs:date', datetime.date.isoformat),
- datetime.time: ('xs:time', datetime.time.isoformat),
- datetime.datetime: ('xs:dateTime', datetime.datetime.isoformat),
-}
-
-array_types = {
- wsme.types.bytes: "String_Array",
- wsme.types.text: "String_Array",
- int: "Int_Array",
- float: "Float_Array",
- bool: "Boolean_Array",
- datetime.datetime: "dateTime_Array"
-}
-
-if not six.PY3:
- array_types[long] = "Long_Array" # noqa
-
-
-def tosoap(tag, value):
- el = et.Element(tag)
- if isinstance(value, tuple):
- value, datatype = value
- else:
- datatype = type(value)
- if value is None:
- el.set('xsi:nil', 'true')
- return el
- if datatype in python_types:
- stype, conv = python_types[datatype]
- el.text = conv(value)
- el.set('xsi:type', stype)
- el.text = str(value)
- return el
-
-
-def tosuds(client, value):
- if value is None:
- return None
- if isinstance(value, tuple):
- value, datatype = value
- else:
- datatype = type(value)
- if value is None:
- return None
- if isinstance(datatype, list):
- if datatype[0] in array_types:
- tname = array_types[datatype[0]]
- else:
- tname = datatype[0].__name__ + '_Array'
- o = client.factory.create('types:' + tname)
- o.item = [tosuds(client, (item, datatype[0])) for item in value]
- return o
- elif datatype in python_types:
- return python_types[datatype][1](value)
- else:
- o = client.factory.create('types:' + datatype.__name__)
-
- for attr in datatype._wsme_attributes:
- if attr.name in value:
- setattr(
- o, attr.name,
- tosuds(client, (value[attr.name], attr.datatype))
- )
- return o
-
-
-def read_bool(value):
- return value == 'true'
-
-
-soap_types = {
- 'xs:string': wsme.types.text,
- 'xs:int': int,
- 'xs:long': int if six.PY3 else long, # noqa
- 'xs:float': float,
- 'xs:decimal': decimal.Decimal,
- 'xs:boolean': read_bool,
- 'xs:date': wsme.utils.parse_isodate,
- 'xs:time': wsme.utils.parse_isotime,
- 'xs:dateTime': wsme.utils.parse_isodatetime,
- 'xs:base64Binary': base64.decodestring,
-}
-
-
-def fromsoap(el):
- if el.get(nil_qn) == 'true':
- return None
- t = el.get(type_qn)
- if t == 'xs:string':
- return wsme.types.text(el.text if el.text else '')
- if t in soap_types:
- return soap_types[t](el.text)
- elif t and t.endswith('_Array'):
- return [fromsoap(i) for i in el]
- else:
- d = {}
- for child in el:
- name = child.tag
- assert name.startswith('{%s}' % typenamespace), name
- name = name[len(typenamespace) + 2:]
- d[name] = fromsoap(child)
- return d
-
-
-def tobytes(value):
- if isinstance(value, wsme.types.text):
- value = value.encode()
- return value
-
-
-def tobin(value):
- value = base64.decodestring(value.encode())
- return value
-
-
-fromsuds_types = {
- wsme.types.binary: tobin,
- wsme.types.bytes: tobytes,
- decimal.Decimal: decimal.Decimal,
-}
-
-
-def fromsuds(dt, value):
- if value is None:
- return None
- if isinstance(dt, list):
- return [fromsuds(dt[0], item) for item in value.item]
- if wsme.types.isarray(dt):
- return [fromsuds(dt.item_type, item) for item in value.item]
- if wsme.types.isusertype(dt) and dt not in fromsuds_types:
- dt = dt.basetype
- if dt in fromsuds_types:
- print(dt, value)
- value = fromsuds_types[dt](value)
- print(value)
- return value
- if wsme.types.iscomplex(dt):
- d = {}
- for attrdef in dt._wsme_attributes:
- if not hasattr(value, attrdef.name):
- continue
- d[attrdef.name] = fromsuds(
- attrdef.datatype, getattr(value, attrdef.name)
- )
- return d
- return value
-
-
-class TestSOAP(wsme.tests.protocol.ProtocolTestCase):
- protocol = 'soap'
- protocol_options = dict(tns=tns, typenamespace=typenamespace)
- ws_path = '/'
- _sudsclient = None
-
- def setUp(self):
- wsme.tests.protocol.ProtocolTestCase.setUp(self)
-
- def test_simple_call(self):
- message = build_soap_message('touch')
- print(message)
- res = self.app.post(
- self.ws_path,
- message,
- headers={"Content-Type": "application/soap+xml; charset=utf-8"},
- expect_errors=True
- )
- print(res.body)
- assert res.status.startswith('200')
-
- def call(self, fpath, _rt=None, _accept=None, _no_result_decode=False,
- **kw):
-
- if _no_result_decode or _accept or self._testMethodName in (
- 'test_missing_argument', 'test_invalid_path', 'test_settext_empty',
- 'test_settext_none'
- ):
- return self.raw_call(fpath, _rt, _accept, _no_result_decode, **kw)
-
- path = fpath.strip('/').split('/')
- methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]])
-
- m = getattr(self.sudsclient.service, methodname)
- kw = dict((
- (key, tosuds(self.sudsclient, value)) for key, value in kw.items()
- ))
- print(kw)
- try:
- return fromsuds(_rt, m(**kw))
- except suds.WebFault as exc:
- raise wsme.tests.protocol.CallException(
- exc.fault.faultcode,
- exc.fault.faultstring,
- getattr(exc.fault, 'detail', None) or None
- )
-
- def raw_call(self, fpath, _rt=None, _accept=None, _no_result_decode=False,
- **kw):
- path = fpath.strip('/').split('/')
- methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]])
- # get the actual definition so we can build the adequate request
- if kw:
- el = et.Element('parameters')
- for key, value in kw.items():
- el.append(tosoap(key, value))
-
- params = six.b("\n").join(et.tostring(el) for el in el)
- else:
- params = ""
- methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]])
- message = build_soap_message(methodname, params)
- print(message)
- headers = {"Content-Type": "application/soap+xml; charset=utf-8"}
- if _accept is not None:
- headers['Accept'] = _accept
- res = self.app.post(
- self.ws_path,
- message,
- headers=headers,
- expect_errors=True
- )
- print("Status: ", res.status, "Received:", res.body)
-
- if _no_result_decode:
- return res
-
- el = et.fromstring(res.body)
- body = el.find(body_qn)
- print(body)
-
- if res.status_int == 200:
- response_tag = '{%s}%sResponse' % (typenamespace, methodname)
- r = body.find(response_tag)
- result = r.find('{%s}result' % typenamespace)
- print("Result element: ", result)
- return fromsoap(result)
- elif res.status_int == 400:
- fault = body.find(fault_qn)
- raise wsme.tests.protocol.CallException(
- fault.find(faultcode_qn).text,
- fault.find(faultstring_qn).text,
- "")
-
- elif res.status_int == 500:
- fault = body.find(fault_qn)
- raise wsme.tests.protocol.CallException(
- fault.find(faultcode_qn).text,
- fault.find(faultstring_qn).text,
- fault.find(faultdetail_qn) is not None and
- fault.find(faultdetail_qn).text or None)
-
- @property
- def sudsclient(self):
- if self._sudsclient is None:
- self._sudsclient = suds.client.Client(
- self.ws_path + 'api.wsdl',
- transport=WebtestSudsTransport(self.app),
- cache=sudscache
- )
- return self._sudsclient
-
- def test_wsdl(self):
- c = self.sudsclient
- assert c.wsdl.tns[1] == tns, c.wsdl.tns
-
- sd = c.sd[0]
-
- assert len(sd.ports) == 1
- port, methods = sd.ports[0]
- self.assertEqual(len(methods), 51)
-
- methods = dict(methods)
-
- assert 'returntypesGettext' in methods
- print(methods)
-
- assert methods['argtypesSettime'][0][0] == 'value'
-
- def test_return_nesteddict(self):
- pass
-
- def test_setnesteddict(self):
- pass
-
- def test_return_objectdictattribute(self):
- pass
-
- def test_setnested_nullobj(self):
- pass # TODO write a soap adapted version of this test.
diff --git a/wsmeext/tests/test_sqlalchemy_controllers.py b/wsmeext/tests/test_sqlalchemy_controllers.py
deleted file mode 100644
index 1956788..0000000
--- a/wsmeext/tests/test_sqlalchemy_controllers.py
+++ /dev/null
@@ -1,223 +0,0 @@
-import datetime
-
-try:
- import json
-except ImportError:
- import simplejson as json
-
-from webtest import TestApp
-
-from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy import Column, Integer, Unicode, Date, ForeignKey
-from sqlalchemy.orm import relation
-
-from sqlalchemy import create_engine
-from sqlalchemy.orm import sessionmaker, scoped_session
-
-from wsme import WSRoot
-import wsme.types
-
-from wsmeext.sqlalchemy.types import generate_types
-from wsmeext.sqlalchemy.controllers import CRUDController
-
-from six import u
-
-engine = create_engine('sqlite:///')
-DBSession = scoped_session(sessionmaker(autocommit=False, autoflush=False,
- bind=engine))
-DBBase = declarative_base()
-
-registry = wsme.types.Registry()
-
-
-class DBPerson(DBBase):
- __tablename__ = 'person'
-
- id = Column(Integer, primary_key=True)
- name = Column(Unicode(50))
- birthdate = Column(Date)
-
- addresses = relation('DBAddress')
-
-
-class DBAddress(DBBase):
- __tablename__ = 'address'
-
- id = Column(Integer, primary_key=True)
-
- _person_id = Column('person_id', ForeignKey(DBPerson.id))
-
- street = Column(Unicode(50))
- city = Column(Unicode(50))
-
- person = relation(DBPerson)
-
-
-globals().update(
- generate_types(DBPerson, DBAddress, makename=lambda s: s[2:],
- registry=registry))
-
-
-class PersonController(CRUDController):
- __saclass__ = DBPerson
- __dbsession__ = DBSession
- __registry__ = registry
-
-
-class AddressController(CRUDController):
- __saclass__ = DBAddress
- __dbsession__ = DBSession
- __registry__ = registry
-
-
-class Root(WSRoot):
- __registry__ = registry
-
- person = PersonController()
- address = AddressController()
-
-
-class TestCRUDController():
- def setUp(self):
- DBBase.metadata.create_all(DBSession.bind)
-
- self.root = Root()
- self.root.getapi()
- self.root.addprotocol('restjson')
-
- self.app = TestApp(self.root.wsgiapp())
-
- def tearDown(self):
- DBBase.metadata.drop_all(DBSession.bind)
-
- def test_create(self):
- data = dict(data=dict(
- name=u('Pierre-Joseph'),
- birthdate=u('1809-01-15')
- ))
- r = self.app.post('/person/create', json.dumps(data),
- headers={'Content-Type': 'application/json'})
- r = json.loads(r.text)
- print(r)
- assert r['name'] == u('Pierre-Joseph')
- assert r['birthdate'] == u('1809-01-15')
-
- def test_PUT(self):
- data = dict(data=dict(
- name=u('Pierre-Joseph'),
- birthdate=u('1809-01-15')
- ))
- r = self.app.put('/person', json.dumps(data),
- headers={'Content-Type': 'application/json'})
- r = json.loads(r.text)
- print(r)
- assert r['name'] == u('Pierre-Joseph')
- assert r['birthdate'] == u('1809-01-15')
-
- def test_read(self):
- p = DBPerson(
- name=u('Pierre-Joseph'),
- birthdate=datetime.date(1809, 1, 15))
- DBSession.add(p)
- DBSession.flush()
- pid = p.id
- r = self.app.post('/person/read', '{"ref": {"id": %s}}' % pid,
- headers={'Content-Type': 'application/json'})
- r = json.loads(r.text)
- print(r)
- assert r['name'] == u('Pierre-Joseph')
- assert r['birthdate'] == u('1809-01-15')
-
- def test_GET(self):
- p = DBPerson(
- name=u('Pierre-Joseph'),
- birthdate=datetime.date(1809, 1, 15))
- DBSession.add(p)
- DBSession.flush()
- pid = p.id
- r = self.app.get('/person?ref.id=%s' % pid,
- headers={'Accept': 'application/json'})
- r = json.loads(r.text)
- print(r)
- assert r['name'] == u('Pierre-Joseph')
- assert r['birthdate'] == u('1809-01-15')
-
- def test_GET_bad_accept(self):
- p = DBPerson(
- name=u('Pierre-Joseph'),
- birthdate=datetime.date(1809, 1, 15))
- DBSession.add(p)
- DBSession.flush()
- pid = p.id
- r = self.app.get('/person?ref.id=%s' % pid,
- headers={'Accept': 'text/plain'},
- status=406)
- assert r.text == ("Unacceptable Accept type: text/plain not in "
- "['application/json', 'text/javascript', "
- "'application/javascript', 'text/xml']")
-
- def test_update(self):
- p = DBPerson(
- name=u('Pierre-Joseph'),
- birthdate=datetime.date(1809, 1, 15))
- DBSession.add(p)
- DBSession.flush()
- pid = p.id
- data = {
- "id": pid,
- "name": u('Pierre-Joseph Proudon')
- }
- r = self.app.post('/person/update', json.dumps(dict(data=data)),
- headers={'Content-Type': 'application/json'})
- r = json.loads(r.text)
- print(r)
- assert r['name'] == u('Pierre-Joseph Proudon')
- assert r['birthdate'] == u('1809-01-15')
-
- def test_POST(self):
- p = DBPerson(
- name=u('Pierre-Joseph'),
- birthdate=datetime.date(1809, 1, 15))
- DBSession.add(p)
- DBSession.flush()
- pid = p.id
- data = {
- "id": pid,
- "name": u('Pierre-Joseph Proudon')
- }
- r = self.app.post('/person', json.dumps(dict(data=data)),
- headers={'Content-Type': 'application/json'})
- r = json.loads(r.text)
- print(r)
- assert r['name'] == u('Pierre-Joseph Proudon')
- assert r['birthdate'] == u('1809-01-15')
-
- def test_delete(self):
- p = DBPerson(
- name=u('Pierre-Joseph'),
- birthdate=datetime.date(1809, 1, 15))
- DBSession.add(p)
- DBSession.flush()
- pid = p.id
- r = self.app.post('/person/delete', json.dumps(
- dict(ref=dict(id=pid))),
- headers={
- 'Content-Type': 'application/json'
- })
- print(r)
- assert DBSession.query(DBPerson).get(pid) is None
-
- def test_DELETE(self):
- p = DBPerson(
- name=u('Pierre-Joseph'),
- birthdate=datetime.date(1809, 1, 15))
- DBSession.add(p)
- DBSession.flush()
- pid = p.id
- r = self.app.delete('/person?ref.id=%s' % pid,
- headers={'Content-Type': 'application/json'})
- print(r)
- assert DBSession.query(DBPerson).get(pid) is None
-
- def test_nothing(self):
- pass
diff --git a/wsmeext/tests/test_sqlalchemy_types.py b/wsmeext/tests/test_sqlalchemy_types.py
deleted file mode 100644
index 8512015..0000000
--- a/wsmeext/tests/test_sqlalchemy_types.py
+++ /dev/null
@@ -1,72 +0,0 @@
-import datetime
-
-import wsmeext.sqlalchemy.types
-
-from wsme.types import text, Unset, isarray
-
-from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy import Column, Integer, String, Date, ForeignKey
-from sqlalchemy.orm import relation
-
-from six import u
-
-SABase = declarative_base()
-
-
-class SomeClass(SABase):
- __tablename__ = 'some_table'
- id = Column(Integer, primary_key=True)
- name = Column(String(50))
-
- adate = Column(Date)
-
-
-def test_complextype():
- class AType(wsmeext.sqlalchemy.types.Base):
- __saclass__ = SomeClass
-
- assert AType.id.datatype is int
- assert AType.name.datatype is text
- assert AType.adate.datatype is datetime.date
-
- a = AType()
- s = SomeClass(name=u('aname'), adate=datetime.date(2012, 6, 26))
- assert s.name == u('aname')
-
- a.from_instance(s)
- assert a.name == u('aname')
- assert a.adate == datetime.date(2012, 6, 26)
-
- a.name = u('test')
- del a.adate
- assert a.adate is Unset
-
- a.to_instance(s)
- assert s.name == u('test')
- assert s.adate == datetime.date(2012, 6, 26)
-
-
-def test_generate():
- class A(SABase):
- __tablename__ = 'a'
- id = Column(Integer, primary_key=True)
- name = Column(String(50))
-
- _b_id = Column(ForeignKey('b.id'))
-
- b = relation('B')
-
- class B(SABase):
- __tablename__ = 'b'
- id = Column(Integer, primary_key=True)
- name = Column(String(50))
-
- alist = relation(A)
-
- newtypes = wsmeext.sqlalchemy.types.generate_types(A, B)
-
- assert newtypes['A'].id.datatype is int
- assert newtypes['A'].b.datatype is newtypes['B']
- assert newtypes['B'].id.datatype is int
- assert isarray(newtypes['B'].alist.datatype)
- assert newtypes['B'].alist.datatype.item_type is newtypes['A']
diff --git a/wsmeext/tg1.py b/wsmeext/tg1.py
deleted file mode 100644
index b978166..0000000
--- a/wsmeext/tg1.py
+++ /dev/null
@@ -1,173 +0,0 @@
-try:
- import json
-except ImportError:
- import simplejson as json # noqa
-
-import functools
-import sys
-
-import cherrypy
-import webob
-from turbogears import expose, util
-import turbogears.view
-
-from wsme.rest import validate as wsvalidate
-import wsme.api
-import wsme.rest
-import wsme.rest.args
-import wsme.rest.json
-from wsme.utils import is_valid_code
-
-import inspect
-
-APIPATH_MAXLEN = 50
-
-__all__ = ['wsexpose', 'wsvalidate']
-
-
-def wsexpose(*args, **kwargs):
- tg_json_expose = expose(
- 'wsmejson:',
- accept_format='application/json',
- content_type='application/json',
- tg_format='json'
- )
- tg_altjson_expose = expose(
- 'wsmejson:',
- accept_format='text/javascript',
- content_type='application/json'
- )
- tg_xml_expose = expose(
- 'wsmexml:',
- accept_format='text/xml',
- content_type='text/xml',
- tg_format='xml'
- )
- sig = wsme.signature(*args, **kwargs)
-
- def decorate(f):
- sig(f)
- funcdef = wsme.api.FunctionDefinition.get(f)
-
- @functools.wraps(f)
- def callfunction(self, *args, **kwargs):
- args, kwargs = wsme.rest.args.get_args(
- funcdef, args, kwargs,
- cherrypy.request.params, None,
- cherrypy.request.body,
- cherrypy.request.headers['Content-Type']
- )
- if funcdef.pass_request:
- kwargs[funcdef.pass_request] = cherrypy.request
- try:
- result = f(self, *args, **kwargs)
- except Exception:
- try:
- exception_info = sys.exc_info()
- orig_exception = exception_info[1]
- if isinstance(orig_exception, cherrypy.HTTPError):
- orig_code = getattr(orig_exception, 'status', None)
- else:
- orig_code = getattr(orig_exception, 'code', None)
- data = wsme.api.format_exception(exception_info)
- finally:
- del exception_info
-
- cherrypy.response.status = 500
- if data['faultcode'] == 'client':
- cherrypy.response.status = 400
- elif orig_code and is_valid_code(orig_code):
- cherrypy.response.status = orig_code
-
- accept = cherrypy.request.headers.get('Accept', "").lower()
- accept = util.simplify_http_accept_header(accept)
-
- decorators = {'text/xml': wsme.rest.xml.encode_error}
- return decorators.get(
- accept,
- wsme.rest.json.encode_error
- )(None, data)
-
- return dict(
- datatype=funcdef.return_type,
- result=result
- )
-
- callfunction = tg_xml_expose(callfunction)
- callfunction = tg_altjson_expose(callfunction)
- callfunction = tg_json_expose(callfunction)
- callfunction._wsme_original_function = f
- return callfunction
-
- return decorate
-
-
-class AutoJSONTemplate(object):
- def __init__(self, extra_vars_func=None, options=None):
- pass
-
- def render(self, info, format="json", fragment=False, template=None):
- "Renders the template to a string using the provided info."
- return wsme.rest.json.encode_result(
- info['result'], info['datatype']
- )
-
- def get_content_type(self, user_agent):
- return "application/json"
-
-
-class AutoXMLTemplate(object):
- def __init__(self, extra_vars_func=None, options=None):
- pass
-
- def render(self, info, format="json", fragment=False, template=None):
- "Renders the template to a string using the provided info."
- return wsme.rest.xml.encode_result(
- info['result'], info['datatype']
- )
-
- def get_content_type(self, user_agent):
- return "text/xml"
-
-
-turbogears.view.engines['wsmejson'] = AutoJSONTemplate(turbogears.view.stdvars)
-turbogears.view.engines['wsmexml'] = AutoXMLTemplate(turbogears.view.stdvars)
-
-
-class Controller(object):
- def __init__(self, wsroot):
- self._wsroot = wsroot
-
- @expose()
- def default(self, *args, **kw):
- req = webob.Request(cherrypy.request.wsgi_environ)
- res = self._wsroot._handle_request(req)
- cherrypy.response.header_list = res.headerlist
- cherrypy.response.status = res.status
- return res.body
-
-
-def _scan_api(controller, path=[], objects=[]):
- """
- Recursively iterate a controller api entries.
- """
- for name in dir(controller):
- if name.startswith('_'):
- continue
- a = getattr(controller, name)
- if a in objects:
- continue
- if inspect.ismethod(a):
- if wsme.api.iswsmefunction(a):
- yield path + [name], a._wsme_original_function, [controller]
- elif inspect.isclass(a):
- continue
- else:
- if len(path) > APIPATH_MAXLEN:
- raise ValueError("Path is too long: " + str(path))
- for i in _scan_api(a, path + [name], objects + [a]):
- yield i
-
-
-def scan_api(root=None):
- return _scan_api(cherrypy.root)
diff --git a/wsmeext/tg11.py b/wsmeext/tg11.py
deleted file mode 100644
index 80ec50c..0000000
--- a/wsmeext/tg11.py
+++ /dev/null
@@ -1,40 +0,0 @@
-from turbogears import config
-import cherrypy
-from cherrypy.filters.basefilter import BaseFilter
-from turbogears.startup import call_on_startup, call_on_shutdown
-from wsmeext.tg1 import wsexpose, wsvalidate
-import wsmeext.tg1
-
-__all__ = ['adapt', 'wsexpose', 'wsvalidate']
-
-
-class WSMECherrypyFilter(BaseFilter):
- def __init__(self, controller):
- self.controller = controller
- self.webpath = None
-
- def on_start_resource(self):
- path = cherrypy.request.path
- if path.startswith(self.controller._wsroot._webpath):
- cherrypy.request.processRequestBody = False
-
-
-def adapt(wsroot):
- wsroot._scan_api = wsmeext.tg1.scan_api
- controller = wsmeext.tg1.Controller(wsroot)
- filter_ = WSMECherrypyFilter(controller)
-
- def install_filter():
- filter_.webpath = config.get('server.webpath') or ''
- controller._wsroot._webpath = \
- filter_.webpath + controller._wsroot._webpath
- cherrypy.root._cp_filters.append(filter_)
-
- def uninstall_filter():
- cherrypy.root._cp_filters.remove(filter_)
- controller._wsroot._webpath = \
- controller._wsroot._webpath[len(filter_.webpath):]
-
- call_on_startup.append(install_filter)
- call_on_shutdown.insert(0, uninstall_filter)
- return controller
diff --git a/wsmeext/tg15.py b/wsmeext/tg15.py
deleted file mode 100644
index 4b8a9b1..0000000
--- a/wsmeext/tg15.py
+++ /dev/null
@@ -1,20 +0,0 @@
-import cherrypy
-
-from wsmeext.tg1 import wsexpose, wsvalidate
-import wsmeext.tg1
-
-
-__all__ = ['adapt', 'wsexpose', 'wsvalidate']
-
-
-def scan_api(root=None):
- for baseurl, instance in cherrypy.tree.apps.items():
- path = [token for token in baseurl.split('/') if token]
- for i in wsmeext.tg1._scan_api(instance.root, path):
- yield i
-
-
-def adapt(wsroot):
- wsroot._scan_api = scan_api
- controller = wsmeext.tg1.Controller(wsroot)
- return controller