From 481f88068bdf0a18f12fd7a811b795f889d35fc7 Mon Sep 17 00:00:00 2001 From: Richard Lee Date: Thu, 12 Jul 2018 11:39:29 -0700 Subject: Add KafkaAdmin class Requires cluster version > 0.10.0.0, and uses new wire protocol classes to do many things via broker connection that previously needed to be done directly in zookeeper. --- test/test_admin.py | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 test/test_admin.py (limited to 'test') diff --git a/test/test_admin.py b/test/test_admin.py new file mode 100644 index 0000000..fd9c54d --- /dev/null +++ b/test/test_admin.py @@ -0,0 +1,47 @@ +import pytest + +import kafka.admin +from kafka.errors import IllegalArgumentError + + +def test_config_resource(): + with pytest.raises(KeyError): + bad_resource = kafka.admin.ConfigResource('something', 'foo') + good_resource = kafka.admin.ConfigResource('broker', 'bar') + assert(good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER) + assert(good_resource.name == 'bar') + assert(good_resource.configs is None) + good_resource = kafka.admin.ConfigResource(kafka.admin.ConfigResourceType.TOPIC, 'baz', {'frob' : 'nob'}) + assert(good_resource.resource_type == kafka.admin.ConfigResourceType.TOPIC) + assert(good_resource.name == 'baz') + assert(good_resource.configs == {'frob' : 'nob'}) + + +def test_new_partitions(): + good_partitions = kafka.admin.NewPartitions(6) + assert(good_partitions.total_count == 6) + assert(good_partitions.new_assignments is None) + good_partitions = kafka.admin.NewPartitions(7, [[1, 2, 3]]) + assert(good_partitions.total_count == 7) + assert(good_partitions.new_assignments == [[1, 2, 3]]) + + +def test_new_topic(): + with pytest.raises(IllegalArgumentError): + bad_topic = kafka.admin.NewTopic('foo', -1, -1) + with pytest.raises(IllegalArgumentError): + bad_topic = kafka.admin.NewTopic('foo', 1, -1) + with pytest.raises(IllegalArgumentError): + bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1 : [1, 1, 1]}) + good_topic = kafka.admin.NewTopic('foo', 1, 2) + assert(good_topic.name == 'foo') + assert(good_topic.num_partitions == 1) + assert(good_topic.replication_factor == 2) + assert(good_topic.replica_assignments == {}) + assert(good_topic.topic_configs == {}) + good_topic = kafka.admin.NewTopic('bar', -1, -1, {1 : [1, 2, 3]}, {'key' : 'value'}) + assert(good_topic.name == 'bar') + assert(good_topic.num_partitions == -1) + assert(good_topic.replication_factor == -1) + assert(good_topic.replica_assignments == {1: [1, 2, 3]}) + assert(good_topic.topic_configs == {'key' : 'value'}) -- cgit v1.2.1