summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-21 23:30:36 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 09:51:37 -0700
commit7719946d74eea6aab6c7865d453d061514096689 (patch)
treeba90421ca0eb590b2508fbe12c8e2c7e5a36f4f3
parent795cb9b29fa05d4425f807f54dfa639c125fc0dd (diff)
downloadkafka-python-7719946d74eea6aab6c7865d453d061514096689.tar.gz
Improve consumer group test loop
-rw-r--r--test/test_consumer_group.py26
1 files changed, 13 insertions, 13 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index d8a0041..04ed9bb 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -87,21 +87,21 @@ def test_group(kafka_broker, topic):
elif not consumers[c].assignment():
break
+ # If all consumers exist and have an assignment
+ else:
+
# Verify all consumers are in the same generation
- generations = set()
- for consumer in six.itervalues(consumers):
- generations.add(consumer._coordinator.generation)
- if len(generations) != 1:
+ # then log state and break while loop
+ generations = set([consumer._coordinator.generation
+ for consumer in list(consumers.values())])
+
+ if len(generations) == 1:
+ for c, consumer in list(consumers.items()):
+ logging.info("[%s] %s %s: %s", c,
+ consumer._coordinator.generation,
+ consumer._coordinator.member_id,
+ consumer.assignment())
break
-
- # If all checks passed, log state and break while loop
- else:
- for c in range(num_consumers):
- logging.info("[%s] %s %s: %s", c,
- consumers[c]._coordinator.generation,
- consumers[c]._coordinator.member_id,
- consumers[c].assignment())
- break
assert time.time() < timeout, "timeout waiting for assignments"
group_assignment = set()