summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-16 12:32:29 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-16 12:32:29 -0800
commitd5c05c811e453c507ac6f7f85bceffc5a7ba1661 (patch)
treeb6ece84d24b88316164e08013302bcc4cf82c59d
parentd1daeaad2520fceba1651f4d2bd7201a5699f6be (diff)
downloadkafka-python-d5c05c811e453c507ac6f7f85bceffc5a7ba1661.tar.gz
Make sure all consumers are in same generation before stopping group test
-rw-r--r--test/test_consumer_group.py13
1 files changed, 13 insertions, 0 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 03656fa..6ef2020 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -76,10 +76,23 @@ def test_group(kafka_broker, topic):
timeout = time.time() + 35
while True:
for c in range(num_consumers):
+
+ # Verify all consumers have been created
if c not in consumers:
break
+
+ # Verify all consumers have an assignment
elif not consumers[c].assignment():
break
+
+ # Verify all consumers are in the same generation
+ generations = set()
+ for consumer in six.itervalues(consumers):
+ generations.add(consumer._coordinator.generation)
+ if len(generations) != 1:
+ break
+
+ # If all checks passed, log state and break while loop
else:
for c in range(num_consumers):
logging.info("[%s] %s %s: %s", c,