1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
"""
Test text channel not being recreated because although there were still
pending messages, we destroyed it with extreme prejudice.
"""
import dbus
from twisted.words.xish import domish
from hazetest import exec_test
from servicetest import call_async, EventPattern, assertEquals, assertLength
import constants as cs
def test(q, bus, conn, stream):
self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
jid = 'foo@bar.com'
foo_handle = conn.get_contact_handle_sync(jid)
call_async(q, conn.Requests, 'CreateChannel',
{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_HANDLE: foo_handle,
})
ret, sig = q.expect_many(
EventPattern('dbus-return', method='CreateChannel'),
EventPattern('dbus-signal', signal='NewChannels'),
)
text_chan = bus.get_object(conn.bus_name, ret.value[0])
chan_iface = dbus.Interface(text_chan, cs.CHANNEL)
text_iface = dbus.Interface(text_chan, cs.CHANNEL_TYPE_TEXT)
destroyable_iface = dbus.Interface(text_chan, cs.CHANNEL_IFACE_DESTROYABLE)
assertLength(1, sig.args)
assertLength(1, sig.args[0]) # one channel
assertLength(2, sig.args[0][0]) # two struct members
assertEquals(ret.value, sig.args[0][0])
emitted_props = sig.args[0][0][1]
assertEquals(cs.CHANNEL_TYPE_TEXT, emitted_props[cs.CHANNEL_TYPE])
assertEquals(cs.HT_CONTACT, emitted_props[cs.TARGET_HANDLE_TYPE])
assertEquals(foo_handle, emitted_props[cs.TARGET_HANDLE])
assertEquals(jid, emitted_props[cs.TARGET_ID])
assertEquals(True, emitted_props[cs.REQUESTED])
assertEquals(self_handle, emitted_props[cs.INITIATOR_HANDLE])
assertEquals('test@localhost', emitted_props[cs.INITIATOR_ID])
channel_props = text_chan.GetAll(cs.CHANNEL,
dbus_interface=dbus.PROPERTIES_IFACE)
assert channel_props['TargetID'] == jid,\
(channel_props['TargetID'], jid)
assert channel_props['Requested'] == True
assert channel_props['InitiatorHandle'] == self_handle,\
(channel_props['InitiatorHandle'], self_handle)
assert channel_props['InitiatorID'] == 'test@localhost',\
channel_props['InitiatorID']
text_iface.Send(0, 'hey')
event = q.expect('stream-message')
elem = event.stanza
assert elem.name == 'message'
assert elem['type'] == 'chat'
found = False
for e in elem.elements():
if e.name == 'body':
found = True
e.children[0] == u'hey'
break
assert found, elem.toXml()
# <message type="chat"><body>hello</body</message>
m = domish.Element((None, 'message'))
m['from'] = 'foo@bar.com/Pidgin'
m['type'] = 'chat'
m.addElement('body', content='hello')
stream.send(m)
event = q.expect('dbus-signal', signal='Received')
hello_message_id = event.args[0]
hello_message_time = event.args[1]
assert event.args[2] == foo_handle
# message type: normal
assert event.args[3] == 0
# flags: none
assert event.args[4] == 0
# body
assert event.args[5] == 'hello'
messages = text_chan.ListPendingMessages(False,
dbus_interface=cs.CHANNEL_TYPE_TEXT)
assert messages == \
[(hello_message_id, hello_message_time, foo_handle,
0, 0, 'hello')], messages
# destroy the channel without acking the message; it does not come back
call_async(q, destroyable_iface, 'Destroy')
event = q.expect('dbus-signal', signal='Closed')
assertEquals(text_chan.object_path, event.path)
event = q.expect('dbus-return', method='Destroy')
# assert that it stays dead
try:
chan_iface.GetChannelType()
except dbus.DBusException:
pass
else:
raise AssertionError("Why won't it die?")
conn.Disconnect()
q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
if __name__ == '__main__':
exec_test(test)
|