From ebe36ea41f0642da6c58648a872dabd537db214a Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 10 Sep 2014 14:10:17 +0100 Subject: Show down queues in "rabbitmqctl list_queues" --- docs/rabbitmqctl.1.xml | 12 ++++++++---- src/rabbit_amqqueue.erl | 46 ++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index eb3c7ef3..2b70587a 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1253,10 +1253,14 @@ message loss. - status - The status of the queue. Normally - 'running', but may be "{syncing, MsgCount}" if the queue is - synchronising. + state + The state of the queue. Normally + 'running', but may be "{syncing, MsgCount}" if the + queue is synchronising. Queues which are located on + cluster nodes that are currently down will be shown + with a status of 'down' (and most other + queueinfoitems will be + unavailable). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 692179fc..33c447f8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -24,6 +24,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([list_down/1]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). @@ -105,6 +106,7 @@ (name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()). -spec(list/0 :: () -> [rabbit_types:amqqueue()]). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). +-spec(list_down/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:amqqueue()) -> rabbit_types:infos()). -spec(info/2 :: @@ -506,20 +508,30 @@ check_dlxrk_arg({Type, _}, _Args) -> list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). +list(VHostPath) -> list(VHostPath, rabbit_queue). + %% Not dirty_match_object since that would not be transactional when used in a %% tx context -list(VHostPath) -> +list(VHostPath, TableName) -> mnesia:async_dirty( fun () -> mnesia:match_object( - rabbit_queue, + TableName, #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}, read) end). +list_down(VHostPath) -> + Present = list(VHostPath), + Durable = list(VHostPath, rabbit_durable_queue), + PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]), + sets:to_list(sets:filter(fun (#amqqueue{name = N}) -> + not sets:is_element(N, PresentS) + end, sets:from_list(Durable))). + info_keys() -> rabbit_amqqueue_process:info_keys(). -map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). +map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info). @@ -529,9 +541,31 @@ info(#amqqueue{ pid = QPid }, Items) -> {error, Error} -> throw(Error) end. -info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). +info_down(Q) -> + info_down(Q, rabbit_amqqueue_process:info_keys()). + +info_down(Q, Items) -> [{Item, i_down(Item, Q)} || Item <- Items]. + +i_down(name, #amqqueue{name = Name}) -> Name; +i_down(durable, #amqqueue{durable = Durable}) -> Durable; +i_down(auto_delete, #amqqueue{auto_delete = AD}) -> AD; +i_down(arguments, #amqqueue{arguments = Args}) -> Args; +i_down(pid, #amqqueue{pid = QPid}) -> QPid; +i_down(down_slave_nodes, #amqqueue{down_slave_nodes = DSN}) -> DSN; +i_down(state, _Q) -> down; +i_down(K, _Q) -> + case lists:member(K, rabbit_amqqueue_process:info_keys()) of + true -> ''; + false -> throw({bad_argument, K}) + end. + +info_all(VHostPath) -> + map(list(VHostPath), fun (Q) -> info(Q) end) ++ + map(list_down(VHostPath), fun (Q) -> info_down(Q) end). -info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +info_all(VHostPath, Items) -> + map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++ + map(list_down(VHostPath), fun (Q) -> info_down(Q, Items) end). force_event_refresh(Ref) -> [gen_server2:cast(Q#amqqueue.pid, @@ -548,7 +582,7 @@ consumer_info_keys() -> ?CONSUMER_INFO_KEYS. consumers_all(VHostPath) -> ConsumerInfoKeys=consumer_info_keys(), lists:append( - map(VHostPath, + map(list(VHostPath), fun (Q) -> [lists:zip( ConsumerInfoKeys, -- cgit v1.2.1