summaryrefslogtreecommitdiff
path: root/python/subunit/tests/test_chunked.py
blob: 21f481469253a049b71ef536f456552a267840ae (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#
#  subunit: extensions to python unittest to get test results from subprocesses.
#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
#  Copyright (C) 2011  Martin Pool <mbp@sourcefrog.net>
#
#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
#  license at the users choice. A copy of both licenses are available in the
#  project source as Apache-2.0 and BSD. You may not use this file except in
#  compliance with one of these two licences.
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
#  license you chose for the specific language governing permissions and
#  limitations under that license.
#

from io import BytesIO
import unittest

from testtools.compat import _b

import subunit.chunked


class TestDecode(unittest.TestCase):

    def setUp(self):
        unittest.TestCase.setUp(self)
        self.output = BytesIO()
        self.decoder = subunit.chunked.Decoder(self.output)

    def test_close_read_length_short_errors(self):
        self.assertRaises(ValueError, self.decoder.close)

    def test_close_body_short_errors(self):
        self.assertEqual(None, self.decoder.write(_b('2\r\na')))
        self.assertRaises(ValueError, self.decoder.close)

    def test_close_body_buffered_data_errors(self):
        self.assertEqual(None, self.decoder.write(_b('2\r')))
        self.assertRaises(ValueError, self.decoder.close)

    def test_close_after_finished_stream_safe(self):
        self.assertEqual(None, self.decoder.write(_b('2\r\nab')))
        self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
        self.decoder.close()

    def test_decode_nothing(self):
        self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
        self.assertEqual(_b(''), self.output.getvalue())

    def test_decode_serialised_form(self):
        self.assertEqual(None, self.decoder.write(_b("F\r\n")))
        self.assertEqual(None, self.decoder.write(_b("serialised\n")))
        self.assertEqual(_b(''), self.decoder.write(_b("form0\r\n")))

    def test_decode_short(self):
        self.assertEqual(_b(''), self.decoder.write(_b('3\r\nabc0\r\n')))
        self.assertEqual(_b('abc'), self.output.getvalue())

    def test_decode_combines_short(self):
        self.assertEqual(_b(''), self.decoder.write(_b('6\r\nabcdef0\r\n')))
        self.assertEqual(_b('abcdef'), self.output.getvalue())

    def test_decode_excess_bytes_from_write(self):
        self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
        self.assertEqual(_b('abc'), self.output.getvalue())

    def test_decode_write_after_finished_errors(self):
        self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
        self.assertRaises(ValueError, self.decoder.write, _b(''))

    def test_decode_hex(self):
        self.assertEqual(_b(''), self.decoder.write(_b('A\r\n12345678900\r\n')))
        self.assertEqual(_b('1234567890'), self.output.getvalue())

    def test_decode_long_ranges(self):
        self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
        self.assertEqual(None, self.decoder.write(_b('1' * 65536)))
        self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
        self.assertEqual(None, self.decoder.write(_b('2' * 65536)))
        self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
        self.assertEqual(_b('1' * 65536 + '2' * 65536), self.output.getvalue())

    def test_decode_newline_nonstrict(self):
        """Tolerate chunk markers with no CR character."""
        # From <http://pad.lv/505078>
        self.decoder = subunit.chunked.Decoder(self.output, strict=False)
        self.assertEqual(None, self.decoder.write(_b('a\n')))
        self.assertEqual(None, self.decoder.write(_b('abcdeabcde')))
        self.assertEqual(_b(''), self.decoder.write(_b('0\n')))
        self.assertEqual(_b('abcdeabcde'), self.output.getvalue())

    def test_decode_strict_newline_only(self):
        """Reject chunk markers with no CR character in strict mode."""
        # From <http://pad.lv/505078>
        self.assertRaises(ValueError,
            self.decoder.write, _b('a\n'))

    def test_decode_strict_multiple_crs(self):
        self.assertRaises(ValueError,
            self.decoder.write, _b('a\r\r\n'))

    def test_decode_short_header(self):
        self.assertRaises(ValueError,
            self.decoder.write, _b('\n'))


class TestEncode(unittest.TestCase):

    def setUp(self):
        unittest.TestCase.setUp(self)
        self.output = BytesIO()
        self.encoder = subunit.chunked.Encoder(self.output)

    def test_encode_nothing(self):
        self.encoder.close()
        self.assertEqual(_b('0\r\n'), self.output.getvalue())

    def test_encode_empty(self):
        self.encoder.write(_b(''))
        self.encoder.close()
        self.assertEqual(_b('0\r\n'), self.output.getvalue())

    def test_encode_short(self):
        self.encoder.write(_b('abc'))
        self.encoder.close()
        self.assertEqual(_b('3\r\nabc0\r\n'), self.output.getvalue())

    def test_encode_combines_short(self):
        self.encoder.write(_b('abc'))
        self.encoder.write(_b('def'))
        self.encoder.close()
        self.assertEqual(_b('6\r\nabcdef0\r\n'), self.output.getvalue())

    def test_encode_over_9_is_in_hex(self):
        self.encoder.write(_b('1234567890'))
        self.encoder.close()
        self.assertEqual(_b('A\r\n12345678900\r\n'), self.output.getvalue())

    def test_encode_long_ranges_not_combined(self):
        self.encoder.write(_b('1' * 65536))
        self.encoder.write(_b('2' * 65536))
        self.encoder.close()
        self.assertEqual(_b('10000\r\n' + '1' * 65536 + '10000\r\n' +
            '2' * 65536 + '0\r\n'), self.output.getvalue())