summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl17
1 files changed, 9 insertions, 8 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 38e0da3f..1996fd0a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -100,7 +100,7 @@ init(Q = #amqqueue { name = QName }) ->
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
fun() -> init_it(Self, GM, Node, QName) end) of
- {new, QPid} ->
+ {new, QPid, GMPids} ->
erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
@@ -120,13 +120,14 @@ init(Q = #amqqueue { name = QName }) ->
msg_id_ack = dict:new(),
msg_id_status = dict:new(),
- known_senders = pmon:new(),
+ known_senders = pmon:new(delegate),
depth_delta = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
ok = gm:broadcast(GM, request_depth),
+ ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
@@ -144,7 +145,7 @@ init_it(Self, GM, Node, QName) ->
mnesia:read({rabbit_queue, QName}),
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> add_slave(Q, Self, GM),
- {new, QPid};
+ {new, QPid, GMPids};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
false -> {stale, QPid}
@@ -156,7 +157,7 @@ init_it(Self, GM, Node, QName) ->
gm_pids = [T || T = {_, S} <- GMPids,
S =/= SPid] },
add_slave(Q1, Self, GM),
- {new, QPid}
+ {new, QPid, GMPids}
end
end.
@@ -273,7 +274,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
noreply(State);
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
- noreply(local_sender_death(ChPid, State));
+ local_sender_death(ChPid, State),
+ noreply(State);
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -604,7 +606,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
-local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+local_sender_death(ChPid, #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a delivery
%% from it but not heard about its death from the master. So if it
%% is monitored we need to point the death out to the master (see
@@ -612,8 +614,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> confirm_sender_death(ChPid)
- end,
- State.
+ end.
confirm_sender_death(Pid) ->
%% We have to deal with the possibility that we'll be promoted to