summaryrefslogtreecommitdiff
path: root/test/test_util.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-25 10:55:04 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-25 10:55:04 -0700
commit57913f9f914a959f52bc9040a172f8c9ff77e491 (patch)
treefe5cc6c14283a4c9d9175a748ef97f7d55df6fd7 /test/test_util.py
parent0e50f33ec678f6d656d488ce8a4537f95bba003e (diff)
downloadkafka-python-57913f9f914a959f52bc9040a172f8c9ff77e491.tar.gz
Various fixes
Bump version number to 0.9.1 Update readme to show supported Kafka/Python versions Validate arguments in consumer.py, add initial consumer unit test Make service kill() child processes when startup fails Add tests for util.py, fix Python 2.6 specific bug.
Diffstat (limited to 'test/test_util.py')
-rw-r--r--test/test_util.py92
1 files changed, 88 insertions, 4 deletions
diff --git a/test/test_util.py b/test/test_util.py
index b85585b..8179b01 100644
--- a/test/test_util.py
+++ b/test/test_util.py
@@ -3,16 +3,100 @@ import random
import struct
import unittest2
import kafka.util
+import kafka.common
class UtilTest(unittest2.TestCase):
@unittest2.skip("Unwritten")
def test_relative_unpack(self):
pass
- @unittest2.skip("Unwritten")
def test_write_int_string(self):
- pass
+ self.assertEqual(
+ kafka.util.write_int_string('some string'),
+ '\x00\x00\x00\x0bsome string'
+ )
+
+ def test_write_int_string__empty(self):
+ self.assertEqual(
+ kafka.util.write_int_string(''),
+ '\x00\x00\x00\x00'
+ )
+
+ def test_write_int_string__null(self):
+ self.assertEqual(
+ kafka.util.write_int_string(None),
+ '\xff\xff\xff\xff'
+ )
- @unittest2.skip("Unwritten")
def test_read_int_string(self):
- pass
+ self.assertEqual(kafka.util.read_int_string('\xff\xff\xff\xff', 0), (None, 4))
+ self.assertEqual(kafka.util.read_int_string('\x00\x00\x00\x00', 0), ('', 4))
+ self.assertEqual(kafka.util.read_int_string('\x00\x00\x00\x0bsome string', 0), ('some string', 15))
+
+ def test_read_int_string__insufficient_data(self):
+ with self.assertRaises(kafka.common.BufferUnderflowError):
+ kafka.util.read_int_string('\x00\x00\x00\x021', 0)
+
+ def test_write_short_string(self):
+ self.assertEqual(
+ kafka.util.write_short_string('some string'),
+ '\x00\x0bsome string'
+ )
+
+ def test_write_short_string__empty(self):
+ self.assertEqual(
+ kafka.util.write_short_string(''),
+ '\x00\x00'
+ )
+
+ def test_write_short_string__null(self):
+ self.assertEqual(
+ kafka.util.write_short_string(None),
+ '\xff\xff'
+ )
+
+ def test_write_short_string__too_long(self):
+ with self.assertRaises(struct.error):
+ kafka.util.write_short_string(' ' * 33000)
+
+ def test_read_short_string(self):
+ self.assertEqual(kafka.util.read_short_string('\xff\xff', 0), (None, 2))
+ self.assertEqual(kafka.util.read_short_string('\x00\x00', 0), ('', 2))
+ self.assertEqual(kafka.util.read_short_string('\x00\x0bsome string', 0), ('some string', 13))
+
+ def test_read_int_string__insufficient_data(self):
+ with self.assertRaises(kafka.common.BufferUnderflowError):
+ kafka.util.read_int_string('\x00\x021', 0)
+
+ def test_relative_unpack(self):
+ self.assertEqual(
+ kafka.util.relative_unpack('>hh', '\x00\x01\x00\x00\x02', 0),
+ ((1, 0), 4)
+ )
+
+ def test_relative_unpack(self):
+ with self.assertRaises(kafka.common.BufferUnderflowError):
+ kafka.util.relative_unpack('>hh', '\x00', 0)
+
+
+ def test_group_by_topic_and_partition(self):
+ t = kafka.common.TopicAndPartition
+
+ l = [
+ t("a", 1),
+ t("a", 1),
+ t("a", 2),
+ t("a", 3),
+ t("b", 3),
+ ]
+
+ self.assertEqual(kafka.util.group_by_topic_and_partition(l), {
+ "a" : {
+ 1 : t("a", 1),
+ 2 : t("a", 2),
+ 3 : t("a", 3),
+ },
+ "b" : {
+ 3 : t("b", 3),
+ }
+ })