1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
import os
import random
import struct
import unittest
from mock import MagicMock, patch
from kafka import KafkaClient
from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError,
LeaderUnavailableError, PartitionUnavailableError
)
from kafka.codec import (
has_gzip, has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
from kafka.protocol import (
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
)
ITERATIONS = 1000
STRLEN = 100
def random_string():
return os.urandom(random.randint(1, STRLEN))
class TestCodec(unittest.TestCase):
@unittest.skipUnless(has_gzip(), "Gzip not available")
def test_gzip(self):
for i in xrange(ITERATIONS):
s1 = random_string()
s2 = gzip_decode(gzip_encode(s1))
self.assertEquals(s1, s2)
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy(self):
for i in xrange(ITERATIONS):
s1 = random_string()
s2 = snappy_decode(snappy_encode(s1))
self.assertEquals(s1, s2)
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_detect_xerial(self):
import kafka as kafka1
_detect_xerial_stream = kafka1.codec._detect_xerial_stream
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
random_snappy = snappy_encode('SNAPPY' * 50)
short_data = b'\x01\x02\x03\x04'
self.assertTrue(_detect_xerial_stream(header))
self.assertFalse(_detect_xerial_stream(b''))
self.assertFalse(_detect_xerial_stream(b'\x00'))
self.assertFalse(_detect_xerial_stream(false_header))
self.assertFalse(_detect_xerial_stream(random_snappy))
self.assertFalse(_detect_xerial_stream(short_data))
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_decode_xerial(self):
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
random_snappy = snappy_encode('SNAPPY' * 50)
block_len = len(random_snappy)
random_snappy2 = snappy_encode('XERIAL' * 50)
block_len2 = len(random_snappy2)
to_test = header \
+ struct.pack('!i', block_len) + random_snappy \
+ struct.pack('!i', block_len2) + random_snappy2 \
self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_encode_xerial(self):
to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
'\x00\x00\x00\x18' + \
'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \
'\x00\x00\x00\x18' + \
'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
to_test = ('SNAPPY' * 50) + ('XERIAL' * 50)
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
self.assertEquals(compressed, to_ensure)
|