summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-02-05 14:00:38 +0000
committerMatthew Sackman <matthew@lshift.net>2010-02-05 14:00:38 +0000
commit3cd3130cb5a0f3c2ee6e265fd438d159cac3e4fa (patch)
tree1650197093118214aeb32c4740a47fff253ed592
parentab5f89cc1a7f19cab21034cbed5dac1aac1817fa (diff)
parent1511c73a4ae414dd2ea87aa79e76bee495dd2db8 (diff)
downloadrabbitmq-server-3cd3130cb5a0f3c2ee6e265fd438d159cac3e4fa.tar.gz
Merging bug 22309 onto default
-rw-r--r--docs/rabbitmqctl.1.pod7
-rw-r--r--src/rabbit_control.erl18
-rw-r--r--src/rabbit_misc.erl62
-rw-r--r--src/rabbit_reader.erl29
-rw-r--r--src/rabbit_tests.erl7
5 files changed, 105 insertions, 18 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 24efbded..0c95841e 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -98,6 +98,13 @@ nodes determined by I<clusternode> option(s). See
L<http://www.rabbitmq.com/clustering.html> for more information about
clustering.
+=item close_connection I<connectionpid> I<explanation>
+
+Instruct the broker to close the connection associated with the Erlang
+process id I<connectionpid> (see also the I<list_connections>
+command), passing the I<explanation> string to the connected client as
+part of the AMQP connection shutdown protocol.
+
=back
=head2 USER MANAGEMENT
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index a3f0341a..8a891205 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -81,6 +81,9 @@ start() ->
{error, Reason} ->
error("~p", [Reason]),
halt(2);
+ {badrpc, {'EXIT', Reason}} ->
+ error("~p", [Reason]),
+ halt(2);
{badrpc, Reason} ->
error("unable to connect to node ~w: ~w", [Node, Reason]),
print_badrpc_diagnostics(Node),
@@ -139,6 +142,7 @@ Available commands:
cluster <ClusterNode> ...
status
rotate_logs [Suffix]
+ close_connection <ConnectionPid> <ExplanationString>
add_user <UserName> <Password>
delete_user <UserName>
@@ -243,6 +247,11 @@ action(rotate_logs, Node, Args = [Suffix], Inform) ->
Inform("Rotating logs to files with suffix ~p", [Suffix]),
call(Node, {rabbit, rotate_logs, Args});
+action(close_connection, Node, [PidStr, Explanation], Inform) ->
+ Inform("Closing connection ~s", [PidStr]),
+ rpc_call(Node, rabbit_reader, shutdown,
+ [rabbit_misc:string_to_pid(PidStr), Explanation]);
+
action(add_user, Node, Args = [Username, _Password], Inform) ->
Inform("Creating user ~p", [Username]),
call(Node, {rabbit_access_control, add_user, Args});
@@ -373,7 +382,7 @@ format_info_item(Key, Items) ->
is_tuple(Value) ->
inet_parse:ntoa(Value);
Value when is_pid(Value) ->
- pid_to_string(Value);
+ rabbit_misc:pid_to_string(Value);
Value when is_binary(Value) ->
escape(Value);
Value when is_atom(Value) ->
@@ -431,10 +440,3 @@ prettify_typed_amqp_value(Type, Value) ->
array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
_ -> Value
end.
-
-%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7)
-pid_to_string(Pid) ->
- <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
- = term_to_binary(Pid),
- Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 172e27f4..92d03789 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
+-export([pid_to_string/1, string_to_pid/1]).
-import(mnesia).
-import(lists).
@@ -127,6 +128,8 @@
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
+-spec(pid_to_string/1 :: (pid()) -> string()).
+-spec(string_to_pid/1 :: (string()) -> pid()).
-endif.
@@ -499,3 +502,62 @@ queue_fold(Fun, Init, Q) ->
{empty, _Q} -> Init;
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
end.
+
+%% This provides a string representation of a pid that is the same
+%% regardless of what node we are running on. The representation also
+%% permits easy identification of the pid's node.
+pid_to_string(Pid) when is_pid(Pid) ->
+ %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
+ %% 8.7)
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ = term_to_binary(Pid),
+ Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
+
+%% inverse of above
+string_to_pid(Str) ->
+ ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end,
+ %% TODO: simplify this code by using the 're' module, once we drop
+ %% support for R11
+ %%
+ %% 1) sanity check
+ %% The \ before the trailing $ is only there to keep emacs
+ %% font-lock from getting confused.
+ case regexp:first_match(Str, "^<.*\\.[0-9]+\\.[0-9]+>\$") of
+ {match, _, _} ->
+ %% 2) strip <>
+ Str1 = string:substr(Str, 2, string:len(Str) - 2),
+ %% 3) extract three constituent parts, taking care to
+ %% handle dots in the node part (hence the reverse and concat)
+ [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")),
+ NodeStr = lists:concat(lists:reverse(Rest)),
+ %% 4) construct a triple term from the three parts
+ TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.",
+ [NodeStr, IdStr, SerStr])),
+ %% 5) parse the triple
+ Tokens = case erl_scan:string(TripleStr) of
+ {ok, Tokens1, _} -> Tokens1;
+ {error, _, _} -> ErrorFun()
+ end,
+ Term = case erl_parse:parse_term(Tokens) of
+ {ok, Term1} -> Term1;
+ {error, _} -> ErrorFun()
+ end,
+ {Node, Id, Ser} =
+ case Term of
+ {Node1, Id1, Ser1} when is_atom(Node1) andalso
+ is_integer(Id1) andalso
+ is_integer(Ser1) ->
+ Term;
+ _ ->
+ ErrorFun()
+ end,
+ %% 6) turn the triple into a pid - see pid_to_string
+ <<131,NodeEnc/binary>> = term_to_binary(Node),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
+ nomatch ->
+ ErrorFun();
+ Error ->
+ %% invalid regexp - shouldn't happen
+ throw(Error)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f5bdb985..d0d8860f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,7 +33,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info_keys/0, info/1, info/2]).
+-export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -132,6 +132,7 @@
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-endif.
@@ -140,6 +141,9 @@
start_link() ->
{ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+shutdown(Pid, Explanation) ->
+ gen_server:call(Pid, {shutdown, Explanation}, infinity).
+
init(Parent) ->
Deb = sys:debug_options([]),
receive
@@ -267,13 +271,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
{'EXIT', Parent, Reason} ->
- if State#v1.connection_state =:= running ->
- send_exception(State, 0,
- rabbit_misc:amqp_error(connection_forced,
- "broker forced connection closure with reason '~w'",
- [Reason], none));
- true -> ok
- end,
+ terminate(io_lib:format("broker forced connection closure "
+ "with reason '~w'", [Reason]), State),
%% this is what we are expected to do according to
%% http://www.erlang.org/doc/man/sys.html
%%
@@ -301,6 +300,13 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
timeout ->
throw({timeout, State#v1.connection_state});
+ {'$gen_call', From, {shutdown, Explanation}} ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> ok;
+ normal -> mainloop(Parent, Deb, NewState)
+ end;
{'$gen_call', From, info} ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
mainloop(Parent, Deb, State);
@@ -323,6 +329,13 @@ switch_callback(OldState, NewCallback, Length) ->
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
+terminate(Explanation, State = #v1{connection_state = running}) ->
+ {normal, send_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
+terminate(_Explanation, State) ->
+ {force, State}.
+
close_connection(State = #v1{connection = #connection{
timeout_sec = TimeoutSec}}) ->
%% We terminate the connection after the specified interval, but
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d4eb3ade..61f6d816 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -734,11 +734,14 @@ test_server_status() ->
[L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
N =:= node()],
- {ok, C} = gen_tcp:connect(H, P, []),
+ {ok, _C} = gen_tcp:connect(H, P, []),
timer:sleep(100),
ok = info_action(list_connections,
rabbit_networking:connection_info_keys(), false),
- ok = gen_tcp:close(C),
+ %% close_connection
+ [ConnPid] = rabbit_networking:connections(),
+ ok = control_action(close_connection, [rabbit_misc:pid_to_string(ConnPid),
+ "go away"]),
%% list channels
Writer = spawn(fun () -> receive shutdown -> ok end end),