diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-11-06 23:07:39 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-11-06 23:07:39 +0000 |
commit | b14aef719be7f13e5395fa2e0cbdf072078cefdf (patch) | |
tree | 2e8dcec76883338a388359880f5f069e5df7d3af /src/rabbit_amqqueue_process.erl | |
parent | f58bbbdcb3d1265d8507fc5b0261fb1c017417f6 (diff) | |
download | rabbitmq-server-b14aef719be7f13e5395fa2e0cbdf072078cefdf.tar.gz |
beginnings of queue info API
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e687df84..b733d114 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -61,6 +61,16 @@ is_overload_protection_active, unsent_message_count}). +-define(INFO_KEYS, + [messages_ready, + messages_unacknowledged, + messages_uncommitted, + messages, + acks_uncommitted, + consumers, + transactions, + memory]). + %%---------------------------------------------------------------------------- start_link(Q) -> @@ -407,6 +417,9 @@ store_tx(Txn, Tx) -> erase_tx(Txn) -> erase({txn, Txn}). +all_tx_record() -> + [T || {{txn, _}, T} <- get()]. + all_tx() -> [Txn || {{txn, Txn}, _} <- get()]. @@ -458,8 +471,51 @@ purge_message_buffer(QName, MessageBuffer) -> %% artifically ack them. persist_acks(none, QName, lists:append(Messages)). +infos(Items, State) -> [{Item, info(Item, State)} || Item <- Items]. + +info(messages_ready, #q{message_buffer = MessageBuffer}) -> + queue:len(MessageBuffer); +info(messages_unacknowledged, _) -> + lists:sum([dict:size(UAM) || + #cr{unacked_messages = UAM} <- all_ch_record()]); +info(messages_uncommitted, _) -> + lists:sum([length(Pending) || + #tx{pending_messages = Pending} <- all_tx_record()]); +info(messages, State) -> + lists:sum([info(Item, State) || Item <- [messages_ready, + messages_unacknowledged, + messages_uncommitted]]); +info(acks_uncommitted, _) -> + lists:sum([length(Pending) || + #tx{pending_acks = Pending} <- all_tx_record()]); +info(consumers, _) -> + lists:sum([length(Consumers) || + #cr{consumers = Consumers} <- all_ch_record()]); +info(transactions, _) -> + length(all_tx_record()); +info(memory, _) -> + {memory, M} = process_info(self(), memory), + M; +info(Item, _) -> + throw({bad_argument, Item}). + %--------------------------------------------------------------------------- +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State); + +handle_call({info, Item}, _From, State) when is_atom(Item) -> + try + reply({ok, {Item, info(Item, State)}}, State) + catch Error -> reply({error, Error}, State) + end; + +handle_call({info, Items}, _From, State) when is_list(Items) -> + try + reply({ok, infos(Items, State)}, State) + catch Error -> reply({error, Error}, State) + end; + handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% Synchronous, "immediate" delivery mode %% |