diff options
author | Will Thompson <will.thompson@collabora.co.uk> | 2010-11-19 14:20:41 +0000 |
---|---|---|
committer | Will Thompson <will.thompson@collabora.co.uk> | 2010-11-19 14:20:41 +0000 |
commit | 0c86c32e29492ada71ff96c8808287978ed2d226 (patch) | |
tree | f040239c5056012b9ff0e0d2fbf8194c933251ea /tests | |
parent | e03f7e94e1bab1227462571e18bd026f16f69063 (diff) | |
download | telepathy-haze-0c86c32e29492ada71ff96c8808287978ed2d226.tar.gz |
Update hazetest from gabbletest!
Woo. I had to fix a few issues in gabbletest to make this work, which I
am submitting upstream.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/twisted/hazetest.py | 596 | ||||
-rw-r--r-- | tests/twisted/ns.py | 1 |
2 files changed, 448 insertions, 149 deletions
diff --git a/tests/twisted/hazetest.py b/tests/twisted/hazetest.py index 90cbb32..c179309 100644 --- a/tests/twisted/hazetest.py +++ b/tests/twisted/hazetest.py @@ -1,35 +1,43 @@ """ Infrastructure code for testing Haze by pretending to be a Jabber server. + +This is based on gabbletest.py in telepathy-gabble. Haze-specific hacks should +be marked with an 'XXX Haze' comment. This offends me too, but I don't have +time to do anything better. """ import base64 import os import hashlib import sys -import time import random +import re +import traceback import ns +import constants as cs import servicetest +from servicetest import ( + assertEquals, assertLength, assertContains, wrap_channel, + EventPattern, call_async, unwrap, Event) import twisted -from servicetest import Event, unwrap from twisted.words.xish import domish, xpath from twisted.words.protocols.jabber.client import IQ from twisted.words.protocols.jabber import xmlstream -from twisted.internet import reactor +from twisted.internet import reactor, ssl import dbus -NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl' -NS_XMPP_BIND = 'urn:ietf:params:xml:ns:xmpp-bind' - -def make_result_iq(stream, iq): +def make_result_iq(stream, iq, add_query_node=True): result = IQ(stream, "result") result["id"] = iq["id"] + to = iq.getAttribute('to') + if to is not None: + result["from"] = to query = iq.firstChildElement() - if query: + if query and add_query_node: result.addElement((query.uri, query.name)) return result @@ -37,24 +45,24 @@ def make_result_iq(stream, iq): def acknowledge_iq(stream, iq): stream.send(make_result_iq(stream, iq)) -def send_error_reply(stream, iq): +def send_error_reply(stream, iq, error_stanza=None): result = IQ(stream, "error") result["id"] = iq["id"] query = iq.firstChildElement() + to = iq.getAttribute('to') + if to is not None: + result["from"] = to if query: result.addElement((query.uri, query.name)) + if error_stanza: + result.addChild(error_stanza) + stream.send(result) def request_muc_handle(q, conn, stream, muc_jid): servicetest.call_async(q, conn, 'RequestHandles', 2, [muc_jid]) - host = muc_jid.split('@')[1] - event = q.expect('stream-iq', to=host, query_ns=ns.DISCO_INFO) - result = make_result_iq(stream, event.stanza) - feature = result.firstChildElement().addElement('feature') - feature['var'] = ns.MUC - stream.send(result) event = q.expect('dbus-return', method='RequestHandles') return event.value[0][0] @@ -70,7 +78,7 @@ def make_muc_presence(affiliation, role, muc_jid, alias, jid=None): return presence def sync_stream(q, stream): - """Used to ensure that the CM has processed all stanzas sent to it.""" + """Used to ensure that Gabble has processed all stanzas sent to it.""" iq = IQ(stream, "get") id = iq['id'] @@ -80,14 +88,18 @@ def sync_stream(q, stream): predicate=(lambda event: event.stanza['id'] == id and event.iq_type == 'result')) -class JabberAuthenticator(xmlstream.Authenticator): - "Trivial XML stream authenticator that accepts one username/digest pair." - - def __init__(self, username, password): +class GabbleAuthenticator(xmlstream.Authenticator): + def __init__(self, username, password, resource=None): self.username = username self.password = password + self.resource = resource + self.bare_jid = None + self.full_jid = None xmlstream.Authenticator.__init__(self) +class JabberAuthenticator(GabbleAuthenticator): + "Trivial XML stream authenticator that accepts one username/digest pair." + # Patch in fix from http://twistedmatrix.com/trac/changeset/23418. # This monkeypatch taken from Gadget source code from twisted.words.xish.utility import EventDispatcher @@ -133,72 +145,100 @@ class JabberAuthenticator(xmlstream.Authenticator): assert map(str, digest) == [expect] resource = xpath.queryForNodes('/iq/query/resource', iq) - assert map(str, resource) == ['Resource'] + assertLength(1, resource) + if self.resource is not None: + assertEquals(self.resource, str(resource[0])) + + self.bare_jid = '%s@localhost' % self.username + self.full_jid = '%s/%s' % (self.bare_jid, resource) result = IQ(self.xmlstream, "result") result["id"] = iq["id"] self.xmlstream.send(result) self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT) - -class XmppAuthenticator(xmlstream.Authenticator): - def __init__(self, username, password): - xmlstream.Authenticator.__init__(self) - self.username = username - self.password = password +class XmppAuthenticator(GabbleAuthenticator): + def __init__(self, username, password, resource=None): + GabbleAuthenticator.__init__(self, username, password, resource) self.authenticated = False - def streamStarted(self, root=None): + def streamInitialize(self, root): if root: self.xmlstream.sid = root.getAttribute('id') + if self.xmlstream.sid is None: + self.xmlstream.sid = '%x' % random.randint(1, sys.maxint) + self.xmlstream.sendHeader() - if self.authenticated: - # Initiator authenticated itself, and has started a new stream. + def streamIQ(self): + features = elem(xmlstream.NS_STREAMS, 'features')( + elem(ns.NS_XMPP_BIND, 'bind'), + elem(ns.NS_XMPP_SESSION, 'session'), + ) + self.xmlstream.send(features) + + self.xmlstream.addOnetimeObserver( + "/iq/bind[@xmlns='%s']" % ns.NS_XMPP_BIND, self.bindIq) + self.xmlstream.addOnetimeObserver( + "/iq/session[@xmlns='%s']" % ns.NS_XMPP_SESSION, self.sessionIq) - features = domish.Element((xmlstream.NS_STREAMS, 'features')) - bind = features.addElement((NS_XMPP_BIND, 'bind')) - self.xmlstream.send(features) + def streamSASL(self): + features = domish.Element((xmlstream.NS_STREAMS, 'features')) + mechanisms = features.addElement((ns.NS_XMPP_SASL, 'mechanisms')) + mechanism = mechanisms.addElement('mechanism', content='PLAIN') + self.xmlstream.send(features) - self.xmlstream.addOnetimeObserver( - "/iq/bind[@xmlns='%s']" % NS_XMPP_BIND, self.bindIq) - else: - features = domish.Element((xmlstream.NS_STREAMS, 'features')) - mechanisms = features.addElement((NS_XMPP_SASL, 'mechanisms')) - mechanism = mechanisms.addElement('mechanism', content='PLAIN') - self.xmlstream.send(features) + self.xmlstream.addOnetimeObserver("/auth", self.auth) - self.xmlstream.addOnetimeObserver("/auth", self.auth) + def streamStarted(self, root=None): + self.streamInitialize(root) + + if self.authenticated: + # Initiator authenticated itself, and has started a new stream. + self.streamIQ() + else: + self.streamSASL() def auth(self, auth): assert (base64.b64decode(str(auth)) == '\x00%s\x00%s' % (self.username, self.password)) - success = domish.Element((NS_XMPP_SASL, 'success')) + success = domish.Element((ns.NS_XMPP_SASL, 'success')) self.xmlstream.send(success) self.xmlstream.reset() self.authenticated = True def bindIq(self, iq): - assert xpath.queryForString('/iq/bind/resource', iq) == 'Resource' + resource = xpath.queryForString('/iq/bind/resource', iq) + if self.resource is not None: + assertEquals(self.resource, resource) + else: + assert resource is not None result = IQ(self.xmlstream, "result") result["id"] = iq["id"] - bind = result.addElement((NS_XMPP_BIND, 'bind')) - jid = bind.addElement('jid', content='test@localhost/Resource') + bind = result.addElement((ns.NS_XMPP_BIND, 'bind')) + self.bare_jid = '%s@localhost' % self.username + self.full_jid = '%s/%s' % (self.bare_jid, resource) + jid = bind.addElement('jid', content=self.full_jid) self.xmlstream.send(result) self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT) -def make_stream_event(type, stanza): + def sessionIq(self, iq): + self.xmlstream.send(make_result_iq(self.xmlstream, iq)) + +def make_stream_event(type, stanza, stream): event = servicetest.Event(type, stanza=stanza) + event.stream = stream event.to = stanza.getAttribute("to") return event -def make_iq_event(iq): - event = make_stream_event('stream-iq', iq) +def make_iq_event(stream, iq): + event = make_stream_event('stream-iq', iq, stream) event.iq_type = iq.getAttribute("type") + event.iq_id = iq.getAttribute("id") query = iq.firstChildElement() if query: @@ -208,56 +248,191 @@ def make_iq_event(iq): if query.getAttribute("node"): event.query_node = query.getAttribute("node") + else: + event.query = None return event -def make_presence_event(stanza): - event = make_stream_event('stream-presence', stanza) +def make_presence_event(stream, stanza): + event = make_stream_event('stream-presence', stanza, stream) event.presence_type = stanza.getAttribute('type') + + statuses = xpath.queryForNodes('/presence/status', stanza) + + if statuses: + event.presence_status = str(statuses[0]) + return event -def make_message_event(stanza): - event = make_stream_event('stream-message', stanza) +def make_message_event(stream, stanza): + event = make_stream_event('stream-message', stanza, stream) event.message_type = stanza.getAttribute('type') return event +class StreamFactory(twisted.internet.protocol.Factory): + def __init__(self, streams, jids): + self.streams = streams + self.jids = jids + self.presences = {} + self.mappings = dict(map (lambda jid, stream: (jid, stream), + jids, streams)) + + # Make a copy of the streams + self.factory_streams = list(streams) + self.factory_streams.reverse() + + # Do not add observers for single instances because it's unnecessary and + # some unit tests need to respond to the roster request, and we shouldn't + # answer it for them otherwise we break compatibility + if len(streams) > 1: + # We need to have a function here because lambda keeps a reference on + # the stream and jid and in the for loop, there is no context + def addObservers(stream, jid): + stream.addObserver('/iq', lambda x: \ + self.forward_iq(stream, jid, x)) + stream.addObserver('/presence', lambda x: \ + self.got_presence(stream, jid, x)) + + for (jid, stream) in self.mappings.items(): + addObservers(stream, jid) + + def protocol(self, *args): + return self.factory_streams.pop() + + + def got_presence (self, stream, jid, stanza): + stanza.attributes['from'] = jid + self.presences[jid] = stanza + + for dest_jid in self.presences.keys(): + # Dispatch the new presence to other clients + stanza.attributes['to'] = dest_jid + self.mappings[dest_jid].send(stanza) + + # Don't echo the presence twice + if dest_jid != jid: + # Dispatch other client's presence to this stream + presence = self.presences[dest_jid] + presence.attributes['to'] = jid + stream.send(presence) + + def lost_presence(self, stream, jid): + if self.presences.has_key(jid): + del self.presences[jid] + for dest_jid in self.presences.keys(): + presence = domish.Element(('jabber:client', 'presence')) + presence['from'] = jid + presence['to'] = dest_jid + presence['type'] = 'unavailable' + self.mappings[dest_jid].send(presence) + + def forward_iq(self, stream, jid, stanza): + stanza.attributes['from'] = jid + + query = stanza.firstChildElement() + + # Fake other accounts as being part of our roster + if query and query.uri == ns.ROSTER: + roster = make_result_iq(stream, stanza) + query = roster.firstChildElement() + for roster_jid in self.mappings.keys(): + if jid != roster_jid: + item = query.addElement('item') + item['jid'] = roster_jid + item['subscription'] = 'both' + stream.send(roster) + return + + to = stanza.getAttribute('to') + dest = None + if to is not None: + dest = self.mappings.get(to) + + if dest is not None: + dest.send(stanza) + class BaseXmlStream(xmlstream.XmlStream): initiating = False namespace = 'jabber:client' + pep_support = True + disco_features = [] + handle_privacy_lists = True def __init__(self, event_func, authenticator): xmlstream.XmlStream.__init__(self, authenticator) self.event_func = event_func self.addObserver('//iq', lambda x: event_func( - make_iq_event(x))) + make_iq_event(self, x))) self.addObserver('//message', lambda x: event_func( - make_message_event(x))) + make_message_event(self, x))) self.addObserver('//presence', lambda x: event_func( - make_presence_event(x))) + make_presence_event(self, x))) self.addObserver('//event/stream/authd', self._cb_authd) + if self.handle_privacy_lists: + self.addObserver("/iq/query[@xmlns='%s']" % ns.PRIVACY, + self._cb_priv_list) + + def _cb_priv_list(self, iq): + send_error_reply(self, iq) def _cb_authd(self, _): # called when stream is authenticated + assert self.authenticator.full_jid is not None + assert self.authenticator.bare_jid is not None + self.addObserver( - "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']", + "/iq[@to='localhost']/query[@xmlns='http://jabber.org/protocol/disco#info']", self._cb_disco_iq) + self.addObserver( + "/iq[@to='%s']/query[@xmlns='http://jabber.org/protocol/disco#info']" + % self.authenticator.bare_jid, + self._cb_bare_jid_disco_iq) + # XXX Haze self.add_roster_observer() self.event_func(servicetest.Event('stream-authenticated')) def _cb_disco_iq(self, iq): - if iq.getAttribute('to') == 'localhost': - # add PEP support - nodes = xpath.queryForNodes( - "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']", - iq) - query = nodes[0] - identity = query.addElement('identity') - identity['category'] = 'pubsub' - identity['type'] = 'pep' - - iq['type'] = 'result' - self.send(iq) + nodes = xpath.queryForNodes( + "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']", iq) + query = nodes[0] + + for feature in self.disco_features: + query.addChild(elem('feature', var=feature)) + + iq['type'] = 'result' + iq['from'] = iq['to'] + self.send(iq) + def _cb_bare_jid_disco_iq(self, iq): + # advertise PEP support + nodes = xpath.queryForNodes( + "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']", + iq) + query = nodes[0] + identity = query.addElement('identity') + identity['category'] = 'pubsub' + identity['type'] = 'pep' + + iq['type'] = 'result' + iq['from'] = iq['to'] + self.send(iq) + + def onDocumentEnd(self): + self.event_func(servicetest.Event('stream-closed')) + # We don't chain up XmlStream.onDocumentEnd() because it will + # disconnect the TCP connection making tests as + # connect/disconnect-timeout.py not working + + def send_stream_error(self, error='system-shutdown'): + # Yes, there are meant to be two different STREAMS namespaces. + go_away = \ + elem(xmlstream.NS_STREAMS, 'error')( + elem(ns.STREAMS, error) + ) + + self.send(go_away) + + # XXX Haze: the next two methods are Haze-specific. def add_roster_observer(self): self.addObserver( "/iq/query[@xmlns='jabber:iq:roster']", @@ -275,56 +450,145 @@ class JabberXmlStream(BaseXmlStream): class XmppXmlStream(BaseXmlStream): version = (1, 0) -def make_connection(bus, event_func, params=None): +class GoogleXmlStream(BaseXmlStream): + version = (1, 0) + + pep_support = False + disco_features = [ns.GOOGLE_ROSTER, + ns.GOOGLE_JINGLE_INFO, + ns.GOOGLE_MAIL_NOTIFY, + ] + + def _cb_bare_jid_disco_iq(self, iq): + # Google talk doesn't support PEP :( + iq['type'] = 'result' + iq['from'] = iq['to'] + self.send(iq) + + +def make_connection(bus, event_func, params=None, suffix=''): + # Gabble accepts a resource in 'account', but the value of 'resource' + # overrides it if there is one. + # XXX Haze doesn't. + # account = 'test%s@localhost/%s' % (suffix, re.sub(r'.*/', '', sys.argv[0])) + account = 'test%s@localhost/Resource' % (suffix, ) + default_params = { - 'account': 'test@localhost/Resource', + 'account': account, 'password': 'pass', - # FIXME: fd.o#14212 + # XXX Haze: fd.o#14212 #'resource': 'Resource', 'server': 'localhost', 'port': dbus.UInt32(4242), + # XXX Haze 'require-encryption': False, + 'auth-plain-in-clear': True, } if params: default_params.update(params) - return servicetest.make_connection(bus, event_func, 'haze', 'jabber', - default_params) + # Allow omitting the 'password' param + if default_params['password'] is None: + del default_params['password'] -def make_stream(event_func, authenticator=None, protocol=None, port=4242): - # set up Jabber server + # Allow omitting the 'account' param + if default_params['account'] is None: + del default_params['account'] + jid = default_params.get('account', None) + # XXX Haze + conn = servicetest.make_connection(bus, event_func, 'haze', 'jabber', + default_params) + return (conn, jid) + +def make_stream(event_func, authenticator=None, protocol=None, + resource=None, suffix=''): + # set up Jabber server if authenticator is None: - authenticator = JabberAuthenticator('test', 'pass') + authenticator = XmppAuthenticator('test%s' % suffix, 'pass', resource=resource) if protocol is None: - protocol = JabberXmlStream + protocol = XmppXmlStream stream = protocol(event_func, authenticator) - factory = twisted.internet.protocol.Factory() - factory.protocol = lambda *args: stream - port = reactor.listenTCP(port, factory) - return (stream, port) + return stream + +def disconnect_conn(q, conn, stream, expected_before=[], expected_after=[]): + call_async(q, conn, 'Disconnect') + + tmp = expected_before + [ + EventPattern('dbus-signal', signal='StatusChanged', args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED]), + EventPattern('stream-closed')] + + before_events = q.expect_many(*tmp) -def exec_test_deferred (funs, params, protocol=None, timeout=None): + stream.sendFooter() + + tmp = expected_after + [EventPattern('dbus-return', method='Disconnect')] + after_events = q.expect_many(*tmp) + + return before_events[:-2], after_events[:-1] + +def exec_test_deferred(fun, params, protocol=None, timeout=None, + authenticator=None, num_instances=1): # hack to ease debugging domish.Element.__repr__ = domish.Element.toXml colourer = None - if sys.stdout.isatty(): + if sys.stdout.isatty() or 'CHECK_FORCE_COLOR' in os.environ: colourer = servicetest.install_colourer() + bus = dbus.SessionBus() + queue = servicetest.IteratingEventQueue(timeout) queue.verbose = ( os.environ.get('CHECK_TWISTED_VERBOSE', '') != '' or '-v' in sys.argv) - bus = dbus.SessionBus() - # conn = make_connection(bus, queue.append, params) - (stream, port) = make_stream(queue.append, protocol=protocol) + conns = [] + jids = [] + streams = [] + resource = params.get('resource') if params is not None else None + for i in range(0, num_instances): + if i == 0: + suffix = '' + else: + suffix = str(i) + + try: + (conn, jid) = make_connection(bus, queue.append, params, suffix) + except Exception, e: + # Crap. This is normally because the connection's still kicking + # around on the bus. Let's bin any connections we *did* manage to + # get going and then bail out unceremoniously. + print e + + for conn in conns: + conn.Disconnect() + + os._exit(1) + + conns.append(conn) + jids.append(jid) + streams.append(make_stream(queue.append, protocol=protocol, + authenticator=authenticator, + resource=resource, suffix=suffix)) + + factory = StreamFactory(streams, jids) + port = reactor.listenTCP(4242, factory) def signal_receiver(*args, **kw): + if kw['path'] == '/org/freedesktop/DBus' and \ + kw['member'] == 'NameOwnerChanged': + bus_name, old_name, new_name = args + if new_name == '': + for i, conn in enumerate(conns): + stream = streams[i] + jid = jids[i] + if conn._requested_bus_name == bus_name: + factory.lost_presence(stream, jid) + break queue.append(Event('dbus-signal', path=unwrap(kw['path']), signal=kw['member'], args=map(unwrap, args), @@ -344,82 +608,87 @@ def exec_test_deferred (funs, params, protocol=None, timeout=None): error = None try: - for f in funs: - conn = make_connection(bus, queue.append, params) - f(queue, bus, conn, stream) + if len(conns) == 1: + fun(queue, bus, conns[0], streams[0]) + else: + fun(queue, bus, conns, streams) except Exception, e: - import traceback traceback.print_exc() error = e - try: - if colourer: - sys.stdout = colourer.fh - d = port.stopListening() - if error is None: - d.addBoth((lambda *args: reactor.crash())) - else: - # please ignore the POSIX behind the curtain - d.addBoth((lambda *args: os._exit(1))) - - conn.Disconnect() - - except dbus.DBusException, e: - pass + if colourer: + sys.stdout = colourer.fh + + d = port.stopListening() + + # Does the Connection object still exist? + for i, conn in enumerate(conns): + if not bus.name_has_owner(conn.object.bus_name): + # Connection has already been disconnected and destroyed + continue + try: + if conn.GetStatus() == cs.CONN_STATUS_CONNECTED: + # Connection is connected, properly disconnect it + disconnect_conn(queue, conn, streams[i]) + else: + # Connection is not connected, call Disconnect() to destroy it + conn.Disconnect() + except dbus.DBusException, e: + pass + + try: + conn.Disconnect() + raise AssertionError("Connection didn't disappear; " + "all subsequent tests will probably fail") + except dbus.DBusException, e: + pass + except Exception, e: + traceback.print_exc() + error = e + + if error is None: + d.addBoth((lambda *args: reactor.crash())) + else: + # please ignore the POSIX behind the curtain + d.addBoth((lambda *args: os._exit(1))) -def exec_tests(funs, params=None, protocol=None, timeout=None): - reactor.callWhenRunning (exec_test_deferred, funs, params, protocol, timeout) - reactor.run() -def exec_test(fun, params=None, protocol=None, timeout=None): - exec_tests([fun], params, protocol, timeout) +def exec_test(fun, params=None, protocol=None, timeout=None, + authenticator=None, num_instances=1): + reactor.callWhenRunning( + exec_test_deferred, fun, params, protocol, timeout, authenticator, num_instances) + reactor.run() # Useful routines for server-side vCard handling current_vcard = domish.Element(('vcard-temp', 'vCard')) -def handle_get_vcard(event, data): - iq = event.stanza - - if iq['type'] != 'get': - return False - - if iq.uri != 'jabber:client': - return False +def expect_and_handle_get_vcard(q, stream): + get_vcard_event = q.expect('stream-iq', query_ns=ns.VCARD_TEMP, + query_name='vCard', iq_type='get') - vcard = list(iq.elements())[0] - - if vcard.name != 'vCard': - return False + iq = get_vcard_event.stanza + vcard = iq.firstChildElement() + assert vcard.name == 'vCard', vcard.toXml() # Send back current vCard - new_iq = IQ(data['stream'], 'result') - new_iq['id'] = iq['id'] - new_iq.addChild(current_vcard) - data['stream'].send(new_iq) - return True - -def handle_set_vcard(event, data): - global current_vcard - iq = event.stanza - - if iq['type'] != 'set': - return False - - if iq.uri != 'jabber:client': - return False - - vcard = list(iq.elements())[0] + result = make_result_iq(stream, iq) + result.addChild(current_vcard) + stream.send(result) - if vcard.name != 'vCard': - return False +def expect_and_handle_set_vcard(q, stream, check=None): + set_vcard_event = q.expect('stream-iq', query_ns=ns.VCARD_TEMP, + query_name='vCard', iq_type='set') + iq = set_vcard_event.stanza + vcard = iq.firstChildElement() + assert vcard.name == 'vCard', vcard.toXml() - current_vcard = iq.firstChildElement() + if check is not None: + check(vcard) - new_iq = IQ(data['stream'], 'result') - new_iq['id'] = iq['id'] - data['stream'].send(new_iq) - return True + # Update current vCard + current_vcard = vcard + stream.send(make_result_iq(stream, iq)) def _elem_add(elem, *children): for child in children: @@ -428,9 +697,10 @@ def _elem_add(elem, *children): elif isinstance(child, unicode): elem.addContent(child) else: - raise ValueError('invalid child object %r', child) + raise ValueError( + 'invalid child object %r (must be element or unicode)', child) -def elem(a, b=None, **kw): +def elem(a, b=None, attrs={}, **kw): r""" >>> elem('foo')().toXml() u'<foo/>' @@ -441,6 +711,10 @@ def elem(a, b=None, **kw): >>> elem('foo', x='1')(u'hello', ... elem('http://foo.org', 'bar', y='2')(u'bye')).toXml() u"<foo x='1'>hello<bar xmlns='http://foo.org' y='2'>bye</bar></foo>" + >>> elem('foo', attrs={'xmlns:bar': 'urn:bar', 'bar:cake': 'yum'})( + ... elem('bar:e')(u'i') + ... ).toXml() + u"<foo xmlns:bar='urn:bar' bar:cake='yum'><bar:e>i</bar:e></foo>" """ class _elem(domish.Element): @@ -453,7 +727,22 @@ def elem(a, b=None, **kw): else: elem = _elem((None, a)) - for k, v in kw.iteritems(): + # Can't just update kw into attrs, because that *modifies the parameter's + # default*. Thanks python. + allattrs = {} + allattrs.update(kw) + allattrs.update(attrs) + + # First, let's pull namespaces out + realattrs = {} + for k, v in allattrs.iteritems(): + if k.startswith('xmlns:'): + abbr = k[len('xmlns:'):] + elem.localPrefixes[abbr] = v + else: + realattrs[k] = v + + for k, v in realattrs.iteritems(): if k == 'from_': elem['from'] = v else: @@ -477,7 +766,8 @@ def elem_iq(server, type, **kw): return iq -def make_presence(_from, to='test@localhost', type=None, status=None, caps=None): +def make_presence(_from, to='test@localhost', type=None, show=None, + status=None, caps=None, photo=None): presence = domish.Element((None, 'presence')) presence['from'] = _from presence['to'] = to @@ -485,6 +775,9 @@ def make_presence(_from, to='test@localhost', type=None, status=None, caps=None) if type is not None: presence['type'] = type + if show is not None: + presence.addElement('show', content=show) + if status is not None: presence.addElement('status', content=status) @@ -493,4 +786,9 @@ def make_presence(_from, to='test@localhost', type=None, status=None, caps=None) for key,value in caps.items(): cel[key] = value + # <x xmlns="vcard-temp:x:update"><photo>4a1...</photo></x> + if photo is not None: + x = presence.addElement((ns.VCARD_TEMP_UPDATE, 'x')) + x.addElement('photo').addContent(photo) + return presence diff --git a/tests/twisted/ns.py b/tests/twisted/ns.py index a483b27..758d00b 100644 --- a/tests/twisted/ns.py +++ b/tests/twisted/ns.py @@ -40,6 +40,7 @@ NICK = "http://jabber.org/protocol/nick" NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl' NS_XMPP_BIND = 'urn:ietf:params:xml:ns:xmpp-bind' NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls' +NS_XMPP_SESSION = 'urn:ietf:params:xml:ns:xmpp-session' OLPC_ACTIVITIES = "http://laptop.org/xmpp/activities" OLPC_ACTIVITIES_NOTIFY = "%s+notify" % OLPC_ACTIVITIES OLPC_ACTIVITY = "http://laptop.org/xmpp/activity" |