diff options
author | Jim Rollenhagen <jim@jimrollenhagen.com> | 2019-09-26 09:43:27 -0400 |
---|---|---|
committer | Jim Rollenhagen <jim@jimrollenhagen.com> | 2019-09-26 09:43:27 -0400 |
commit | e9c6edfe510f4ed407f8d2d84b4b931a382b48b3 (patch) | |
tree | 94bbd6a34bcf09e99f7ae1be88b19960192d6adb /wsmeext/soap | |
parent | 1d73d6e50411ebc45fb96a6ed3c63ca91a500323 (diff) | |
download | wsme-master.tar.gz |
Diffstat (limited to 'wsmeext/soap')
-rw-r--r-- | wsmeext/soap/__init__.py | 5 | ||||
-rw-r--r-- | wsmeext/soap/protocol.py | 478 | ||||
-rw-r--r-- | wsmeext/soap/simplegeneric.py | 107 | ||||
-rw-r--r-- | wsmeext/soap/wsdl.py | 297 |
4 files changed, 0 insertions, 887 deletions
diff --git a/wsmeext/soap/__init__.py b/wsmeext/soap/__init__.py deleted file mode 100644 index 7a237ba..0000000 --- a/wsmeext/soap/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from __future__ import absolute_import - -from wsmeext.soap.protocol import SoapProtocol - -__all__ = ['SoapProtocol'] diff --git a/wsmeext/soap/protocol.py b/wsmeext/soap/protocol.py deleted file mode 100644 index 0d87af8..0000000 --- a/wsmeext/soap/protocol.py +++ /dev/null @@ -1,478 +0,0 @@ -""" -A SOAP implementation for wsme. -Parts of the code were taken from the tgwebservices soap implmentation. -""" -from __future__ import absolute_import - -import pkg_resources -import datetime -import decimal -import base64 -import logging - -import six - -from wsmeext.soap.simplegeneric import generic -from wsmeext.soap.wsdl import WSDLGenerator - -try: - from lxml import etree as ET - use_lxml = True -except ImportError: - from xml.etree import cElementTree as ET # noqa - use_lxml = False - -from wsme.protocol import CallContext, Protocol, expose - -import wsme.types -import wsme.runtime - -from wsme import exc -from wsme.utils import parse_isodate, parse_isotime, parse_isodatetime - -log = logging.getLogger(__name__) - -xsd_ns = 'http://www.w3.org/2001/XMLSchema' -xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance' -soapenv_ns = 'http://schemas.xmlsoap.org/soap/envelope/' - -if not use_lxml: - ET.register_namespace('soap', soapenv_ns) - -type_qn = '{%s}type' % xsi_ns -nil_qn = '{%s}nil' % xsi_ns - -Envelope_qn = '{%s}Envelope' % soapenv_ns -Body_qn = '{%s}Body' % soapenv_ns -Fault_qn = '{%s}Fault' % soapenv_ns -faultcode_qn = '{%s}faultcode' % soapenv_ns -faultstring_qn = '{%s}faultstring' % soapenv_ns -detail_qn = '{%s}detail' % soapenv_ns - - -type_registry = { - wsme.types.bytes: 'xs:string', - wsme.types.text: 'xs:string', - int: 'xs:int', - float: "xs:float", - bool: "xs:boolean", - datetime.datetime: "xs:dateTime", - datetime.date: "xs:date", - datetime.time: "xs:time", - decimal.Decimal: "xs:decimal", - wsme.types.binary: "xs:base64Binary", -} - -if not six.PY3: - type_registry[long] = "xs:long" # noqa - -array_registry = { - wsme.types.text: "String_Array", - wsme.types.bytes: "String_Array", - int: "Int_Array", - float: "Float_Array", - bool: "Boolean_Array", -} - -if not six.PY3: - array_registry[long] = "Long_Array" # noqa - - -def soap_array(datatype, ns): - if datatype.item_type in array_registry: - name = array_registry[datatype.item_type] - else: - name = soap_type(datatype.item_type, False) + '_Array' - if ns: - name = 'types:' + name - return name - - -def soap_type(datatype, ns): - name = None - if wsme.types.isarray(datatype): - return soap_array(datatype, ns) - if wsme.types.isdict(datatype): - return None - if datatype in type_registry: - stype = type_registry[datatype] - if not ns: - stype = stype[3:] - return stype - if wsme.types.iscomplex(datatype): - name = datatype.__name__ - if name and ns: - name = 'types:' + name - return name - if wsme.types.isusertype(datatype): - return soap_type(datatype.basetype, ns) - - -def soap_fname(path, funcdef): - return "".join([path[0]] + [i.capitalize() for i in path[1:]]) - - -class SoapEncoder(object): - def __init__(self, types_ns): - self.types_ns = types_ns - - def make_soap_element(self, datatype, tag, value, xsitype=None): - el = ET.Element(tag) - if value is None: - el.set(nil_qn, 'true') - elif xsitype is not None: - el.set(type_qn, xsitype) - el.text = value - elif wsme.types.isusertype(datatype): - return self.tosoap(datatype.basetype, tag, - datatype.tobasetype(value)) - elif wsme.types.iscomplex(datatype): - el.set(type_qn, 'types:%s' % (datatype.__name__)) - for attrdef in wsme.types.list_attributes(datatype): - attrvalue = getattr(value, attrdef.key) - if attrvalue is not wsme.types.Unset: - el.append(self.tosoap( - attrdef.datatype, - '{%s}%s' % (self.types_ns, attrdef.name), - attrvalue - )) - else: - el.set(type_qn, type_registry.get(datatype)) - if not isinstance(value, wsme.types.text): - value = wsme.types.text(value) - el.text = value - return el - - @generic - def tosoap(self, datatype, tag, value): - """Converts a value into xml Element objects for inclusion in the SOAP - response output (after adding the type to the type_registry). - - If a non-complex user specific type is to be used in the api, - a specific toxml should be added:: - - from wsme.protocol.soap import tosoap, make_soap_element, \ - type_registry - - class MySpecialType(object): - pass - - type_registry[MySpecialType] = 'xs:MySpecialType' - - @tosoap.when_object(MySpecialType) - def myspecialtype_tosoap(datatype, tag, value): - return make_soap_element(datatype, tag, str(value)) - """ - return self.make_soap_element(datatype, tag, value) - - @tosoap.when_type(wsme.types.ArrayType) - def array_tosoap(self, datatype, tag, value): - el = ET.Element(tag) - el.set(type_qn, soap_array(datatype, self.types_ns)) - if value is None: - el.set(nil_qn, 'true') - elif len(value) == 0: - el.append(ET.Element('item')) - else: - for item in value: - el.append(self.tosoap(datatype.item_type, 'item', item)) - return el - - @tosoap.when_object(bool) - def bool_tosoap(self, datatype, tag, value): - return self.make_soap_element( - datatype, - tag, - 'true' if value is True else 'false' if value is False else None - ) - - @tosoap.when_object(wsme.types.bytes) - def bytes_tosoap(self, datatype, tag, value): - log.debug('(bytes_tosoap, %s, %s, %s, %s)', datatype, - tag, value, type(value)) - if isinstance(value, wsme.types.bytes): - value = value.decode('ascii') - return self.make_soap_element(datatype, tag, value) - - @tosoap.when_object(datetime.datetime) - def datetime_tosoap(self, datatype, tag, value): - return self.make_soap_element( - datatype, - tag, - value is not None and value.isoformat() or None - ) - - @tosoap.when_object(wsme.types.binary) - def binary_tosoap(self, datatype, tag, value): - log.debug("(%s, %s, %s)", datatype, tag, value) - value = base64.encodestring(value) if value is not None else None - if six.PY3: - value = value.decode('ascii') - return self.make_soap_element( - datatype.basetype, tag, value, 'xs:base64Binary' - ) - - @tosoap.when_object(None) - def None_tosoap(self, datatype, tag, value): - return self.make_soap_element(datatype, tag, None) - - -@generic -def fromsoap(datatype, el, ns): - """ - A generic converter from soap elements to python datatype. - - If a non-complex user specific type is to be used in the api, - a specific fromsoap should be added. - """ - if el.get(nil_qn) == 'true': - return None - if datatype in type_registry: - value = datatype(el.text) - elif wsme.types.isusertype(datatype): - value = datatype.frombasetype( - fromsoap(datatype.basetype, el, ns)) - else: - value = datatype() - for attr in wsme.types.list_attributes(datatype): - child = el.find('{%s}%s' % (ns['type'], attr.name)) - if child is not None: - setattr(value, attr.key, fromsoap(attr.datatype, child, ns)) - return value - - -@fromsoap.when_type(wsme.types.ArrayType) -def array_fromsoap(datatype, el, ns): - if len(el) == 1: - if datatype.item_type \ - not in wsme.types.pod_types + wsme.types.dt_types \ - and len(el[0]) == 0: - return [] - return [fromsoap(datatype.item_type, child, ns) for child in el] - - -@fromsoap.when_object(wsme.types.bytes) -def bytes_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:string'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return el.text.encode('ascii') if el.text else six.b('') - - -@fromsoap.when_object(wsme.types.text) -def text_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:string'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return datatype(el.text if el.text else '') - - -@fromsoap.when_object(bool) -def bool_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:boolean'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return el.text.lower() != 'false' - - -@fromsoap.when_object(datetime.date) -def date_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:date'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return parse_isodate(el.text) - - -@fromsoap.when_object(datetime.time) -def time_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:time'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return parse_isotime(el.text) - - -@fromsoap.when_object(datetime.datetime) -def datetime_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:dateTime'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return parse_isodatetime(el.text) - - -@fromsoap.when_object(wsme.types.binary) -def binary_fromsoap(datatype, el, ns): - if el.get(nil_qn) == 'true': - return None - if el.get(type_qn) not in (None, 'xs:base64Binary'): - raise exc.InvalidInput(el.tag, ET.tostring(el)) - return base64.decodestring(el.text.encode('ascii')) - - -class SoapProtocol(Protocol): - """ - SOAP protocol. - - .. autoattribute:: name - .. autoattribute:: content_types - """ - name = 'soap' - displayname = 'SOAP' - content_types = ['application/soap+xml'] - - ns = { - "soap": "http://www.w3.org/2001/12/soap-envelope", - "soapenv": "http://schemas.xmlsoap.org/soap/envelope/", - "soapenc": "http://schemas.xmlsoap.org/soap/encoding/", - } - - def __init__(self, tns=None, typenamespace=None, baseURL=None, - servicename='MyApp'): - self.tns = tns - self.typenamespace = typenamespace - self.servicename = servicename - self.baseURL = baseURL - self._name_mapping = {} - - self.encoder = SoapEncoder(typenamespace) - - def get_name_mapping(self, service=None): - if service not in self._name_mapping: - self._name_mapping[service] = dict( - (soap_fname(path, f), path) - for path, f in self.root.getapi() - if service is None or (path and path[0] == service) - ) - return self._name_mapping[service] - - def accept(self, req): - for ct in self.content_types: - if req.headers['Content-Type'].startswith(ct): - return True - if req.headers.get("Soapaction"): - return True - return False - - def iter_calls(self, request): - yield CallContext(request) - - def extract_path(self, context): - request = context.request - el = ET.fromstring(request.body) - body = el.find('{%(soapenv)s}Body' % self.ns) - # Extract the service name from the tns - message = list(body)[0] - fname = message.tag - if fname.startswith('{%s}' % self.typenamespace): - fname = fname[len(self.typenamespace) + 2:] - mapping = self.get_name_mapping() - if fname not in mapping: - raise exc.UnknownFunction(fname) - path = mapping[fname] - context.soap_message = message - return path - return None - - def read_arguments(self, context): - kw = {} - if not hasattr(context, 'soap_message'): - return kw - msg = context.soap_message - for param in msg: - # FIX for python2.6 (only for lxml) - if use_lxml and isinstance(param, ET._Comment): - continue - name = param.tag[len(self.typenamespace) + 2:] - arg = context.funcdef.get_arg(name) - value = fromsoap(arg.datatype, param, { - 'type': self.typenamespace, - }) - kw[name] = value - wsme.runtime.check_arguments(context.funcdef, (), kw) - return kw - - def soap_response(self, path, funcdef, result): - r = ET.Element('{%s}%sResponse' % ( - self.typenamespace, soap_fname(path, funcdef) - )) - log.debug('(soap_response, %s, %s)', funcdef.return_type, result) - r.append(self.encoder.tosoap( - funcdef.return_type, '{%s}result' % self.typenamespace, result - )) - return r - - def encode_result(self, context, result): - log.debug('(encode_result, %s)', result) - if use_lxml: - envelope = ET.Element( - Envelope_qn, - nsmap={'xs': xsd_ns, 'types': self.typenamespace} - ) - else: - envelope = ET.Element(Envelope_qn, { - 'xmlns:xs': xsd_ns, - 'xmlns:types': self.typenamespace - }) - body = ET.SubElement(envelope, Body_qn) - body.append(self.soap_response(context.path, context.funcdef, result)) - s = ET.tostring(envelope) - return s - - def get_template(self, name): - return pkg_resources.resource_string( - __name__, '%s.html' % name) - - def encode_error(self, context, infos): - envelope = ET.Element(Envelope_qn) - body = ET.SubElement(envelope, Body_qn) - fault = ET.SubElement(body, Fault_qn) - ET.SubElement(fault, faultcode_qn).text = infos['faultcode'] - ET.SubElement(fault, faultstring_qn).text = infos['faultstring'] - if 'debuginfo' in infos: - ET.SubElement(fault, detail_qn).text = infos['debuginfo'] - s = ET.tostring(envelope) - return s - - @expose('/api.wsdl', 'text/xml') - def api_wsdl(self, service=None): - if service is None: - servicename = self.servicename - else: - servicename = self.servicename + service.capitalize() - return WSDLGenerator( - tns=self.tns, - types_ns=self.typenamespace, - soapenc=self.ns['soapenc'], - service_name=servicename, - complex_types=self.root.__registry__.complex_types, - funclist=self.root.getapi(), - arrays=self.root.__registry__.array_types, - baseURL=self.baseURL, - soap_array=soap_array, - soap_type=soap_type, - soap_fname=soap_fname, - ).generate(True) - - def encode_sample_value(self, datatype, value, format=False): - r = self.encoder.make_soap_element(datatype, 'value', value) - if format: - xml_indent(r) - return ('xml', six.text_type(r)) - - -def xml_indent(elem, level=0): - i = "\n" + level * " " - if len(elem): - if not elem.text or not elem.text.strip(): - elem.text = i + " " - for e in elem: - xml_indent(e, level + 1) - if not e.tail or not e.tail.strip(): - e.tail = i - if level and (not elem.tail or not elem.tail.strip()): - elem.tail = i diff --git a/wsmeext/soap/simplegeneric.py b/wsmeext/soap/simplegeneric.py deleted file mode 100644 index 97c169b..0000000 --- a/wsmeext/soap/simplegeneric.py +++ /dev/null @@ -1,107 +0,0 @@ -import inspect - -__all__ = ["generic"] -try: - from types import ClassType, InstanceType - classtypes = type, ClassType -except ImportError: - classtypes = type - InstanceType = None - - -def generic(func, argpos=None): - """Create a simple generic function""" - - if argpos is None: - if hasattr(func, 'argpos'): - argpos = func.argpos - else: - argnames = inspect.getargspec(func)[0] - if argnames and argnames[0] == 'self': - argpos = 1 - else: - argpos = 0 - - _sentinel = object() - - def _by_class(*args, **kw): - cls = args[argpos].__class__ - for t in type(cls.__name__, (cls, object), {}).__mro__: - f = _gbt(t, _sentinel) - if f is not _sentinel: - return f(*args, **kw) - else: - return func(*args, **kw) - - _by_type = {object: func, InstanceType: _by_class} - _gbt = _by_type.get - - def when_type(*types): - """Decorator to add a method that will be called for the given types""" - for t in types: - if not isinstance(t, classtypes): - raise TypeError( - "%r is not a type or class" % (t,) - ) - - def decorate(f): - for t in types: - if _by_type.setdefault(t, f) is not f: - raise TypeError( - "%r already has method for type %r" % (func, t) - ) - return f - return decorate - - _by_object = {} - _gbo = _by_object.get - - def when_object(*obs): - """Decorator to add a method to be called for the given object(s)""" - def decorate(f): - for o in obs: - if _by_object.setdefault(id(o), (o, f))[1] is not f: - raise TypeError( - "%r already has method for object %r" % (func, o) - ) - return f - return decorate - - def dispatch(*args, **kw): - f = _gbo(id(args[argpos]), _sentinel) - if f is _sentinel: - for t in type(args[argpos]).__mro__: - f = _gbt(t, _sentinel) - if f is not _sentinel: - return f(*args, **kw) - else: - return func(*args, **kw) - else: - return f[1](*args, **kw) - - dispatch.__name__ = func.__name__ - dispatch.__dict__ = func.__dict__.copy() - dispatch.__doc__ = func.__doc__ - dispatch.__module__ = func.__module__ - - dispatch.when_type = when_type - dispatch.when_object = when_object - dispatch.default = func - dispatch.has_object = lambda o: id(o) in _by_object - dispatch.has_type = lambda t: t in _by_type - dispatch.argpos = argpos - return dispatch - - -def test_suite(): - import doctest - return doctest.DocFileSuite( - 'README.txt', - optionflags=doctest.ELLIPSIS | doctest.REPORT_ONLY_FIRST_FAILURE, - ) - - -if __name__ == '__main__': - import unittest - r = unittest.TextTestRunner() - r.run(test_suite()) diff --git a/wsmeext/soap/wsdl.py b/wsmeext/soap/wsdl.py deleted file mode 100644 index b60aff4..0000000 --- a/wsmeext/soap/wsdl.py +++ /dev/null @@ -1,297 +0,0 @@ -import six -import wsme.types - -try: - from lxml import etree as ET - use_lxml = True -except ImportError: - from xml.etree import cElementTree as ET # noqa - use_lxml = False - - -def xml_tostring(el, pretty_print=False): - if use_lxml: - return ET.tostring(el, pretty_print=pretty_print) - return ET.tostring(el) - - -class NS(object): - def __init__(self, url): - self.url = url - - def __call__(self, name): - return self.qn(name) - - def __str__(self): - return self.url - - def qn(self, name): - return '{%s}%s' % (self.url, name) - - -wsdl_ns = NS("http://schemas.xmlsoap.org/wsdl/") -soap_ns = NS("http://schemas.xmlsoap.org/wsdl/soap/") -xs_ns = NS("http://www.w3.org/2001/XMLSchema") -soapenc_ns = NS("http://schemas.xmlsoap.org/soap/encoding/") - - -class WSDLGenerator(object): - def __init__( - self, - tns, - types_ns, - soapenc, - service_name, - complex_types, - funclist, - arrays, - baseURL, - soap_array, - soap_type, - soap_fname): - - self.tns = NS(tns) - self.types_ns = NS(types_ns) - self.soapenc = soapenc - self.service_name = service_name - self.complex_types = complex_types - self.funclist = funclist - self.arrays = arrays - self.baseURL = baseURL or '' - self.soap_array = soap_array - self.soap_fname = soap_fname - self.soap_type = soap_type - - def gen_complex_type(self, cls): - complexType = ET.Element(xs_ns('complexType')) - complexType.set('name', cls.__name__) - sequence = ET.SubElement(complexType, xs_ns('sequence')) - for attrdef in wsme.types.list_attributes(cls): - soap_type = self.soap_type(attrdef.datatype, str(self.types_ns)) - if soap_type is None: - continue - element = ET.SubElement(sequence, xs_ns('element')) - element.set('name', attrdef.name) - element.set('type', soap_type) - element.set('minOccurs', '1' if attrdef.mandatory else '0') - element.set('maxOccurs', '1') - return complexType - - def gen_array(self, array): - complexType = ET.Element(xs_ns('complexType')) - complexType.set('name', self.soap_array(array, False)) - ET.SubElement( - ET.SubElement(complexType, xs_ns('sequence')), - xs_ns('element'), - name='item', - maxOccurs='unbounded', - nillable='true', - type=self.soap_type(array.item_type, self.types_ns) - ) - return complexType - - def gen_function_types(self, path, funcdef): - args_el = ET.Element( - xs_ns('element'), - name=self.soap_fname(path, funcdef) - ) - - sequence = ET.SubElement( - ET.SubElement(args_el, xs_ns('complexType')), - xs_ns('sequence') - ) - - for farg in funcdef.arguments: - t = self.soap_type(farg.datatype, True) - if t is None: - continue - element = ET.SubElement( - sequence, xs_ns('element'), - name=farg.name, - type=self.soap_type(farg.datatype, True) - ) - if not farg.mandatory: - element.set('minOccurs', '0') - - response_el = ET.Element( - xs_ns('element'), - name=self.soap_fname(path, funcdef) + 'Response' - ) - element = ET.SubElement( - ET.SubElement( - ET.SubElement( - response_el, - xs_ns('complexType') - ), - xs_ns('sequence') - ), - xs_ns('element'), - name='result' - ) - return_soap_type = self.soap_type(funcdef.return_type, True) - if return_soap_type is not None: - element.set('type', return_soap_type) - - return args_el, response_el - - def gen_types(self): - types = ET.Element(wsdl_ns('types')) - schema = ET.SubElement(types, xs_ns('schema')) - schema.set('elementFormDefault', 'qualified') - schema.set('targetNamespace', str(self.types_ns)) - for cls in self.complex_types: - schema.append(self.gen_complex_type(cls)) - for array in self.arrays: - schema.append(self.gen_array(array)) - for path, funcdef in self.funclist: - schema.extend(self.gen_function_types(path, funcdef)) - return types - - def gen_functions(self): - messages = [] - - binding = ET.Element( - wsdl_ns('binding'), - name='%s_Binding' % self.service_name, - type='tns:%s_PortType' % self.service_name - ) - ET.SubElement( - binding, - soap_ns('binding'), - style='document', - transport='http://schemas.xmlsoap.org/soap/http' - ) - - portType = ET.Element( - wsdl_ns('portType'), - name='%s_PortType' % self.service_name - ) - - for path, funcdef in self.funclist: - soap_fname = self.soap_fname(path, funcdef) - - # message - req_message = ET.Element( - wsdl_ns('message'), - name=soap_fname + 'Request', - xmlns=str(self.types_ns) - ) - ET.SubElement( - req_message, - wsdl_ns('part'), - name='parameters', - element='types:%s' % soap_fname - ) - messages.append(req_message) - - res_message = ET.Element( - wsdl_ns('message'), - name=soap_fname + 'Response', - xmlns=str(self.types_ns) - ) - ET.SubElement( - res_message, - wsdl_ns('part'), - name='parameters', - element='types:%sResponse' % soap_fname - ) - messages.append(res_message) - - # portType/operation - operation = ET.SubElement( - portType, - wsdl_ns('operation'), - name=soap_fname - ) - if funcdef.doc: - ET.SubElement( - operation, - wsdl_ns('documentation') - ).text = funcdef.doc - ET.SubElement( - operation, wsdl_ns('input'), - message='tns:%sRequest' % soap_fname - ) - ET.SubElement( - operation, wsdl_ns('output'), - message='tns:%sResponse' % soap_fname - ) - - # binding/operation - operation = ET.SubElement( - binding, - wsdl_ns('operation'), - name=soap_fname - ) - ET.SubElement( - operation, - soap_ns('operation'), - soapAction=soap_fname - ) - ET.SubElement( - ET.SubElement( - operation, - wsdl_ns('input') - ), - soap_ns('body'), - use='literal' - ) - ET.SubElement( - ET.SubElement( - operation, - wsdl_ns('output') - ), - soap_ns('body'), - use='literal' - ) - - return messages + [portType, binding] - - def gen_service(self): - service = ET.Element(wsdl_ns('service'), name=self.service_name) - ET.SubElement( - service, - wsdl_ns('documentation') - ).text = six.u('WSDL File for %s') % self.service_name - ET.SubElement( - ET.SubElement( - service, - wsdl_ns('port'), - binding='tns:%s_Binding' % self.service_name, - name='%s_PortType' % self.service_name - ), - soap_ns('address'), - location=self.baseURL - ) - - return service - - def gen_definitions(self): - attrib = { - 'name': self.service_name, - 'targetNamespace': str(self.tns) - } - if use_lxml: - definitions = ET.Element( - wsdl_ns('definitions'), - attrib=attrib, - nsmap={ - 'xs': str(xs_ns), - 'soap': str(soap_ns), - 'types': str(self.types_ns), - 'tns': str(self.tns) - } - ) - else: - definitions = ET.Element(wsdl_ns('definitions'), **attrib) - definitions.set('xmlns:types', str(self.types_ns)) - definitions.set('xmlns:tns', str(self.tns)) - - definitions.set('name', self.service_name) - definitions.append(self.gen_types()) - definitions.extend(self.gen_functions()) - definitions.append(self.gen_service()) - return definitions - - def generate(self, format=False): - return xml_tostring(self.gen_definitions(), pretty_print=format) |