summaryrefslogtreecommitdiff
path: root/src/rabbit_heartbeat.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_heartbeat.erl')
-rw-r--r--src/rabbit_heartbeat.erl54
1 files changed, 43 insertions, 11 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index a9945af1..589bf7cc 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -32,7 +32,7 @@
-module(rabbit_heartbeat).
-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
- pause_monitor/1, resume_monitor/1]).
+ start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]).
-include("rabbit.hrl").
@@ -41,16 +41,28 @@
-ifdef(use_specs).
-export_type([heartbeaters/0]).
+-export_type([start_heartbeat_fun/0]).
--type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})).
+-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}).
+
+-type(heartbeat_callback() :: fun (() -> any())).
+
+-type(start_heartbeat_fun() ::
+ fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) ->
+ no_return())).
-spec(start_heartbeat_sender/3 ::
- (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
-spec(start_heartbeat_receiver/3 ::
- (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
+-spec(start_heartbeat_fun/1 ::
+ (pid()) -> start_heartbeat_fun()).
+
+
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -58,40 +70,60 @@
%%----------------------------------------------------------------------------
-start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
+start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
heartbeater(
{Sock, TimeoutSec * 1000 div 2, send_oct, 0,
fun () ->
- catch rabbit_net:send(
- Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ SendFun(),
continue
end}).
-start_heartbeat_receiver(Parent, Sock, TimeoutSec) ->
+start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
- Parent ! timeout,
+ ReceiveFun(),
stop
end}).
-pause_monitor(none) ->
+start_heartbeat_fun(SupPid) ->
+ fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ {ok, Sender} =
+ start_heartbeater(SendTimeoutSec, SupPid, Sock,
+ SendFun, heartbeat_sender,
+ start_heartbeat_sender),
+ {ok, Receiver} =
+ start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
+ ReceiveFun, heartbeat_receiver,
+ start_heartbeat_receiver),
+ {Sender, Receiver}
+ end.
+
+pause_monitor({_Sender, none}) ->
ok;
pause_monitor({_Sender, Receiver}) ->
Receiver ! pause,
ok.
-resume_monitor(none) ->
+resume_monitor({_Sender, none}) ->
ok;
resume_monitor({_Sender, Receiver}) ->
Receiver ! resume,
ok.
%%----------------------------------------------------------------------------
+start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
+ {ok, none};
+start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
+ supervisor2:start_child(
+ SupPid, {Name,
+ {rabbit_heartbeat, Callback,
+ [Sock, TimeoutSec, TimeoutFun]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
heartbeater(Params) ->
{ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.