summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-11-06 23:07:39 +0000
committerMatthias Radestock <matthias@lshift.net>2008-11-06 23:07:39 +0000
commitb14aef719be7f13e5395fa2e0cbdf072078cefdf (patch)
tree2e8dcec76883338a388359880f5f069e5df7d3af
parentf58bbbdcb3d1265d8507fc5b0261fb1c017417f6 (diff)
downloadrabbitmq-server-b14aef719be7f13e5395fa2e0cbdf072078cefdf.tar.gz
beginnings of queue info API
-rw-r--r--src/rabbit_amqqueue.erl22
-rw-r--r--src/rabbit_amqqueue_process.erl56
2 files changed, 77 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 56d2c35d..93caf519 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -27,8 +27,9 @@
-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
+-export([lookup/1, with/2, with_or_die/2, list/0, list_vhost_queues/1,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
+-export([info/1, info/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2]).
@@ -54,6 +55,9 @@
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+-type(info_key() :: atom()).
+-type(info() :: {info_key(), any()}).
+
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
@@ -61,7 +65,12 @@
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
+-spec(list/0 :: () -> [amqqueue()]).
-spec(list_vhost_queues/1 :: (vhost()) -> [amqqueue()]).
+-spec(info/1 :: (amqqueue()) -> [info()]).
+-spec(info/2 ::
+ (amqqueue(), info_key()) -> info();
+ (amqqueue(), [info_key()]) -> [info()]).
-spec(stat/1 :: (amqqueue()) -> qstats()).
-spec(stat_all/0 :: () -> [qstats()]).
-spec(delete/3 ::
@@ -178,10 +187,21 @@ with_or_die(Name, F) ->
not_found, "no ~s", [rabbit_misc:rs(Name)])
end).
+list() -> rabbit_misc:dirty_read_all(amqqueue).
+
list_vhost_queues(VHostPath) ->
mnesia:dirty_match_object(
#amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}).
+info(#amqqueue{ pid = QPid }) ->
+ gen_server:call(QPid, info).
+
+info(#amqqueue{ pid = QPid }, ItemOrItems) ->
+ case gen_server:call(QPid, {info, ItemOrItems}) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat).
stat_all() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e687df84..b733d114 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -61,6 +61,16 @@
is_overload_protection_active,
unsent_message_count}).
+-define(INFO_KEYS,
+ [messages_ready,
+ messages_unacknowledged,
+ messages_uncommitted,
+ messages,
+ acks_uncommitted,
+ consumers,
+ transactions,
+ memory]).
+
%%----------------------------------------------------------------------------
start_link(Q) ->
@@ -407,6 +417,9 @@ store_tx(Txn, Tx) ->
erase_tx(Txn) ->
erase({txn, Txn}).
+all_tx_record() ->
+ [T || {{txn, _}, T} <- get()].
+
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
@@ -458,8 +471,51 @@ purge_message_buffer(QName, MessageBuffer) ->
%% artifically ack them.
persist_acks(none, QName, lists:append(Messages)).
+infos(Items, State) -> [{Item, info(Item, State)} || Item <- Items].
+
+info(messages_ready, #q{message_buffer = MessageBuffer}) ->
+ queue:len(MessageBuffer);
+info(messages_unacknowledged, _) ->
+ lists:sum([dict:size(UAM) ||
+ #cr{unacked_messages = UAM} <- all_ch_record()]);
+info(messages_uncommitted, _) ->
+ lists:sum([length(Pending) ||
+ #tx{pending_messages = Pending} <- all_tx_record()]);
+info(messages, State) ->
+ lists:sum([info(Item, State) || Item <- [messages_ready,
+ messages_unacknowledged,
+ messages_uncommitted]]);
+info(acks_uncommitted, _) ->
+ lists:sum([length(Pending) ||
+ #tx{pending_acks = Pending} <- all_tx_record()]);
+info(consumers, _) ->
+ lists:sum([length(Consumers) ||
+ #cr{consumers = Consumers} <- all_ch_record()]);
+info(transactions, _) ->
+ length(all_tx_record());
+info(memory, _) ->
+ {memory, M} = process_info(self(), memory),
+ M;
+info(Item, _) ->
+ throw({bad_argument, Item}).
+
%---------------------------------------------------------------------------
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State);
+
+handle_call({info, Item}, _From, State) when is_atom(Item) ->
+ try
+ reply({ok, {Item, info(Item, State)}}, State)
+ catch Error -> reply({error, Error}, State)
+ end;
+
+handle_call({info, Items}, _From, State) when is_list(Items) ->
+ try
+ reply({ok, infos(Items, State)}, State)
+ catch Error -> reply({error, Error}, State)
+ end;
+
handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%