summaryrefslogtreecommitdiff
path: root/test/test_consumer_group.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_consumer_group.py')
-rw-r--r--test/test_consumer_group.py26
1 files changed, 13 insertions, 13 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index d8a0041..04ed9bb 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -87,21 +87,21 @@ def test_group(kafka_broker, topic):
elif not consumers[c].assignment():
break
+ # If all consumers exist and have an assignment
+ else:
+
# Verify all consumers are in the same generation
- generations = set()
- for consumer in six.itervalues(consumers):
- generations.add(consumer._coordinator.generation)
- if len(generations) != 1:
+ # then log state and break while loop
+ generations = set([consumer._coordinator.generation
+ for consumer in list(consumers.values())])
+
+ if len(generations) == 1:
+ for c, consumer in list(consumers.items()):
+ logging.info("[%s] %s %s: %s", c,
+ consumer._coordinator.generation,
+ consumer._coordinator.member_id,
+ consumer.assignment())
break
-
- # If all checks passed, log state and break while loop
- else:
- for c in range(num_consumers):
- logging.info("[%s] %s %s: %s", c,
- consumers[c]._coordinator.generation,
- consumers[c]._coordinator.member_id,
- consumers[c].assignment())
- break
assert time.time() < timeout, "timeout waiting for assignments"
group_assignment = set()