1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
"""
Make sure ContactCaps works well enough.
"""
from twisted.words.xish import domish
from servicetest import assertEquals, assertContains, EventPattern
from hazetest import exec_test, sync_stream
import constants as cs
import config
import ns
# assert this list of RCCs is only text
def check_text_only(rccs):
assertEquals([({
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT
}, [cs.TARGET_HANDLE])], rccs)
# assert GetContactCaps and GetContactAttributes returns just text caps
def check_rccs(conn, handle):
rccs = conn.ContactCapabilities.GetContactCapabilities([handle])
assertEquals(1, len(rccs))
check_text_only(rccs[handle])
attrs = conn.Contacts.GetContactAttributes([handle],
[cs.CONN_IFACE_CONTACT_CAPS],
False)
rccs = attrs[handle][cs.CONN_IFACE_CONTACT_CAPS + '/capabilities']
check_text_only(rccs)
# do the self handle which will just be text
def test_self_handle(q, bus, conn, stream):
conn.Connect()
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
self_handle = conn.Properties.Get(cs.CONN, 'SelfHandle')
check_rccs(conn, self_handle)
# do someone else which will also just be text
def test_someone_else(q, bus, conn, stream):
conn.Connect()
q.expect_many(
EventPattern('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED]),
EventPattern('stream-authenticated'),
)
event = q.expect('stream-iq', query_ns=ns.ROSTER)
event.stanza['type'] = 'result'
item = event.query.addElement('item')
item['jid'] = 'bob@foo.com'
item['subscription'] = 'both'
stream.send(event.stanza)
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
bob_handle = conn.RequestHandles(cs.HT_CONTACT, ['bob@foo.com'])[0]
check_rccs(conn, bob_handle)
# now a randomer who isn't even in our contact list
amy_handle = conn.RequestHandles(cs.HT_CONTACT, ['amy@foo.com'])[0]
check_rccs(conn, amy_handle)
def test_media(q, bus, conn, stream):
conn.Connect()
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
sync_stream(q, stream)
conn.ContactCapabilities.UpdateCapabilities([(
'im.telepathy1.Client.Foobar',
[{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.INITIAL_AUDIO: True },
{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.INITIAL_VIDEO: True }],
[],
)])
q.expect('stream-presence') # can't be bothered checking this
conn.ContactCapabilities.UpdateCapabilities([(
'im.telepathy1.Client.Foobar',
[], [])])
q.expect('stream-presence') # can't be bothered checking this
if __name__ == '__main__':
exec_test(test_self_handle)
exec_test(test_someone_else)
if config.MEDIA_ENABLED:
exec_test(test_media)
|