summaryrefslogtreecommitdiff
path: root/tests/twisted/simple-caps.py
blob: a16f5512f8a3148ad4e151841a78049eea2cb954 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105

"""
Make sure ContactCaps works well enough.
"""

from twisted.words.xish import domish

from servicetest import assertEquals, assertContains, EventPattern
from hazetest import exec_test, sync_stream
import constants as cs

import config

import ns

# assert this list of RCCs is only text
def check_text_only(rccs):
    assertEquals([({
                cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
                cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT
                }, [cs.TARGET_HANDLE])], rccs)

# assert GetContactCaps and GetContactAttributes returns just text caps
def check_rccs(conn, handle):
    rccs = conn.ContactCapabilities.GetContactCapabilities([handle])
    assertEquals(1, len(rccs))
    check_text_only(rccs[handle])

    attrs = conn.Contacts.GetContactAttributes([handle],
                                               [cs.CONN_IFACE_CONTACT_CAPS],
                                               False)
    rccs = attrs[handle][cs.CONN_IFACE_CONTACT_CAPS + '/capabilities']
    check_text_only(rccs)

# do the self handle which will just be text
def test_self_handle(q, bus, conn, stream):
    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged',
             args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    self_handle = conn.Properties.Get(cs.CONN, 'SelfHandle')

    check_rccs(conn, self_handle)

# do someone else which will also just be text
def test_someone_else(q, bus, conn, stream):
    conn.Connect()
    q.expect_many(
            EventPattern('dbus-signal', signal='StatusChanged',
                args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED]),
            EventPattern('stream-authenticated'),
            )

    event = q.expect('stream-iq', query_ns=ns.ROSTER)
    event.stanza['type'] = 'result'

    item = event.query.addElement('item')
    item['jid'] = 'bob@foo.com'
    item['subscription'] = 'both'

    stream.send(event.stanza)

    q.expect('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    bob_handle = conn.RequestHandles(cs.HT_CONTACT, ['bob@foo.com'])[0]
    check_rccs(conn, bob_handle)

    # now a randomer who isn't even in our contact list
    amy_handle = conn.RequestHandles(cs.HT_CONTACT, ['amy@foo.com'])[0]
    check_rccs(conn, amy_handle)

def test_media(q, bus, conn, stream):
    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    sync_stream(q, stream)

    conn.ContactCapabilities.UpdateCapabilities([(
                'im.telepathy1.Client.Foobar',
                [{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
                   cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
                   cs.INITIAL_AUDIO: True },
                 { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
                   cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
                   cs.INITIAL_VIDEO: True }],
                [],
                )])

    q.expect('stream-presence') # can't be bothered checking this

    conn.ContactCapabilities.UpdateCapabilities([(
                'im.telepathy1.Client.Foobar',
                [], [])])

    q.expect('stream-presence') # can't be bothered checking this

if __name__ == '__main__':
    exec_test(test_self_handle)
    exec_test(test_someone_else)

    if config.MEDIA_ENABLED:
        exec_test(test_media)