summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-03 21:57:14 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-03 21:57:14 +0000
commit0f3e69630f48ae1b08713a4405f847b6c20c19f6 (patch)
tree61cac837d49a5891a499ce9a777d817c2ff83b22
parent30aa0f6084dd0a78f9ef196bc76d5302b269b7bc (diff)
parent1bff88281ef7df7574eee82dd50e705742367f92 (diff)
downloadrabbitmq-server-0f3e69630f48ae1b08713a4405f847b6c20c19f6.tar.gz
merge default into bug21966
-rw-r--r--docs/rabbitmqctl.1.pod55
-rw-r--r--src/pg_local.erl203
-rw-r--r--src/rabbit_channel.erl97
-rw-r--r--src/rabbit_control.erl13
-rw-r--r--src/rabbit_limiter.erl18
5 files changed, 368 insertions, 18 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 47c4d168..b82d5b87 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -367,6 +367,61 @@ send queue size
=back
+=over
+
+=item list_channels [I<channelinfoitem> ...]
+
+List channel information. Each line printed describes a channel, with
+the requested I<channelinfoitem> values separated by tab characters.
+If no I<channelinfoitem>s are specified then I<pid>, I<user>,
+I<transactional>, I<consumer_count>, and I<messages_unacknowledged>
+are assumed.
+
+The list includes channels which are part of ordinary AMQP connections
+(as listed by list_connections) and channels created by various
+plug-ins and other extensions.
+
+=back
+
+=head3 Channel information items
+
+=over
+
+=item pid
+
+id of the Erlang process associated with the channel
+
+=item connection
+
+id of the Erlang process associated with the connection to which the
+channel belongs
+
+=item user
+
+username associated with the channel
+
+=item vhost
+
+virtual host in which the channel operates
+
+=item transactional
+
+true if the channel is in transactional mode, false otherwise
+
+=item consumer_count
+
+number of logical AMQP consumers retrieving messages via the channel
+
+=item messages_unacknowledged
+
+number of messages delivered via this channel but not yet acknowledged
+
+=item prefetch_count
+
+QoS prefetch count limit in force, 0 if unlimited
+
+=back
+
The list_queues, list_exchanges and list_bindings commands accept an
optional virtual host parameter for which to display results,
defaulting to I<"/">. The default can be overridden with the B<-p>
diff --git a/src/pg_local.erl b/src/pg_local.erl
new file mode 100644
index 00000000..7f771f74
--- /dev/null
+++ b/src/pg_local.erl
@@ -0,0 +1,203 @@
+%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) Process groups are node-local only.
+%%
+%% 2) Groups are created/deleted implicitly.
+%%
+%% 3) 'join' and 'leave' are asynchronous.
+%%
+%% 4) the type specs of the exported non-callback functions have been
+%% extracted into a separate, guarded section, and rewritten in
+%% old-style spec syntax, for better compatibility with older
+%% versions of Erlang/OTP. The remaining type specs have been
+%% removed.
+
+%% All modifications are (C) 2010 LShift Ltd.
+
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1997-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(pg_local).
+
+-export([join/2, leave/2, get_members/1]).
+-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
+ terminate/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(name() :: term()).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}).
+-spec(start/0 :: () -> {'ok', pid()} | {'error', term()}).
+-spec(join/2 :: (name(), pid()) -> 'ok').
+-spec(leave/2 :: (name(), pid()) -> 'ok').
+-spec(get_members/1 :: (name()) -> [pid()]).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%%% As of R13B03 monitors are used instead of links.
+
+%%%
+%%% Exported functions
+%%%
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+start() ->
+ ensure_started().
+
+join(Name, Pid) when is_pid(Pid) ->
+ ensure_started(),
+ gen_server:cast(?MODULE, {join, Name, Pid}).
+
+leave(Name, Pid) when is_pid(Pid) ->
+ ensure_started(),
+ gen_server:cast(?MODULE, {leave, Name, Pid}).
+
+get_members(Name) ->
+ ensure_started(),
+ group_members(Name).
+
+%%%
+%%% Callback functions from gen_server
+%%%
+
+-record(state, {}).
+
+init([]) ->
+ pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]),
+ {ok, #state{}}.
+
+handle_call(Request, From, S) ->
+ error_logger:warning_msg("The pg_local server received an unexpected message:\n"
+ "handle_call(~p, ~p, _)\n",
+ [Request, From]),
+ {noreply, S}.
+
+handle_cast({join, Name, Pid}, S) ->
+ join_group(Name, Pid),
+ {noreply, S};
+handle_cast({leave, Name, Pid}, S) ->
+ leave_group(Name, Pid),
+ {noreply, S};
+handle_cast(_, S) ->
+ {noreply, S}.
+
+handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
+ member_died(MonitorRef),
+ {noreply, S};
+handle_info(_, S) ->
+ {noreply, S}.
+
+terminate(_Reason, _S) ->
+ true = ets:delete(pg_local_table),
+ ok.
+
+%%%
+%%% Local functions
+%%%
+
+%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the
+%%% table is ordered_set, and the fast matching of partially
+%%% instantiated keys is used extensively.
+%%%
+%%% {{ref, Pid}, MonitorRef, Counter}
+%%% {{ref, MonitorRef}, Pid}
+%%% Each process has one monitor. Counter is incremented when the
+%%% Pid joins some group.
+%%% {{member, Name, Pid}, _}
+%%% Pid is a member of group Name, GroupCounter is incremented when the
+%%% Pid joins the group Name.
+%%% {{pid, Pid, Name}}
+%%% Pid is a member of group Name.
+
+member_died(Ref) ->
+ [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}),
+ Names = member_groups(Pid),
+ _ = [leave_group(Name, P) ||
+ Name <- Names,
+ P <- member_in_group(Pid, Name)],
+ ok.
+
+join_group(Name, Pid) ->
+ Ref_Pid = {ref, Pid},
+ try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1})
+ catch _:_ ->
+ Ref = erlang:monitor(process, Pid),
+ true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}),
+ true = ets:insert(pg_local_table, {{ref, Ref}, Pid})
+ end,
+ Member_Name_Pid = {member, Name, Pid},
+ try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1})
+ catch _:_ ->
+ true = ets:insert(pg_local_table, {Member_Name_Pid, 1}),
+ true = ets:insert(pg_local_table, {{pid, Pid, Name}})
+ end.
+
+leave_group(Name, Pid) ->
+ Member_Name_Pid = {member, Name, Pid},
+ try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of
+ N ->
+ if
+ N =:= 0 ->
+ true = ets:delete(pg_local_table, {pid, Pid, Name}),
+ true = ets:delete(pg_local_table, Member_Name_Pid);
+ true ->
+ ok
+ end,
+ Ref_Pid = {ref, Pid},
+ case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of
+ 0 ->
+ [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid),
+ true = ets:delete(pg_local_table, {ref, Ref}),
+ true = ets:delete(pg_local_table, Ref_Pid),
+ true = erlang:demonitor(Ref, [flush]),
+ ok;
+ _ ->
+ ok
+ end
+ catch _:_ ->
+ ok
+ end.
+
+group_members(Name) ->
+ [P ||
+ [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}),
+ _ <- lists:seq(1, N)].
+
+member_in_group(Pid, Name) ->
+ [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}),
+ lists:duplicate(N, Pid).
+
+member_groups(Pid) ->
+ [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})].
+
+ensure_started() ->
+ case whereis(?MODULE) of
+ undefined ->
+ C = {pg_local, {?MODULE, start_link, []}, permanent,
+ 1000, worker, [?MODULE]},
+ supervisor:start_child(kernel_safe_sup, C);
+ PgLocalPid ->
+ {ok, PgLocalPid}
+ end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f8e10097..7db80425 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,8 +37,10 @@
-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
+-export([list/0, info/1, info/2, info_all/0, info_all/1]).
--export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3,
+ handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
@@ -46,10 +48,21 @@
username, virtual_host,
most_recently_declared_queue, consumer_mapping}).
--define(HIBERNATE_AFTER, 1000).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-define(INFO_KEYS,
+ [pid,
+ connection,
+ user,
+ vhost,
+ transactional,
+ consumer_count,
+ messages_unacknowledged,
+ prefetch_count]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -62,6 +75,11 @@
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(list/0 :: () -> [pid()]).
+-spec(info/1 :: (pid()) -> [info()]).
+-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(info_all/0 :: () -> [[info()]]).
+-spec(info_all/1 :: ([info_key()]) -> [[info()]]).
-endif.
@@ -91,12 +109,31 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
conserve_memory(Pid, Conserve) ->
gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}).
+list() ->
+ pg_local:get_members(rabbit_channels).
+
+info(Pid) ->
+ gen_server2:pcall(Pid, 9, info, infinity).
+
+info(Pid, Items) ->
+ case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
+info_all() ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()).
+
+info_all(Items) ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ ok = pg_local:join(rabbit_channels, self()),
{ok, #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -110,7 +147,18 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()}}.
+ consumer_mapping = dict:new()},
+ hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State);
+
+handle_call({info, Items}, _From, State) ->
+ try
+ reply({ok, infos(Items, State)}, State)
+ catch Error -> reply({error, Error}, State)
+ end;
handle_call(_Request, _From, State) ->
noreply(State).
@@ -162,33 +210,31 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
+ {stop, Reason, State}.
-handle_info(timeout, State) ->
+handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
- {noreply, State, hibernate}.
+ {hibernate, State}.
-terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
- state = terminating}) ->
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid);
+terminate(_Reason, State = #ch{state = terminating}) ->
+ terminate(State);
-terminate(Reason, State = #ch{writer_pid = WriterPid,
- limiter_pid = LimiterPid}) ->
+terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
normal -> ok = Res;
_ -> ok
end,
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid).
+ terminate(State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%---------------------------------------------------------------------------
-noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
+
+noreply(NewState) -> {noreply, NewState, hibernate}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -951,3 +997,24 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
WriterPid, QPid, self(), M, Content);
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
+
+terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
+ pg_local:leave(rabbit_channels, self()),
+ rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid).
+
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, _) -> self();
+i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
+i(user, #ch{username = Username}) -> Username;
+i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
+ dict:size(ConsumerMapping);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
+ queue:len(UAMQ);
+i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
+ rabbit_limiter:get_limit(LimiterPid);
+i(Item, _) ->
+ throw({bad_argument, Item}).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 2fe3f33e..738ed444 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -158,6 +158,7 @@ Available commands:
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]
+ list_channels [<ChannelInfoItem> ...]
Quiet output mode is selected with the \"-q\" flag. Informational
messages are suppressed when quiet mode is in effect.
@@ -191,6 +192,11 @@ frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend]. The default is to display user, peer_address, peer_port
and state.
+<ChannelInfoItem> must be a member of the list [pid, connection, user,
+vhost, transactional, consumer_count, messages_unacknowledged,
+prefetch_count]. The default is to display pid, user, transactional,
+consumer_count, messages_unacknowledged.
+
"),
halt(1).
@@ -301,6 +307,13 @@ action(list_connections, Node, Args, Inform) ->
[ArgAtoms]),
ArgAtoms);
+action(list_channels, Node, Args, Inform) ->
+ Inform("Listing channels", []),
+ ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count,
+ messages_unacknowledged]),
+ display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
+ ArgAtoms);
+
action(Command, Node, Args, Inform) ->
{VHost, RemainingArgs} = parse_vhost_flag(Args),
action(Command, Node, VHost, RemainingArgs, Inform).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 087a9f64..6bd803a2 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -31,12 +31,13 @@
-module(rabbit_limiter).
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([get_limit/1]).
%%----------------------------------------------------------------------------
@@ -51,6 +52,7 @@
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
+-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
-endif.
@@ -69,7 +71,7 @@
%%----------------------------------------------------------------------------
start_link(ChPid) ->
- {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []),
+ {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid], []),
Pid.
shutdown(undefined) ->
@@ -104,6 +106,13 @@ register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}).
unregister(undefined, _QPid) -> ok;
unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}).
+get_limit(undefined) ->
+ 0;
+get_limit(Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> 0 end,
+ fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -118,7 +127,10 @@ handle_call({can_send, QPid, AckRequired}, _From,
false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
true -> Volume
end}}
- end.
+ end;
+
+handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
+ {reply, PrefetchCount, State}.
handle_cast(shutdown, State) ->
{stop, normal, State};