diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-02-05 14:00:38 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-02-05 14:00:38 +0000 |
commit | 3cd3130cb5a0f3c2ee6e265fd438d159cac3e4fa (patch) | |
tree | 1650197093118214aeb32c4740a47fff253ed592 | |
parent | ab5f89cc1a7f19cab21034cbed5dac1aac1817fa (diff) | |
parent | 1511c73a4ae414dd2ea87aa79e76bee495dd2db8 (diff) | |
download | rabbitmq-server-3cd3130cb5a0f3c2ee6e265fd438d159cac3e4fa.tar.gz |
Merging bug 22309 onto default
-rw-r--r-- | docs/rabbitmqctl.1.pod | 7 | ||||
-rw-r--r-- | src/rabbit_control.erl | 18 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 62 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 29 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 7 |
5 files changed, 105 insertions, 18 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 24efbded..0c95841e 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -98,6 +98,13 @@ nodes determined by I<clusternode> option(s). See L<http://www.rabbitmq.com/clustering.html> for more information about clustering. +=item close_connection I<connectionpid> I<explanation> + +Instruct the broker to close the connection associated with the Erlang +process id I<connectionpid> (see also the I<list_connections> +command), passing the I<explanation> string to the connected client as +part of the AMQP connection shutdown protocol. + =back =head2 USER MANAGEMENT diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index a3f0341a..8a891205 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -81,6 +81,9 @@ start() -> {error, Reason} -> error("~p", [Reason]), halt(2); + {badrpc, {'EXIT', Reason}} -> + error("~p", [Reason]), + halt(2); {badrpc, Reason} -> error("unable to connect to node ~w: ~w", [Node, Reason]), print_badrpc_diagnostics(Node), @@ -139,6 +142,7 @@ Available commands: cluster <ClusterNode> ... status rotate_logs [Suffix] + close_connection <ConnectionPid> <ExplanationString> add_user <UserName> <Password> delete_user <UserName> @@ -243,6 +247,11 @@ action(rotate_logs, Node, Args = [Suffix], Inform) -> Inform("Rotating logs to files with suffix ~p", [Suffix]), call(Node, {rabbit, rotate_logs, Args}); +action(close_connection, Node, [PidStr, Explanation], Inform) -> + Inform("Closing connection ~s", [PidStr]), + rpc_call(Node, rabbit_reader, shutdown, + [rabbit_misc:string_to_pid(PidStr), Explanation]); + action(add_user, Node, Args = [Username, _Password], Inform) -> Inform("Creating user ~p", [Username]), call(Node, {rabbit_access_control, add_user, Args}); @@ -373,7 +382,7 @@ format_info_item(Key, Items) -> is_tuple(Value) -> inet_parse:ntoa(Value); Value when is_pid(Value) -> - pid_to_string(Value); + rabbit_misc:pid_to_string(Value); Value when is_binary(Value) -> escape(Value); Value when is_atom(Value) -> @@ -431,10 +440,3 @@ prettify_typed_amqp_value(Type, Value) -> array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; _ -> Value end. - -%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7) -pid_to_string(Pid) -> - <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> - = term_to_binary(Pid), - Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), - lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 172e27f4..92d03789 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,6 +56,7 @@ -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). +-export([pid_to_string/1, string_to_pid/1]). -import(mnesia). -import(lists). @@ -127,6 +128,8 @@ -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> number()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). +-spec(pid_to_string/1 :: (pid()) -> string()). +-spec(string_to_pid/1 :: (string()) -> pid()). -endif. @@ -499,3 +502,62 @@ queue_fold(Fun, Init, Q) -> {empty, _Q} -> Init; {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. + +%% This provides a string representation of a pid that is the same +%% regardless of what node we are running on. The representation also +%% permits easy identification of the pid's node. +pid_to_string(Pid) when is_pid(Pid) -> + %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and + %% 8.7) + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> + = term_to_binary(Pid), + Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). + +%% inverse of above +string_to_pid(Str) -> + ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end, + %% TODO: simplify this code by using the 're' module, once we drop + %% support for R11 + %% + %% 1) sanity check + %% The \ before the trailing $ is only there to keep emacs + %% font-lock from getting confused. + case regexp:first_match(Str, "^<.*\\.[0-9]+\\.[0-9]+>\$") of + {match, _, _} -> + %% 2) strip <> + Str1 = string:substr(Str, 2, string:len(Str) - 2), + %% 3) extract three constituent parts, taking care to + %% handle dots in the node part (hence the reverse and concat) + [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")), + NodeStr = lists:concat(lists:reverse(Rest)), + %% 4) construct a triple term from the three parts + TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.", + [NodeStr, IdStr, SerStr])), + %% 5) parse the triple + Tokens = case erl_scan:string(TripleStr) of + {ok, Tokens1, _} -> Tokens1; + {error, _, _} -> ErrorFun() + end, + Term = case erl_parse:parse_term(Tokens) of + {ok, Term1} -> Term1; + {error, _} -> ErrorFun() + end, + {Node, Id, Ser} = + case Term of + {Node1, Id1, Ser1} when is_atom(Node1) andalso + is_integer(Id1) andalso + is_integer(Ser1) -> + Term; + _ -> + ErrorFun() + end, + %% 6) turn the triple into a pid - see pid_to_string + <<131,NodeEnc/binary>> = term_to_binary(Node), + binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); + nomatch -> + ErrorFun(); + Error -> + %% invalid regexp - shouldn't happen + throw(Error) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f5bdb985..d0d8860f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,7 +33,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info_keys/0, info/1, info/2]). +-export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -132,6 +132,7 @@ -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(shutdown/2 :: (pid(), string()) -> 'ok'). -endif. @@ -140,6 +141,9 @@ start_link() -> {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +shutdown(Pid, Explanation) -> + gen_server:call(Pid, {shutdown, Explanation}, infinity). + init(Parent) -> Deb = sys:debug_options([]), receive @@ -267,13 +271,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> {inet_async, Sock, Ref, {error, Reason}} -> throw({inet_error, Reason}); {'EXIT', Parent, Reason} -> - if State#v1.connection_state =:= running -> - send_exception(State, 0, - rabbit_misc:amqp_error(connection_forced, - "broker forced connection closure with reason '~w'", - [Reason], none)); - true -> ok - end, + terminate(io_lib:format("broker forced connection closure " + "with reason '~w'", [Reason]), State), %% this is what we are expected to do according to %% http://www.erlang.org/doc/man/sys.html %% @@ -301,6 +300,13 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end; timeout -> throw({timeout, State#v1.connection_state}); + {'$gen_call', From, {shutdown, Explanation}} -> + {ForceTermination, NewState} = terminate(Explanation, State), + gen_server:reply(From, ok), + case ForceTermination of + force -> ok; + normal -> mainloop(Parent, Deb, NewState) + end; {'$gen_call', From, info} -> gen_server:reply(From, infos(?INFO_KEYS, State)), mainloop(Parent, Deb, State); @@ -323,6 +329,13 @@ switch_callback(OldState, NewCallback, Length) -> OldState#v1{callback = NewCallback, recv_ref = Ref}. +terminate(Explanation, State = #v1{connection_state = running}) -> + {normal, send_exception(State, 0, + rabbit_misc:amqp_error( + connection_forced, Explanation, [], none))}; +terminate(_Explanation, State) -> + {force, State}. + close_connection(State = #v1{connection = #connection{ timeout_sec = TimeoutSec}}) -> %% We terminate the connection after the specified interval, but diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d4eb3ade..61f6d816 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -734,11 +734,14 @@ test_server_status() -> [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), N =:= node()], - {ok, C} = gen_tcp:connect(H, P, []), + {ok, _C} = gen_tcp:connect(H, P, []), timer:sleep(100), ok = info_action(list_connections, rabbit_networking:connection_info_keys(), false), - ok = gen_tcp:close(C), + %% close_connection + [ConnPid] = rabbit_networking:connections(), + ok = control_action(close_connection, [rabbit_misc:pid_to_string(ConnPid), + "go away"]), %% list channels Writer = spawn(fun () -> receive shutdown -> ok end end), |