diff options
Diffstat (limited to 'tests/twisted/text/test-chat-state.py')
-rw-r--r-- | tests/twisted/text/test-chat-state.py | 303 |
1 files changed, 303 insertions, 0 deletions
diff --git a/tests/twisted/text/test-chat-state.py b/tests/twisted/text/test-chat-state.py new file mode 100644 index 0000000..289f692 --- /dev/null +++ b/tests/twisted/text/test-chat-state.py @@ -0,0 +1,303 @@ +# coding=utf-8 +""" +Test that chat state notifications are correctly sent and received on text +channels. +""" + +from twisted.words.xish import domish + +from servicetest import assertEquals, assertLength, wrap_channel, EventPattern +from gabbletest import exec_test, make_result_iq, sync_stream, make_presence +import constants as cs +import ns + +def check_state_notification(elem, name, allow_body=False): + assertEquals('message', elem.name) + assertEquals('chat', elem['type']) + + children = list(elem.elements()) + notification = [x for x in children if x.uri == ns.CHAT_STATES][0] + assert notification.name == name, notification.toXml() + + if not allow_body: + assert len(children) == 1, elem.toXml() + +def make_message(jid, body=None, state=None): + m = domish.Element((None, 'message')) + m['from'] = jid + m['type'] = 'chat' + + if state is not None: + m.addElement((ns.CHAT_STATES, state)) + + if body is not None: + m.addElement('body', content=body) + + return m + +def test(q, bus, conn, stream): + conn.Connect() + q.expect('dbus-signal', signal='StatusChanged', + args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED]) + + self_handle = conn.GetSelfHandle() + + jid = 'foo@bar.com' + full_jid = 'foo@bar.com/Foo' + foo_handle = conn.RequestHandles(cs.HT_CONTACT, [jid])[0] + + path = conn.Requests.CreateChannel( + { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT, + cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT, + cs.TARGET_HANDLE: foo_handle, + })[0] + chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text', + ['ChatState', 'Destroyable']) + + presence = make_presence(full_jid, status='hello', + caps={ + 'node': 'http://telepathy.freedesktop.org/homeopathy', + 'ver' : '0.1', + }) + stream.send(presence) + + version_event = q.expect('stream-iq', to=full_jid, + query_ns=ns.DISCO_INFO, + query_node='http://telepathy.freedesktop.org/homeopathy#0.1') + + result = make_result_iq(stream, version_event.stanza) + query = result.firstChildElement() + feature = query.addElement('feature') + feature['var'] = ns.CHAT_STATES + stream.send(result) + + sync_stream(q, stream) + + # Receiving chat states: + + # Composing... + stream.send(make_message(full_jid, state='composing')) + + changed = q.expect('dbus-signal', signal='ChatStateChanged') + handle, state = changed.args + assertEquals(foo_handle, handle) + assertEquals(cs.CHAT_STATE_COMPOSING, state) + + # Message! + stream.send(make_message(full_jid, body='hello', state='active')) + + changed = q.expect('dbus-signal', signal='ChatStateChanged') + handle, state = changed.args + assertEquals(foo_handle, handle) + assertEquals(cs.CHAT_STATE_ACTIVE, state) + + # Sending chat states: + + # Composing... + chan.ChatState.SetChatState(cs.CHAT_STATE_COMPOSING) + + stream_message = q.expect('stream-message') + check_state_notification(stream_message.stanza, 'composing') + + # XEP 0085: + # every content message SHOULD contain an <active/> notification. + chan.Text.Send(0, 'hi.') + + stream_message = q.expect('stream-message') + elem = stream_message.stanza + assertEquals('chat', elem['type']) + + check_state_notification(elem, 'active', allow_body=True) + + def is_body(e): + if e.name == 'body': + assert e.children[0] == u'hi.', e.toXml() + return True + return False + + assert len([x for x in elem.elements() if is_body(x)]) == 1, elem.toXml() + + # Close the channel without acking the received message. The peer should + # get a <gone/> notification, and the channel should respawn. + chan.Close() + + gone, _, _ = q.expect_many( + EventPattern('stream-message'), + EventPattern('dbus-signal', signal='Closed'), + EventPattern('dbus-signal', signal='NewChannel'), + ) + check_state_notification(gone.stanza, 'gone') + + # Reusing the proxy object because we happen to know it'll be at the same + # path... + + # Destroy the channel. The peer shouldn't get a <gone/> notification, since + # we already said we were gone and haven't sent them any messages to the + # contrary. + es = [EventPattern('stream-message')] + q.forbid_events(es) + + chan.Destroyable.Destroy() + sync_stream(q, stream) + + # Make the channel anew. + path = conn.Requests.CreateChannel( + { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT, + cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT, + cs.TARGET_HANDLE: foo_handle, + })[0] + chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text', + ['ChatState', 'Destroyable']) + + # Close it immediately; the peer should again not get a <gone/> + # notification, since we haven't sent any notifications on that channel. + chan.Close() + sync_stream(q, stream) + q.unforbid_events(es) + + # XEP-0085 §5.1 defines how to negotiate support for chat states with a + # contact in the absence of capabilities. This is useful when talking to + # invisible contacts, for example. + + # First, if we receive a message from a contact, containing an <active/> + # notification, they support chat states, so we should send them. + + jid = 'i@example.com' + full_jid = jid + '/GTalk' + + path = conn.Requests.CreateChannel( + { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT, + cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT, + cs.TARGET_ID: jid, + })[0] + chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text', + ['ChatState']) + + stream.send(make_message(full_jid, body='i am invisible', state='active')) + + changed = q.expect('dbus-signal', signal='ChatStateChanged') + assertEquals(cs.CHAT_STATE_ACTIVE, changed.args[1]) + + # We've seen them send a chat state notification, so we should send them + # notifications when the UI tells us to. + chan.ChatState.SetChatState(cs.CHAT_STATE_COMPOSING) + stream_message = q.expect('stream-message', to=full_jid) + check_state_notification(stream_message.stanza, 'composing') + + chan.Text.Send(0, 'very convincing') + stream_message = q.expect('stream-message', to=full_jid) + check_state_notification(stream_message.stanza, 'active', allow_body=True) + + # Now, test the case where we start the negotiation, and the contact + # turns out to support chat state notifications. + + jid = 'c@example.com' + full_jid = jid + '/GTalk' + path = conn.Requests.CreateChannel( + { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT, + cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT, + cs.TARGET_ID: jid, + })[0] + chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text', + ['ChatState']) + + # We shouldn't send any notifications until we actually send a message. + e = EventPattern('stream-message', to=jid) + q.forbid_events([e]) + for i in [cs.CHAT_STATE_COMPOSING, cs.CHAT_STATE_INACTIVE, + cs.CHAT_STATE_PAUSED, cs.CHAT_STATE_ACTIVE]: + chan.ChatState.SetChatState(i) + sync_stream(q, stream) + q.unforbid_events([e]) + + # When we send a message, say we're active. + chan.Text.Send(0, 'is anyone there?') + stream_message = q.expect('stream-message', to=jid) + check_state_notification(stream_message.stanza, 'active', allow_body=True) + + # We get a notification back from our contact. + stream.send(make_message(full_jid, state='composing')) + + changed = q.expect('dbus-signal', signal='ChatStateChanged') + _, state = changed.args + assertEquals(cs.CHAT_STATE_COMPOSING, state) + + # So now we know they support notification, so should send notifications. + chan.ChatState.SetChatState(cs.CHAT_STATE_COMPOSING) + + # This doesn't check whether we're sending to the bare jid, or the + # jid+resource. In fact, the notification is sent to the bare jid, because + # we only update which jid we send to when we actually receive a message, + # not when we receive a notification. wjt thinks this is less surprising + # than the alternative: + # + # • I'm talking to you on my N900, and signed in on my laptop; + # • I enter one character in a tab to you on my laptop, and then delete + # it; + # • Now your messages to me appear on my laptop (until I send you another + # one from my N900)! + stream_message = q.expect('stream-message') + check_state_notification(stream_message.stanza, 'composing') + + # But! Now they start messaging us from a different client, which *doesn't* + # support notifications. + other_jid = jid + '/Library' + stream.send(make_message(other_jid, body='grr, library computers')) + q.expect('dbus-signal', signal='Received') + + # Okay, we should stop sending typing notifications. + e = EventPattern('stream-message', to=other_jid) + q.forbid_events([e]) + for i in [cs.CHAT_STATE_COMPOSING, cs.CHAT_STATE_INACTIVE, + cs.CHAT_STATE_PAUSED, cs.CHAT_STATE_ACTIVE]: + chan.ChatState.SetChatState(i) + sync_stream(q, stream) + q.unforbid_events([e]) + + # Now, test the case where we start the negotiation, and the contact + # does not support chat state notifications + + jid = 'twitterbot@example.com' + full_jid = jid + '/Nonsense' + path = conn.Requests.CreateChannel( + { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT, + cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT, + cs.TARGET_ID: jid, + })[0] + chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text', + ['ChatState']) + + # We shouldn't send any notifications until we actually send a message. + e = EventPattern('stream-message', to=jid) + q.forbid_events([e]) + for i in [cs.CHAT_STATE_COMPOSING, cs.CHAT_STATE_INACTIVE, + cs.CHAT_STATE_PAUSED, cs.CHAT_STATE_ACTIVE]: + chan.ChatState.SetChatState(i) + sync_stream(q, stream) + q.unforbid_events([e]) + + # When we send a message, say we're active. + chan.Text.Send(0, '#n900 #maemo #zomg #woo #yay http://bit.ly/n900') + stream_message = q.expect('stream-message', to=jid) + check_state_notification(stream_message.stanza, 'active', allow_body=True) + + # They reply without a chat state. + stream.send(make_message(full_jid, body="posted.")) + q.expect('dbus-signal', signal='Received') + + # Okay, we shouldn't send any more. + e = EventPattern('stream-message', to=other_jid) + q.forbid_events([e]) + for i in [cs.CHAT_STATE_COMPOSING, cs.CHAT_STATE_INACTIVE, + cs.CHAT_STATE_PAUSED, cs.CHAT_STATE_ACTIVE]: + chan.ChatState.SetChatState(i) + sync_stream(q, stream) + q.unforbid_events([e]) + + chan.Text.Send(0, '@stephenfry simmer down') + message = q.expect('stream-message') + states = [x for x in message.stanza.elements() if x.uri == ns.CHAT_STATES] + assertLength(0, states) + +if __name__ == '__main__': + exec_test(test) |