summaryrefslogtreecommitdiff
path: root/test/test_consumer_group.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-25 16:03:22 -0800
committerDana Powers <dana.powers@rd.io>2016-01-25 16:31:37 -0800
commit654f6b62470e88bf6e76fcf12ab1f9136eba7e1f (patch)
tree37e490b8551838250245a7ef52baba562c56b894 /test/test_consumer_group.py
parenta154d0471c9181a6a6461466140e881018df4b8b (diff)
downloadkafka-python-654f6b62470e88bf6e76fcf12ab1f9136eba7e1f.tar.gz
Remove test_correlation_id_rollover; use daemon threads for test consumers
Diffstat (limited to 'test/test_consumer_group.py')
-rw-r--r--test/test_consumer_group.py47
1 files changed, 3 insertions, 44 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index f153d2d..03656fa 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -29,14 +29,6 @@ def topic(simple_client):
return topic
-@pytest.fixture
-def topic_with_messages(simple_client, topic):
- producer = SimpleProducer(simple_client)
- for i in six.moves.xrange(100):
- producer.send_messages(topic, 'msg_%d' % i)
- return topic
-
-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
def test_consumer(kafka_broker, version):
@@ -76,7 +68,9 @@ def test_group(kafka_broker, topic):
num_consumers = 4
for i in range(num_consumers):
- threading.Thread(target=consumer_thread, args=(i,)).start()
+ t = threading.Thread(target=consumer_thread, args=(i,))
+ t.daemon = True
+ t.start()
try:
timeout = time.time() + 35
@@ -108,38 +102,3 @@ def test_group(kafka_broker, topic):
finally:
for c in range(num_consumers):
stop[c].set()
-
-
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
-def test_correlation_id_rollover(kafka_broker):
- logging.getLogger('kafka.conn').setLevel(logging.ERROR)
- from kafka.protocol.metadata import MetadataRequest
- conn = BrokerConnection('localhost', kafka_broker.port,
- receive_buffer_bytes=131072,
- max_in_flight_requests_per_connection=100)
- req = MetadataRequest([])
- while not conn.connected():
- conn.connect()
- futures = collections.deque()
- start = time.time()
- done = 0
- for i in six.moves.xrange(2**13):
- if not conn.can_send_more():
- conn.recv(timeout=None)
- futures.append(conn.send(req))
- conn.recv()
- while futures and futures[0].is_done:
- f = futures.popleft()
- if not f.succeeded():
- raise f.exception
- done += 1
- if time.time() > start + 10:
- print ("%d done" % done)
- start = time.time()
-
- while futures:
- conn.recv()
- if futures[0].is_done:
- f = futures.popleft()
- if not f.succeeded():
- raise f.exception