diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-02-05 20:28:16 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-02-05 20:28:16 +0000 |
commit | a7b3464eb73e66f61a218134f03055b6b037ec5e (patch) | |
tree | 9549e3b8fb3459dd946c665afddd8605b12419e2 | |
parent | 82571f5e68c898a79fac70bdf7c65c1de0f2dcd6 (diff) | |
parent | a8e6ccc9735461e74e501ee3f176ea3c0970c1d5 (diff) | |
download | rabbitmq-server-bug22310.tar.gz |
merge default into bug22310bug22310
in order to resolve merge conflict going the other way
-rw-r--r-- | codegen.py | 7 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.pod | 108 | ||||
-rw-r--r-- | src/pg_local.erl | 213 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 17 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 106 | ||||
-rw-r--r-- | src/rabbit_control.erl | 53 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 18 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 62 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 11 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 29 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 58 |
13 files changed, 665 insertions, 58 deletions
@@ -214,6 +214,8 @@ def genErl(spec): elif type == 'table': print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \ (f.index, f.index) + elif type == 'shortstr': + print " if F%dLen > 255 -> exit(method_field_shortstr_overflow); true -> ok end," % (f.index) else: pass @@ -246,7 +248,10 @@ def genErl(spec): elif type == 'table': print " F%dTab = rabbit_binary_generator:generate_table(F%d)," % (f.index, f.index) print " F%dLen = size(F%dTab)," % (f.index, f.index) - elif type in ['shortstr', 'longstr']: + elif type == 'shortstr': + print " F%dLen = size(F%d)," % (f.index, f.index) + print " if F%dLen > 255 -> exit(method_field_shortstr_overflow); true -> ok end," % (f.index) + elif type == 'longstr': print " F%dLen = size(F%d)," % (f.index, f.index) else: pass diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 47c4d168..e26767ab 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -98,6 +98,13 @@ nodes determined by I<clusternode> option(s). See L<http://www.rabbitmq.com/clustering.html> for more information about clustering. +=item close_connection I<connectionpid> I<explanation> + +Instruct the broker to close the connection associated with the Erlang +process id I<connectionpid> (see also the I<list_connections> +command), passing the I<explanation> string to the connected client as +part of the AMQP connection shutdown protocol. + =back =head2 USER MANAGEMENT @@ -202,6 +209,22 @@ queue arguments id of the Erlang process associated with the queue +=item owner_pid + +id of the Erlang process representing the connection which is the +exclusive owner of the queue, or empty if the queue is non-exclusive + +=item exclusive_consumer_pid + +id of the Erlang process representing the channel of the exclusive +consumer subscribed to this queue, or empty if there is no exclusive +consumer + +=item exclusive_consumer_tag + +consumer tag of the exclusive consumer subscribed to this queue, or +empty if there is no exclusive consumer + =item messages_ready number of messages ready to be delivered to clients @@ -367,10 +390,87 @@ send queue size =back -The list_queues, list_exchanges and list_bindings commands accept an -optional virtual host parameter for which to display results, -defaulting to I<"/">. The default can be overridden with the B<-p> -flag. +=over + +=item list_channels [I<channelinfoitem> ...] + +List channel information. Each line printed describes a channel, with +the requested I<channelinfoitem> values separated by tab characters. +If no I<channelinfoitem>s are specified then I<pid>, I<user>, +I<transactional>, I<consumer_count>, and I<messages_unacknowledged> +are assumed. + +The list includes channels which are part of ordinary AMQP connections +(as listed by list_connections) and channels created by various +plug-ins and other extensions. + +=back + +=head3 Channel information items + +=over + +=item pid + +id of the Erlang process associated with the channel + +=item connection + +id of the Erlang process associated with the connection to which the +channel belongs + +=item number + +the number of the channel, which uniquely identifies it within a +connection + +=item user + +username associated with the channel + +=item vhost + +virtual host in which the channel operates + +=item transactional + +true if the channel is in transactional mode, false otherwise + +=item consumer_count + +number of logical AMQP consumers retrieving messages via the channel + +=item messages_unacknowledged + +number of messages delivered via this channel but not yet acknowledged + +=item acks_uncommitted + +number of acknowledgements received in an as yet uncommitted +transaction + +=item prefetch_count + +QoS prefetch count limit in force, 0 if unlimited + +=back + +=item list_consumers + +List consumers, i.e. subscriptions to a queue's message stream. Each +line printed shows, separated by tab characters, the name of the queue +subscribed to, the id of the channel process via which the +subscription was created and is managed, the consumer tag which +uniquely identifies the subscription within a channel, and a boolean +indicating whether acknowledgements are expected for messages +delivered to this consumer. + +=back + +The list_queues, list_exchanges, list_bindings and list_consumers +commands accept an optional virtual host parameter for which to +display results, defaulting to I<"/">. The default can be overridden +with the B<-p> flag. =head1 OUTPUT ESCAPING diff --git a/src/pg_local.erl b/src/pg_local.erl new file mode 100644 index 00000000..fa41fe46 --- /dev/null +++ b/src/pg_local.erl @@ -0,0 +1,213 @@ +%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP +%% distribution, with the following modifications: +%% +%% 1) Process groups are node-local only. +%% +%% 2) Groups are created/deleted implicitly. +%% +%% 3) 'join' and 'leave' are asynchronous. +%% +%% 4) the type specs of the exported non-callback functions have been +%% extracted into a separate, guarded section, and rewritten in +%% old-style spec syntax, for better compatibility with older +%% versions of Erlang/OTP. The remaining type specs have been +%% removed. + +%% All modifications are (C) 2010 LShift Ltd. + +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(pg_local). + +-export([join/2, leave/2, get_members/1]). +-export([sync/0]). %% intended for testing only; not part of official API +-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2, + terminate/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(name() :: term()). + +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}). +-spec(start/0 :: () -> {'ok', pid()} | {'error', term()}). +-spec(join/2 :: (name(), pid()) -> 'ok'). +-spec(leave/2 :: (name(), pid()) -> 'ok'). +-spec(get_members/1 :: (name()) -> [pid()]). + +-spec(sync/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +%%% As of R13B03 monitors are used instead of links. + +%%% +%%% Exported functions +%%% + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +start() -> + ensure_started(). + +join(Name, Pid) when is_pid(Pid) -> + ensure_started(), + gen_server:cast(?MODULE, {join, Name, Pid}). + +leave(Name, Pid) when is_pid(Pid) -> + ensure_started(), + gen_server:cast(?MODULE, {leave, Name, Pid}). + +get_members(Name) -> + ensure_started(), + group_members(Name). + +sync() -> + ensure_started(), + gen_server:call(?MODULE, sync). + +%%% +%%% Callback functions from gen_server +%%% + +-record(state, {}). + +init([]) -> + pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]), + {ok, #state{}}. + +handle_call(sync, _From, S) -> + {reply, ok, S}; + +handle_call(Request, From, S) -> + error_logger:warning_msg("The pg_local server received an unexpected message:\n" + "handle_call(~p, ~p, _)\n", + [Request, From]), + {noreply, S}. + +handle_cast({join, Name, Pid}, S) -> + join_group(Name, Pid), + {noreply, S}; +handle_cast({leave, Name, Pid}, S) -> + leave_group(Name, Pid), + {noreply, S}; +handle_cast(_, S) -> + {noreply, S}. + +handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) -> + member_died(MonitorRef), + {noreply, S}; +handle_info(_, S) -> + {noreply, S}. + +terminate(_Reason, _S) -> + true = ets:delete(pg_local_table), + ok. + +%%% +%%% Local functions +%%% + +%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the +%%% table is ordered_set, and the fast matching of partially +%%% instantiated keys is used extensively. +%%% +%%% {{ref, Pid}, MonitorRef, Counter} +%%% {{ref, MonitorRef}, Pid} +%%% Each process has one monitor. Counter is incremented when the +%%% Pid joins some group. +%%% {{member, Name, Pid}, _} +%%% Pid is a member of group Name, GroupCounter is incremented when the +%%% Pid joins the group Name. +%%% {{pid, Pid, Name}} +%%% Pid is a member of group Name. + +member_died(Ref) -> + [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}), + Names = member_groups(Pid), + _ = [leave_group(Name, P) || + Name <- Names, + P <- member_in_group(Pid, Name)], + ok. + +join_group(Name, Pid) -> + Ref_Pid = {ref, Pid}, + try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1}) + catch _:_ -> + Ref = erlang:monitor(process, Pid), + true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}), + true = ets:insert(pg_local_table, {{ref, Ref}, Pid}) + end, + Member_Name_Pid = {member, Name, Pid}, + try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1}) + catch _:_ -> + true = ets:insert(pg_local_table, {Member_Name_Pid, 1}), + true = ets:insert(pg_local_table, {{pid, Pid, Name}}) + end. + +leave_group(Name, Pid) -> + Member_Name_Pid = {member, Name, Pid}, + try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of + N -> + if + N =:= 0 -> + true = ets:delete(pg_local_table, {pid, Pid, Name}), + true = ets:delete(pg_local_table, Member_Name_Pid); + true -> + ok + end, + Ref_Pid = {ref, Pid}, + case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of + 0 -> + [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid), + true = ets:delete(pg_local_table, {ref, Ref}), + true = ets:delete(pg_local_table, Ref_Pid), + true = erlang:demonitor(Ref, [flush]), + ok; + _ -> + ok + end + catch _:_ -> + ok + end. + +group_members(Name) -> + [P || + [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}), + _ <- lists:seq(1, N)]. + +member_in_group(Pid, Name) -> + [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}), + lists:duplicate(N, Pid). + +member_groups(Pid) -> + [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})]. + +ensure_started() -> + case whereis(?MODULE) of + undefined -> + C = {pg_local, {?MODULE, start_link, []}, permanent, + 1000, worker, [?MODULE]}, + supervisor:start_child(kernel_safe_sup, C); + PgLocalPid -> + {ok, PgLocalPid} + end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index db7461b0..08ad2d78 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,6 +37,7 @@ -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2]). @@ -74,6 +75,9 @@ -spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). +-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]). +-spec(consumers_all/1 :: + (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]). -spec(stat/1 :: (amqqueue()) -> qstats()). -spec(stat_all/0 :: () -> [qstats()]). -spec(delete/3 :: @@ -96,7 +100,8 @@ -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: - (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) -> + (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), + boolean(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). @@ -240,6 +245,16 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +consumers(#amqqueue{ pid = QPid }) -> + gen_server2:pcall(QPid, 9, consumers, infinity). + +consumers_all(VHostPath) -> + lists:concat( + map(VHostPath, + fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} || + {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] + end)). + stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). stat_all() -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 06e68a1b..29d428c7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -77,6 +77,9 @@ auto_delete, arguments, pid, + owner_pid, + exclusive_consumer_pid, + exclusive_consumer_tag, messages_ready, messages_unacknowledged, messages_uncommitted, @@ -511,6 +514,18 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; i(pid, _) -> self(); +i(owner_pid, #q{owner = none}) -> + ''; +i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) -> + ReaderPid; +i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> + ''; +i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> + ChPid; +i(exclusive_consumer_tag, #q{exclusive_consumer = none}) -> + ''; +i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> + ConsumerTag; i(messages_ready, #q{message_buffer = MessageBuffer}) -> queue:len(MessageBuffer); i(messages_unacknowledged, _) -> @@ -547,6 +562,15 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; +handle_call(consumers, _From, + State = #q{active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> + reply(rabbit_misc:queue_fold( + fun ({ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, Acc) -> + [{ChPid, ConsumerTag, AckRequired} | Acc] + end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); + handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "immediate" delivery mode %% diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index b8e161a6..b903a6ee 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -196,12 +196,16 @@ generate_array(Array) when is_list(Array) -> fun ({Type, Value}) -> field_value_to_binary(Type, Value) end, Array)). -short_string_to_binary(String) when is_binary(String) and (size(String) < 256) -> - [<<(size(String)):8>>, String]; +short_string_to_binary(String) when is_binary(String) -> + Len = size(String), + if Len < 256 -> [<<(size(String)):8>>, String]; + true -> exit(content_properties_shortstr_overflow) + end; short_string_to_binary(String) -> StringLength = length(String), - true = (StringLength < 256), % assertion - [<<StringLength:8>>, String]. + if StringLength < 256 -> [<<StringLength:8>>, String]; + true -> exit(content_properties_shortstr_overflow) + end. long_string_to_binary(String) when is_binary(String) -> [<<(size(String)):32>>, String]; @@ -239,7 +243,10 @@ encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, Flags end. encode_property(shortstr, String) -> - Len = size(String), <<Len:8/unsigned, String:Len/binary>>; + Len = size(String), + if Len < 256 -> <<Len:8/unsigned, String:Len/binary>>; + true -> exit(content_properties_shortstr_overflow) + end; encode_property(longstr, String) -> Len = size(String), <<Len:32/unsigned, String:Len/binary>>; encode_property(octet, Int) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8f11b4ed..b9940eb9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,8 +37,10 @@ -export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). +-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, + handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, @@ -46,10 +48,23 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping}). --define(HIBERNATE_AFTER, 1000). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(INFO_KEYS, + [pid, + connection, + number, + user, + vhost, + transactional, + consumer_count, + messages_unacknowledged, + acks_uncommitted, + prefetch_count]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -62,6 +77,12 @@ -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(list/0 :: () -> [pid()]). +-spec(info_keys/0 :: () -> [info_key()]). +-spec(info/1 :: (pid()) -> [info()]). +-spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(info_all/0 :: () -> [[info()]]). +-spec(info_all/1 :: ([info_key()]) -> [[info()]]). -endif. @@ -91,12 +112,33 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> conserve_memory(Pid, Conserve) -> gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). +list() -> + pg_local:get_members(rabbit_channels). + +info_keys() -> ?INFO_KEYS. + +info(Pid) -> + gen_server2:pcall(Pid, 9, info, infinity). + +info(Pid, Items) -> + case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end. + +info_all() -> + rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()). + +info_all(Items) -> + rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + ok = pg_local:join(rabbit_channels, self()), {ok, #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -110,7 +152,18 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}}. + consumer_mapping = dict:new()}, + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State); + +handle_call({info, Items}, _From, State) -> + try + reply({ok, infos(Items, State)}, State) + catch Error -> reply({error, Error}, State) + end; handle_call(_Request, _From, State) -> noreply(State). @@ -162,33 +215,31 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; + {stop, Reason, State}. -handle_info(timeout, State) -> +handle_pre_hibernate(State) -> ok = clear_permission_cache(), - {noreply, State, hibernate}. + {hibernate, State}. -terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, - state = terminating}) -> - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid); +terminate(_Reason, State = #ch{state = terminating}) -> + terminate(State); -terminate(Reason, State = #ch{writer_pid = WriterPid, - limiter_pid = LimiterPid}) -> +terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of normal -> ok = Res; _ -> ok end, - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid). + terminate(State). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%--------------------------------------------------------------------------- -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. +reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. + +noreply(NewState) -> {noreply, NewState, hibernate}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -953,3 +1004,28 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. + +terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> + pg_local:leave(rabbit_channels, self()), + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid). + +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, _) -> self(); +i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; +i(number, #ch{channel = Channel}) -> Channel; +i(user, #ch{username = Username}) -> Username; +i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> + dict:size(ConsumerMapping); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, + uncommitted_ack_q = UAQ}) -> + queue:len(UAMQ) + queue:len(UAQ); +i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) -> + queue:len(UAQ); +i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> + rabbit_limiter:get_limit(LimiterPid); +i(Item, _) -> + throw({bad_argument, Item}). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 2fe3f33e..771c0e72 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -81,6 +81,9 @@ start() -> {error, Reason} -> error("~p", [Reason]), halt(2); + {badrpc, {'EXIT', Reason}} -> + error("~p", [Reason]), + halt(2); {badrpc, Reason} -> error("unable to connect to node ~w: ~w", [Node, Reason]), print_badrpc_diagnostics(Node), @@ -139,6 +142,7 @@ Available commands: cluster <ClusterNode> ... status rotate_logs [Suffix] + close_connection <ConnectionPid> <ExplanationString> add_user <UserName> <Password> delete_user <UserName> @@ -158,6 +162,8 @@ Available commands: list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] list_bindings [-p <VHostPath>] list_connections [<ConnectionInfoItem> ...] + list_channels [<ChannelInfoItem> ...] + list_consumers [-p <VHostPath>] Quiet output mode is selected with the \"-q\" flag. Informational messages are suppressed when quiet mode is in effect. @@ -174,7 +180,8 @@ optional virtual host parameter for which to display results. The default value is \"/\". <QueueInfoItem> must be a member of the list [name, durable, -auto_delete, arguments, pid, messages_ready, messages_unacknowledged, +auto_delete, arguments, pid, owner_pid, exclusive_consumer_pid, +exclusive_consumer_tag, messages_ready, messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, consumers, transactions, memory]. The default is to display name and (number of) messages. @@ -191,6 +198,17 @@ frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display user, peer_address, peer_port and state. +<ChannelInfoItem> must be a member of the list [pid, connection, +number, user, vhost, transactional, consumer_count, +messages_unacknowledged, acks_uncommitted, prefetch_count]. The +default is to display pid, user, transactional, consumer_count, +messages_unacknowledged. + +The output format for \"list_consumers\" is a list of rows containing, +in order, the queue name, channel process id, consumer tag, and a +boolean indicating whether acknowledgements are expected from the +consumer. + "), halt(1). @@ -235,6 +253,11 @@ action(rotate_logs, Node, Args = [Suffix], Inform) -> Inform("Rotating logs to files with suffix ~p", [Suffix]), call(Node, {rabbit, rotate_logs, Args}); +action(close_connection, Node, [PidStr, Explanation], Inform) -> + Inform("Closing connection ~s", [PidStr]), + rpc_call(Node, rabbit_networking, close_connection, + [rabbit_misc:string_to_pid(PidStr), Explanation]); + action(add_user, Node, Args = [Username, _Password], Inform) -> Inform("Creating user ~p", [Username]), call(Node, {rabbit_access_control, add_user, Args}); @@ -291,8 +314,7 @@ action(list_bindings, Node, Args, Inform) -> display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], - InfoKeys), - ok; + InfoKeys); action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), @@ -301,6 +323,22 @@ action(list_connections, Node, Args, Inform) -> [ArgAtoms]), ArgAtoms); +action(list_channels, Node, Args, Inform) -> + Inform("Listing channels", []), + ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count, + messages_unacknowledged]), + display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), + ArgAtoms); + +action(list_consumers, Node, Args, Inform) -> + Inform("Listing consumers", []), + {VHostArg, _} = parse_vhost_flag_bin(Args), + InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required], + display_info_list( + [lists:zip(InfoKeys, tuple_to_list(X)) || + X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])], + InfoKeys); + action(Command, Node, Args, Inform) -> {VHost, RemainingArgs} = parse_vhost_flag(Args), action(Command, Node, VHost, RemainingArgs, Inform). @@ -358,7 +396,7 @@ format_info_item(Key, Items) -> is_tuple(Value) -> inet_parse:ntoa(Value); Value when is_pid(Value) -> - pid_to_string(Value); + rabbit_misc:pid_to_string(Value); Value when is_binary(Value) -> escape(Value); Value when is_atom(Value) -> @@ -416,10 +454,3 @@ prettify_typed_amqp_value(Type, Value) -> array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; _ -> Value end. - -%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7) -pid_to_string(Pid) -> - <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> - = term_to_binary(Pid), - Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), - lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index cf91270e..83df15ce 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -31,12 +31,13 @@ -module(rabbit_limiter). --behaviour(gen_server). +-behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/2, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([get_limit/1]). %%---------------------------------------------------------------------------- @@ -51,6 +52,7 @@ -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). +-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). -endif. @@ -69,7 +71,7 @@ %%---------------------------------------------------------------------------- start_link(ChPid, UnackedMsgCount) -> - {ok, Pid} = gen_server:start_link(?MODULE, [ChPid, UnackedMsgCount], []), + {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []), Pid. shutdown(undefined) -> @@ -104,6 +106,13 @@ register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). unregister(undefined, _QPid) -> ok; unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). +get_limit(undefined) -> + 0; +get_limit(Pid) -> + rabbit_misc:with_exit_handler( + fun () -> 0 end, + fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -118,7 +127,10 @@ handle_call({can_send, QPid, AckRequired}, _From, false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; true -> Volume end}} - end. + end; + +handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> + {reply, PrefetchCount, State}. handle_cast(shutdown, State) -> {stop, normal, State}; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 172e27f4..92d03789 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,6 +56,7 @@ -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). +-export([pid_to_string/1, string_to_pid/1]). -import(mnesia). -import(lists). @@ -127,6 +128,8 @@ -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> number()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). +-spec(pid_to_string/1 :: (pid()) -> string()). +-spec(string_to_pid/1 :: (string()) -> pid()). -endif. @@ -499,3 +502,62 @@ queue_fold(Fun, Init, Q) -> {empty, _Q} -> Init; {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. + +%% This provides a string representation of a pid that is the same +%% regardless of what node we are running on. The representation also +%% permits easy identification of the pid's node. +pid_to_string(Pid) when is_pid(Pid) -> + %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and + %% 8.7) + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> + = term_to_binary(Pid), + Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). + +%% inverse of above +string_to_pid(Str) -> + ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end, + %% TODO: simplify this code by using the 're' module, once we drop + %% support for R11 + %% + %% 1) sanity check + %% The \ before the trailing $ is only there to keep emacs + %% font-lock from getting confused. + case regexp:first_match(Str, "^<.*\\.[0-9]+\\.[0-9]+>\$") of + {match, _, _} -> + %% 2) strip <> + Str1 = string:substr(Str, 2, string:len(Str) - 2), + %% 3) extract three constituent parts, taking care to + %% handle dots in the node part (hence the reverse and concat) + [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")), + NodeStr = lists:concat(lists:reverse(Rest)), + %% 4) construct a triple term from the three parts + TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.", + [NodeStr, IdStr, SerStr])), + %% 5) parse the triple + Tokens = case erl_scan:string(TripleStr) of + {ok, Tokens1, _} -> Tokens1; + {error, _, _} -> ErrorFun() + end, + Term = case erl_parse:parse_term(Tokens) of + {ok, Term1} -> Term1; + {error, _} -> ErrorFun() + end, + {Node, Id, Ser} = + case Term of + {Node1, Id1, Ser1} when is_atom(Node1) andalso + is_integer(Id1) andalso + is_integer(Ser1) -> + Term; + _ -> + ErrorFun() + end, + %% 6) turn the triple into a pid - see pid_to_string + <<131,NodeEnc/binary>> = term_to_binary(Node), + binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); + nomatch -> + ErrorFun(); + Error -> + %% invalid regexp - shouldn't happen + throw(Error) + end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 06e2b40e..717eccc4 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -35,7 +35,8 @@ stop_tcp_listener/2, on_node_down/1, active_listeners/0, node_listeners/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, - connection_info_all/0, connection_info_all/1]). + connection_info_all/0, connection_info_all/1, + close_connection/2]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). @@ -76,6 +77,7 @@ -spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]). -spec(connection_info_all/0 :: () -> [[info()]]). -spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]). +-spec(close_connection/2 :: (pid(), string()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) -> {ip_address(), atom()}). @@ -224,6 +226,13 @@ connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items). connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). +close_connection(Pid, Explanation) -> + case lists:any(fun ({_, ChildPid, _, _}) -> ChildPid =:= Pid end, + supervisor:which_children(rabbit_tcp_client_sup)) of + true -> rabbit_reader:shutdown(Pid, Explanation); + false -> throw({error, {not_a_connection_pid, Pid}}) + end. + %%-------------------------------------------------------------------- tcp_host({0,0,0,0}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f5bdb985..d0d8860f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,7 +33,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info_keys/0, info/1, info/2]). +-export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -132,6 +132,7 @@ -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(shutdown/2 :: (pid(), string()) -> 'ok'). -endif. @@ -140,6 +141,9 @@ start_link() -> {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +shutdown(Pid, Explanation) -> + gen_server:call(Pid, {shutdown, Explanation}, infinity). + init(Parent) -> Deb = sys:debug_options([]), receive @@ -267,13 +271,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> {inet_async, Sock, Ref, {error, Reason}} -> throw({inet_error, Reason}); {'EXIT', Parent, Reason} -> - if State#v1.connection_state =:= running -> - send_exception(State, 0, - rabbit_misc:amqp_error(connection_forced, - "broker forced connection closure with reason '~w'", - [Reason], none)); - true -> ok - end, + terminate(io_lib:format("broker forced connection closure " + "with reason '~w'", [Reason]), State), %% this is what we are expected to do according to %% http://www.erlang.org/doc/man/sys.html %% @@ -301,6 +300,13 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end; timeout -> throw({timeout, State#v1.connection_state}); + {'$gen_call', From, {shutdown, Explanation}} -> + {ForceTermination, NewState} = terminate(Explanation, State), + gen_server:reply(From, ok), + case ForceTermination of + force -> ok; + normal -> mainloop(Parent, Deb, NewState) + end; {'$gen_call', From, info} -> gen_server:reply(From, infos(?INFO_KEYS, State)), mainloop(Parent, Deb, State); @@ -323,6 +329,13 @@ switch_callback(OldState, NewCallback, Length) -> OldState#v1{callback = NewCallback, recv_ref = Ref}. +terminate(Explanation, State = #v1{connection_state = running}) -> + {normal, send_exception(State, 0, + rabbit_misc:amqp_error( + connection_forced, Explanation, [], none))}; +terminate(_Explanation, State) -> + {force, State}. + close_connection(State = #v1{connection = #connection{ timeout_sec = TimeoutSec}}) -> %% We terminate the connection after the specified interval, but diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 833ccc26..46a8f7df 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_priority_queue(), + passed = test_pg_local(), passed = test_unfold(), passed = test_parsing(), passed = test_topic_matching(), @@ -183,6 +184,28 @@ test_simple_n_element_queue(N) -> {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. +test_pg_local() -> + [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]], + check_pg_local(ok, [], []), + check_pg_local(pg_local:join(a, P), [P], []), + check_pg_local(pg_local:join(b, P), [P], [P]), + check_pg_local(pg_local:join(a, P), [P, P], [P]), + check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]), + check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]), + check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]), + check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]), + check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]), + check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), + check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), + [X ! done || X <- [P, Q]], + check_pg_local(ok, [], []), + passed. + +check_pg_local(ok, APids, BPids) -> + ok = pg_local:sync(), + [true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) || + {Key, Pids} <- [{a, APids}, {b, BPids}]]. + test_unfold() -> {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), List = lists:seq(2,20,2), @@ -689,10 +712,17 @@ test_user_management() -> test_server_status() -> - %% create a queue so we have something to list - Q = #amqqueue{} = rabbit_amqqueue:declare( - rabbit_misc:r(<<"/">>, queue, <<"foo">>), - false, false, []), + %% create a few things so there is some useful information to list + Writer = spawn(fun () -> receive shutdown -> ok end end), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), + [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( + rabbit_misc:r(<<"/">>, queue, Name), + false, false, []) || + Name <- [<<"foo">>, <<"bar">>]], + + ok = rabbit_amqqueue:claim_queue(Q, self()), + ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined, + <<"ctag">>, true, undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), @@ -703,19 +733,29 @@ test_server_status() -> %% list bindings ok = control_action(list_bindings, []), - %% cleanup - {ok, _} = rabbit_amqqueue:delete(Q, false, false), - %% list connections [#listener{host = H, port = P} | _] = [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), N =:= node()], - {ok, C} = gen_tcp:connect(H, P, []), + {ok, _C} = gen_tcp:connect(H, P, []), timer:sleep(100), ok = info_action(list_connections, rabbit_networking:connection_info_keys(), false), - ok = gen_tcp:close(C), + %% close_connection + [ConnPid] = rabbit_networking:connections(), + ok = control_action(close_connection, [rabbit_misc:pid_to_string(ConnPid), + "go away"]), + + %% list channels + ok = info_action(list_channels, rabbit_channel:info_keys(), false), + + %% list consumers + ok = control_action(list_consumers, []), + + %% cleanup + [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], + ok = rabbit_channel:shutdown(Ch), passed. |