summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-05 20:28:16 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-05 20:28:16 +0000
commita7b3464eb73e66f61a218134f03055b6b037ec5e (patch)
tree9549e3b8fb3459dd946c665afddd8605b12419e2
parent82571f5e68c898a79fac70bdf7c65c1de0f2dcd6 (diff)
parenta8e6ccc9735461e74e501ee3f176ea3c0970c1d5 (diff)
downloadrabbitmq-server-bug22310.tar.gz
merge default into bug22310bug22310
in order to resolve merge conflict going the other way
-rw-r--r--codegen.py7
-rw-r--r--docs/rabbitmqctl.1.pod108
-rw-r--r--src/pg_local.erl213
-rw-r--r--src/rabbit_amqqueue.erl17
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_binary_generator.erl17
-rw-r--r--src/rabbit_channel.erl106
-rw-r--r--src/rabbit_control.erl53
-rw-r--r--src/rabbit_limiter.erl18
-rw-r--r--src/rabbit_misc.erl62
-rw-r--r--src/rabbit_networking.erl11
-rw-r--r--src/rabbit_reader.erl29
-rw-r--r--src/rabbit_tests.erl58
13 files changed, 665 insertions, 58 deletions
diff --git a/codegen.py b/codegen.py
index 6f39574f..648983f1 100644
--- a/codegen.py
+++ b/codegen.py
@@ -214,6 +214,8 @@ def genErl(spec):
elif type == 'table':
print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \
(f.index, f.index)
+ elif type == 'shortstr':
+ print " if F%dLen > 255 -> exit(method_field_shortstr_overflow); true -> ok end," % (f.index)
else:
pass
@@ -246,7 +248,10 @@ def genErl(spec):
elif type == 'table':
print " F%dTab = rabbit_binary_generator:generate_table(F%d)," % (f.index, f.index)
print " F%dLen = size(F%dTab)," % (f.index, f.index)
- elif type in ['shortstr', 'longstr']:
+ elif type == 'shortstr':
+ print " F%dLen = size(F%d)," % (f.index, f.index)
+ print " if F%dLen > 255 -> exit(method_field_shortstr_overflow); true -> ok end," % (f.index)
+ elif type == 'longstr':
print " F%dLen = size(F%d)," % (f.index, f.index)
else:
pass
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 47c4d168..e26767ab 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -98,6 +98,13 @@ nodes determined by I<clusternode> option(s). See
L<http://www.rabbitmq.com/clustering.html> for more information about
clustering.
+=item close_connection I<connectionpid> I<explanation>
+
+Instruct the broker to close the connection associated with the Erlang
+process id I<connectionpid> (see also the I<list_connections>
+command), passing the I<explanation> string to the connected client as
+part of the AMQP connection shutdown protocol.
+
=back
=head2 USER MANAGEMENT
@@ -202,6 +209,22 @@ queue arguments
id of the Erlang process associated with the queue
+=item owner_pid
+
+id of the Erlang process representing the connection which is the
+exclusive owner of the queue, or empty if the queue is non-exclusive
+
+=item exclusive_consumer_pid
+
+id of the Erlang process representing the channel of the exclusive
+consumer subscribed to this queue, or empty if there is no exclusive
+consumer
+
+=item exclusive_consumer_tag
+
+consumer tag of the exclusive consumer subscribed to this queue, or
+empty if there is no exclusive consumer
+
=item messages_ready
number of messages ready to be delivered to clients
@@ -367,10 +390,87 @@ send queue size
=back
-The list_queues, list_exchanges and list_bindings commands accept an
-optional virtual host parameter for which to display results,
-defaulting to I<"/">. The default can be overridden with the B<-p>
-flag.
+=over
+
+=item list_channels [I<channelinfoitem> ...]
+
+List channel information. Each line printed describes a channel, with
+the requested I<channelinfoitem> values separated by tab characters.
+If no I<channelinfoitem>s are specified then I<pid>, I<user>,
+I<transactional>, I<consumer_count>, and I<messages_unacknowledged>
+are assumed.
+
+The list includes channels which are part of ordinary AMQP connections
+(as listed by list_connections) and channels created by various
+plug-ins and other extensions.
+
+=back
+
+=head3 Channel information items
+
+=over
+
+=item pid
+
+id of the Erlang process associated with the channel
+
+=item connection
+
+id of the Erlang process associated with the connection to which the
+channel belongs
+
+=item number
+
+the number of the channel, which uniquely identifies it within a
+connection
+
+=item user
+
+username associated with the channel
+
+=item vhost
+
+virtual host in which the channel operates
+
+=item transactional
+
+true if the channel is in transactional mode, false otherwise
+
+=item consumer_count
+
+number of logical AMQP consumers retrieving messages via the channel
+
+=item messages_unacknowledged
+
+number of messages delivered via this channel but not yet acknowledged
+
+=item acks_uncommitted
+
+number of acknowledgements received in an as yet uncommitted
+transaction
+
+=item prefetch_count
+
+QoS prefetch count limit in force, 0 if unlimited
+
+=back
+
+=item list_consumers
+
+List consumers, i.e. subscriptions to a queue's message stream. Each
+line printed shows, separated by tab characters, the name of the queue
+subscribed to, the id of the channel process via which the
+subscription was created and is managed, the consumer tag which
+uniquely identifies the subscription within a channel, and a boolean
+indicating whether acknowledgements are expected for messages
+delivered to this consumer.
+
+=back
+
+The list_queues, list_exchanges, list_bindings and list_consumers
+commands accept an optional virtual host parameter for which to
+display results, defaulting to I<"/">. The default can be overridden
+with the B<-p> flag.
=head1 OUTPUT ESCAPING
diff --git a/src/pg_local.erl b/src/pg_local.erl
new file mode 100644
index 00000000..fa41fe46
--- /dev/null
+++ b/src/pg_local.erl
@@ -0,0 +1,213 @@
+%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) Process groups are node-local only.
+%%
+%% 2) Groups are created/deleted implicitly.
+%%
+%% 3) 'join' and 'leave' are asynchronous.
+%%
+%% 4) the type specs of the exported non-callback functions have been
+%% extracted into a separate, guarded section, and rewritten in
+%% old-style spec syntax, for better compatibility with older
+%% versions of Erlang/OTP. The remaining type specs have been
+%% removed.
+
+%% All modifications are (C) 2010 LShift Ltd.
+
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1997-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(pg_local).
+
+-export([join/2, leave/2, get_members/1]).
+-export([sync/0]). %% intended for testing only; not part of official API
+-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
+ terminate/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(name() :: term()).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}).
+-spec(start/0 :: () -> {'ok', pid()} | {'error', term()}).
+-spec(join/2 :: (name(), pid()) -> 'ok').
+-spec(leave/2 :: (name(), pid()) -> 'ok').
+-spec(get_members/1 :: (name()) -> [pid()]).
+
+-spec(sync/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%%% As of R13B03 monitors are used instead of links.
+
+%%%
+%%% Exported functions
+%%%
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+start() ->
+ ensure_started().
+
+join(Name, Pid) when is_pid(Pid) ->
+ ensure_started(),
+ gen_server:cast(?MODULE, {join, Name, Pid}).
+
+leave(Name, Pid) when is_pid(Pid) ->
+ ensure_started(),
+ gen_server:cast(?MODULE, {leave, Name, Pid}).
+
+get_members(Name) ->
+ ensure_started(),
+ group_members(Name).
+
+sync() ->
+ ensure_started(),
+ gen_server:call(?MODULE, sync).
+
+%%%
+%%% Callback functions from gen_server
+%%%
+
+-record(state, {}).
+
+init([]) ->
+ pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]),
+ {ok, #state{}}.
+
+handle_call(sync, _From, S) ->
+ {reply, ok, S};
+
+handle_call(Request, From, S) ->
+ error_logger:warning_msg("The pg_local server received an unexpected message:\n"
+ "handle_call(~p, ~p, _)\n",
+ [Request, From]),
+ {noreply, S}.
+
+handle_cast({join, Name, Pid}, S) ->
+ join_group(Name, Pid),
+ {noreply, S};
+handle_cast({leave, Name, Pid}, S) ->
+ leave_group(Name, Pid),
+ {noreply, S};
+handle_cast(_, S) ->
+ {noreply, S}.
+
+handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
+ member_died(MonitorRef),
+ {noreply, S};
+handle_info(_, S) ->
+ {noreply, S}.
+
+terminate(_Reason, _S) ->
+ true = ets:delete(pg_local_table),
+ ok.
+
+%%%
+%%% Local functions
+%%%
+
+%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the
+%%% table is ordered_set, and the fast matching of partially
+%%% instantiated keys is used extensively.
+%%%
+%%% {{ref, Pid}, MonitorRef, Counter}
+%%% {{ref, MonitorRef}, Pid}
+%%% Each process has one monitor. Counter is incremented when the
+%%% Pid joins some group.
+%%% {{member, Name, Pid}, _}
+%%% Pid is a member of group Name, GroupCounter is incremented when the
+%%% Pid joins the group Name.
+%%% {{pid, Pid, Name}}
+%%% Pid is a member of group Name.
+
+member_died(Ref) ->
+ [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}),
+ Names = member_groups(Pid),
+ _ = [leave_group(Name, P) ||
+ Name <- Names,
+ P <- member_in_group(Pid, Name)],
+ ok.
+
+join_group(Name, Pid) ->
+ Ref_Pid = {ref, Pid},
+ try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1})
+ catch _:_ ->
+ Ref = erlang:monitor(process, Pid),
+ true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}),
+ true = ets:insert(pg_local_table, {{ref, Ref}, Pid})
+ end,
+ Member_Name_Pid = {member, Name, Pid},
+ try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1})
+ catch _:_ ->
+ true = ets:insert(pg_local_table, {Member_Name_Pid, 1}),
+ true = ets:insert(pg_local_table, {{pid, Pid, Name}})
+ end.
+
+leave_group(Name, Pid) ->
+ Member_Name_Pid = {member, Name, Pid},
+ try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of
+ N ->
+ if
+ N =:= 0 ->
+ true = ets:delete(pg_local_table, {pid, Pid, Name}),
+ true = ets:delete(pg_local_table, Member_Name_Pid);
+ true ->
+ ok
+ end,
+ Ref_Pid = {ref, Pid},
+ case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of
+ 0 ->
+ [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid),
+ true = ets:delete(pg_local_table, {ref, Ref}),
+ true = ets:delete(pg_local_table, Ref_Pid),
+ true = erlang:demonitor(Ref, [flush]),
+ ok;
+ _ ->
+ ok
+ end
+ catch _:_ ->
+ ok
+ end.
+
+group_members(Name) ->
+ [P ||
+ [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}),
+ _ <- lists:seq(1, N)].
+
+member_in_group(Pid, Name) ->
+ [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}),
+ lists:duplicate(N, Pid).
+
+member_groups(Pid) ->
+ [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})].
+
+ensure_started() ->
+ case whereis(?MODULE) of
+ undefined ->
+ C = {pg_local, {?MODULE, start_link, []}, permanent,
+ 1000, worker, [?MODULE]},
+ supervisor:start_child(kernel_safe_sup, C);
+ PgLocalPid ->
+ {ok, PgLocalPid}
+ end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index db7461b0..08ad2d78 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,6 +37,7 @@
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2]).
@@ -74,6 +75,9 @@
-spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
+-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]).
+-spec(consumers_all/1 ::
+ (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]).
-spec(stat/1 :: (amqqueue()) -> qstats()).
-spec(stat_all/0 :: () -> [qstats()]).
-spec(delete/3 ::
@@ -96,7 +100,8 @@
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/8 ::
- (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) ->
+ (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
+ boolean(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
@@ -240,6 +245,16 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+consumers(#amqqueue{ pid = QPid }) ->
+ gen_server2:pcall(QPid, 9, consumers, infinity).
+
+consumers_all(VHostPath) ->
+ lists:concat(
+ map(VHostPath,
+ fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} ||
+ {ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
+ end)).
+
stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 06e68a1b..29d428c7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -77,6 +77,9 @@
auto_delete,
arguments,
pid,
+ owner_pid,
+ exclusive_consumer_pid,
+ exclusive_consumer_tag,
messages_ready,
messages_unacknowledged,
messages_uncommitted,
@@ -511,6 +514,18 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
+i(owner_pid, #q{owner = none}) ->
+ '';
+i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) ->
+ ReaderPid;
+i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
+ '';
+i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
+ ChPid;
+i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
+ '';
+i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
+ ConsumerTag;
i(messages_ready, #q{message_buffer = MessageBuffer}) ->
queue:len(MessageBuffer);
i(messages_unacknowledged, _) ->
@@ -547,6 +562,15 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(consumers, _From,
+ State = #q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ reply(rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+
handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index b8e161a6..b903a6ee 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -196,12 +196,16 @@ generate_array(Array) when is_list(Array) ->
fun ({Type, Value}) -> field_value_to_binary(Type, Value) end,
Array)).
-short_string_to_binary(String) when is_binary(String) and (size(String) < 256) ->
- [<<(size(String)):8>>, String];
+short_string_to_binary(String) when is_binary(String) ->
+ Len = size(String),
+ if Len < 256 -> [<<(size(String)):8>>, String];
+ true -> exit(content_properties_shortstr_overflow)
+ end;
short_string_to_binary(String) ->
StringLength = length(String),
- true = (StringLength < 256), % assertion
- [<<StringLength:8>>, String].
+ if StringLength < 256 -> [<<StringLength:8>>, String];
+ true -> exit(content_properties_shortstr_overflow)
+ end.
long_string_to_binary(String) when is_binary(String) ->
[<<(size(String)):32>>, String];
@@ -239,7 +243,10 @@ encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, Flags
end.
encode_property(shortstr, String) ->
- Len = size(String), <<Len:8/unsigned, String:Len/binary>>;
+ Len = size(String),
+ if Len < 256 -> <<Len:8/unsigned, String:Len/binary>>;
+ true -> exit(content_properties_shortstr_overflow)
+ end;
encode_property(longstr, String) ->
Len = size(String), <<Len:32/unsigned, String:Len/binary>>;
encode_property(octet, Int) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8f11b4ed..b9940eb9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,8 +37,10 @@
-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
+-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3,
+ handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
@@ -46,10 +48,23 @@
username, virtual_host,
most_recently_declared_queue, consumer_mapping}).
--define(HIBERNATE_AFTER, 1000).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-define(INFO_KEYS,
+ [pid,
+ connection,
+ number,
+ user,
+ vhost,
+ transactional,
+ consumer_count,
+ messages_unacknowledged,
+ acks_uncommitted,
+ prefetch_count]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -62,6 +77,12 @@
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(list/0 :: () -> [pid()]).
+-spec(info_keys/0 :: () -> [info_key()]).
+-spec(info/1 :: (pid()) -> [info()]).
+-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(info_all/0 :: () -> [[info()]]).
+-spec(info_all/1 :: ([info_key()]) -> [[info()]]).
-endif.
@@ -91,12 +112,33 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
conserve_memory(Pid, Conserve) ->
gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}).
+list() ->
+ pg_local:get_members(rabbit_channels).
+
+info_keys() -> ?INFO_KEYS.
+
+info(Pid) ->
+ gen_server2:pcall(Pid, 9, info, infinity).
+
+info(Pid, Items) ->
+ case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
+info_all() ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()).
+
+info_all(Items) ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ ok = pg_local:join(rabbit_channels, self()),
{ok, #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -110,7 +152,18 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()}}.
+ consumer_mapping = dict:new()},
+ hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State);
+
+handle_call({info, Items}, _From, State) ->
+ try
+ reply({ok, infos(Items, State)}, State)
+ catch Error -> reply({error, Error}, State)
+ end;
handle_call(_Request, _From, State) ->
noreply(State).
@@ -162,33 +215,31 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
+ {stop, Reason, State}.
-handle_info(timeout, State) ->
+handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
- {noreply, State, hibernate}.
+ {hibernate, State}.
-terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
- state = terminating}) ->
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid);
+terminate(_Reason, State = #ch{state = terminating}) ->
+ terminate(State);
-terminate(Reason, State = #ch{writer_pid = WriterPid,
- limiter_pid = LimiterPid}) ->
+terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
normal -> ok = Res;
_ -> ok
end,
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid).
+ terminate(State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%---------------------------------------------------------------------------
-noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
+
+noreply(NewState) -> {noreply, NewState, hibernate}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -953,3 +1004,28 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
WriterPid, QPid, self(), M, Content);
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
+
+terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
+ pg_local:leave(rabbit_channels, self()),
+ rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid).
+
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, _) -> self();
+i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
+i(number, #ch{channel = Channel}) -> Channel;
+i(user, #ch{username = Username}) -> Username;
+i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
+ dict:size(ConsumerMapping);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
+ uncommitted_ack_q = UAQ}) ->
+ queue:len(UAMQ) + queue:len(UAQ);
+i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) ->
+ queue:len(UAQ);
+i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
+ rabbit_limiter:get_limit(LimiterPid);
+i(Item, _) ->
+ throw({bad_argument, Item}).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 2fe3f33e..771c0e72 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -81,6 +81,9 @@ start() ->
{error, Reason} ->
error("~p", [Reason]),
halt(2);
+ {badrpc, {'EXIT', Reason}} ->
+ error("~p", [Reason]),
+ halt(2);
{badrpc, Reason} ->
error("unable to connect to node ~w: ~w", [Node, Reason]),
print_badrpc_diagnostics(Node),
@@ -139,6 +142,7 @@ Available commands:
cluster <ClusterNode> ...
status
rotate_logs [Suffix]
+ close_connection <ConnectionPid> <ExplanationString>
add_user <UserName> <Password>
delete_user <UserName>
@@ -158,6 +162,8 @@ Available commands:
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]
+ list_channels [<ChannelInfoItem> ...]
+ list_consumers [-p <VHostPath>]
Quiet output mode is selected with the \"-q\" flag. Informational
messages are suppressed when quiet mode is in effect.
@@ -174,7 +180,8 @@ optional virtual host parameter for which to display results. The
default value is \"/\".
<QueueInfoItem> must be a member of the list [name, durable,
-auto_delete, arguments, pid, messages_ready, messages_unacknowledged,
+auto_delete, arguments, pid, owner_pid, exclusive_consumer_pid,
+exclusive_consumer_tag, messages_ready, messages_unacknowledged,
messages_uncommitted, messages, acks_uncommitted, consumers,
transactions, memory]. The default is to display name and (number of)
messages.
@@ -191,6 +198,17 @@ frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend]. The default is to display user, peer_address, peer_port
and state.
+<ChannelInfoItem> must be a member of the list [pid, connection,
+number, user, vhost, transactional, consumer_count,
+messages_unacknowledged, acks_uncommitted, prefetch_count]. The
+default is to display pid, user, transactional, consumer_count,
+messages_unacknowledged.
+
+The output format for \"list_consumers\" is a list of rows containing,
+in order, the queue name, channel process id, consumer tag, and a
+boolean indicating whether acknowledgements are expected from the
+consumer.
+
"),
halt(1).
@@ -235,6 +253,11 @@ action(rotate_logs, Node, Args = [Suffix], Inform) ->
Inform("Rotating logs to files with suffix ~p", [Suffix]),
call(Node, {rabbit, rotate_logs, Args});
+action(close_connection, Node, [PidStr, Explanation], Inform) ->
+ Inform("Closing connection ~s", [PidStr]),
+ rpc_call(Node, rabbit_networking, close_connection,
+ [rabbit_misc:string_to_pid(PidStr), Explanation]);
+
action(add_user, Node, Args = [Username, _Password], Inform) ->
Inform("Creating user ~p", [Username]),
call(Node, {rabbit_access_control, add_user, Args});
@@ -291,8 +314,7 @@ action(list_bindings, Node, Args, Inform) ->
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
- InfoKeys),
- ok;
+ InfoKeys);
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
@@ -301,6 +323,22 @@ action(list_connections, Node, Args, Inform) ->
[ArgAtoms]),
ArgAtoms);
+action(list_channels, Node, Args, Inform) ->
+ Inform("Listing channels", []),
+ ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count,
+ messages_unacknowledged]),
+ display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
+ ArgAtoms);
+
+action(list_consumers, Node, Args, Inform) ->
+ Inform("Listing consumers", []),
+ {VHostArg, _} = parse_vhost_flag_bin(Args),
+ InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required],
+ display_info_list(
+ [lists:zip(InfoKeys, tuple_to_list(X)) ||
+ X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])],
+ InfoKeys);
+
action(Command, Node, Args, Inform) ->
{VHost, RemainingArgs} = parse_vhost_flag(Args),
action(Command, Node, VHost, RemainingArgs, Inform).
@@ -358,7 +396,7 @@ format_info_item(Key, Items) ->
is_tuple(Value) ->
inet_parse:ntoa(Value);
Value when is_pid(Value) ->
- pid_to_string(Value);
+ rabbit_misc:pid_to_string(Value);
Value when is_binary(Value) ->
escape(Value);
Value when is_atom(Value) ->
@@ -416,10 +454,3 @@ prettify_typed_amqp_value(Type, Value) ->
array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
_ -> Value
end.
-
-%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7)
-pid_to_string(Pid) ->
- <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
- = term_to_binary(Pid),
- Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index cf91270e..83df15ce 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -31,12 +31,13 @@
-module(rabbit_limiter).
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/2, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([get_limit/1]).
%%----------------------------------------------------------------------------
@@ -51,6 +52,7 @@
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
+-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
-endif.
@@ -69,7 +71,7 @@
%%----------------------------------------------------------------------------
start_link(ChPid, UnackedMsgCount) ->
- {ok, Pid} = gen_server:start_link(?MODULE, [ChPid, UnackedMsgCount], []),
+ {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []),
Pid.
shutdown(undefined) ->
@@ -104,6 +106,13 @@ register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}).
unregister(undefined, _QPid) -> ok;
unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}).
+get_limit(undefined) ->
+ 0;
+get_limit(Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> 0 end,
+ fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -118,7 +127,10 @@ handle_call({can_send, QPid, AckRequired}, _From,
false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
true -> Volume
end}}
- end.
+ end;
+
+handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
+ {reply, PrefetchCount, State}.
handle_cast(shutdown, State) ->
{stop, normal, State};
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 172e27f4..92d03789 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
+-export([pid_to_string/1, string_to_pid/1]).
-import(mnesia).
-import(lists).
@@ -127,6 +128,8 @@
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
+-spec(pid_to_string/1 :: (pid()) -> string()).
+-spec(string_to_pid/1 :: (string()) -> pid()).
-endif.
@@ -499,3 +502,62 @@ queue_fold(Fun, Init, Q) ->
{empty, _Q} -> Init;
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
end.
+
+%% This provides a string representation of a pid that is the same
+%% regardless of what node we are running on. The representation also
+%% permits easy identification of the pid's node.
+pid_to_string(Pid) when is_pid(Pid) ->
+ %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
+ %% 8.7)
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ = term_to_binary(Pid),
+ Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
+
+%% inverse of above
+string_to_pid(Str) ->
+ ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end,
+ %% TODO: simplify this code by using the 're' module, once we drop
+ %% support for R11
+ %%
+ %% 1) sanity check
+ %% The \ before the trailing $ is only there to keep emacs
+ %% font-lock from getting confused.
+ case regexp:first_match(Str, "^<.*\\.[0-9]+\\.[0-9]+>\$") of
+ {match, _, _} ->
+ %% 2) strip <>
+ Str1 = string:substr(Str, 2, string:len(Str) - 2),
+ %% 3) extract three constituent parts, taking care to
+ %% handle dots in the node part (hence the reverse and concat)
+ [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")),
+ NodeStr = lists:concat(lists:reverse(Rest)),
+ %% 4) construct a triple term from the three parts
+ TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.",
+ [NodeStr, IdStr, SerStr])),
+ %% 5) parse the triple
+ Tokens = case erl_scan:string(TripleStr) of
+ {ok, Tokens1, _} -> Tokens1;
+ {error, _, _} -> ErrorFun()
+ end,
+ Term = case erl_parse:parse_term(Tokens) of
+ {ok, Term1} -> Term1;
+ {error, _} -> ErrorFun()
+ end,
+ {Node, Id, Ser} =
+ case Term of
+ {Node1, Id1, Ser1} when is_atom(Node1) andalso
+ is_integer(Id1) andalso
+ is_integer(Ser1) ->
+ Term;
+ _ ->
+ ErrorFun()
+ end,
+ %% 6) turn the triple into a pid - see pid_to_string
+ <<131,NodeEnc/binary>> = term_to_binary(Node),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
+ nomatch ->
+ ErrorFun();
+ Error ->
+ %% invalid regexp - shouldn't happen
+ throw(Error)
+ end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 06e2b40e..717eccc4 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -35,7 +35,8 @@
stop_tcp_listener/2, on_node_down/1, active_listeners/0,
node_listeners/1, connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
- connection_info_all/0, connection_info_all/1]).
+ connection_info_all/0, connection_info_all/1,
+ close_connection/2]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
@@ -76,6 +77,7 @@
-spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]).
-spec(connection_info_all/0 :: () -> [[info()]]).
-spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]).
+-spec(close_connection/2 :: (pid(), string()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) ->
{ip_address(), atom()}).
@@ -224,6 +226,13 @@ connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
+close_connection(Pid, Explanation) ->
+ case lists:any(fun ({_, ChildPid, _, _}) -> ChildPid =:= Pid end,
+ supervisor:which_children(rabbit_tcp_client_sup)) of
+ true -> rabbit_reader:shutdown(Pid, Explanation);
+ false -> throw({error, {not_a_connection_pid, Pid}})
+ end.
+
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f5bdb985..d0d8860f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,7 +33,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info_keys/0, info/1, info/2]).
+-export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -132,6 +132,7 @@
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-endif.
@@ -140,6 +141,9 @@
start_link() ->
{ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+shutdown(Pid, Explanation) ->
+ gen_server:call(Pid, {shutdown, Explanation}, infinity).
+
init(Parent) ->
Deb = sys:debug_options([]),
receive
@@ -267,13 +271,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
{'EXIT', Parent, Reason} ->
- if State#v1.connection_state =:= running ->
- send_exception(State, 0,
- rabbit_misc:amqp_error(connection_forced,
- "broker forced connection closure with reason '~w'",
- [Reason], none));
- true -> ok
- end,
+ terminate(io_lib:format("broker forced connection closure "
+ "with reason '~w'", [Reason]), State),
%% this is what we are expected to do according to
%% http://www.erlang.org/doc/man/sys.html
%%
@@ -301,6 +300,13 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
timeout ->
throw({timeout, State#v1.connection_state});
+ {'$gen_call', From, {shutdown, Explanation}} ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> ok;
+ normal -> mainloop(Parent, Deb, NewState)
+ end;
{'$gen_call', From, info} ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
mainloop(Parent, Deb, State);
@@ -323,6 +329,13 @@ switch_callback(OldState, NewCallback, Length) ->
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
+terminate(Explanation, State = #v1{connection_state = running}) ->
+ {normal, send_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
+terminate(_Explanation, State) ->
+ {force, State}.
+
close_connection(State = #v1{connection = #connection{
timeout_sec = TimeoutSec}}) ->
%% We terminate the connection after the specified interval, but
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 833ccc26..46a8f7df 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
passed = test_priority_queue(),
+ passed = test_pg_local(),
passed = test_unfold(),
passed = test_parsing(),
passed = test_topic_matching(),
@@ -183,6 +184,28 @@ test_simple_n_element_queue(N) ->
{true, false, N, ToListRes, Items} = test_priority_queue(Q),
passed.
+test_pg_local() ->
+ [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]],
+ check_pg_local(ok, [], []),
+ check_pg_local(pg_local:join(a, P), [P], []),
+ check_pg_local(pg_local:join(b, P), [P], [P]),
+ check_pg_local(pg_local:join(a, P), [P, P], [P]),
+ check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]),
+ check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]),
+ check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]),
+ check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]),
+ check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]),
+ check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
+ check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
+ [X ! done || X <- [P, Q]],
+ check_pg_local(ok, [], []),
+ passed.
+
+check_pg_local(ok, APids, BPids) ->
+ ok = pg_local:sync(),
+ [true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
+ {Key, Pids} <- [{a, APids}, {b, BPids}]].
+
test_unfold() ->
{[], test} = rabbit_misc:unfold(fun (_V) -> false end, test),
List = lists:seq(2,20,2),
@@ -689,10 +712,17 @@ test_user_management() ->
test_server_status() ->
- %% create a queue so we have something to list
- Q = #amqqueue{} = rabbit_amqqueue:declare(
- rabbit_misc:r(<<"/">>, queue, <<"foo">>),
- false, false, []),
+ %% create a few things so there is some useful information to list
+ Writer = spawn(fun () -> receive shutdown -> ok end end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
+ rabbit_misc:r(<<"/">>, queue, Name),
+ false, false, []) ||
+ Name <- [<<"foo">>, <<"bar">>]],
+
+ ok = rabbit_amqqueue:claim_queue(Q, self()),
+ ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined,
+ <<"ctag">>, true, undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -703,19 +733,29 @@ test_server_status() ->
%% list bindings
ok = control_action(list_bindings, []),
- %% cleanup
- {ok, _} = rabbit_amqqueue:delete(Q, false, false),
-
%% list connections
[#listener{host = H, port = P} | _] =
[L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
N =:= node()],
- {ok, C} = gen_tcp:connect(H, P, []),
+ {ok, _C} = gen_tcp:connect(H, P, []),
timer:sleep(100),
ok = info_action(list_connections,
rabbit_networking:connection_info_keys(), false),
- ok = gen_tcp:close(C),
+ %% close_connection
+ [ConnPid] = rabbit_networking:connections(),
+ ok = control_action(close_connection, [rabbit_misc:pid_to_string(ConnPid),
+ "go away"]),
+
+ %% list channels
+ ok = info_action(list_channels, rabbit_channel:info_keys(), false),
+
+ %% list consumers
+ ok = control_action(list_consumers, []),
+
+ %% cleanup
+ [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
+ ok = rabbit_channel:shutdown(Ch),
passed.