1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
"""
Test text channel not being recreated because although there were still
pending messages, we destroyed it with extreme prejudice.
"""
import dbus
from twisted.words.xish import domish
from hazetest import exec_test
from servicetest import call_async, EventPattern, assertEquals
import constants as cs
def test(q, bus, conn, stream):
self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
jid = 'foo@bar.com'
call_async(q, conn, 'RequestHandles', 1, [jid])
event = q.expect('dbus-return', method='RequestHandles')
foo_handle = event.value[0][0]
call_async(q, conn, 'RequestChannel',
'org.freedesktop.Telepathy.Channel.Type.Text', 1, foo_handle, True)
ret, old_sig, new_sig = q.expect_many(
EventPattern('dbus-return', method='RequestChannel'),
EventPattern('dbus-signal', signal='NewChannel'),
EventPattern('dbus-signal', signal='NewChannels'),
)
text_chan = bus.get_object(conn.bus_name, ret.value[0])
chan_iface = dbus.Interface(text_chan,
'org.freedesktop.Telepathy.Channel')
text_iface = dbus.Interface(text_chan,
'org.freedesktop.Telepathy.Channel.Type.Text')
destroyable_iface = dbus.Interface(text_chan,
'org.freedesktop.Telepathy.Channel.Interface.Destroyable')
assert old_sig.args[0] == ret.value[0]
assert old_sig.args[1] == u'org.freedesktop.Telepathy.Channel.Type.Text'
# check that handle type == contact handle
assert old_sig.args[2] == 1
assert old_sig.args[3] == foo_handle
assert old_sig.args[4] == True # suppress handler
assert len(new_sig.args) == 1
assert len(new_sig.args[0]) == 1 # one channel
assert len(new_sig.args[0][0]) == 2 # two struct members
assert new_sig.args[0][0][0] == ret.value[0]
emitted_props = new_sig.args[0][0][1]
assert emitted_props['org.freedesktop.Telepathy.Channel.ChannelType'] ==\
'org.freedesktop.Telepathy.Channel.Type.Text'
assert emitted_props['org.freedesktop.Telepathy.Channel.'
'TargetHandleType'] == 1
assert emitted_props['org.freedesktop.Telepathy.Channel.TargetHandle'] ==\
foo_handle
assert emitted_props['org.freedesktop.Telepathy.Channel.TargetID'] == jid
assert emitted_props['org.freedesktop.Telepathy.Channel.'
'Requested'] == True
assert emitted_props['org.freedesktop.Telepathy.Channel.'
'InitiatorHandle'] == self_handle
assert emitted_props['org.freedesktop.Telepathy.Channel.'
'InitiatorID'] == 'test@localhost'
channel_props = text_chan.GetAll(
'org.freedesktop.Telepathy.Channel',
dbus_interface=dbus.PROPERTIES_IFACE)
assert channel_props['TargetID'] == jid,\
(channel_props['TargetID'], jid)
assert channel_props['Requested'] == True
assert channel_props['InitiatorHandle'] == self_handle,\
(channel_props['InitiatorHandle'], self_handle)
assert channel_props['InitiatorID'] == 'test@localhost',\
channel_props['InitiatorID']
text_iface.Send(0, 'hey')
event = q.expect('stream-message')
elem = event.stanza
assert elem.name == 'message'
assert elem['type'] == 'chat'
found = False
for e in elem.elements():
if e.name == 'body':
found = True
e.children[0] == u'hey'
break
assert found, elem.toXml()
# <message type="chat"><body>hello</body</message>
m = domish.Element((None, 'message'))
m['from'] = 'foo@bar.com/Pidgin'
m['type'] = 'chat'
m.addElement('body', content='hello')
stream.send(m)
event = q.expect('dbus-signal', signal='Received')
hello_message_id = event.args[0]
hello_message_time = event.args[1]
assert event.args[2] == foo_handle
# message type: normal
assert event.args[3] == 0
# flags: none
assert event.args[4] == 0
# body
assert event.args[5] == 'hello'
messages = text_chan.ListPendingMessages(False,
dbus_interface='org.freedesktop.Telepathy.Channel.Type.Text')
assert messages == \
[(hello_message_id, hello_message_time, foo_handle,
0, 0, 'hello')], messages
# destroy the channel without acking the message; it does not come back
call_async(q, destroyable_iface, 'Destroy')
event = q.expect('dbus-signal', signal='Closed')
assertEquals(text_chan.object_path, event.path)
event = q.expect('dbus-return', method='Destroy')
# assert that it stays dead
try:
chan_iface.GetChannelType()
except dbus.DBusException:
pass
else:
raise AssertionError("Why won't it die?")
conn.Disconnect()
q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
if __name__ == '__main__':
exec_test(test)
|