summaryrefslogtreecommitdiff
path: root/test/test_consumer_group.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-12-21 14:46:10 -0800
committerGitHub <noreply@github.com>2017-12-21 14:46:10 -0800
commitad024d1e897dbf16bd629fa63895bd7af4a8d959 (patch)
treef1993351b2c6487e8e623cefabf42ddf7477f666 /test/test_consumer_group.py
parent995664c7d407009a0a1030c7541848eb5ad51c97 (diff)
downloadkafka-python-ad024d1e897dbf16bd629fa63895bd7af4a8d959.tar.gz
KAFKA-3888 Use background thread to process consumer heartbeats (#1266)
Diffstat (limited to 'test/test_consumer_group.py')
-rw-r--r--test/test_consumer_group.py45
1 files changed, 42 insertions, 3 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 8f25e9f..690d45a 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -9,6 +9,7 @@ import six
from kafka import SimpleClient
from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
+from kafka.coordinator.base import MemberState, Generation
from kafka.structs import TopicPartition
from test.conftest import version
@@ -92,9 +93,10 @@ def test_group(kafka_broker, topic):
# If all consumers exist and have an assignment
else:
+ logging.info('All consumers have assignment... checking for stable group')
# Verify all consumers are in the same generation
# then log state and break while loop
- generations = set([consumer._coordinator.generation
+ generations = set([consumer._coordinator._generation.generation_id
for consumer in list(consumers.values())])
# New generation assignment is not complete until
@@ -105,12 +107,16 @@ def test_group(kafka_broker, topic):
if not rejoining and len(generations) == 1:
for c, consumer in list(consumers.items()):
logging.info("[%s] %s %s: %s", c,
- consumer._coordinator.generation,
- consumer._coordinator.member_id,
+ consumer._coordinator._generation.generation_id,
+ consumer._coordinator._generation.member_id,
consumer.assignment())
break
+ else:
+ logging.info('Rejoining: %s, generations: %s', rejoining, generations)
+ time.sleep(1)
assert time.time() < timeout, "timeout waiting for assignments"
+ logging.info('Group stabilized; verifying assignment')
group_assignment = set()
for c in range(num_consumers):
assert len(consumers[c].assignment()) != 0
@@ -120,9 +126,12 @@ def test_group(kafka_broker, topic):
assert group_assignment == set([
TopicPartition(topic, partition)
for partition in range(num_partitions)])
+ logging.info('Assignment looks good!')
finally:
+ logging.info('Shutting down %s consumers', num_consumers)
for c in range(num_consumers):
+ logging.info('Stopping consumer %s', c)
stop[c].set()
threads[c].join()
@@ -143,3 +152,33 @@ def test_paused(kafka_broker, topic):
consumer.unsubscribe()
assert set() == consumer.paused()
+
+
+@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
+@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+def test_heartbeat_thread(kafka_broker, topic):
+ group_id = 'test-group-' + random_string(6)
+ consumer = KafkaConsumer(topic,
+ bootstrap_servers=get_connect_str(kafka_broker),
+ group_id=group_id,
+ heartbeat_interval_ms=500)
+
+ # poll until we have joined group / have assignment
+ while not consumer.assignment():
+ consumer.poll(timeout_ms=100)
+
+ assert consumer._coordinator.state is MemberState.STABLE
+ last_poll = consumer._coordinator.heartbeat.last_poll
+ last_beat = consumer._coordinator.heartbeat.last_send
+
+ timeout = time.time() + 30
+ while True:
+ if time.time() > timeout:
+ raise RuntimeError('timeout waiting for heartbeat')
+ if consumer._coordinator.heartbeat.last_send > last_beat:
+ break
+ time.sleep(0.5)
+
+ assert consumer._coordinator.heartbeat.last_poll == last_poll
+ consumer.poll(timeout_ms=100)
+ assert consumer._coordinator.heartbeat.last_poll > last_poll