summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/delegate.erl101
-rw-r--r--src/pmon.erl42
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl27
-rw-r--r--src/rabbit_mirror_queue_slave.erl85
6 files changed, 176 insertions, 83 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index f680d94a..0331ca01 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,22 +18,32 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2,
+ monitor/2, demonitor/1, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-record(state, {node, monitors, name}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([monitor_ref/0]).
+
+-type(monitor_ref() :: reference() | {atom(), pid()}).
-type(fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}).
+
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
-spec(invoke/2 :: ( pid(), fun_or_mfa(A)) -> A;
([pid()], fun_or_mfa(A)) -> {[{pid(), A}],
[{pid(), term()}]}).
-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa(any())) -> 'ok').
+-spec(monitor/2 :: ('process', pid()) -> monitor_ref()).
+-spec(demonitor/1 :: (monitor_ref()) -> 'true').
+
-spec(call/2 ::
( pid(), any()) -> any();
([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
@@ -49,7 +59,8 @@
%%----------------------------------------------------------------------------
start_link(Num) ->
- gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+ Name = delegate_name(Num),
+ gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
apply1(FunOrMFA, Pid);
@@ -77,7 +88,7 @@ invoke(Pids, FunOrMFA) when is_list(Pids) ->
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
RemoteNodes -> gen_server2:multi_call(
- RemoteNodes, delegate(RemoteNodes),
+ RemoteNodes, delegate(self(), RemoteNodes),
{invoke, FunOrMFA, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
@@ -105,12 +116,25 @@ invoke_no_result(Pids, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
- {invoke, FunOrMFA, Grouped})
+ RemoteNodes -> gen_server2:abcast(
+ RemoteNodes, delegate(self(), RemoteNodes),
+ {invoke, FunOrMFA, Grouped})
end,
safe_invoke(LocalPids, FunOrMFA), %% must not die
ok.
+monitor(process, Pid) when node(Pid) =:= node() ->
+ erlang:monitor(process, Pid);
+monitor(process, Pid) ->
+ Name = delegate(Pid, [node(Pid)]),
+ gen_server2:cast(Name, {monitor, self(), Pid}),
+ {Name, Pid}.
+
+demonitor(Ref) when is_reference(Ref) ->
+ erlang:demonitor(Ref);
+demonitor({Name, Pid}) ->
+ gen_server2:cast(Name, {demonitor, self(), Pid}).
+
call(PidOrPids, Msg) ->
invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}).
@@ -133,10 +157,10 @@ group_pids_by_node(Pids) ->
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate(RemoteNodes) ->
+delegate(Pid, RemoteNodes) ->
case get(delegate) of
undefined -> Name = delegate_name(
- erlang:phash2(self(),
+ erlang:phash2(Pid,
delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
@@ -157,23 +181,64 @@ apply1(Fun, Arg) -> Fun(Arg).
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, node(), hibernate,
+init([Name]) ->
+ {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({invoke, FunOrMFA, Grouped}, _From, Node) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), Node,
+handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State,
hibernate}.
-handle_cast({invoke, FunOrMFA, Grouped}, Node) ->
+handle_cast({monitor, MonitoringPid, Pid},
+ State = #state{monitors = Monitors}) ->
+ Monitors1 = case dict:find(Pid, Monitors) of
+ {ok, {Ref, Pids}} ->
+ Pids1 = gb_sets:add_element(MonitoringPid, Pids),
+ dict:store(Pid, {Ref, Pids1}, Monitors);
+ error ->
+ Ref = erlang:monitor(process, Pid),
+ Pids = gb_sets:singleton(MonitoringPid),
+ dict:store(Pid, {Ref, Pids}, Monitors)
+ end,
+ {noreply, State#state{monitors = Monitors1}, hibernate};
+
+handle_cast({demonitor, MonitoringPid, Pid},
+ State = #state{monitors = Monitors}) ->
+ Monitors1 = case dict:find(Pid, Monitors) of
+ {ok, {Ref, Pids}} ->
+ Pids1 = gb_sets:del_element(MonitoringPid, Pids),
+ case gb_sets:is_empty(Pids1) of
+ true -> erlang:demonitor(Ref),
+ dict:erase(Pid, Monitors);
+ false -> dict:store(Pid, {Ref, Pids1}, Monitors)
+ end;
+ error ->
+ Monitors
+ end,
+ {noreply, State#state{monitors = Monitors1}, hibernate};
+
+handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) ->
safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA),
- {noreply, Node, hibernate}.
-
-handle_info(_Info, Node) ->
- {noreply, Node, hibernate}.
+ {noreply, State, hibernate}.
+
+handle_info({'DOWN', Ref, process, Pid, Info},
+ State = #state{monitors = Monitors, name = Name}) ->
+ {noreply,
+ case dict:find(Pid, Monitors) of
+ {ok, {Ref, Pids}} ->
+ Msg = {'DOWN', {Name, Pid}, process, Pid, Info},
+ gb_sets:fold(fun (MonitoringPid, _) -> MonitoringPid ! Msg end,
+ none, Pids),
+ State#state{monitors = dict:erase(Pid, Monitors)};
+ error ->
+ State
+ end, hibernate};
+
+handle_info(_Info, State) ->
+ {noreply, State, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, Node, _Extra) ->
- {ok, Node}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/pmon.erl b/src/pmon.erl
index b9db66fb..86308167 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -16,22 +16,26 @@
-module(pmon).
--export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
- monitored/1, is_empty/1]).
+-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2,
+ is_monitored/2, erase/2, monitored/1, is_empty/1]).
-compile({no_auto_import, [monitor/2]}).
+-record(state, {dict, module}).
+
-ifdef(use_specs).
%%----------------------------------------------------------------------------
-export_type([?MODULE/0]).
--opaque(?MODULE() :: dict()).
+-opaque(?MODULE() :: #state{dict :: dict(),
+ module :: atom()}).
-type(item() :: pid() | {atom(), node()}).
-spec(new/0 :: () -> ?MODULE()).
+-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()).
-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
@@ -42,29 +46,33 @@
-endif.
-new() -> dict:new().
+new() -> new(erlang).
+
+new(Module) -> #state{dict = dict:new(),
+ module = Module}.
-monitor(Item, M) ->
+monitor(Item, S = #state{dict = M, module = Module}) ->
case dict:is_key(Item, M) of
- true -> M;
- false -> dict:store(Item, erlang:monitor(process, Item), M)
+ true -> S;
+ false -> S#state{dict = dict:store(
+ Item, Module:monitor(process, Item), M)}
end.
-monitor_all([], M) -> M; %% optimisation
-monitor_all([Item], M) -> monitor(Item, M); %% optimisation
-monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
+monitor_all([], S) -> S; %% optimisation
+monitor_all([Item], S) -> monitor(Item, S); %% optimisation
+monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items).
-demonitor(Item, M) ->
+demonitor(Item, S = #state{dict = M, module = Module}) ->
case dict:find(Item, M) of
- {ok, MRef} -> erlang:demonitor(MRef),
- dict:erase(Item, M);
+ {ok, MRef} -> Module:demonitor(MRef),
+ S#state{dict = dict:erase(Item, M)};
error -> M
end.
-is_monitored(Item, M) -> dict:is_key(Item, M).
+is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M).
-erase(Item, M) -> dict:erase(Item, M).
+erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}.
-monitored(M) -> dict:fetch_keys(M).
+monitored(#state{dict = M}) -> dict:fetch_keys(M).
-is_empty(M) -> dict:size(M) == 0.
+is_empty(#state{dict = M}) -> dict:size(M) == 0.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index a9974711..1b7fe6da 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -749,7 +749,7 @@ log_banner() ->
{"config file(s)" = K, []} ->
Format(K, "(none)");
{"config file(s)" = K, [V0 | Vs]} ->
- Format(K, V0), [Format("", V) || V <- Vs];
+ [Format(K, V0) | [Format("", V) || V <- Vs]];
{K, V} ->
Format(K, V)
end || S <- Settings]),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e61cba02..6e0eb9bf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -146,7 +146,7 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
active_consumers = queue:new(),
- senders = pmon:new(),
+ senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running},
rabbit_event:init_stats_timer(State, #q.stats_timer).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index c9918fed..f54e9bd1 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -222,20 +222,19 @@
%% sender_death message to all the slaves, saying the sender has
%% died. Once the slaves receive the sender_death message, they know
%% that they're not going to receive any more instructions from the gm
-%% regarding that sender, thus they throw away any publications from
-%% the sender pending publication instructions. However, it is
-%% possible that the coordinator receives the DOWN and communicates
-%% that to the master before the master has finished receiving and
-%% processing publishes from the sender. This turns out not to be a
-%% problem: the sender has actually died, and so will not need to
-%% receive confirms or other feedback, and should further messages be
-%% "received" from the sender, the master will ask the coordinator to
-%% set up a new monitor, and will continue to process the messages
-%% normally. Slaves may thus receive publishes via gm from previously
-%% declared "dead" senders, but again, this is fine: should the slave
-%% have just thrown out the message it had received directly from the
-%% sender (due to receiving a sender_death message via gm), it will be
-%% able to cope with the publication purely from the master via gm.
+%% regarding that sender. However, it is possible that the coordinator
+%% receives the DOWN and communicates that to the master before the
+%% master has finished receiving and processing publishes from the
+%% sender. This turns out not to be a problem: the sender has actually
+%% died, and so will not need to receive confirms or other feedback,
+%% and should further messages be "received" from the sender, the
+%% master will ask the coordinator to set up a new monitor, and
+%% will continue to process the messages normally. Slaves may thus
+%% receive publishes via gm from previously declared "dead" senders,
+%% but again, this is fine: should the slave have just thrown out the
+%% message it had received directly from the sender (due to receiving
+%% a sender_death message via gm), it will be able to cope with the
+%% publication purely from the master via gm.
%%
%% When a slave receives a DOWN message for a sender, if it has not
%% received the sender_death message from the master via gm already,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b74ae3a5..18f848c3 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -59,7 +59,7 @@
sync_timer_ref,
rate_timer_ref,
- sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
+ sender_queues, %% :: Pid -> {Q Msg, Set MsgId, ChState}
msg_id_ack, %% :: MsgId -> AckTag
msg_id_status,
@@ -118,7 +118,7 @@ init(Q = #amqqueue { name = QName }) ->
msg_id_ack = dict:new(),
msg_id_status = dict:new(),
- known_senders = pmon:new(),
+ known_senders = pmon:new(delegate),
depth_delta = undefined
},
@@ -270,7 +270,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
noreply(State);
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
- noreply(local_sender_death(ChPid, State));
+ local_sender_death(ChPid, State),
+ noreply(maybe_forget_sender(ChPid, down_from_ch, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -558,10 +559,15 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
- Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
- Delivery <- queue:to_list(PubQ)],
+ Deliveries = [Delivery ||
+ {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
+ Delivery <- queue:to_list(PubQ)],
+ AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
+ KS1 = lists:foldl(fun (ChPid0, KS0) ->
+ pmon:demonitor(ChPid0, KS0)
+ end, KS, AwaitGmDown),
rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS,
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
noreply(State) ->
@@ -601,7 +607,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
-local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+local_sender_death(ChPid, #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a delivery
%% from it but not heard about its death from the master. So if it
%% is monitored we need to point the death out to the master (see
@@ -609,8 +615,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> confirm_sender_death(ChPid)
- end,
- State.
+ end.
confirm_sender_death(Pid) ->
%% We have to deal with the possibility that we'll be promoted to
@@ -639,12 +644,39 @@ confirm_sender_death(Pid) ->
State
end,
%% Note that we do not remove our knowledge of this ChPid until we
- %% get the sender_death from GM.
+ %% get the sender_death from GM as well as a DOWN notification.
{ok, _TRef} = timer:apply_after(
?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue,
[self(), rabbit_mirror_queue_master, Fun]),
ok.
+forget_sender(running, _) -> false;
+forget_sender(_, running) -> false;
+forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
+
+%% Record and process lifetime events from channels. Forget all about a channel
+%% only when down notifications are received from both the channel and from gm.
+maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
+ msg_id_status = MS,
+ known_senders = KS }) ->
+ case dict:find(ChPid, SQ) of
+ error ->
+ State;
+ {ok, {MQ, PendCh, ChStateRecord}} ->
+ case forget_sender(ChState, ChStateRecord) of
+ true ->
+ credit_flow:peer_down(ChPid),
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = lists:foldl(
+ fun dict:erase/2,
+ MS, sets:to_list(PendCh)),
+ known_senders = pmon:demonitor(ChPid, KS) };
+ false ->
+ SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ),
+ State #state { sender_queues = SQ1 }
+ end
+ end.
+
maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
sender = ChPid },
@@ -653,9 +685,9 @@ maybe_enqueue_message(
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
MQ1 = queue:in(Delivery, MQ),
- SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, Status} ->
MS1 = send_or_record_confirm(
@@ -667,7 +699,7 @@ maybe_enqueue_message(
get_sender_queue(ChPid, SQ) ->
case dict:find(ChPid, SQ) of
- error -> {queue:new(), sets:new()};
+ error -> {queue:new(), sets:new(), running};
{ok, Val} -> Val
end.
@@ -675,19 +707,20 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
case dict:find(ChPid, SQ) of
error ->
SQ;
- {ok, {MQ, PendingCh}} ->
- dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
+ {ok, {MQ, PendingCh, ChState}} ->
+ dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
+ SQ)
end.
publish_or_discard(Status, ChPid, MsgId,
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
%% We really are going to do the publish/discard right now, even
%% though we may not have seen it directly from the channel. But
- %% we cannot issues confirms until the latter has happened. So we
+ %% we cannot issue confirms until the latter has happened. So we
%% need to keep track of the MsgId and its confirmation status in
%% the meantime.
State1 = ensure_monitoring(ChPid, State),
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
@@ -707,7 +740,7 @@ publish_or_discard(Status, ChPid, MsgId,
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ),
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
@@ -768,25 +801,13 @@ process_instruction({requeue, MsgIds},
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
- State = #state { sender_queues = SQ,
- msg_id_status = MS,
- known_senders = KS }) ->
+ State = #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a message
%% from it. In this case we just want to avoid doing work if we
%% never got any messages.
{ok, case pmon:is_monitored(ChPid, KS) of
false -> State;
- true -> MS1 = case dict:find(ChPid, SQ) of
- error ->
- MS;
- {ok, {_MQ, PendingCh}} ->
- lists:foldl(fun dict:erase/2, MS,
- sets:to_list(PendingCh))
- end,
- credit_flow:peer_down(ChPid),
- State #state { sender_queues = dict:erase(ChPid, SQ),
- msg_id_status = MS1,
- known_senders = pmon:demonitor(ChPid, KS) }
+ true -> maybe_forget_sender(ChPid, down_from_gm, State)
end};
process_instruction({depth, Depth},
State = #state { backing_queue = BQ,