summaryrefslogtreecommitdiff
path: root/test/test_admin_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_admin_integration.py')
-rw-r--r--test/test_admin_integration.py102
1 files changed, 100 insertions, 2 deletions
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
index 37b1405..dc04537 100644
--- a/test/test_admin_integration.py
+++ b/test/test_admin_integration.py
@@ -1,10 +1,13 @@
import pytest
-from test.testutil import env_kafka_version
+from logging import info
+from test.testutil import env_kafka_version, random_string
+from threading import Event, Thread
+from time import time, sleep
-from kafka.errors import NoError
from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
+from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -138,3 +141,98 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
with pytest.raises(ValueError):
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
+def test_describe_consumer_group_does_not_exist(kafka_admin_client):
+ """Tests that the describe consumer group call fails if the group coordinator is not available
+ """
+ with pytest.raises(GroupCoordinatorNotAvailableError):
+ group_description = kafka_admin_client.describe_consumer_groups(['test'])
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
+def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
+ """Tests that the describe consumer group call returns valid consumer group information
+ This test takes inspiration from the test 'test_group' in test_consumer_group.py.
+ """
+ consumers = {}
+ stop = {}
+ threads = {}
+ random_group_id = 'test-group-' + random_string(6)
+ group_id_list = [random_group_id, random_group_id + '_2']
+ generations = {group_id_list[0]: set(), group_id_list[1]: set()}
+ def consumer_thread(i, group_id):
+ assert i not in consumers
+ assert i not in stop
+ stop[i] = Event()
+ consumers[i] = kafka_consumer_factory(group_id=group_id)
+ while not stop[i].is_set():
+ consumers[i].poll(20)
+ consumers[i].close()
+ consumers[i] = None
+ stop[i] = None
+
+ num_consumers = 3
+ for i in range(num_consumers):
+ group_id = group_id_list[i % 2]
+ t = Thread(target=consumer_thread, args=(i, group_id,))
+ t.start()
+ threads[i] = t
+
+ try:
+ timeout = time() + 35
+ while True:
+ for c in range(num_consumers):
+
+ # Verify all consumers have been created
+ if c not in consumers:
+ break
+
+ # Verify all consumers have an assignment
+ elif not consumers[c].assignment():
+ break
+
+ # If all consumers exist and have an assignment
+ else:
+
+ info('All consumers have assignment... checking for stable group')
+ # Verify all consumers are in the same generation
+ # then log state and break while loop
+
+ for consumer in consumers.values():
+ generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id)
+
+ is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()])
+
+ # New generation assignment is not complete until
+ # coordinator.rejoining = False
+ rejoining = any([consumer._coordinator.rejoining
+ for consumer in list(consumers.values())])
+
+ if not rejoining and is_same_generation:
+ break
+ else:
+ sleep(1)
+ assert time() < timeout, "timeout waiting for assignments"
+
+ info('Group stabilized; verifying assignment')
+ output = kafka_admin_client.describe_consumer_groups(group_id_list)
+ assert len(output) == 2
+ consumer_groups = set()
+ for consumer_group in output:
+ assert(consumer_group.group in group_id_list)
+ if consumer_group.group == group_id_list[0]:
+ assert(len(consumer_group.members) == 2)
+ else:
+ assert(len(consumer_group.members) == 1)
+ for member in consumer_group.members:
+ assert(member.member_metadata.subscription[0] == topic)
+ assert(member.member_assignment.assignment[0][0] == topic)
+ consumer_groups.add(consumer_group.group)
+ assert(sorted(list(consumer_groups)) == group_id_list)
+ finally:
+ info('Shutting down %s consumers', num_consumers)
+ for c in range(num_consumers):
+ info('Stopping consumer %s', c)
+ stop[c].set()
+ threads[c].join()
+ threads[c] = None