diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-25 16:03:22 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-25 16:31:37 -0800 |
commit | 654f6b62470e88bf6e76fcf12ab1f9136eba7e1f (patch) | |
tree | 37e490b8551838250245a7ef52baba562c56b894 | |
parent | a154d0471c9181a6a6461466140e881018df4b8b (diff) | |
download | kafka-python-654f6b62470e88bf6e76fcf12ab1f9136eba7e1f.tar.gz |
Remove test_correlation_id_rollover; use daemon threads for test consumers
-rw-r--r-- | test/test_consumer_group.py | 47 |
1 files changed, 3 insertions, 44 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index f153d2d..03656fa 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -29,14 +29,6 @@ def topic(simple_client): return topic -@pytest.fixture -def topic_with_messages(simple_client, topic): - producer = SimpleProducer(simple_client) - for i in six.moves.xrange(100): - producer.send_messages(topic, 'msg_%d' % i) - return topic - - @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") def test_consumer(kafka_broker, version): @@ -76,7 +68,9 @@ def test_group(kafka_broker, topic): num_consumers = 4 for i in range(num_consumers): - threading.Thread(target=consumer_thread, args=(i,)).start() + t = threading.Thread(target=consumer_thread, args=(i,)) + t.daemon = True + t.start() try: timeout = time.time() + 35 @@ -108,38 +102,3 @@ def test_group(kafka_broker, topic): finally: for c in range(num_consumers): stop[c].set() - - -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") -def test_correlation_id_rollover(kafka_broker): - logging.getLogger('kafka.conn').setLevel(logging.ERROR) - from kafka.protocol.metadata import MetadataRequest - conn = BrokerConnection('localhost', kafka_broker.port, - receive_buffer_bytes=131072, - max_in_flight_requests_per_connection=100) - req = MetadataRequest([]) - while not conn.connected(): - conn.connect() - futures = collections.deque() - start = time.time() - done = 0 - for i in six.moves.xrange(2**13): - if not conn.can_send_more(): - conn.recv(timeout=None) - futures.append(conn.send(req)) - conn.recv() - while futures and futures[0].is_done: - f = futures.popleft() - if not f.succeeded(): - raise f.exception - done += 1 - if time.time() > start + 10: - print ("%d done" % done) - start = time.time() - - while futures: - conn.recv() - if futures[0].is_done: - f = futures.popleft() - if not f.succeeded(): - raise f.exception |