summaryrefslogtreecommitdiff
path: root/tests/twisted/messages/contactinfo-request.py
blob: cd524a038d09b4c3d95b235e065fa7ec1e068c61 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

"""
Test RequestContactInfo implementation
"""

from idletest import exec_test
from servicetest import EventPattern, assertEquals, call_async
from constants import *
import dbus

CHANNEL_NAMES = ['#idletest0', '#idletest1']

def validate_vcard(vcard):
    channel_names = []
    for (name, parameters, value) in vcard:
        if name == 'fn':
            assertEquals('Test User', value[0])
        elif name == 'x-irc-channel':
            channel_names.append(value[0])
        elif name == 'x-irc-server':
            assertEquals('idle.test.server', value[0])
            assertEquals('Idle Test Server', value[1])
        elif name == 'x-host':
            assertEquals('localhost', value[0])
        elif name == 'x-idle-time':
            assertEquals('42', value[0]) # fake value
        elif name == 'nickname':
            assertEquals('test', value[0])
        elif name == 'x-presence-type':
            assertEquals(PRESENCE_AVAILABLE, int(value[0]))
        elif name == 'x-presence-status-identifier':
            assertEquals('available', value[0])
    assertEquals(CHANNEL_NAMES, channel_names)

def test(q, bus, conn, stream):
    conn.Connect()
    q.expect_many(
            EventPattern('dbus-signal', signal='StatusChanged', args=[1, 1]),
            EventPattern('irc-connected'))
    q.expect('dbus-signal', signal='SelfHandleChanged',
        args=[1])
    q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])

    self_handle = conn.GetSelfHandle()

    room_handles = conn.RequestHandles(HT_ROOM, CHANNEL_NAMES)
    for room_handle in room_handles:
        call_async(q, conn, 'RequestChannel', CHANNEL_TYPE_TEXT, HT_ROOM, room_handle, True)
        q.expect('dbus-return', method='RequestChannel')
        q.expect('dbus-signal', signal='NewChannels')

    contact_info = dbus.Interface(conn, CONN_IFACE_CONTACT_INFO)

    # The test server alternatively returns success and RPL_TRYAGAIN in
    # response to WHOIS queries. The first WHOIS query is issued as part of the
    # handshake when the connection is initially made. So this one is going to
    # return a RPL_TRYAGAIN resulting in a SERVICE_BUSY error.
    call_async(q, contact_info, 'RequestContactInfo', self_handle)
    event = q.expect('dbus-error', method='RequestContactInfo')
    assertEquals(SERVICE_BUSY, event.name)

    call_async(q, contact_info, 'RequestContactInfo', self_handle)
    event = q.expect('dbus-return', method='RequestContactInfo')
    vcard = event.value[0]

    validate_vcard(vcard)

    call_async(q, conn, 'Disconnect')
    q.expect_many(
            EventPattern('dbus-return', method='Disconnect'),
            EventPattern('dbus-signal', signal='StatusChanged', args=[2, 1]))
    return True

if __name__ == '__main__':
    exec_test(test)