summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl148
1 files changed, 67 insertions, 81 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e4d78c45..964c3e24 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -19,17 +19,8 @@
%% For general documentation of HA design, see
%% rabbit_mirror_queue_coordinator
%%
-%% We join the GM group before we add ourselves to the amqqueue
-%% record. As a result:
-%% 1. We can receive msgs from GM that correspond to messages we will
-%% never receive from publishers.
-%% 2. When we receive a message from publishers, we must receive a
-%% message from the GM group for it.
-%% 3. However, that instruction from the GM group can arrive either
-%% before or after the actual message. We need to be able to
-%% distinguish between GM instructions arriving early, and case (1)
-%% above.
-%%
+%% We receive messages from GM and from publishers, and the gm
+%% messages can arrive either before or after the 'actual' message.
%% All instructions from the GM group must be processed in the order
%% in which they're received.
@@ -89,28 +80,33 @@
synchronised
}).
-start_link(Q) ->
- gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-info(QPid) ->
- gen_server2:call(QPid, info, infinity).
+info(QPid) -> gen_server2:call(QPid, info, infinity).
init(#amqqueue { name = QueueName } = Q) ->
+ %% We join the GM group before we add ourselves to the amqqueue
+ %% record. As a result:
+ %% 1. We can receive msgs from GM that correspond to messages we will
+ %% never receive from publishers.
+ %% 2. When we receive a message from publishers, we must receive a
+ %% message from the GM group for it.
+ %% 3. However, that instruction from the GM group can arrive either
+ %% before or after the actual message. We need to be able to
+ %% distinguish between GM instructions arriving early, and case (1)
+ %% above.
+ %%
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(fun() ->
- init_it(Self, Node,
- QueueName)
- end) of
+ case rabbit_misc:execute_mnesia_transaction(
+ fun() -> init_it(Self, Node, QueueName) end) of
{new, MPid} ->
- process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
- receive {joined, GM} ->
- ok
- end,
erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
@@ -153,24 +149,21 @@ init_it(Self, Node, QueueName) ->
[Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
mnesia:read({rabbit_queue, QueueName}),
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] ->
- MPids1 = MPids ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
- {new, QPid};
- [QPid] ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> duplicate_live_master;
- false -> {stale, QPid}
- end;
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true -> existing;
- false -> MPids1 = (MPids -- [SPid]) ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
- {new, QPid}
- end
+ [] -> MPids1 = MPids ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid};
+ [QPid] -> case rabbit_misc:is_process_alive(QPid) of
+ true -> duplicate_live_master;
+ false -> {stale, QPid}
+ end;
+ [SPid] -> case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> MPids1 = (MPids -- [SPid]) ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid}
+ end
end.
handle_call({deliver, Delivery = #delivery { immediate = true }},
@@ -356,14 +349,10 @@ prioritise_info(Msg, _State) ->
%% GM
%% ---------------------------------------------------------------------------
-joined([SPid], _Members) ->
- SPid ! {joined, self()},
- ok.
+joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, []) ->
- ok;
-members_changed([SPid], _Births, Deaths) ->
- inform_deaths(SPid, Deaths).
+members_changed([_SPid], _Births, []) -> ok;
+members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths).
handle_msg([_SPid], _From, master_changed) ->
ok;
@@ -584,9 +573,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
confirm_messages(MsgIds, State #state {
backing_queue_state = BQS1 })),
case BQ:needs_timeout(BQS1) of
- false -> {stop_sync_timer(State1), hibernate};
- idle -> {stop_sync_timer(State1), 0 };
- timed -> {ensure_sync_timer(State1), 0 }
+ false -> {stop_sync_timer(State1), hibernate };
+ idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
+ timed -> {ensure_sync_timer(State1), 0 }
end.
backing_queue_timeout(State = #state { backing_queue = BQ }) ->
@@ -680,26 +669,24 @@ maybe_enqueue_message(
%% msg_seq_no was at the time. We do now!
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { sender_queues = SQ1,
- msg_id_status = dict:erase(MsgId, MS) };
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 };
{ok, {published, ChPid}} ->
%% It was published to the BQ and we didn't know the
%% msg_seq_no so couldn't confirm it at the time.
- case needs_confirming(Delivery, State1) of
- never ->
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- eventually ->
- State1 #state {
- msg_id_status =
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 }
- end;
+ {MS1, SQ1} =
+ case needs_confirming(Delivery, State1) of
+ never -> {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)};
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ {dict:store(MsgId, MMS, MS), SQ};
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)}
+ end,
+ State1 #state { msg_id_status = MS1,
+ sender_queues = SQ1 };
{ok, discarded} ->
%% We've already heard from GM that the msg is to be
%% discarded. We won't see this again.
@@ -748,18 +735,17 @@ process_instruction(
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } },
_EnqueueOnPromotion}}, MQ2} ->
- %% We received the msg from the channel first. Thus we
- %% need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never ->
- {MQ2, PendingCh, MS};
- eventually ->
- {MQ2, PendingCh,
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- {MQ2, PendingCh, MS}
- end;
+ {MQ2, PendingCh,
+ %% We received the msg from the channel first. Thus
+ %% we need to deal with confirms here.
+ case needs_confirming(Delivery, State1) of
+ never -> MS;
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ dict:store(MsgId, MMS , MS);
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ MS
+ end};
{{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}