diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-02-03 21:57:14 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-02-03 21:57:14 +0000 |
commit | 0f3e69630f48ae1b08713a4405f847b6c20c19f6 (patch) | |
tree | 61cac837d49a5891a499ce9a777d817c2ff83b22 | |
parent | 30aa0f6084dd0a78f9ef196bc76d5302b269b7bc (diff) | |
parent | 1bff88281ef7df7574eee82dd50e705742367f92 (diff) | |
download | rabbitmq-server-0f3e69630f48ae1b08713a4405f847b6c20c19f6.tar.gz |
merge default into bug21966
-rw-r--r-- | docs/rabbitmqctl.1.pod | 55 | ||||
-rw-r--r-- | src/pg_local.erl | 203 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 97 | ||||
-rw-r--r-- | src/rabbit_control.erl | 13 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 18 |
5 files changed, 368 insertions, 18 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 47c4d168..b82d5b87 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -367,6 +367,61 @@ send queue size =back +=over + +=item list_channels [I<channelinfoitem> ...] + +List channel information. Each line printed describes a channel, with +the requested I<channelinfoitem> values separated by tab characters. +If no I<channelinfoitem>s are specified then I<pid>, I<user>, +I<transactional>, I<consumer_count>, and I<messages_unacknowledged> +are assumed. + +The list includes channels which are part of ordinary AMQP connections +(as listed by list_connections) and channels created by various +plug-ins and other extensions. + +=back + +=head3 Channel information items + +=over + +=item pid + +id of the Erlang process associated with the channel + +=item connection + +id of the Erlang process associated with the connection to which the +channel belongs + +=item user + +username associated with the channel + +=item vhost + +virtual host in which the channel operates + +=item transactional + +true if the channel is in transactional mode, false otherwise + +=item consumer_count + +number of logical AMQP consumers retrieving messages via the channel + +=item messages_unacknowledged + +number of messages delivered via this channel but not yet acknowledged + +=item prefetch_count + +QoS prefetch count limit in force, 0 if unlimited + +=back + The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results, defaulting to I<"/">. The default can be overridden with the B<-p> diff --git a/src/pg_local.erl b/src/pg_local.erl new file mode 100644 index 00000000..7f771f74 --- /dev/null +++ b/src/pg_local.erl @@ -0,0 +1,203 @@ +%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP +%% distribution, with the following modifications: +%% +%% 1) Process groups are node-local only. +%% +%% 2) Groups are created/deleted implicitly. +%% +%% 3) 'join' and 'leave' are asynchronous. +%% +%% 4) the type specs of the exported non-callback functions have been +%% extracted into a separate, guarded section, and rewritten in +%% old-style spec syntax, for better compatibility with older +%% versions of Erlang/OTP. The remaining type specs have been +%% removed. + +%% All modifications are (C) 2010 LShift Ltd. + +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(pg_local). + +-export([join/2, leave/2, get_members/1]). +-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2, + terminate/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(name() :: term()). + +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}). +-spec(start/0 :: () -> {'ok', pid()} | {'error', term()}). +-spec(join/2 :: (name(), pid()) -> 'ok'). +-spec(leave/2 :: (name(), pid()) -> 'ok'). +-spec(get_members/1 :: (name()) -> [pid()]). + +-endif. + +%%---------------------------------------------------------------------------- + +%%% As of R13B03 monitors are used instead of links. + +%%% +%%% Exported functions +%%% + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +start() -> + ensure_started(). + +join(Name, Pid) when is_pid(Pid) -> + ensure_started(), + gen_server:cast(?MODULE, {join, Name, Pid}). + +leave(Name, Pid) when is_pid(Pid) -> + ensure_started(), + gen_server:cast(?MODULE, {leave, Name, Pid}). + +get_members(Name) -> + ensure_started(), + group_members(Name). + +%%% +%%% Callback functions from gen_server +%%% + +-record(state, {}). + +init([]) -> + pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]), + {ok, #state{}}. + +handle_call(Request, From, S) -> + error_logger:warning_msg("The pg_local server received an unexpected message:\n" + "handle_call(~p, ~p, _)\n", + [Request, From]), + {noreply, S}. + +handle_cast({join, Name, Pid}, S) -> + join_group(Name, Pid), + {noreply, S}; +handle_cast({leave, Name, Pid}, S) -> + leave_group(Name, Pid), + {noreply, S}; +handle_cast(_, S) -> + {noreply, S}. + +handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) -> + member_died(MonitorRef), + {noreply, S}; +handle_info(_, S) -> + {noreply, S}. + +terminate(_Reason, _S) -> + true = ets:delete(pg_local_table), + ok. + +%%% +%%% Local functions +%%% + +%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the +%%% table is ordered_set, and the fast matching of partially +%%% instantiated keys is used extensively. +%%% +%%% {{ref, Pid}, MonitorRef, Counter} +%%% {{ref, MonitorRef}, Pid} +%%% Each process has one monitor. Counter is incremented when the +%%% Pid joins some group. +%%% {{member, Name, Pid}, _} +%%% Pid is a member of group Name, GroupCounter is incremented when the +%%% Pid joins the group Name. +%%% {{pid, Pid, Name}} +%%% Pid is a member of group Name. + +member_died(Ref) -> + [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}), + Names = member_groups(Pid), + _ = [leave_group(Name, P) || + Name <- Names, + P <- member_in_group(Pid, Name)], + ok. + +join_group(Name, Pid) -> + Ref_Pid = {ref, Pid}, + try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1}) + catch _:_ -> + Ref = erlang:monitor(process, Pid), + true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}), + true = ets:insert(pg_local_table, {{ref, Ref}, Pid}) + end, + Member_Name_Pid = {member, Name, Pid}, + try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1}) + catch _:_ -> + true = ets:insert(pg_local_table, {Member_Name_Pid, 1}), + true = ets:insert(pg_local_table, {{pid, Pid, Name}}) + end. + +leave_group(Name, Pid) -> + Member_Name_Pid = {member, Name, Pid}, + try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of + N -> + if + N =:= 0 -> + true = ets:delete(pg_local_table, {pid, Pid, Name}), + true = ets:delete(pg_local_table, Member_Name_Pid); + true -> + ok + end, + Ref_Pid = {ref, Pid}, + case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of + 0 -> + [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid), + true = ets:delete(pg_local_table, {ref, Ref}), + true = ets:delete(pg_local_table, Ref_Pid), + true = erlang:demonitor(Ref, [flush]), + ok; + _ -> + ok + end + catch _:_ -> + ok + end. + +group_members(Name) -> + [P || + [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}), + _ <- lists:seq(1, N)]. + +member_in_group(Pid, Name) -> + [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}), + lists:duplicate(N, Pid). + +member_groups(Pid) -> + [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})]. + +ensure_started() -> + case whereis(?MODULE) of + undefined -> + C = {pg_local, {?MODULE, start_link, []}, permanent, + 1000, worker, [?MODULE]}, + supervisor:start_child(kernel_safe_sup, C); + PgLocalPid -> + {ok, PgLocalPid} + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f8e10097..7db80425 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,8 +37,10 @@ -export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). +-export([list/0, info/1, info/2, info_all/0, info_all/1]). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, + handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, @@ -46,10 +48,21 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping}). --define(HIBERNATE_AFTER, 1000). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(INFO_KEYS, + [pid, + connection, + user, + vhost, + transactional, + consumer_count, + messages_unacknowledged, + prefetch_count]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -62,6 +75,11 @@ -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(list/0 :: () -> [pid()]). +-spec(info/1 :: (pid()) -> [info()]). +-spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(info_all/0 :: () -> [[info()]]). +-spec(info_all/1 :: ([info_key()]) -> [[info()]]). -endif. @@ -91,12 +109,31 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> conserve_memory(Pid, Conserve) -> gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). +list() -> + pg_local:get_members(rabbit_channels). + +info(Pid) -> + gen_server2:pcall(Pid, 9, info, infinity). + +info(Pid, Items) -> + case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end. + +info_all() -> + rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()). + +info_all(Items) -> + rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + ok = pg_local:join(rabbit_channels, self()), {ok, #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -110,7 +147,18 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}}. + consumer_mapping = dict:new()}, + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State); + +handle_call({info, Items}, _From, State) -> + try + reply({ok, infos(Items, State)}, State) + catch Error -> reply({error, Error}, State) + end; handle_call(_Request, _From, State) -> noreply(State). @@ -162,33 +210,31 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; + {stop, Reason, State}. -handle_info(timeout, State) -> +handle_pre_hibernate(State) -> ok = clear_permission_cache(), - {noreply, State, hibernate}. + {hibernate, State}. -terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, - state = terminating}) -> - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid); +terminate(_Reason, State = #ch{state = terminating}) -> + terminate(State); -terminate(Reason, State = #ch{writer_pid = WriterPid, - limiter_pid = LimiterPid}) -> +terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of normal -> ok = Res; _ -> ok end, - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid). + terminate(State). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%--------------------------------------------------------------------------- -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. +reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. + +noreply(NewState) -> {noreply, NewState, hibernate}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -951,3 +997,24 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. + +terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> + pg_local:leave(rabbit_channels, self()), + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid). + +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, _) -> self(); +i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; +i(user, #ch{username = Username}) -> Username; +i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> + dict:size(ConsumerMapping); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> + queue:len(UAMQ); +i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> + rabbit_limiter:get_limit(LimiterPid); +i(Item, _) -> + throw({bad_argument, Item}). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 2fe3f33e..738ed444 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -158,6 +158,7 @@ Available commands: list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] list_bindings [-p <VHostPath>] list_connections [<ConnectionInfoItem> ...] + list_channels [<ChannelInfoItem> ...] Quiet output mode is selected with the \"-q\" flag. Informational messages are suppressed when quiet mode is in effect. @@ -191,6 +192,11 @@ frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display user, peer_address, peer_port and state. +<ChannelInfoItem> must be a member of the list [pid, connection, user, +vhost, transactional, consumer_count, messages_unacknowledged, +prefetch_count]. The default is to display pid, user, transactional, +consumer_count, messages_unacknowledged. + "), halt(1). @@ -301,6 +307,13 @@ action(list_connections, Node, Args, Inform) -> [ArgAtoms]), ArgAtoms); +action(list_channels, Node, Args, Inform) -> + Inform("Listing channels", []), + ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count, + messages_unacknowledged]), + display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), + ArgAtoms); + action(Command, Node, Args, Inform) -> {VHost, RemainingArgs} = parse_vhost_flag(Args), action(Command, Node, VHost, RemainingArgs, Inform). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 087a9f64..6bd803a2 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -31,12 +31,13 @@ -module(rabbit_limiter). --behaviour(gen_server). +-behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([get_limit/1]). %%---------------------------------------------------------------------------- @@ -51,6 +52,7 @@ -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). +-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). -endif. @@ -69,7 +71,7 @@ %%---------------------------------------------------------------------------- start_link(ChPid) -> - {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), + {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid], []), Pid. shutdown(undefined) -> @@ -104,6 +106,13 @@ register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). unregister(undefined, _QPid) -> ok; unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). +get_limit(undefined) -> + 0; +get_limit(Pid) -> + rabbit_misc:with_exit_handler( + fun () -> 0 end, + fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -118,7 +127,10 @@ handle_call({can_send, QPid, AckRequired}, _From, false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; true -> Volume end}} - end. + end; + +handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> + {reply, PrefetchCount, State}. handle_cast(shutdown, State) -> {stop, normal, State}; |