1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved.
%%
-module(rabbit_prequeue).
%% This is the initial gen_server that all queue processes start off
%% as. It handles the decision as to whether we need to start a new
%% slave, a new master/unmirrored, or whether we are restarting (and
%% if so, as what). Thus a crashing queue process can restart from here
%% and always do the right thing.
-export([start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-behaviour(gen_server2).
-include("rabbit.hrl").
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-export_type([start_mode/0]).
-type(start_mode() :: 'declare' | 'recovery' | 'slave').
-spec(start_link/3 :: (rabbit_types:amqqueue(), start_mode(), pid())
-> rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
start_link(Q, StartMode, Marker) ->
gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []).
%%----------------------------------------------------------------------------
init({Q, StartMode, Marker}) ->
init(Q, case {is_process_alive(Marker), StartMode} of
{true, slave} -> slave;
{true, _} -> master;
{false, _} -> restart
end).
init(Q, master) -> rabbit_amqqueue_process:init(Q);
init(Q, slave) -> rabbit_mirror_queue_slave:init(Q);
init(#amqqueue{name = QueueName}, restart) ->
{ok, Q = #amqqueue{pid = QPid,
slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
LocalOrMasterDown = node(QPid) =:= node()
orelse not rabbit_mnesia:on_running_node(QPid),
Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)],
case rabbit_mnesia:is_process_alive(QPid) of
true -> false = LocalOrMasterDown, %% assertion
rabbit_mirror_queue_slave:go(self(), async),
rabbit_mirror_queue_slave:init(Q); %% [1]
false -> case LocalOrMasterDown andalso Slaves =:= [] of
true -> crash_restart(Q); %% [2]
false -> timer:sleep(25),
init(Q, restart) %% [3]
end
end.
%% [1] There is a master on another node. Regardless of whether we
%% were originally a master or a slave, we are now a new slave.
%%
%% [2] Nothing is alive. We are the last best hope. Try to restart as a master.
%%
%% [3] The current master is dead but either there are alive slaves to
%% take over or it's all happening on a different node anyway. This is
%% not a stable situation. Sleep and wait for somebody else to make a
%% move.
crash_restart(Q = #amqqueue{name = QueueName}) ->
rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]),
gen_server2:cast(self(), init),
rabbit_amqqueue_process:init(Q#amqqueue{pid = self()}).
%%----------------------------------------------------------------------------
%% This gen_server2 always hands over to some other module at the end
%% of init/1.
handle_call(_Msg, _From, _State) -> exit(unreachable).
handle_cast(_Msg, _State) -> exit(unreachable).
handle_info(_Msg, _State) -> exit(unreachable).
terminate(_Reason, _State) -> exit(unreachable).
code_change(_OldVsn, _State, _Extra) -> exit(unreachable).
|