diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-21 23:30:36 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 09:51:37 -0700 |
commit | 7719946d74eea6aab6c7865d453d061514096689 (patch) | |
tree | ba90421ca0eb590b2508fbe12c8e2c7e5a36f4f3 | |
parent | 795cb9b29fa05d4425f807f54dfa639c125fc0dd (diff) | |
download | kafka-python-7719946d74eea6aab6c7865d453d061514096689.tar.gz |
Improve consumer group test loop
-rw-r--r-- | test/test_consumer_group.py | 26 |
1 files changed, 13 insertions, 13 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index d8a0041..04ed9bb 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -87,21 +87,21 @@ def test_group(kafka_broker, topic): elif not consumers[c].assignment(): break + # If all consumers exist and have an assignment + else: + # Verify all consumers are in the same generation - generations = set() - for consumer in six.itervalues(consumers): - generations.add(consumer._coordinator.generation) - if len(generations) != 1: + # then log state and break while loop + generations = set([consumer._coordinator.generation + for consumer in list(consumers.values())]) + + if len(generations) == 1: + for c, consumer in list(consumers.items()): + logging.info("[%s] %s %s: %s", c, + consumer._coordinator.generation, + consumer._coordinator.member_id, + consumer.assignment()) break - - # If all checks passed, log state and break while loop - else: - for c in range(num_consumers): - logging.info("[%s] %s %s: %s", c, - consumers[c]._coordinator.generation, - consumers[c]._coordinator.member_id, - consumers[c].assignment()) - break assert time.time() < timeout, "timeout waiting for assignments" group_assignment = set() |