summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorWill Thompson <will.thompson@collabora.co.uk>2010-11-19 14:20:41 +0000
committerWill Thompson <will.thompson@collabora.co.uk>2010-11-19 14:20:41 +0000
commit0c86c32e29492ada71ff96c8808287978ed2d226 (patch)
treef040239c5056012b9ff0e0d2fbf8194c933251ea /tests
parente03f7e94e1bab1227462571e18bd026f16f69063 (diff)
downloadtelepathy-haze-0c86c32e29492ada71ff96c8808287978ed2d226.tar.gz
Update hazetest from gabbletest!
Woo. I had to fix a few issues in gabbletest to make this work, which I am submitting upstream.
Diffstat (limited to 'tests')
-rw-r--r--tests/twisted/hazetest.py596
-rw-r--r--tests/twisted/ns.py1
2 files changed, 448 insertions, 149 deletions
diff --git a/tests/twisted/hazetest.py b/tests/twisted/hazetest.py
index 90cbb32..c179309 100644
--- a/tests/twisted/hazetest.py
+++ b/tests/twisted/hazetest.py
@@ -1,35 +1,43 @@
"""
Infrastructure code for testing Haze by pretending to be a Jabber server.
+
+This is based on gabbletest.py in telepathy-gabble. Haze-specific hacks should
+be marked with an 'XXX Haze' comment. This offends me too, but I don't have
+time to do anything better.
"""
import base64
import os
import hashlib
import sys
-import time
import random
+import re
+import traceback
import ns
+import constants as cs
import servicetest
+from servicetest import (
+ assertEquals, assertLength, assertContains, wrap_channel,
+ EventPattern, call_async, unwrap, Event)
import twisted
-from servicetest import Event, unwrap
from twisted.words.xish import domish, xpath
from twisted.words.protocols.jabber.client import IQ
from twisted.words.protocols.jabber import xmlstream
-from twisted.internet import reactor
+from twisted.internet import reactor, ssl
import dbus
-NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
-NS_XMPP_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
-
-def make_result_iq(stream, iq):
+def make_result_iq(stream, iq, add_query_node=True):
result = IQ(stream, "result")
result["id"] = iq["id"]
+ to = iq.getAttribute('to')
+ if to is not None:
+ result["from"] = to
query = iq.firstChildElement()
- if query:
+ if query and add_query_node:
result.addElement((query.uri, query.name))
return result
@@ -37,24 +45,24 @@ def make_result_iq(stream, iq):
def acknowledge_iq(stream, iq):
stream.send(make_result_iq(stream, iq))
-def send_error_reply(stream, iq):
+def send_error_reply(stream, iq, error_stanza=None):
result = IQ(stream, "error")
result["id"] = iq["id"]
query = iq.firstChildElement()
+ to = iq.getAttribute('to')
+ if to is not None:
+ result["from"] = to
if query:
result.addElement((query.uri, query.name))
+ if error_stanza:
+ result.addChild(error_stanza)
+
stream.send(result)
def request_muc_handle(q, conn, stream, muc_jid):
servicetest.call_async(q, conn, 'RequestHandles', 2, [muc_jid])
- host = muc_jid.split('@')[1]
- event = q.expect('stream-iq', to=host, query_ns=ns.DISCO_INFO)
- result = make_result_iq(stream, event.stanza)
- feature = result.firstChildElement().addElement('feature')
- feature['var'] = ns.MUC
- stream.send(result)
event = q.expect('dbus-return', method='RequestHandles')
return event.value[0][0]
@@ -70,7 +78,7 @@ def make_muc_presence(affiliation, role, muc_jid, alias, jid=None):
return presence
def sync_stream(q, stream):
- """Used to ensure that the CM has processed all stanzas sent to it."""
+ """Used to ensure that Gabble has processed all stanzas sent to it."""
iq = IQ(stream, "get")
id = iq['id']
@@ -80,14 +88,18 @@ def sync_stream(q, stream):
predicate=(lambda event:
event.stanza['id'] == id and event.iq_type == 'result'))
-class JabberAuthenticator(xmlstream.Authenticator):
- "Trivial XML stream authenticator that accepts one username/digest pair."
-
- def __init__(self, username, password):
+class GabbleAuthenticator(xmlstream.Authenticator):
+ def __init__(self, username, password, resource=None):
self.username = username
self.password = password
+ self.resource = resource
+ self.bare_jid = None
+ self.full_jid = None
xmlstream.Authenticator.__init__(self)
+class JabberAuthenticator(GabbleAuthenticator):
+ "Trivial XML stream authenticator that accepts one username/digest pair."
+
# Patch in fix from http://twistedmatrix.com/trac/changeset/23418.
# This monkeypatch taken from Gadget source code
from twisted.words.xish.utility import EventDispatcher
@@ -133,72 +145,100 @@ class JabberAuthenticator(xmlstream.Authenticator):
assert map(str, digest) == [expect]
resource = xpath.queryForNodes('/iq/query/resource', iq)
- assert map(str, resource) == ['Resource']
+ assertLength(1, resource)
+ if self.resource is not None:
+ assertEquals(self.resource, str(resource[0]))
+
+ self.bare_jid = '%s@localhost' % self.username
+ self.full_jid = '%s/%s' % (self.bare_jid, resource)
result = IQ(self.xmlstream, "result")
result["id"] = iq["id"]
self.xmlstream.send(result)
self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
-
-class XmppAuthenticator(xmlstream.Authenticator):
- def __init__(self, username, password):
- xmlstream.Authenticator.__init__(self)
- self.username = username
- self.password = password
+class XmppAuthenticator(GabbleAuthenticator):
+ def __init__(self, username, password, resource=None):
+ GabbleAuthenticator.__init__(self, username, password, resource)
self.authenticated = False
- def streamStarted(self, root=None):
+ def streamInitialize(self, root):
if root:
self.xmlstream.sid = root.getAttribute('id')
+ if self.xmlstream.sid is None:
+ self.xmlstream.sid = '%x' % random.randint(1, sys.maxint)
+
self.xmlstream.sendHeader()
- if self.authenticated:
- # Initiator authenticated itself, and has started a new stream.
+ def streamIQ(self):
+ features = elem(xmlstream.NS_STREAMS, 'features')(
+ elem(ns.NS_XMPP_BIND, 'bind'),
+ elem(ns.NS_XMPP_SESSION, 'session'),
+ )
+ self.xmlstream.send(features)
+
+ self.xmlstream.addOnetimeObserver(
+ "/iq/bind[@xmlns='%s']" % ns.NS_XMPP_BIND, self.bindIq)
+ self.xmlstream.addOnetimeObserver(
+ "/iq/session[@xmlns='%s']" % ns.NS_XMPP_SESSION, self.sessionIq)
- features = domish.Element((xmlstream.NS_STREAMS, 'features'))
- bind = features.addElement((NS_XMPP_BIND, 'bind'))
- self.xmlstream.send(features)
+ def streamSASL(self):
+ features = domish.Element((xmlstream.NS_STREAMS, 'features'))
+ mechanisms = features.addElement((ns.NS_XMPP_SASL, 'mechanisms'))
+ mechanism = mechanisms.addElement('mechanism', content='PLAIN')
+ self.xmlstream.send(features)
- self.xmlstream.addOnetimeObserver(
- "/iq/bind[@xmlns='%s']" % NS_XMPP_BIND, self.bindIq)
- else:
- features = domish.Element((xmlstream.NS_STREAMS, 'features'))
- mechanisms = features.addElement((NS_XMPP_SASL, 'mechanisms'))
- mechanism = mechanisms.addElement('mechanism', content='PLAIN')
- self.xmlstream.send(features)
+ self.xmlstream.addOnetimeObserver("/auth", self.auth)
- self.xmlstream.addOnetimeObserver("/auth", self.auth)
+ def streamStarted(self, root=None):
+ self.streamInitialize(root)
+
+ if self.authenticated:
+ # Initiator authenticated itself, and has started a new stream.
+ self.streamIQ()
+ else:
+ self.streamSASL()
def auth(self, auth):
assert (base64.b64decode(str(auth)) ==
'\x00%s\x00%s' % (self.username, self.password))
- success = domish.Element((NS_XMPP_SASL, 'success'))
+ success = domish.Element((ns.NS_XMPP_SASL, 'success'))
self.xmlstream.send(success)
self.xmlstream.reset()
self.authenticated = True
def bindIq(self, iq):
- assert xpath.queryForString('/iq/bind/resource', iq) == 'Resource'
+ resource = xpath.queryForString('/iq/bind/resource', iq)
+ if self.resource is not None:
+ assertEquals(self.resource, resource)
+ else:
+ assert resource is not None
result = IQ(self.xmlstream, "result")
result["id"] = iq["id"]
- bind = result.addElement((NS_XMPP_BIND, 'bind'))
- jid = bind.addElement('jid', content='test@localhost/Resource')
+ bind = result.addElement((ns.NS_XMPP_BIND, 'bind'))
+ self.bare_jid = '%s@localhost' % self.username
+ self.full_jid = '%s/%s' % (self.bare_jid, resource)
+ jid = bind.addElement('jid', content=self.full_jid)
self.xmlstream.send(result)
self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
-def make_stream_event(type, stanza):
+ def sessionIq(self, iq):
+ self.xmlstream.send(make_result_iq(self.xmlstream, iq))
+
+def make_stream_event(type, stanza, stream):
event = servicetest.Event(type, stanza=stanza)
+ event.stream = stream
event.to = stanza.getAttribute("to")
return event
-def make_iq_event(iq):
- event = make_stream_event('stream-iq', iq)
+def make_iq_event(stream, iq):
+ event = make_stream_event('stream-iq', iq, stream)
event.iq_type = iq.getAttribute("type")
+ event.iq_id = iq.getAttribute("id")
query = iq.firstChildElement()
if query:
@@ -208,56 +248,191 @@ def make_iq_event(iq):
if query.getAttribute("node"):
event.query_node = query.getAttribute("node")
+ else:
+ event.query = None
return event
-def make_presence_event(stanza):
- event = make_stream_event('stream-presence', stanza)
+def make_presence_event(stream, stanza):
+ event = make_stream_event('stream-presence', stanza, stream)
event.presence_type = stanza.getAttribute('type')
+
+ statuses = xpath.queryForNodes('/presence/status', stanza)
+
+ if statuses:
+ event.presence_status = str(statuses[0])
+
return event
-def make_message_event(stanza):
- event = make_stream_event('stream-message', stanza)
+def make_message_event(stream, stanza):
+ event = make_stream_event('stream-message', stanza, stream)
event.message_type = stanza.getAttribute('type')
return event
+class StreamFactory(twisted.internet.protocol.Factory):
+ def __init__(self, streams, jids):
+ self.streams = streams
+ self.jids = jids
+ self.presences = {}
+ self.mappings = dict(map (lambda jid, stream: (jid, stream),
+ jids, streams))
+
+ # Make a copy of the streams
+ self.factory_streams = list(streams)
+ self.factory_streams.reverse()
+
+ # Do not add observers for single instances because it's unnecessary and
+ # some unit tests need to respond to the roster request, and we shouldn't
+ # answer it for them otherwise we break compatibility
+ if len(streams) > 1:
+ # We need to have a function here because lambda keeps a reference on
+ # the stream and jid and in the for loop, there is no context
+ def addObservers(stream, jid):
+ stream.addObserver('/iq', lambda x: \
+ self.forward_iq(stream, jid, x))
+ stream.addObserver('/presence', lambda x: \
+ self.got_presence(stream, jid, x))
+
+ for (jid, stream) in self.mappings.items():
+ addObservers(stream, jid)
+
+ def protocol(self, *args):
+ return self.factory_streams.pop()
+
+
+ def got_presence (self, stream, jid, stanza):
+ stanza.attributes['from'] = jid
+ self.presences[jid] = stanza
+
+ for dest_jid in self.presences.keys():
+ # Dispatch the new presence to other clients
+ stanza.attributes['to'] = dest_jid
+ self.mappings[dest_jid].send(stanza)
+
+ # Don't echo the presence twice
+ if dest_jid != jid:
+ # Dispatch other client's presence to this stream
+ presence = self.presences[dest_jid]
+ presence.attributes['to'] = jid
+ stream.send(presence)
+
+ def lost_presence(self, stream, jid):
+ if self.presences.has_key(jid):
+ del self.presences[jid]
+ for dest_jid in self.presences.keys():
+ presence = domish.Element(('jabber:client', 'presence'))
+ presence['from'] = jid
+ presence['to'] = dest_jid
+ presence['type'] = 'unavailable'
+ self.mappings[dest_jid].send(presence)
+
+ def forward_iq(self, stream, jid, stanza):
+ stanza.attributes['from'] = jid
+
+ query = stanza.firstChildElement()
+
+ # Fake other accounts as being part of our roster
+ if query and query.uri == ns.ROSTER:
+ roster = make_result_iq(stream, stanza)
+ query = roster.firstChildElement()
+ for roster_jid in self.mappings.keys():
+ if jid != roster_jid:
+ item = query.addElement('item')
+ item['jid'] = roster_jid
+ item['subscription'] = 'both'
+ stream.send(roster)
+ return
+
+ to = stanza.getAttribute('to')
+ dest = None
+ if to is not None:
+ dest = self.mappings.get(to)
+
+ if dest is not None:
+ dest.send(stanza)
+
class BaseXmlStream(xmlstream.XmlStream):
initiating = False
namespace = 'jabber:client'
+ pep_support = True
+ disco_features = []
+ handle_privacy_lists = True
def __init__(self, event_func, authenticator):
xmlstream.XmlStream.__init__(self, authenticator)
self.event_func = event_func
self.addObserver('//iq', lambda x: event_func(
- make_iq_event(x)))
+ make_iq_event(self, x)))
self.addObserver('//message', lambda x: event_func(
- make_message_event(x)))
+ make_message_event(self, x)))
self.addObserver('//presence', lambda x: event_func(
- make_presence_event(x)))
+ make_presence_event(self, x)))
self.addObserver('//event/stream/authd', self._cb_authd)
+ if self.handle_privacy_lists:
+ self.addObserver("/iq/query[@xmlns='%s']" % ns.PRIVACY,
+ self._cb_priv_list)
+
+ def _cb_priv_list(self, iq):
+ send_error_reply(self, iq)
def _cb_authd(self, _):
# called when stream is authenticated
+ assert self.authenticator.full_jid is not None
+ assert self.authenticator.bare_jid is not None
+
self.addObserver(
- "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
+ "/iq[@to='localhost']/query[@xmlns='http://jabber.org/protocol/disco#info']",
self._cb_disco_iq)
+ self.addObserver(
+ "/iq[@to='%s']/query[@xmlns='http://jabber.org/protocol/disco#info']"
+ % self.authenticator.bare_jid,
+ self._cb_bare_jid_disco_iq)
+ # XXX Haze
self.add_roster_observer()
self.event_func(servicetest.Event('stream-authenticated'))
def _cb_disco_iq(self, iq):
- if iq.getAttribute('to') == 'localhost':
- # add PEP support
- nodes = xpath.queryForNodes(
- "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
- iq)
- query = nodes[0]
- identity = query.addElement('identity')
- identity['category'] = 'pubsub'
- identity['type'] = 'pep'
-
- iq['type'] = 'result'
- self.send(iq)
+ nodes = xpath.queryForNodes(
+ "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']", iq)
+ query = nodes[0]
+
+ for feature in self.disco_features:
+ query.addChild(elem('feature', var=feature))
+
+ iq['type'] = 'result'
+ iq['from'] = iq['to']
+ self.send(iq)
+ def _cb_bare_jid_disco_iq(self, iq):
+ # advertise PEP support
+ nodes = xpath.queryForNodes(
+ "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
+ iq)
+ query = nodes[0]
+ identity = query.addElement('identity')
+ identity['category'] = 'pubsub'
+ identity['type'] = 'pep'
+
+ iq['type'] = 'result'
+ iq['from'] = iq['to']
+ self.send(iq)
+
+ def onDocumentEnd(self):
+ self.event_func(servicetest.Event('stream-closed'))
+ # We don't chain up XmlStream.onDocumentEnd() because it will
+ # disconnect the TCP connection making tests as
+ # connect/disconnect-timeout.py not working
+
+ def send_stream_error(self, error='system-shutdown'):
+ # Yes, there are meant to be two different STREAMS namespaces.
+ go_away = \
+ elem(xmlstream.NS_STREAMS, 'error')(
+ elem(ns.STREAMS, error)
+ )
+
+ self.send(go_away)
+
+ # XXX Haze: the next two methods are Haze-specific.
def add_roster_observer(self):
self.addObserver(
"/iq/query[@xmlns='jabber:iq:roster']",
@@ -275,56 +450,145 @@ class JabberXmlStream(BaseXmlStream):
class XmppXmlStream(BaseXmlStream):
version = (1, 0)
-def make_connection(bus, event_func, params=None):
+class GoogleXmlStream(BaseXmlStream):
+ version = (1, 0)
+
+ pep_support = False
+ disco_features = [ns.GOOGLE_ROSTER,
+ ns.GOOGLE_JINGLE_INFO,
+ ns.GOOGLE_MAIL_NOTIFY,
+ ]
+
+ def _cb_bare_jid_disco_iq(self, iq):
+ # Google talk doesn't support PEP :(
+ iq['type'] = 'result'
+ iq['from'] = iq['to']
+ self.send(iq)
+
+
+def make_connection(bus, event_func, params=None, suffix=''):
+ # Gabble accepts a resource in 'account', but the value of 'resource'
+ # overrides it if there is one.
+ # XXX Haze doesn't.
+ # account = 'test%s@localhost/%s' % (suffix, re.sub(r'.*/', '', sys.argv[0]))
+ account = 'test%s@localhost/Resource' % (suffix, )
+
default_params = {
- 'account': 'test@localhost/Resource',
+ 'account': account,
'password': 'pass',
- # FIXME: fd.o#14212
+ # XXX Haze: fd.o#14212
#'resource': 'Resource',
'server': 'localhost',
'port': dbus.UInt32(4242),
+ # XXX Haze
'require-encryption': False,
+ 'auth-plain-in-clear': True,
}
if params:
default_params.update(params)
- return servicetest.make_connection(bus, event_func, 'haze', 'jabber',
- default_params)
+ # Allow omitting the 'password' param
+ if default_params['password'] is None:
+ del default_params['password']
-def make_stream(event_func, authenticator=None, protocol=None, port=4242):
- # set up Jabber server
+ # Allow omitting the 'account' param
+ if default_params['account'] is None:
+ del default_params['account']
+ jid = default_params.get('account', None)
+ # XXX Haze
+ conn = servicetest.make_connection(bus, event_func, 'haze', 'jabber',
+ default_params)
+ return (conn, jid)
+
+def make_stream(event_func, authenticator=None, protocol=None,
+ resource=None, suffix=''):
+ # set up Jabber server
if authenticator is None:
- authenticator = JabberAuthenticator('test', 'pass')
+ authenticator = XmppAuthenticator('test%s' % suffix, 'pass', resource=resource)
if protocol is None:
- protocol = JabberXmlStream
+ protocol = XmppXmlStream
stream = protocol(event_func, authenticator)
- factory = twisted.internet.protocol.Factory()
- factory.protocol = lambda *args: stream
- port = reactor.listenTCP(port, factory)
- return (stream, port)
+ return stream
+
+def disconnect_conn(q, conn, stream, expected_before=[], expected_after=[]):
+ call_async(q, conn, 'Disconnect')
+
+ tmp = expected_before + [
+ EventPattern('dbus-signal', signal='StatusChanged', args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED]),
+ EventPattern('stream-closed')]
+
+ before_events = q.expect_many(*tmp)
-def exec_test_deferred (funs, params, protocol=None, timeout=None):
+ stream.sendFooter()
+
+ tmp = expected_after + [EventPattern('dbus-return', method='Disconnect')]
+ after_events = q.expect_many(*tmp)
+
+ return before_events[:-2], after_events[:-1]
+
+def exec_test_deferred(fun, params, protocol=None, timeout=None,
+ authenticator=None, num_instances=1):
# hack to ease debugging
domish.Element.__repr__ = domish.Element.toXml
colourer = None
- if sys.stdout.isatty():
+ if sys.stdout.isatty() or 'CHECK_FORCE_COLOR' in os.environ:
colourer = servicetest.install_colourer()
+ bus = dbus.SessionBus()
+
queue = servicetest.IteratingEventQueue(timeout)
queue.verbose = (
os.environ.get('CHECK_TWISTED_VERBOSE', '') != ''
or '-v' in sys.argv)
- bus = dbus.SessionBus()
- # conn = make_connection(bus, queue.append, params)
- (stream, port) = make_stream(queue.append, protocol=protocol)
+ conns = []
+ jids = []
+ streams = []
+ resource = params.get('resource') if params is not None else None
+ for i in range(0, num_instances):
+ if i == 0:
+ suffix = ''
+ else:
+ suffix = str(i)
+
+ try:
+ (conn, jid) = make_connection(bus, queue.append, params, suffix)
+ except Exception, e:
+ # Crap. This is normally because the connection's still kicking
+ # around on the bus. Let's bin any connections we *did* manage to
+ # get going and then bail out unceremoniously.
+ print e
+
+ for conn in conns:
+ conn.Disconnect()
+
+ os._exit(1)
+
+ conns.append(conn)
+ jids.append(jid)
+ streams.append(make_stream(queue.append, protocol=protocol,
+ authenticator=authenticator,
+ resource=resource, suffix=suffix))
+
+ factory = StreamFactory(streams, jids)
+ port = reactor.listenTCP(4242, factory)
def signal_receiver(*args, **kw):
+ if kw['path'] == '/org/freedesktop/DBus' and \
+ kw['member'] == 'NameOwnerChanged':
+ bus_name, old_name, new_name = args
+ if new_name == '':
+ for i, conn in enumerate(conns):
+ stream = streams[i]
+ jid = jids[i]
+ if conn._requested_bus_name == bus_name:
+ factory.lost_presence(stream, jid)
+ break
queue.append(Event('dbus-signal',
path=unwrap(kw['path']),
signal=kw['member'], args=map(unwrap, args),
@@ -344,82 +608,87 @@ def exec_test_deferred (funs, params, protocol=None, timeout=None):
error = None
try:
- for f in funs:
- conn = make_connection(bus, queue.append, params)
- f(queue, bus, conn, stream)
+ if len(conns) == 1:
+ fun(queue, bus, conns[0], streams[0])
+ else:
+ fun(queue, bus, conns, streams)
except Exception, e:
- import traceback
traceback.print_exc()
error = e
- try:
- if colourer:
- sys.stdout = colourer.fh
- d = port.stopListening()
- if error is None:
- d.addBoth((lambda *args: reactor.crash()))
- else:
- # please ignore the POSIX behind the curtain
- d.addBoth((lambda *args: os._exit(1)))
-
- conn.Disconnect()
-
- except dbus.DBusException, e:
- pass
+ if colourer:
+ sys.stdout = colourer.fh
+
+ d = port.stopListening()
+
+ # Does the Connection object still exist?
+ for i, conn in enumerate(conns):
+ if not bus.name_has_owner(conn.object.bus_name):
+ # Connection has already been disconnected and destroyed
+ continue
+ try:
+ if conn.GetStatus() == cs.CONN_STATUS_CONNECTED:
+ # Connection is connected, properly disconnect it
+ disconnect_conn(queue, conn, streams[i])
+ else:
+ # Connection is not connected, call Disconnect() to destroy it
+ conn.Disconnect()
+ except dbus.DBusException, e:
+ pass
+
+ try:
+ conn.Disconnect()
+ raise AssertionError("Connection didn't disappear; "
+ "all subsequent tests will probably fail")
+ except dbus.DBusException, e:
+ pass
+ except Exception, e:
+ traceback.print_exc()
+ error = e
+
+ if error is None:
+ d.addBoth((lambda *args: reactor.crash()))
+ else:
+ # please ignore the POSIX behind the curtain
+ d.addBoth((lambda *args: os._exit(1)))
-def exec_tests(funs, params=None, protocol=None, timeout=None):
- reactor.callWhenRunning (exec_test_deferred, funs, params, protocol, timeout)
- reactor.run()
-def exec_test(fun, params=None, protocol=None, timeout=None):
- exec_tests([fun], params, protocol, timeout)
+def exec_test(fun, params=None, protocol=None, timeout=None,
+ authenticator=None, num_instances=1):
+ reactor.callWhenRunning(
+ exec_test_deferred, fun, params, protocol, timeout, authenticator, num_instances)
+ reactor.run()
# Useful routines for server-side vCard handling
current_vcard = domish.Element(('vcard-temp', 'vCard'))
-def handle_get_vcard(event, data):
- iq = event.stanza
-
- if iq['type'] != 'get':
- return False
-
- if iq.uri != 'jabber:client':
- return False
+def expect_and_handle_get_vcard(q, stream):
+ get_vcard_event = q.expect('stream-iq', query_ns=ns.VCARD_TEMP,
+ query_name='vCard', iq_type='get')
- vcard = list(iq.elements())[0]
-
- if vcard.name != 'vCard':
- return False
+ iq = get_vcard_event.stanza
+ vcard = iq.firstChildElement()
+ assert vcard.name == 'vCard', vcard.toXml()
# Send back current vCard
- new_iq = IQ(data['stream'], 'result')
- new_iq['id'] = iq['id']
- new_iq.addChild(current_vcard)
- data['stream'].send(new_iq)
- return True
-
-def handle_set_vcard(event, data):
- global current_vcard
- iq = event.stanza
-
- if iq['type'] != 'set':
- return False
-
- if iq.uri != 'jabber:client':
- return False
-
- vcard = list(iq.elements())[0]
+ result = make_result_iq(stream, iq)
+ result.addChild(current_vcard)
+ stream.send(result)
- if vcard.name != 'vCard':
- return False
+def expect_and_handle_set_vcard(q, stream, check=None):
+ set_vcard_event = q.expect('stream-iq', query_ns=ns.VCARD_TEMP,
+ query_name='vCard', iq_type='set')
+ iq = set_vcard_event.stanza
+ vcard = iq.firstChildElement()
+ assert vcard.name == 'vCard', vcard.toXml()
- current_vcard = iq.firstChildElement()
+ if check is not None:
+ check(vcard)
- new_iq = IQ(data['stream'], 'result')
- new_iq['id'] = iq['id']
- data['stream'].send(new_iq)
- return True
+ # Update current vCard
+ current_vcard = vcard
+ stream.send(make_result_iq(stream, iq))
def _elem_add(elem, *children):
for child in children:
@@ -428,9 +697,10 @@ def _elem_add(elem, *children):
elif isinstance(child, unicode):
elem.addContent(child)
else:
- raise ValueError('invalid child object %r', child)
+ raise ValueError(
+ 'invalid child object %r (must be element or unicode)', child)
-def elem(a, b=None, **kw):
+def elem(a, b=None, attrs={}, **kw):
r"""
>>> elem('foo')().toXml()
u'<foo/>'
@@ -441,6 +711,10 @@ def elem(a, b=None, **kw):
>>> elem('foo', x='1')(u'hello',
... elem('http://foo.org', 'bar', y='2')(u'bye')).toXml()
u"<foo x='1'>hello<bar xmlns='http://foo.org' y='2'>bye</bar></foo>"
+ >>> elem('foo', attrs={'xmlns:bar': 'urn:bar', 'bar:cake': 'yum'})(
+ ... elem('bar:e')(u'i')
+ ... ).toXml()
+ u"<foo xmlns:bar='urn:bar' bar:cake='yum'><bar:e>i</bar:e></foo>"
"""
class _elem(domish.Element):
@@ -453,7 +727,22 @@ def elem(a, b=None, **kw):
else:
elem = _elem((None, a))
- for k, v in kw.iteritems():
+ # Can't just update kw into attrs, because that *modifies the parameter's
+ # default*. Thanks python.
+ allattrs = {}
+ allattrs.update(kw)
+ allattrs.update(attrs)
+
+ # First, let's pull namespaces out
+ realattrs = {}
+ for k, v in allattrs.iteritems():
+ if k.startswith('xmlns:'):
+ abbr = k[len('xmlns:'):]
+ elem.localPrefixes[abbr] = v
+ else:
+ realattrs[k] = v
+
+ for k, v in realattrs.iteritems():
if k == 'from_':
elem['from'] = v
else:
@@ -477,7 +766,8 @@ def elem_iq(server, type, **kw):
return iq
-def make_presence(_from, to='test@localhost', type=None, status=None, caps=None):
+def make_presence(_from, to='test@localhost', type=None, show=None,
+ status=None, caps=None, photo=None):
presence = domish.Element((None, 'presence'))
presence['from'] = _from
presence['to'] = to
@@ -485,6 +775,9 @@ def make_presence(_from, to='test@localhost', type=None, status=None, caps=None)
if type is not None:
presence['type'] = type
+ if show is not None:
+ presence.addElement('show', content=show)
+
if status is not None:
presence.addElement('status', content=status)
@@ -493,4 +786,9 @@ def make_presence(_from, to='test@localhost', type=None, status=None, caps=None)
for key,value in caps.items():
cel[key] = value
+ # <x xmlns="vcard-temp:x:update"><photo>4a1...</photo></x>
+ if photo is not None:
+ x = presence.addElement((ns.VCARD_TEMP_UPDATE, 'x'))
+ x.addElement('photo').addContent(photo)
+
return presence
diff --git a/tests/twisted/ns.py b/tests/twisted/ns.py
index a483b27..758d00b 100644
--- a/tests/twisted/ns.py
+++ b/tests/twisted/ns.py
@@ -40,6 +40,7 @@ NICK = "http://jabber.org/protocol/nick"
NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
NS_XMPP_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls'
+NS_XMPP_SESSION = 'urn:ietf:params:xml:ns:xmpp-session'
OLPC_ACTIVITIES = "http://laptop.org/xmpp/activities"
OLPC_ACTIVITIES_NOTIFY = "%s+notify" % OLPC_ACTIVITIES
OLPC_ACTIVITY = "http://laptop.org/xmpp/activity"