diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-16 12:32:29 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-16 12:32:29 -0800 |
commit | d5c05c811e453c507ac6f7f85bceffc5a7ba1661 (patch) | |
tree | b6ece84d24b88316164e08013302bcc4cf82c59d | |
parent | d1daeaad2520fceba1651f4d2bd7201a5699f6be (diff) | |
download | kafka-python-d5c05c811e453c507ac6f7f85bceffc5a7ba1661.tar.gz |
Make sure all consumers are in same generation before stopping group test
-rw-r--r-- | test/test_consumer_group.py | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 03656fa..6ef2020 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -76,10 +76,23 @@ def test_group(kafka_broker, topic): timeout = time.time() + 35 while True: for c in range(num_consumers): + + # Verify all consumers have been created if c not in consumers: break + + # Verify all consumers have an assignment elif not consumers[c].assignment(): break + + # Verify all consumers are in the same generation + generations = set() + for consumer in six.itervalues(consumers): + generations.add(consumer._coordinator.generation) + if len(generations) != 1: + break + + # If all checks passed, log state and break while loop else: for c in range(num_consumers): logging.info("[%s] %s %s: %s", c, |