diff options
Diffstat (limited to 'test/test_admin_integration.py')
| -rw-r--r-- | test/test_admin_integration.py | 102 |
1 files changed, 100 insertions, 2 deletions
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 37b1405..dc04537 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,10 +1,13 @@ import pytest -from test.testutil import env_kafka_version +from logging import info +from test.testutil import env_kafka_version, random_string +from threading import Event, Thread +from time import time, sleep -from kafka.errors import NoError from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) +from kafka.errors import (NoError, GroupCoordinatorNotAvailableError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -138,3 +141,98 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): with pytest.raises(ValueError): configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') +def test_describe_consumer_group_does_not_exist(kafka_admin_client): + """Tests that the describe consumer group call fails if the group coordinator is not available + """ + with pytest.raises(GroupCoordinatorNotAvailableError): + group_description = kafka_admin_client.describe_consumer_groups(['test']) + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') +def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): + """Tests that the describe consumer group call returns valid consumer group information + This test takes inspiration from the test 'test_group' in test_consumer_group.py. + """ + consumers = {} + stop = {} + threads = {} + random_group_id = 'test-group-' + random_string(6) + group_id_list = [random_group_id, random_group_id + '_2'] + generations = {group_id_list[0]: set(), group_id_list[1]: set()} + def consumer_thread(i, group_id): + assert i not in consumers + assert i not in stop + stop[i] = Event() + consumers[i] = kafka_consumer_factory(group_id=group_id) + while not stop[i].is_set(): + consumers[i].poll(20) + consumers[i].close() + consumers[i] = None + stop[i] = None + + num_consumers = 3 + for i in range(num_consumers): + group_id = group_id_list[i % 2] + t = Thread(target=consumer_thread, args=(i, group_id,)) + t.start() + threads[i] = t + + try: + timeout = time() + 35 + while True: + for c in range(num_consumers): + + # Verify all consumers have been created + if c not in consumers: + break + + # Verify all consumers have an assignment + elif not consumers[c].assignment(): + break + + # If all consumers exist and have an assignment + else: + + info('All consumers have assignment... checking for stable group') + # Verify all consumers are in the same generation + # then log state and break while loop + + for consumer in consumers.values(): + generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id) + + is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()]) + + # New generation assignment is not complete until + # coordinator.rejoining = False + rejoining = any([consumer._coordinator.rejoining + for consumer in list(consumers.values())]) + + if not rejoining and is_same_generation: + break + else: + sleep(1) + assert time() < timeout, "timeout waiting for assignments" + + info('Group stabilized; verifying assignment') + output = kafka_admin_client.describe_consumer_groups(group_id_list) + assert len(output) == 2 + consumer_groups = set() + for consumer_group in output: + assert(consumer_group.group in group_id_list) + if consumer_group.group == group_id_list[0]: + assert(len(consumer_group.members) == 2) + else: + assert(len(consumer_group.members) == 1) + for member in consumer_group.members: + assert(member.member_metadata.subscription[0] == topic) + assert(member.member_assignment.assignment[0][0] == topic) + consumer_groups.add(consumer_group.group) + assert(sorted(list(consumer_groups)) == group_id_list) + finally: + info('Shutting down %s consumers', num_consumers) + for c in range(num_consumers): + info('Stopping consumer %s', c) + stop[c].set() + threads[c].join() + threads[c] = None |
