diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-09-15 15:39:23 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-09-15 15:39:23 +0100 |
commit | 07faa347870c297de4c54b633742c4c1f4199086 (patch) | |
tree | 0500a6df3a0c27d4cf1c013ce29d6daae774361d /src | |
parent | 894b6d98993b48ce763d68efc8a6947d20775ad1 (diff) | |
download | rabbitmq-server-07faa347870c297de4c54b633742c4c1f4199086.tar.gz |
Reify queue crashed state.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 37 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 3 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 18 |
5 files changed, 45 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 54431bcc..221fe3cc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -272,7 +272,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> slave_pids = [], sync_slave_pids = [], down_slave_nodes = [], - gm_pids = []})), + gm_pids = [], + state = live})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call( rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), @@ -280,7 +281,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end); + fun () -> + ok = store_queue(Q#amqqueue{state = live}), + rabbit_misc:const(Q) + end); internal_declare(Q = #amqqueue{name = QueueName}, false) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> @@ -288,7 +292,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [] -> case not_found_or_absent(QueueName) of not_found -> Q1 = rabbit_policy:set(Q), - ok = store_queue(Q1), + Q2 = Q#amqqueue{state = live}, + ok = store_queue(Q2), B = add_default_binding(Q1), fun () -> B(), Q1 end; {absent, _Q, _} = R -> rabbit_misc:const(R) @@ -381,6 +386,8 @@ not_found_or_absent_dirty(Name) -> with(Name, F, E) -> case lookup(Name) of + {ok, Q = #amqqueue{state = crashed}} -> + E({absent, Q, crashed}); {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do @@ -390,11 +397,8 @@ with(Name, F, E) -> %% the retry loop. rabbit_misc:with_exit_handler( fun () -> false = rabbit_misc:is_process_alive(QPid), - case crashed_or_recovering(Q) of - crashed -> E({absent, Q, crashed}); - recovering -> timer:sleep(25), - with(Name, F, E) - end + timer:sleep(25), + with(Name, F, E) end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) @@ -407,20 +411,6 @@ with_or_die(Name, F) -> ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end). -%% TODO we could say we are crashed when we mean recovering if we -%% happen to call in the middle of a crash-failover. We could try to -%% figure out whether that's happening by looking for the supervisor - -%% but we'd need some additional book keeping to know what it is. And -%% it will just mean a temporary glitch while crashing, which is -%% fairly tolerable. -crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) -> - case lists:member(node(QPid), [node() | nodes()]) of - true -> crashed; - false -> recovering - end; -crashed_or_recovering(_Q) -> - recovering. - assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, Durable, AutoDelete, RequiredArgs, Owner) -> @@ -779,7 +769,8 @@ immutable(Q) -> Q#amqqueue{pid = none, down_slave_nodes = none, gm_pids = none, policy = none, - decorators = none}. + decorators = none, + state = none}. deliver([], _Delivery, _Flow) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 12a3c9f0..fcf17381 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -235,8 +235,15 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate(normal, State) -> %% delete case terminate_shutdown(terminate_delete(true, normal, State), State); %% If we crashed don't try to clean up the BQS, probably best to leave it. -terminate(_Reason, State) -> - terminate_shutdown(fun (BQS) -> BQS end, State). +terminate(_Reason, State = #q{q = Q}) -> + terminate_shutdown(fun (BQS) -> + Q2 = Q#amqqueue{state = crashed}, + rabbit_misc:execute_mnesia_transaction( + fun() -> + rabbit_amqqueue:store_queue(Q2) + end), + BQS + end, State). terminate_delete(EmitStats, Reason, State = #q{q = #amqqueue{name = QName}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 1bea8042..5ce22271 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -110,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> [Q1 = #amqqueue{gm_pids = GMPids}] = mnesia:read({rabbit_queue, QName}), ok = rabbit_amqqueue:store_queue( - Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]}) + Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], + state = live}) end), {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), %% We need synchronous add here (i.e. do not return until the diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index aec6f93d..826b6927 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -243,7 +243,8 @@ store_updated_slaves(Q = #amqqueue{pid = MPid, SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]], Q1 = Q#amqqueue{sync_slave_pids = SSPids1, - down_slave_nodes = DSNs1}, + down_slave_nodes = DSNs1, + state = live}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event rabbit_amqqueue:notify_policy_changed(Q1), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 1104f373..9f6dc21a 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -49,6 +49,7 @@ -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). -rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). -rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). +-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). %% ------------------------------------------------------------------- @@ -80,6 +81,7 @@ -spec(internal_system_x/0 :: () -> 'ok'). -spec(cluster_name/0 :: () -> 'ok'). -spec(down_slave_nodes/0 :: () -> 'ok'). +-spec(queue_state/0 :: () -> 'ok'). -endif. @@ -400,6 +402,22 @@ down_slave_nodes(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]). +queue_state() -> + ok = queue_state(rabbit_queue), + ok = queue_state(rabbit_durable_queue). + +queue_state(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + live} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |