diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-03-14 12:23:50 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-03-14 12:23:50 -0700 |
commit | 0ae0e219550dfbc5a8ab8ea62790a0aca27f9e2d (patch) | |
tree | 6de6bd4e4af2269d8484f3e7411131a2598f1d07 /test | |
parent | c902baafbee777ec65fc66c9fdbaa5b172b37917 (diff) | |
download | kafka-python-0ae0e219550dfbc5a8ab8ea62790a0aca27f9e2d.tar.gz |
join consumer threads in test_consumer_group cleanup
Diffstat (limited to 'test')
-rw-r--r-- | test/test_consumer_group.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 3d10f8f..34b1be4 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -52,6 +52,7 @@ def test_group(kafka_broker, topic): connect_str = 'localhost:' + str(kafka_broker.port) consumers = {} stop = {} + threads = {} messages = collections.defaultdict(list) def consumer_thread(i): assert i not in consumers @@ -61,7 +62,7 @@ def test_group(kafka_broker, topic): bootstrap_servers=connect_str, heartbeat_interval_ms=500) while not stop[i].is_set(): - for tp, records in six.itervalues(consumers[i].poll()): + for tp, records in six.itervalues(consumers[i].poll(100)): messages[i][tp].extend(records) consumers[i].close() del consumers[i] @@ -70,8 +71,8 @@ def test_group(kafka_broker, topic): num_consumers = 4 for i in range(num_consumers): t = threading.Thread(target=consumer_thread, args=(i,)) - t.daemon = True t.start() + threads[i] = t try: timeout = time.time() + 35 @@ -116,6 +117,7 @@ def test_group(kafka_broker, topic): finally: for c in range(num_consumers): stop[c].set() + threads[c].join() @pytest.fixture |