diff options
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 38e0da3f..1996fd0a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -100,7 +100,7 @@ init(Q = #amqqueue { name = QName }) -> Node = node(), case rabbit_misc:execute_mnesia_transaction( fun() -> init_it(Self, GM, Node, QName) end) of - {new, QPid} -> + {new, QPid, GMPids} -> erlang:monitor(process, QPid), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [Self]), @@ -120,13 +120,14 @@ init(Q = #amqqueue { name = QName }) -> msg_id_ack = dict:new(), msg_id_status = dict:new(), - known_senders = pmon:new(), + known_senders = pmon:new(delegate), depth_delta = undefined }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), ok = gm:broadcast(GM, request_depth), + ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; @@ -144,7 +145,7 @@ init_it(Self, GM, Node, QName) -> mnesia:read({rabbit_queue, QName}), case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of [] -> add_slave(Q, Self, GM), - {new, QPid}; + {new, QPid, GMPids}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; false -> {stale, QPid} @@ -156,7 +157,7 @@ init_it(Self, GM, Node, QName) -> gm_pids = [T || T = {_, S} <- GMPids, S =/= SPid] }, add_slave(Q1, Self, GM), - {new, QPid} + {new, QPid, GMPids} end end. @@ -273,7 +274,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, noreply(State); handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> - noreply(local_sender_death(ChPid, State)); + local_sender_death(ChPid, State), + noreply(State); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -604,7 +606,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> State #state { known_senders = pmon:monitor(ChPid, KS) }. -local_sender_death(ChPid, State = #state { known_senders = KS }) -> +local_sender_death(ChPid, #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a delivery %% from it but not heard about its death from the master. So if it %% is monitored we need to point the death out to the master (see @@ -612,8 +614,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> confirm_sender_death(ChPid) - end, - State. + end. confirm_sender_death(Pid) -> %% We have to deal with the possibility that we'll be promoted to |