diff options
Diffstat (limited to 'src/rabbit_control_main.erl')
-rw-r--r-- | src/rabbit_control_main.erl | 739 |
1 files changed, 739 insertions, 0 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl new file mode 100644 index 00000000..6f36f99d --- /dev/null +++ b/src/rabbit_control_main.erl @@ -0,0 +1,739 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_control_main). +-include("rabbit.hrl"). + +-export([start/0, stop/0, parse_arguments/2, action/5, + sync_queue/1, cancel_sync_queue/1]). + +-define(RPC_TIMEOUT, infinity). +-define(EXTERNAL_CHECK_INTERVAL, 1000). + +-define(QUIET_OPT, "-q"). +-define(NODE_OPT, "-n"). +-define(VHOST_OPT, "-p"). +-define(PRIORITY_OPT, "--priority"). +-define(APPLY_TO_OPT, "--apply-to"). +-define(RAM_OPT, "--ram"). +-define(OFFLINE_OPT, "--offline"). + +-define(QUIET_DEF, {?QUIET_OPT, flag}). +-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). +-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}). +-define(PRIORITY_DEF, {?PRIORITY_OPT, {option, "0"}}). +-define(APPLY_TO_DEF, {?APPLY_TO_OPT, {option, "all"}}). +-define(RAM_DEF, {?RAM_OPT, flag}). +-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}). + +-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]). + +-define(COMMANDS, + [stop, + stop_app, + start_app, + wait, + reset, + force_reset, + rotate_logs, + + {join_cluster, [?RAM_DEF]}, + change_cluster_node_type, + update_cluster_nodes, + {forget_cluster_node, [?OFFLINE_DEF]}, + cluster_status, + {sync_queue, [?VHOST_DEF]}, + {cancel_sync_queue, [?VHOST_DEF]}, + + add_user, + delete_user, + change_password, + clear_password, + set_user_tags, + list_users, + + add_vhost, + delete_vhost, + list_vhosts, + {set_permissions, [?VHOST_DEF]}, + {clear_permissions, [?VHOST_DEF]}, + {list_permissions, [?VHOST_DEF]}, + list_user_permissions, + + {set_parameter, [?VHOST_DEF]}, + {clear_parameter, [?VHOST_DEF]}, + {list_parameters, [?VHOST_DEF]}, + + {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, + {clear_policy, [?VHOST_DEF]}, + {list_policies, [?VHOST_DEF]}, + + {list_queues, [?VHOST_DEF]}, + {list_exchanges, [?VHOST_DEF]}, + {list_bindings, [?VHOST_DEF]}, + {list_connections, [?VHOST_DEF]}, + list_channels, + {list_consumers, [?VHOST_DEF]}, + status, + environment, + report, + eval, + + close_connection, + {trace_on, [?VHOST_DEF]}, + {trace_off, [?VHOST_DEF]}, + set_vm_memory_high_watermark + ]). + +-define(GLOBAL_QUERIES, + [{"Connections", rabbit_networking, connection_info_all, + connection_info_keys}, + {"Channels", rabbit_channel, info_all, info_keys}]). + +-define(VHOST_QUERIES, + [{"Queues", rabbit_amqqueue, info_all, info_keys}, + {"Exchanges", rabbit_exchange, info_all, info_keys}, + {"Bindings", rabbit_binding, info_all, info_keys}, + {"Consumers", rabbit_amqqueue, consumers_all, consumer_info_keys}, + {"Permissions", rabbit_auth_backend_internal, list_vhost_permissions, + vhost_perms_info_keys}, + {"Policies", rabbit_policy, list_formatted, info_keys}, + {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> no_return()). +-spec(stop/0 :: () -> 'ok'). +-spec(action/5 :: + (atom(), node(), [string()], [{string(), any()}], + fun ((string(), [any()]) -> 'ok')) + -> 'ok'). +-spec(usage/0 :: () -> no_return()). + +-endif. + +%%---------------------------------------------------------------------------- + +start() -> + {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), + {Command, Opts, Args} = + case parse_arguments(init:get_plain_arguments(), NodeStr) of + {ok, Res} -> Res; + no_command -> print_error("could not recognise command", []), + usage() + end, + Quiet = proplists:get_bool(?QUIET_OPT, Opts), + Node = proplists:get_value(?NODE_OPT, Opts), + Inform = case Quiet of + true -> fun (_Format, _Args1) -> ok end; + false -> fun (Format, Args1) -> + io:format(Format ++ " ...~n", Args1) + end + end, + PrintInvalidCommandError = + fun () -> + print_error("invalid command '~s'", + [string:join([atom_to_list(Command) | Args], " ")]) + end, + + %% The reason we don't use a try/catch here is that rpc:call turns + %% thrown errors into normal return values + case catch action(Command, Node, Args, Opts, Inform) of + ok -> + case Quiet of + true -> ok; + false -> io:format("...done.~n") + end, + rabbit_misc:quit(0); + {ok, Info} -> + case Quiet of + true -> ok; + false -> io:format("...done (~p).~n", [Info]) + end, + rabbit_misc:quit(0); + {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15 + PrintInvalidCommandError(), + usage(); + {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15 + PrintInvalidCommandError(), + usage(); + {'EXIT', {badarg, _}} -> + print_error("invalid parameter: ~p", [Args]), + usage(); + {error, {Problem, Reason}} when is_atom(Problem), is_binary(Reason) -> + %% We handle this common case specially to avoid ~p since + %% that has i18n issues + print_error("~s: ~s", [Problem, Reason]), + rabbit_misc:quit(2); + {error, Reason} -> + print_error("~p", [Reason]), + rabbit_misc:quit(2); + {error_string, Reason} -> + print_error("~s", [Reason]), + rabbit_misc:quit(2); + {badrpc, {'EXIT', Reason}} -> + print_error("~p", [Reason]), + rabbit_misc:quit(2); + {badrpc, Reason} -> + print_error("unable to connect to node ~w: ~w", [Node, Reason]), + print_badrpc_diagnostics(Node), + rabbit_misc:quit(2); + Other -> + print_error("~p", [Other]), + rabbit_misc:quit(2) + end. + +fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). + +print_report(Node, {Descr, Module, InfoFun, KeysFun}) -> + io:format("~s:~n", [Descr]), + print_report0(Node, {Module, InfoFun, KeysFun}, []). + +print_report(Node, {Descr, Module, InfoFun, KeysFun}, VHostArg) -> + io:format("~s on ~s:~n", [Descr, VHostArg]), + print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg). + +print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) -> + case rpc_call(Node, Module, InfoFun, VHostArg) of + [_|_] = Results -> InfoItems = rpc_call(Node, Module, KeysFun, []), + display_row([atom_to_list(I) || I <- InfoItems]), + display_info_list(Results, InfoItems); + _ -> ok + end, + io:nl(). + +print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). + +print_badrpc_diagnostics(Node) -> + fmt_stderr(rabbit_nodes:diagnostics([Node]), []). + +stop() -> + ok. + +usage() -> + io:format("~s", [rabbit_ctl_usage:usage()]), + rabbit_misc:quit(1). + +parse_arguments(CmdLine, NodeStr) -> + case rabbit_misc:parse_arguments( + ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of + {ok, {Cmd, Opts0, Args}} -> + Opts = [case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; + _ -> {K, V} + end || {K, V} <- Opts0], + {ok, {Cmd, Opts, Args}}; + E -> + E + end. + +%%---------------------------------------------------------------------------- + +action(stop, Node, Args, _Opts, Inform) -> + Inform("Stopping and halting node ~p", [Node]), + Res = call(Node, {rabbit, stop_and_halt, []}), + case {Res, Args} of + {ok, [PidFile]} -> wait_for_process_death( + read_pid_file(PidFile, false)); + {ok, [_, _| _]} -> exit({badarg, Args}); + _ -> ok + end, + Res; + +action(stop_app, Node, [], _Opts, Inform) -> + Inform("Stopping node ~p", [Node]), + call(Node, {rabbit, stop, []}); + +action(start_app, Node, [], _Opts, Inform) -> + Inform("Starting node ~p", [Node]), + call(Node, {rabbit, start, []}); + +action(reset, Node, [], _Opts, Inform) -> + Inform("Resetting node ~p", [Node]), + call(Node, {rabbit_mnesia, reset, []}); + +action(force_reset, Node, [], _Opts, Inform) -> + Inform("Forcefully resetting node ~p", [Node]), + call(Node, {rabbit_mnesia, force_reset, []}); + +action(join_cluster, Node, [ClusterNodeS], Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + NodeType = case proplists:get_bool(?RAM_OPT, Opts) of + true -> ram; + false -> disc + end, + Inform("Clustering node ~p with ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType]); + +action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) -> + Inform("Turning ~p into a ram node", [Node]), + rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]); +action(change_cluster_node_type, Node, [Type], _Opts, Inform) + when Type =:= "disc" orelse Type =:= "disk" -> + Inform("Turning ~p into a disc node", [Node]), + rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]); + +action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]); + +action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts), + Inform("Removing node ~p from cluster", [ClusterNode]), + rpc_call(Node, rabbit_mnesia, forget_cluster_node, + [ClusterNode, RemoveWhenOffline]); + +action(sync_queue, Node, [Q], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)), + Inform("Synchronising ~s", [rabbit_misc:rs(QName)]), + rpc_call(Node, rabbit_control_main, sync_queue, [QName]); + +action(cancel_sync_queue, Node, [Q], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)), + Inform("Stopping synchronising ~s", [rabbit_misc:rs(QName)]), + rpc_call(Node, rabbit_control_main, cancel_sync_queue, [QName]); + +action(wait, Node, [PidFile], _Opts, Inform) -> + Inform("Waiting for ~p", [Node]), + wait_for_application(Node, PidFile, rabbit_and_plugins, Inform); +action(wait, Node, [PidFile, App], _Opts, Inform) -> + Inform("Waiting for ~p on ~p", [App, Node]), + wait_for_application(Node, PidFile, list_to_atom(App), Inform); + +action(status, Node, [], _Opts, Inform) -> + Inform("Status of node ~p", [Node]), + display_call_result(Node, {rabbit, status, []}); + +action(cluster_status, Node, [], _Opts, Inform) -> + Inform("Cluster status of node ~p", [Node]), + display_call_result(Node, {rabbit_mnesia, status, []}); + +action(environment, Node, _App, _Opts, Inform) -> + Inform("Application environment of node ~p", [Node]), + display_call_result(Node, {rabbit, environment, []}); + +action(rotate_logs, Node, [], _Opts, Inform) -> + Inform("Reopening logs for node ~p", [Node]), + call(Node, {rabbit, rotate_logs, [""]}); +action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) -> + Inform("Rotating logs to files with suffix \"~s\"", [Suffix]), + call(Node, {rabbit, rotate_logs, Args}); + +action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> + Inform("Closing connection \"~s\"", [PidStr]), + rpc_call(Node, rabbit_networking, close_connection, + [rabbit_misc:string_to_pid(PidStr), Explanation]); + +action(add_user, Node, Args = [Username, _Password], _Opts, Inform) -> + Inform("Creating user \"~s\"", [Username]), + call(Node, {rabbit_auth_backend_internal, add_user, Args}); + +action(delete_user, Node, Args = [_Username], _Opts, Inform) -> + Inform("Deleting user \"~s\"", Args), + call(Node, {rabbit_auth_backend_internal, delete_user, Args}); + +action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> + Inform("Changing password for user \"~s\"", [Username]), + call(Node, {rabbit_auth_backend_internal, change_password, Args}); + +action(clear_password, Node, Args = [Username], _Opts, Inform) -> + Inform("Clearing password for user \"~s\"", [Username]), + call(Node, {rabbit_auth_backend_internal, clear_password, Args}); + +action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) -> + Tags = [list_to_atom(T) || T <- TagsStr], + Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]), + rpc_call(Node, rabbit_auth_backend_internal, set_tags, + [list_to_binary(Username), Tags]); + +action(list_users, Node, [], _Opts, Inform) -> + Inform("Listing users", []), + display_info_list( + call(Node, {rabbit_auth_backend_internal, list_users, []}), + rabbit_auth_backend_internal:user_info_keys()); + +action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> + Inform("Creating vhost \"~s\"", Args), + call(Node, {rabbit_vhost, add, Args}); + +action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> + Inform("Deleting vhost \"~s\"", Args), + call(Node, {rabbit_vhost, delete, Args}); + +action(list_vhosts, Node, Args, _Opts, Inform) -> + Inform("Listing vhosts", []), + ArgAtoms = default_if_empty(Args, [name]), + display_info_list(call(Node, {rabbit_vhost, info_all, []}), ArgAtoms); + +action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) -> + Inform("Listing permissions for user ~p", Args), + display_info_list(call(Node, {rabbit_auth_backend_internal, + list_user_permissions, Args}), + rabbit_auth_backend_internal:user_perms_info_keys()); + +action(list_queues, Node, Args, Opts, Inform) -> + Inform("Listing queues", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, messages]), + display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, + [VHostArg, ArgAtoms]), + ArgAtoms); + +action(list_exchanges, Node, Args, Opts, Inform) -> + Inform("Listing exchanges", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, type]), + display_info_list(rpc_call(Node, rabbit_exchange, info_all, + [VHostArg, ArgAtoms]), + ArgAtoms); + +action(list_bindings, Node, Args, Opts, Inform) -> + Inform("Listing bindings", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [source_name, source_kind, + destination_name, destination_kind, + routing_key, arguments]), + display_info_list(rpc_call(Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms]), + ArgAtoms); + +action(list_connections, Node, Args, _Opts, Inform) -> + Inform("Listing connections", []), + ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), + display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, + [ArgAtoms]), + ArgAtoms); + +action(list_channels, Node, Args, _Opts, Inform) -> + Inform("Listing channels", []), + ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, + messages_unacknowledged]), + display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), + ArgAtoms); + +action(list_consumers, Node, _Args, Opts, Inform) -> + Inform("Listing consumers", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg]), + rabbit_amqqueue:consumer_info_keys()); + +action(trace_on, Node, [], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Starting tracing for vhost \"~s\"", [VHost]), + rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]); + +action(trace_off, Node, [], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Stopping tracing for vhost \"~s\"", [VHost]), + rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); + +action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> + Frac = list_to_float(case string:chr(Arg, $.) of + 0 -> Arg ++ ".0"; + _ -> Arg + end), + Inform("Setting memory threshold on ~p to ~p", [Node, Frac]), + rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]); + +action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Setting permissions for user \"~s\" in vhost \"~s\"", + [Username, VHost]), + call(Node, {rabbit_auth_backend_internal, set_permissions, + [Username, VHost, CPerm, WPerm, RPerm]}); + +action(clear_permissions, Node, [Username], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Clearing permissions for user \"~s\" in vhost \"~s\"", + [Username, VHost]), + call(Node, {rabbit_auth_backend_internal, clear_permissions, + [Username, VHost]}); + +action(list_permissions, Node, [], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Listing permissions in vhost \"~s\"", [VHost]), + display_info_list(call(Node, {rabbit_auth_backend_internal, + list_vhost_permissions, [VHost]}), + rabbit_auth_backend_internal:vhost_perms_info_keys()); + +action(set_parameter, Node, [Component, Key, Value], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Setting runtime parameter ~p for component ~p to ~p", + [Key, Component, Value]), + rpc_call(Node, rabbit_runtime_parameters, parse_set, + [VHostArg, list_to_binary(Component), list_to_binary(Key), Value]); + +action(clear_parameter, Node, [Component, Key], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]), + rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg, + list_to_binary(Component), + list_to_binary(Key)]); + +action(list_parameters, Node, [], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing runtime parameters", []), + display_info_list( + rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]), + rabbit_runtime_parameters:info_keys()); + +action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> + Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p", + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts), + ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)), + Inform(Msg, [Key, Pattern, Defn, PriorityArg]), + rpc_call( + Node, rabbit_policy, parse_set, + [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]); + +action(clear_policy, Node, [Key], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing policy ~p", [Key]), + rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); + +action(list_policies, Node, [], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing policies", []), + display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]), + rabbit_policy:info_keys()); + +action(report, Node, _Args, _Opts, Inform) -> + Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), + [begin ok = action(Action, N, [], [], Inform), io:nl() end || + N <- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]), + Action <- [status, cluster_status, environment]], + VHosts = unsafe_rpc(Node, rabbit_vhost, list, []), + [print_report(Node, Q) || Q <- ?GLOBAL_QUERIES], + [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts], + ok; + +action(eval, Node, [Expr], _Opts, _Inform) -> + case erl_scan:string(Expr) of + {ok, Scanned, _} -> + case erl_parse:parse_exprs(Scanned) of + {ok, Parsed} -> {value, Value, _} = + unsafe_rpc( + Node, erl_eval, exprs, [Parsed, []]), + io:format("~p~n", [Value]), + ok; + {error, E} -> {error_string, format_parse_error(E)} + end; + {error, E, _} -> + {error_string, format_parse_error(E)} + end. + +format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). + +sync_queue(Q) -> + rabbit_amqqueue:with( + Q, fun(#amqqueue{pid = QPid}) -> rabbit_amqqueue:sync_mirrors(QPid) end). + +cancel_sync_queue(Q) -> + rabbit_amqqueue:with( + Q, fun(#amqqueue{pid = QPid}) -> + rabbit_amqqueue:cancel_sync_mirrors(QPid) + end). + +%%---------------------------------------------------------------------------- + +wait_for_application(Node, PidFile, Application, Inform) -> + Pid = read_pid_file(PidFile, true), + Inform("pid is ~s", [Pid]), + wait_for_application(Node, Pid, Application). + +wait_for_application(Node, Pid, rabbit_and_plugins) -> + wait_for_startup(Node, Pid); +wait_for_application(Node, Pid, Application) -> + while_process_is_alive( + Node, Pid, fun() -> rabbit_nodes:is_running(Node, Application) end). + +wait_for_startup(Node, Pid) -> + while_process_is_alive( + Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end). + +while_process_is_alive(Node, Pid, Activity) -> + case process_up(Pid) of + true -> case Activity() of + true -> ok; + false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), + while_process_is_alive(Node, Pid, Activity) + end; + false -> {error, process_not_running} + end. + +wait_for_process_death(Pid) -> + case process_up(Pid) of + true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), + wait_for_process_death(Pid); + false -> ok + end. + +read_pid_file(PidFile, Wait) -> + case {file:read_file(PidFile), Wait} of + {{ok, Bin}, _} -> + S = binary_to_list(Bin), + {match, [PidS]} = re:run(S, "[^\\s]+", + [{capture, all, list}]), + try list_to_integer(PidS) + catch error:badarg -> + exit({error, {garbage_in_pid_file, PidFile}}) + end, + PidS; + {{error, enoent}, true} -> + timer:sleep(?EXTERNAL_CHECK_INTERVAL), + read_pid_file(PidFile, Wait); + {{error, _} = E, _} -> + exit({error, {could_not_read_pid, E}}) + end. + +% Test using some OS clunkiness since we shouldn't trust +% rpc:call(os, getpid, []) at this point +process_up(Pid) -> + with_os([{unix, fun () -> + run_ps(Pid) =:= 0 + end}, + {win32, fun () -> + Cmd = "tasklist /nh /fi \"pid eq " ++ Pid ++ "\" ", + Res = rabbit_misc:os_cmd(Cmd ++ "2>&1"), + case re:run(Res, "erl\\.exe", [{capture, none}]) of + match -> true; + _ -> false + end + end}]). + +with_os(Handlers) -> + {OsFamily, _} = os:type(), + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() + end. + +run_ps(Pid) -> + Port = erlang:open_port({spawn, "ps -p " ++ Pid}, + [exit_status, {line, 16384}, + use_stdio, stderr_to_stdout]), + exit_loop(Port). + +exit_loop(Port) -> + receive + {Port, {exit_status, Rc}} -> Rc; + {Port, _} -> exit_loop(Port) + end. + +%%---------------------------------------------------------------------------- + +default_if_empty(List, Default) when is_list(List) -> + if List == [] -> Default; + true -> [list_to_atom(X) || X <- List] + end. + +display_info_list(Results, InfoItemKeys) when is_list(Results) -> + lists:foreach( + fun (Result) -> display_row( + [format_info_item(proplists:get_value(X, Result)) || + X <- InfoItemKeys]) + end, lists:sort(Results)), + ok; +display_info_list(Other, _) -> + Other. + +display_row(Row) -> + io:fwrite(string:join(Row, "\t")), + io:nl(). + +-define(IS_U8(X), (X >= 0 andalso X =< 255)). +-define(IS_U16(X), (X >= 0 andalso X =< 65535)). + +format_info_item(#resource{name = Name}) -> + escape(Name); +format_info_item({N1, N2, N3, N4} = Value) when + ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) -> + rabbit_misc:ntoa(Value); +format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when + ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4), + ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) -> + rabbit_misc:ntoa(Value); +format_info_item(Value) when is_pid(Value) -> + rabbit_misc:pid_to_string(Value); +format_info_item(Value) when is_binary(Value) -> + escape(Value); +format_info_item(Value) when is_atom(Value) -> + escape(atom_to_list(Value)); +format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = + Value) when is_binary(TableEntryKey) andalso + is_atom(TableEntryType) -> + io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item([T | _] = Value) + when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse + is_list(T) -> + "[" ++ + lists:nthtail(2, lists:append( + [", " ++ format_info_item(E) || E <- Value])) ++ "]"; +format_info_item(Value) -> + io_lib:format("~w", [Value]). + +display_call_result(Node, MFA) -> + case call(Node, MFA) of + {badrpc, _} = Res -> throw(Res); + Res -> io:format("~p~n", [Res]), + ok + end. + +unsafe_rpc(Node, Mod, Fun, Args) -> + case rpc_call(Node, Mod, Fun, Args) of + {badrpc, _} = Res -> throw(Res); + Normal -> Normal + end. + +call(Node, {Mod, Fun, Args}) -> + rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)). + +rpc_call(Node, Mod, Fun, Args) -> + rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). + +%% escape does C-style backslash escaping of non-printable ASCII +%% characters. We don't escape characters above 127, since they may +%% form part of UTF-8 strings. + +escape(Atom) when is_atom(Atom) -> escape(atom_to_list(Atom)); +escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin)); +escape(L) when is_list(L) -> escape_char(lists:reverse(L), []). + +escape_char([$\\ | T], Acc) -> + escape_char(T, [$\\, $\\ | Acc]); +escape_char([X | T], Acc) when X >= 32, X /= 127 -> + escape_char(T, [X | Acc]); +escape_char([X | T], Acc) -> + escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3), + $0 + (X band 7) | Acc]); +escape_char([], Acc) -> + Acc. + +prettify_amqp_table(Table) -> + [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table]. + +prettify_typed_amqp_value(longstr, Value) -> escape(Value); +prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value); +prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) || + {T, V} <- Value]; +prettify_typed_amqp_value(_Type, Value) -> Value. |