summaryrefslogtreecommitdiff
path: root/test/test_consumer_group.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-14 12:23:50 -0700
committerDana Powers <dana.powers@gmail.com>2016-03-14 12:23:50 -0700
commit0ae0e219550dfbc5a8ab8ea62790a0aca27f9e2d (patch)
tree6de6bd4e4af2269d8484f3e7411131a2598f1d07 /test/test_consumer_group.py
parentc902baafbee777ec65fc66c9fdbaa5b172b37917 (diff)
downloadkafka-python-0ae0e219550dfbc5a8ab8ea62790a0aca27f9e2d.tar.gz
join consumer threads in test_consumer_group cleanup
Diffstat (limited to 'test/test_consumer_group.py')
-rw-r--r--test/test_consumer_group.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 3d10f8f..34b1be4 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -52,6 +52,7 @@ def test_group(kafka_broker, topic):
connect_str = 'localhost:' + str(kafka_broker.port)
consumers = {}
stop = {}
+ threads = {}
messages = collections.defaultdict(list)
def consumer_thread(i):
assert i not in consumers
@@ -61,7 +62,7 @@ def test_group(kafka_broker, topic):
bootstrap_servers=connect_str,
heartbeat_interval_ms=500)
while not stop[i].is_set():
- for tp, records in six.itervalues(consumers[i].poll()):
+ for tp, records in six.itervalues(consumers[i].poll(100)):
messages[i][tp].extend(records)
consumers[i].close()
del consumers[i]
@@ -70,8 +71,8 @@ def test_group(kafka_broker, topic):
num_consumers = 4
for i in range(num_consumers):
t = threading.Thread(target=consumer_thread, args=(i,))
- t.daemon = True
t.start()
+ threads[i] = t
try:
timeout = time.time() + 35
@@ -116,6 +117,7 @@ def test_group(kafka_broker, topic):
finally:
for c in range(num_consumers):
stop[c].set()
+ threads[c].join()
@pytest.fixture