summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-11 16:36:24 -0800
committerDana Powers <dana.powers@gmail.com>2016-03-12 15:08:36 -0800
commitf58b7e9b2aaaa6891b6ad0331b6aa8cce55d97f4 (patch)
tree1546751938165c621cc699ecc9a20add5e3cce2f /test
parentccadb4dc8059865f9d7b0c4a65c5e480e65cd25f (diff)
downloadkafka-python-f58b7e9b2aaaa6891b6ad0331b6aa8cce55d97f4.tar.gz
Add test for unknown coordinator heartbeat task
Diffstat (limited to 'test')
-rw-r--r--test/test_coordinator.py16
1 files changed, 14 insertions, 2 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 94e0e66..847cbc1 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -380,16 +380,20 @@ def test_maybe_auto_commit_offsets_sync(mocker, coordinator,
def patched_coord(mocker, coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
coordinator._subscription.needs_partition_assignment = False
- mocker.patch.object(coordinator, 'coordinator_unknown')
- coordinator.coordinator_unknown.return_value = False
+ mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
coordinator.coordinator_id = 0
+ coordinator.generation = 0
+ mocker.patch.object(coordinator, 'need_rejoin', return_value=False)
mocker.patch.object(coordinator._client, 'least_loaded_node',
return_value=1)
mocker.patch.object(coordinator._client, 'ready', return_value=True)
mocker.patch.object(coordinator._client, 'send')
+ mocker.patch.object(coordinator._client, 'schedule')
mocker.spy(coordinator, '_failed_request')
mocker.spy(coordinator, '_handle_offset_commit_response')
mocker.spy(coordinator, '_handle_offset_fetch_response')
+ mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_success')
+ mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_failure')
return coordinator
@@ -573,3 +577,11 @@ def test_handle_offset_fetch_response(patched_coord, offsets,
assert future.value == offsets
assert patched_coord.coordinator_id is (None if dead else 0)
assert patched_coord._subscription.needs_partition_assignment is reassign
+
+
+def test_heartbeat(patched_coord):
+ patched_coord.coordinator_unknown.return_value = True
+
+ patched_coord.heartbeat_task()
+ assert patched_coord._client.schedule.call_count == 1
+ assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1