From 83af5102e995e854a1980b90f1400afdd098da37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Reni=C3=A9?= Date: Thu, 28 Aug 2014 14:40:01 +0200 Subject: Use base unittest or unittest2 depending on python version --- test/test_codec.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'test/test_codec.py') diff --git a/test/test_codec.py b/test/test_codec.py index 2e6f67e..c409052 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -1,5 +1,5 @@ import struct -import unittest2 +from . import unittest from kafka.codec import ( has_snappy, gzip_encode, gzip_decode, @@ -10,21 +10,21 @@ from kafka.protocol import ( ) from testutil import * -class TestCodec(unittest2.TestCase): +class TestCodec(unittest.TestCase): def test_gzip(self): for i in xrange(1000): s1 = random_string(100) s2 = gzip_decode(gzip_encode(s1)) self.assertEquals(s1, s2) - @unittest2.skipUnless(has_snappy(), "Snappy not available") + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy(self): for i in xrange(1000): s1 = random_string(100) s2 = snappy_decode(snappy_encode(s1)) self.assertEquals(s1, s2) - @unittest2.skipUnless(has_snappy(), "Snappy not available") + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy_detect_xerial(self): import kafka as kafka1 _detect_xerial_stream = kafka1.codec._detect_xerial_stream @@ -41,7 +41,7 @@ class TestCodec(unittest2.TestCase): self.assertFalse(_detect_xerial_stream(random_snappy)) self.assertFalse(_detect_xerial_stream(short_data)) - @unittest2.skipUnless(has_snappy(), "Snappy not available") + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy_decode_xerial(self): header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' random_snappy = snappy_encode('SNAPPY' * 50) @@ -55,7 +55,7 @@ class TestCodec(unittest2.TestCase): self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50)) - @unittest2.skipUnless(has_snappy(), "Snappy not available") + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy_encode_xerial(self): to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \ '\x00\x00\x00\x18' + \ -- cgit v1.2.1 From cf0b7f0530e765f2cf710bd35daf53bb4ea205d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Reni=C3=A9?= Date: Thu, 28 Aug 2014 20:50:20 +0200 Subject: Make all unit tests pass on py3.3/3.4 --- test/test_codec.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) (limited to 'test/test_codec.py') diff --git a/test/test_codec.py b/test/test_codec.py index c409052..b30e994 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -1,4 +1,5 @@ import struct +from six.moves import xrange from . import unittest from kafka.codec import ( @@ -8,7 +9,7 @@ from kafka.codec import ( from kafka.protocol import ( create_gzip_message, create_message, create_snappy_message, KafkaProtocol ) -from testutil import * +from .testutil import * class TestCodec(unittest.TestCase): def test_gzip(self): @@ -31,7 +32,7 @@ class TestCodec(unittest.TestCase): header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes' false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01' - random_snappy = snappy_encode('SNAPPY' * 50) + random_snappy = snappy_encode(b'SNAPPY' * 50) short_data = b'\x01\x02\x03\x04' self.assertTrue(_detect_xerial_stream(header)) @@ -44,26 +45,28 @@ class TestCodec(unittest.TestCase): @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy_decode_xerial(self): header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' - random_snappy = snappy_encode('SNAPPY' * 50) + random_snappy = snappy_encode(b'SNAPPY' * 50) block_len = len(random_snappy) - random_snappy2 = snappy_encode('XERIAL' * 50) + random_snappy2 = snappy_encode(b'XERIAL' * 50) block_len2 = len(random_snappy2) to_test = header \ + struct.pack('!i', block_len) + random_snappy \ + struct.pack('!i', block_len2) + random_snappy2 \ - self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50)) + self.assertEquals(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50)) @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy_encode_xerial(self): - to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \ - '\x00\x00\x00\x18' + \ - '\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \ - '\x00\x00\x00\x18' + \ - '\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + to_ensure = ( + b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + b'\x00\x00\x00\x18' + b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + b'\x00\x00\x00\x18' + b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + ) - to_test = ('SNAPPY' * 50) + ('XERIAL' * 50) + to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50) compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) self.assertEquals(compressed, to_ensure) -- cgit v1.2.1