summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-09-15 15:39:23 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-09-15 15:39:23 +0100
commit07faa347870c297de4c54b633742c4c1f4199086 (patch)
tree0500a6df3a0c27d4cf1c013ce29d6daae774361d /src
parent894b6d98993b48ce763d68efc8a6947d20775ad1 (diff)
downloadrabbitmq-server-07faa347870c297de4c54b633742c4c1f4199086.tar.gz
Reify queue crashed state.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl37
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_mirror_queue_master.erl3
-rw-r--r--src/rabbit_mirror_queue_misc.erl3
-rw-r--r--src/rabbit_upgrade_functions.erl18
5 files changed, 45 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 54431bcc..221fe3cc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -272,7 +272,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
slave_pids = [],
sync_slave_pids = [],
down_slave_nodes = [],
- gm_pids = []})),
+ gm_pids = [],
+ state = live})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(
rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
@@ -280,7 +281,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end);
+ fun () ->
+ ok = store_queue(Q#amqqueue{state = live}),
+ rabbit_misc:const(Q)
+ end);
internal_declare(Q = #amqqueue{name = QueueName}, false) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
@@ -288,7 +292,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[] ->
case not_found_or_absent(QueueName) of
not_found -> Q1 = rabbit_policy:set(Q),
- ok = store_queue(Q1),
+ Q2 = Q#amqqueue{state = live},
+ ok = store_queue(Q2),
B = add_default_binding(Q1),
fun () -> B(), Q1 end;
{absent, _Q, _} = R -> rabbit_misc:const(R)
@@ -381,6 +386,8 @@ not_found_or_absent_dirty(Name) ->
with(Name, F, E) ->
case lookup(Name) of
+ {ok, Q = #amqqueue{state = crashed}} ->
+ E({absent, Q, crashed});
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
@@ -390,11 +397,8 @@ with(Name, F, E) ->
%% the retry loop.
rabbit_misc:with_exit_handler(
fun () -> false = rabbit_misc:is_process_alive(QPid),
- case crashed_or_recovering(Q) of
- crashed -> E({absent, Q, crashed});
- recovering -> timer:sleep(25),
- with(Name, F, E)
- end
+ timer:sleep(25),
+ with(Name, F, E)
end, fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
@@ -407,20 +411,6 @@ with_or_die(Name, F) ->
({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end).
-%% TODO we could say we are crashed when we mean recovering if we
-%% happen to call in the middle of a crash-failover. We could try to
-%% figure out whether that's happening by looking for the supervisor -
-%% but we'd need some additional book keeping to know what it is. And
-%% it will just mean a temporary glitch while crashing, which is
-%% fairly tolerable.
-crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) ->
- case lists:member(node(QPid), [node() | nodes()]) of
- true -> crashed;
- false -> recovering
- end;
-crashed_or_recovering(_Q) ->
- recovering.
-
assert_equivalence(#amqqueue{durable = Durable,
auto_delete = AutoDelete} = Q,
Durable, AutoDelete, RequiredArgs, Owner) ->
@@ -779,7 +769,8 @@ immutable(Q) -> Q#amqqueue{pid = none,
down_slave_nodes = none,
gm_pids = none,
policy = none,
- decorators = none}.
+ decorators = none,
+ state = none}.
deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 12a3c9f0..fcf17381 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -235,8 +235,15 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate(normal, State) -> %% delete case
terminate_shutdown(terminate_delete(true, normal, State), State);
%% If we crashed don't try to clean up the BQS, probably best to leave it.
-terminate(_Reason, State) ->
- terminate_shutdown(fun (BQS) -> BQS end, State).
+terminate(_Reason, State = #q{q = Q}) ->
+ terminate_shutdown(fun (BQS) ->
+ Q2 = Q#amqqueue{state = crashed},
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ rabbit_amqqueue:store_queue(Q2)
+ end),
+ BQS
+ end, State).
terminate_delete(EmitStats, Reason,
State = #q{q = #amqqueue{name = QName},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 1bea8042..5ce22271 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -110,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
[Q1 = #amqqueue{gm_pids = GMPids}]
= mnesia:read({rabbit_queue, QName}),
ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
+ state = live})
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
%% We need synchronous add here (i.e. do not return until the
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index aec6f93d..826b6927 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -243,7 +243,8 @@ store_updated_slaves(Q = #amqqueue{pid = MPid,
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]],
Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
- down_slave_nodes = DSNs1},
+ down_slave_nodes = DSNs1,
+ state = live},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q1),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 1104f373..9f6dc21a 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -49,6 +49,7 @@
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
+-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
%% -------------------------------------------------------------------
@@ -80,6 +81,7 @@
-spec(internal_system_x/0 :: () -> 'ok').
-spec(cluster_name/0 :: () -> 'ok').
-spec(down_slave_nodes/0 :: () -> 'ok').
+-spec(queue_state/0 :: () -> 'ok').
-endif.
@@ -400,6 +402,22 @@ down_slave_nodes(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]).
+queue_state() ->
+ ok = queue_state(rabbit_queue),
+ ok = queue_state(rabbit_durable_queue).
+
+queue_state(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ live}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->