From ef25dbf2c2b037d7b6c213213c88c7ffec70a291 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 10 Nov 2010 11:47:51 +0000 Subject: Refactored rabbit_heartbeat to provide start_heartbeat_fun --- src/rabbit_connection_sup.erl | 28 ++++++++++------------------ src/rabbit_heartbeat.erl | 35 +++++++++++++++++++++++++++-------- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index b3821d3b..bb5ed916 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -79,21 +79,13 @@ init([]) -> {ok, {{one_for_all, 0, 1}, []}}. start_heartbeat_fun(SupPid) -> - fun (_Sock, 0) -> - none; - (Sock, TimeoutSec) -> - Parent = self(), - {ok, Sender} = - supervisor2:start_child( - SupPid, {heartbeat_sender, - {rabbit_heartbeat, start_heartbeat_sender, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {ok, Receiver} = - supervisor2:start_child( - SupPid, {heartbeat_receiver, - {rabbit_heartbeat, start_heartbeat_receiver, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {Sender, Receiver} - end. + SendFun = fun(Sock) -> + Frame = rabbit_binary_generator:build_heartbeat_frame(), + catch rabbit_net:send(Sock, Frame) + end, + + Parent = self(), + TimeoutFun = fun() -> + Parent ! timeout + end, + rabbit_heartbeat:start_heartbeat_fun(SupPid, SendFun, TimeoutFun). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index a9945af1..08462d74 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -32,7 +32,7 @@ -module(rabbit_heartbeat). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - pause_monitor/1, resume_monitor/1]). + start_heartbeat_fun/3, pause_monitor/1, resume_monitor/1]). -include("rabbit.hrl"). @@ -43,12 +43,13 @@ -export_type([heartbeaters/0]). -type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). +-type(callback_fun() :: fun (() -> any())). -spec(start_heartbeat_sender/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), callback_fun()) -> rabbit_types:ok(pid())). -spec(start_heartbeat_receiver/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), callback_fun()) -> rabbit_types:ok(pid())). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -58,27 +59,45 @@ %%---------------------------------------------------------------------------- -start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater( {Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch rabbit_net:send( - Sock, rabbit_binary_generator:build_heartbeat_frame()), + SendFun(Sock), continue end}). -start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> +start_heartbeat_receiver(Sock, TimeoutSec, TimeoutFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - Parent ! timeout, + TimeoutFun(), stop end}). +start_heartbeat_fun(SupPid, SendFun, TimeoutFun) -> + fun (_Sock, 0) -> + none; + (Sock, TimeoutSec) -> + {ok, Sender} = + supervisor2:start_child( + SupPid, {heartbeat_sender, + {rabbit_heartbeat, start_heartbeat_sender, + [Sock, TimeoutSec, SendFun]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {ok, Receiver} = + supervisor2:start_child( + SupPid, {heartbeat_receiver, + {rabbit_heartbeat, start_heartbeat_receiver, + [Sock, TimeoutSec, TimeoutFun]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {Sender, Receiver} + end. + pause_monitor(none) -> ok; pause_monitor({_Sender, Receiver}) -> -- cgit v1.2.1 From fcf0ec3a214a97da1d4fee3f02bd730ed73c6840 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 10 Nov 2010 12:11:07 +0000 Subject: Fixed up the specs for the heartbeater --- src/rabbit_heartbeat.erl | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 08462d74..ebdfbdc6 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -43,15 +43,23 @@ -export_type([heartbeaters/0]). -type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). --type(callback_fun() :: fun (() -> any())). + +-type(send_fun() :: fun ((rabbit_net:socket()) -> any())). +-type(timeout_fun() :: fun (() -> any())). -spec(start_heartbeat_sender/3 :: - (rabbit_net:socket(), non_neg_integer(), callback_fun()) -> + (rabbit_net:socket(), non_neg_integer(), send_fun()) -> rabbit_types:ok(pid())). -spec(start_heartbeat_receiver/3 :: - (rabbit_net:socket(), non_neg_integer(), callback_fun()) -> + (rabbit_net:socket(), non_neg_integer(), timeout_fun()) -> rabbit_types:ok(pid())). +-spec(start_heartbeat_fun/3 :: + (pid(), send_fun(), timeout_fun()) -> + fun((rabbit_net:socket(), non_neg_integer()) + -> heartbeaters())). + + -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). -- cgit v1.2.1 From 1298a4035b2d1a4791530ade8a586e22c4f48b1e Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 10 Nov 2010 15:39:12 +0000 Subject: Reworked start_heartbeat_fun to allow a different timeout for send/receive heartbeats --- src/rabbit_heartbeat.erl | 44 ++++++++++++++++++++++++-------------------- src/rabbit_reader.erl | 13 +++++-------- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index ebdfbdc6..5f1e211e 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -41,11 +41,15 @@ -ifdef(use_specs). -export_type([heartbeaters/0]). +-export_type([start_heartbeat_fun/0]). --type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). +-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). -type(send_fun() :: fun ((rabbit_net:socket()) -> any())). -type(timeout_fun() :: fun (() -> any())). +-type(start_heartbeat_fun() :: + fun((rabbit_net:socket(), non_neg_integer(), non_neg_integer()) -> + no_return())). -spec(start_heartbeat_sender/3 :: (rabbit_net:socket(), non_neg_integer(), send_fun()) -> @@ -55,9 +59,7 @@ rabbit_types:ok(pid())). -spec(start_heartbeat_fun/3 :: - (pid(), send_fun(), timeout_fun()) -> - fun((rabbit_net:socket(), non_neg_integer()) - -> heartbeaters())). + (pid(), send_fun(), timeout_fun()) -> start_heartbeat_fun()). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -87,38 +89,40 @@ start_heartbeat_receiver(Sock, TimeoutSec, TimeoutFun) -> stop end}). -start_heartbeat_fun(SupPid, SendFun, TimeoutFun) -> - fun (_Sock, 0) -> - none; - (Sock, TimeoutSec) -> +start_heartbeat_fun(SupPid, SendFun, ReceiveFun) -> + fun (Sock, SendTimeoutSec, ReceiveTimeoutSec) -> {ok, Sender} = - supervisor2:start_child( - SupPid, {heartbeat_sender, - {rabbit_heartbeat, start_heartbeat_sender, - [Sock, TimeoutSec, SendFun]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + start_heartbeater(SendTimeoutSec, SupPid, Sock, + SendFun, heartbeat_sender, + start_heartbeat_sender), {ok, Receiver} = - supervisor2:start_child( - SupPid, {heartbeat_receiver, - {rabbit_heartbeat, start_heartbeat_receiver, - [Sock, TimeoutSec, TimeoutFun]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + ReceiveFun, heartbeat_receiver, + start_heartbeat_receiver), {Sender, Receiver} end. -pause_monitor(none) -> +pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. -resume_monitor(none) -> +resume_monitor({_Sender, none}) -> ok; resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. %%---------------------------------------------------------------------------- +start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> + {ok, none}; +start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> + supervisor2:start_child( + SupPid, {Name, + {rabbit_heartbeat, Callback, + [Sock, TimeoutSec, TimeoutFun]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 127467bb..54e51600 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -162,11 +162,7 @@ -ifdef(use_specs). --type(start_heartbeat_fun() :: - fun ((rabbit_networking:socket(), non_neg_integer()) -> - rabbit_heartbeat:heartbeaters())). - --spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> +-spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). @@ -177,9 +173,10 @@ -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) + -> no_return()). -spec(start_connection/7 :: - (pid(), pid(), pid(), start_heartbeat_fun(), any(), + (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), rabbit_networking:socket(), fun ((rabbit_networking:socket()) -> rabbit_types:ok_or_error2( @@ -771,7 +768,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = SHF(Sock, ClientHeartbeat), + Heartbeater = SHF(Sock, ClientHeartbeat, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, -- cgit v1.2.1 From 63fab0f6cb17797c5effe58f5abcca2f87ec4a6c Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 10 Nov 2010 19:00:27 +0000 Subject: Reworked heartbeating so that it really works again and so we can specify different timeouts for send/receive --- src/rabbit_connection_sup.erl | 14 ++------------ src/rabbit_heartbeat.erl | 27 ++++++++++++++------------- src/rabbit_reader.erl | 14 +++++++++++++- 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index bb5ed916..22742fa9 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -66,7 +66,8 @@ start_link() -> supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, + [ChannelSupSupPid, Collector, + rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. @@ -78,14 +79,3 @@ reader(Pid) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. -start_heartbeat_fun(SupPid) -> - SendFun = fun(Sock) -> - Frame = rabbit_binary_generator:build_heartbeat_frame(), - catch rabbit_net:send(Sock, Frame) - end, - - Parent = self(), - TimeoutFun = fun() -> - Parent ! timeout - end, - rabbit_heartbeat:start_heartbeat_fun(SupPid, SendFun, TimeoutFun). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 5f1e211e..589bf7cc 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -32,7 +32,7 @@ -module(rabbit_heartbeat). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - start_heartbeat_fun/3, pause_monitor/1, resume_monitor/1]). + start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). -include("rabbit.hrl"). @@ -45,21 +45,22 @@ -type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). --type(send_fun() :: fun ((rabbit_net:socket()) -> any())). --type(timeout_fun() :: fun (() -> any())). +-type(heartbeat_callback() :: fun (() -> any())). + -type(start_heartbeat_fun() :: - fun((rabbit_net:socket(), non_neg_integer(), non_neg_integer()) -> + fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> no_return())). -spec(start_heartbeat_sender/3 :: - (rabbit_net:socket(), non_neg_integer(), send_fun()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). -spec(start_heartbeat_receiver/3 :: - (rabbit_net:socket(), non_neg_integer(), timeout_fun()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). --spec(start_heartbeat_fun/3 :: - (pid(), send_fun(), timeout_fun()) -> start_heartbeat_fun()). +-spec(start_heartbeat_fun/1 :: + (pid()) -> start_heartbeat_fun()). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -76,21 +77,21 @@ start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> heartbeater( {Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - SendFun(Sock), + SendFun(), continue end}). -start_heartbeat_receiver(Sock, TimeoutSec, TimeoutFun) -> +start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - TimeoutFun(), + ReceiveFun(), stop end}). -start_heartbeat_fun(SupPid, SendFun, ReceiveFun) -> - fun (Sock, SendTimeoutSec, ReceiveTimeoutSec) -> +start_heartbeat_fun(SupPid) -> + fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock, SendFun, heartbeat_sender, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 54e51600..c40e02b8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -768,7 +768,19 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = SHF(Sock, ClientHeartbeat, ClientHeartbeat), + SendFun = + fun() -> + Frame = rabbit_binary_generator:build_heartbeat_frame(), + catch rabbit_net:send(Sock, Frame) + end, + + Parent = self(), + ReceiveFun = + fun() -> + Parent ! timeout + end, + Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, + ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, -- cgit v1.2.1