summaryrefslogtreecommitdiff
path: root/tests/unit/moduleapi/stream.tcl
blob: 7ad1a3059830c05ebe30078e23518476af247412 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
set testmodule [file normalize tests/modules/stream.so]

start_server {tags {"modules"}} {
    r module load $testmodule

    test {Module stream add and delete} {
        r del mystream
        # add to empty key
        set streamid1 [r stream.add mystream item 1 value a]
        # add to existing stream
        set streamid2 [r stream.add mystream item 2 value b]
        # check result
        assert { [string match "*-*" $streamid1] }
        set items [r XRANGE mystream - +]
        assert_equal $items \
            "{$streamid1 {item 1 value a}} {$streamid2 {item 2 value b}}"
        # delete one of them and try deleting non-existing ID
        assert_equal OK [r stream.delete mystream $streamid1]
        assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456}
        assert_error "Invalid stream ID*" {r stream.delete mystream foo}
        assert_equal "{$streamid2 {item 2 value b}}" [r XRANGE mystream - +]
        # check error condition: wrong type
        r del mystream
        r set mystream mystring
        assert_error "ERR StreamAdd*" {r stream.add mystream item 1 value a}
        assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456}
    }

    test {Module stream add unblocks blocking xread} {
        r del mystream

        # Blocking XREAD on an empty key
        set rd1 [redis_deferring_client]
        $rd1 XREAD BLOCK 3000 STREAMS mystream $
        # wait until client is actually blocked
        wait_for_condition 50 100 {
            [s 0 blocked_clients] eq {1}
        } else {
            fail "Client is not blocked"
        }
        set id [r stream.add mystream field 1 value a]
        assert_equal "{mystream {{$id {field 1 value a}}}}" [$rd1 read]

        # Blocking XREAD on an existing stream
        set rd2 [redis_deferring_client]
        $rd2 XREAD BLOCK 3000 STREAMS mystream $
        # wait until client is actually blocked
        wait_for_condition 50 100 {
            [s 0 blocked_clients] eq {1}
        } else {
            fail "Client is not blocked"
        }
        set id [r stream.add mystream field 2 value b]
        assert_equal "{mystream {{$id {field 2 value b}}}}" [$rd2 read]
    }

    test {Module stream add benchmark (1M stream add)} {
        set n 1000000
        r del mystream
        set result [r stream.addn mystream $n field value]
        assert_equal $result $n
    }

    test {Module stream XADD big fields doesn't create empty key} {
        set original_proto [config_get_set proto-max-bulk-len 2147483647] ;#2gb
        set original_query [config_get_set client-query-buffer-limit 2147483647] ;#2gb

        r del mystream
        r write "*4\r\n\$10\r\nstream.add\r\n\$8\r\nmystream\r\n\$5\r\nfield\r\n"
        catch {
            write_big_bulk 1073741824 ;#1gb
        } err
        assert {$err eq "ERR StreamAdd failed"}
        assert_equal 0 [r exists mystream]

        # restore defaults
        r config set proto-max-bulk-len $original_proto
        r config set client-query-buffer-limit $original_query
    } {OK} {large-memory}

    test {Module stream iterator} {
        r del mystream
        set streamid1 [r xadd mystream * item 1 value a]
        set streamid2 [r xadd mystream * item 2 value b]
        # range result
        set result1 [r stream.range mystream "-" "+"]
        set expect1 [r xrange mystream "-" "+"]
        assert_equal $result1 $expect1
        # reverse range
        set result_rev [r stream.range mystream "+" "-"]
        set expect_rev [r xrevrange mystream "+" "-"]
        assert_equal $result_rev $expect_rev

        # only one item: range with startid = endid
        set result2 [r stream.range mystream "-" $streamid1]
        assert_equal $result2 "{$streamid1 {item 1 value a}}"
        assert_equal $result2 [list [list $streamid1 {item 1 value a}]]
        # only one item: range with startid = endid
        set result3 [r stream.range mystream $streamid2 $streamid2]
        assert_equal $result3 "{$streamid2 {item 2 value b}}"
        assert_equal $result3 [list [list $streamid2 {item 2 value b}]]
    }

    test {Module stream iterator delete} {
        r del mystream
        set id1 [r xadd mystream * normal item]
        set id2 [r xadd mystream * selfdestruct yes]
        set id3 [r xadd mystream * another item]
        # stream.range deletes the "selfdestruct" item after returning it
        assert_equal \
            "{$id1 {normal item}} {$id2 {selfdestruct yes}} {$id3 {another item}}" \
            [r stream.range mystream - +]
        # now, the "selfdestruct" item is gone
        assert_equal \
            "{$id1 {normal item}} {$id3 {another item}}" \
            [r stream.range mystream - +]
    }

    test {Module stream trim by length} {
        r del mystream
        # exact maxlen
        r xadd mystream * item 1 value a
        r xadd mystream * item 2 value b
        r xadd mystream * item 3 value c
        assert_equal 3 [r xlen mystream]
        assert_equal 0 [r stream.trim mystream maxlen = 5]
        assert_equal 3 [r xlen mystream]
        assert_equal 2 [r stream.trim mystream maxlen = 1]
        assert_equal 1 [r xlen mystream]
        assert_equal 1 [r stream.trim mystream maxlen = 0]
        # check that there is no limit for exact maxlen
        r stream.addn mystream 20000 item x value y
        assert_equal 20000 [r stream.trim mystream maxlen = 0]
        # approx maxlen (100 items per node implies default limit 10K items)
        r stream.addn mystream 20000 item x value y
        assert_equal 20000 [r xlen mystream]
        assert_equal 10000 [r stream.trim mystream maxlen ~ 2]
        assert_equal 9900  [r stream.trim mystream maxlen ~ 2]
        assert_equal 0     [r stream.trim mystream maxlen ~ 2]
        assert_equal 100   [r xlen mystream]
        assert_equal 100   [r stream.trim mystream maxlen ~ 0]
        assert_equal 0     [r xlen mystream]
    }

    test {Module stream trim by ID} {
        r del mystream
        # exact minid
        r xadd mystream * item 1 value a
        r xadd mystream * item 2 value b
        set minid [r xadd mystream * item 3 value c]
        assert_equal 3 [r xlen mystream]
        assert_equal 0 [r stream.trim mystream minid = -]
        assert_equal 3 [r xlen mystream]
        assert_equal 2 [r stream.trim mystream minid = $minid]
        assert_equal 1 [r xlen mystream]
        assert_equal 1 [r stream.trim mystream minid = +]
        # check that there is no limit for exact minid
        r stream.addn mystream 20000 item x value y
        assert_equal 20000 [r stream.trim mystream minid = +]
        # approx minid (100 items per node implies default limit 10K items)
        r stream.addn mystream 19980 item x value y
        set minid [r xadd mystream * item x value y]
        r stream.addn mystream 19 item x value y
        assert_equal 20000 [r xlen mystream]
        assert_equal 10000 [r stream.trim mystream minid ~ $minid]
        assert_equal 9900  [r stream.trim mystream minid ~ $minid]
        assert_equal 0     [r stream.trim mystream minid ~ $minid]
        assert_equal 100   [r xlen mystream]
        assert_equal 100   [r stream.trim mystream minid ~ +]
        assert_equal 0     [r xlen mystream]
    }

    test "Unload the module - stream" {
        assert_equal {OK} [r module unload stream]
    }
}