1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
"""
Test text channel initiated by me.
"""
import dbus
from twisted.words.xish import domish
from hazetest import exec_test
from servicetest import call_async, EventPattern
import constants as cs
def test(q, bus, conn, stream):
self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
jid = 'foo@bar.com'
call_async(q, conn, 'RequestHandles', 1, [jid])
event = q.expect('dbus-return', method='RequestHandles')
foo_handle = event.value[0][0]
call_async(q, conn, 'RequestChannel',
'org.freedesktop.Telepathy.Channel.Type.Text', 1, foo_handle, True)
ret, sig = q.expect_many(
EventPattern('dbus-return', method='RequestChannel'),
EventPattern('dbus-signal', signal='NewChannel'),
)
text_chan = bus.get_object(conn.bus_name, ret.value[0])
assert sig.args[0] == ret.value[0], \
(sig.args[0], ret.value[0])
assert sig.args[1] == u'org.freedesktop.Telepathy.Channel.Type.Text',\
sig.args[1]
# check that handle type == contact handle
assert sig.args[2] == 1, sig.args[1]
assert sig.args[3] == foo_handle, (sig.args[3], foo_handle)
assert sig.args[4] == True # suppress handler
# Exercise basic Channel Properties from spec 0.17.7
channel_props = text_chan.GetAll(
'org.freedesktop.Telepathy.Channel',
dbus_interface=dbus.PROPERTIES_IFACE)
assert channel_props.get('TargetHandle') == foo_handle,\
(channel_props.get('TargetHandle'), foo_handle)
assert channel_props.get('TargetHandleType') == 1,\
channel_props.get('TargetHandleType')
assert channel_props.get('ChannelType') == \
'org.freedesktop.Telepathy.Channel.Type.Text',\
channel_props.get('ChannelType')
assert 'org.freedesktop.Telepathy.Channel.Interface.ChatState' in \
channel_props.get('Interfaces', ()), \
channel_props.get('Interfaces')
assert channel_props['TargetID'] == jid,\
(channel_props['TargetID'], jid)
assert channel_props['Requested'] == True
assert channel_props['InitiatorHandle'] == self_handle,\
(channel_props['InitiatorHandle'], self_handle)
assert channel_props['InitiatorID'] == 'test@localhost',\
channel_props['InitiatorID']
dbus.Interface(text_chan,
u'org.freedesktop.Telepathy.Channel.Type.Text').Send(0, 'hey')
event = q.expect('stream-message')
elem = event.stanza
assert elem.name == 'message'
assert elem['type'] == 'chat'
found = False
for e in elem.elements():
if e.name == 'body':
found = True
e.children[0] == u'hey'
break
assert found, elem.toXml()
# <message type="chat"><body>hello</body</message>
m = domish.Element((None, 'message'))
m['from'] = 'foo@bar.com/Pidgin'
m['type'] = 'chat'
m.addElement('body', content='hello')
stream.send(m)
event = q.expect('dbus-signal', signal='Received')
# message type: normal
assert event.args[3] == 0
# flags: none
assert event.args[4] == 0
# body
assert event.args[5] == 'hello'
conn.Disconnect()
q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
if __name__ == '__main__':
exec_test(test)
|