diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-07-29 13:26:43 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-07-29 13:26:43 +0100 |
commit | 3e96fcafe76f0df5df315c4fbf88e666ff7461c4 (patch) | |
tree | a62808817909cdaf948eda78a27fa07a04e36323 | |
parent | 3fcd2b90cade09d9906b2b656570a767be5ba206 (diff) | |
parent | 0fa2b4f6be0580f2fa76eb80d272ae3b37850ead (diff) | |
download | rabbitmq-server-3e96fcafe76f0df5df315c4fbf88e666ff7461c4.tar.gz |
Merge in bug26307 and update.
-rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
-rw-r--r-- | src/pmon.erl | 22 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 19 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 28 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 11 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 7 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 1 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 38 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 37 | ||||
-rw-r--r-- | src/vm_memory_monitor_tests.erl | 35 |
11 files changed, 165 insertions, 48 deletions
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 27c882df..0d5ec7ce 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: RabbitMQ Team <packaging@rabbitmq.com> -Uploaders: Emile Joubert <emile@rabbitmq.com> +Uploaders: Blair Hester <bhester@gopivotal.com> Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:13.b.3), erlang-src (>= 1:13.b.3), unzip, zip Standards-Version: 3.9.2 diff --git a/src/pmon.erl b/src/pmon.erl index ae1be40c..de3e9fea 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -54,8 +54,13 @@ new(Module) -> #state{dict = dict:new(), monitor(Item, S = #state{dict = M, module = Module}) -> case dict:is_key(Item, M) of true -> S; - false -> S#state{dict = dict:store( - Item, Module:monitor(process, Item), M)} + false -> case node_alive_shortcut(Item) of + true -> Ref = Module:monitor(process, Item), + S#state{dict = dict:store(Item, Ref, M)}; + false -> self() ! {'DOWN', fake_ref, process, Item, + nodedown}, + S + end end. monitor_all([], S) -> S; %% optimisation @@ -76,3 +81,16 @@ erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}. monitored(#state{dict = M}) -> dict:fetch_keys(M). is_empty(#state{dict = M}) -> dict:size(M) == 0. + +%%---------------------------------------------------------------------------- + +%% We check here to see if the node is alive in order to avoid trying +%% to connect to it if it isn't - this can cause substantial +%% slowdowns. We can't perform this shortcut if passed {Name, Node} +%% since we would need to convert that into a pid for the fake 'DOWN' +%% message, so we always return true here - but that's OK, it's just +%% an optimisation. +node_alive_shortcut(P) when is_pid(P) -> + lists:member(node(P), [node() | nodes()]); +node_alive_shortcut({_Name, _Node}) -> + true. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 04a38b78..8f44d761 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -103,7 +103,8 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). -info_keys() -> ?INFO_KEYS. +info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys(). +statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- @@ -824,17 +825,15 @@ i(down_slave_nodes, #q{q = #amqqueue{name = Name, end; i(state, #q{status = running}) -> credit_flow:state(); i(state, #q{status = State}) -> State; -i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> - BQ:status(BQS); -i(Item, _) -> - throw({bad_argument, Item}). +i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:info(Item, BQS). emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> ExtraKs = [K || {K, _} <- Extra], - Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State), + Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State), not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). @@ -934,7 +933,7 @@ handle_call({init, Recover}, From, end; handle_call(info, _From, State) -> - reply(infos(?INFO_KEYS, State), State); + reply(infos(info_keys(), State), State); handle_call({info, Items}, _From, State) -> try diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 8f37bf60..595a05d3 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -16,6 +16,13 @@ -module(rabbit_backing_queue). +-export([info_keys/0]). + +-define(INFO_KEYS, [messages_ram, messages_ready_ram, + messages_unacknowledged_ram, messages_persistent, + message_bytes, message_bytes_ram, message_bytes_persistent, + backing_queue_status]). + -ifdef(use_specs). %% We can't specify a per-queue ack/state with callback signatures @@ -37,6 +44,8 @@ -type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)). -type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). + %% Called on startup with a list of durable queue names. The queues %% aren't being started at this point, but this call allows the %% backing queue to perform any checking necessary for the consistency @@ -216,9 +225,7 @@ %% inbound messages and outbound messages at the moment. -callback msg_rates(state()) -> {float(), float()}. -%% Exists for debugging purposes, to be able to expose state via -%% rabbitmqctl list_queues backing_queue_status --callback status(state()) -> [{atom(), any()}]. +-callback info(atom(), state()) -> any(). %% Passed a function to be invoked with the relevant backing queue's %% state. Useful for when the backing queue or other components need @@ -243,9 +250,11 @@ behaviour_info(callbacks) -> {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {status, 1}, - {invoke, 3}, {is_duplicate, 2}] ; + {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {info_keys, 0}, + {infos, 2}, {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. -endif. + +info_keys() -> ?INFO_KEYS. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index b8b6afad..70fe10e6 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -115,6 +115,12 @@ {"Policies", rabbit_policy, list_formatted, info_keys}, {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]). +-define(COMMANDS_NOT_REQUIRING_APP, + [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs, + join_cluster, change_cluster_node_type, update_cluster_nodes, + forget_cluster_node, cluster_status, status, environment, eval, + force_boot]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -156,7 +162,7 @@ start() -> %% The reason we don't use a try/catch here is that rpc:call turns %% thrown errors into normal return values - case catch action(Command, Node, Args, Opts, Inform) of + case catch do_action(Command, Node, Args, Opts, Inform) of ok -> case Quiet of true -> ok; @@ -251,6 +257,15 @@ parse_arguments(CmdLine, NodeStr) -> %%---------------------------------------------------------------------------- +do_action(Command, Node, Args, Opts, Inform) -> + case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of + false -> case ensure_app_running(Node) of + ok -> action(Command, Node, Args, Opts, Inform); + E -> E + end; + true -> action(Command, Node, Args, Opts, Inform) + end. + action(stop, Node, Args, _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), Res = call(Node, {rabbit, stop_and_halt, []}), @@ -744,6 +759,17 @@ unsafe_rpc(Node, Mod, Fun, Args) -> Normal -> Normal end. +ensure_app_running(Node) -> + case call(Node, {rabbit, is_running, []}) of + true -> ok; + false -> {error_string, + rabbit_misc:format( + "rabbit application is not running on node ~s.~n" + " * Suggestion: start it with \"rabbitmqctl start_app\" " + "and try again", [Node])}; + Other -> Other + end. + call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 24b22d4c..9bccf5dd 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, - msg_rates/1, status/1, invoke/3, is_duplicate/2]). + msg_rates/1, info/2, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -374,10 +374,13 @@ resume(State = #state { backing_queue = BQ, msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:msg_rates(BQS). -status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQ:status(BQS) ++ +info(backing_queue_status, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:info(backing_queue_status, BQS) ++ [ {mirror_seen, dict:size(State #state.seen_status)}, - {mirror_senders, sets:size(State #state.known_senders)} ]. + {mirror_senders, sets:size(State #state.known_senders)} ]; +info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:info(Item, BQS). invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 180993a5..09355f3f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -912,8 +912,13 @@ ntoab(IP) -> _ -> "[" ++ Str ++ "]" end. +%% We try to avoid reconnecting to down nodes here; this is used in a +%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur +%% would be bad news. is_process_alive(Pid) -> - rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true. + Node = node(Pid), + lists:member(Node, [node() | nodes()]) andalso + rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true. pget(K, P) -> proplists:get_value(K, P). pget(K, P, D) -> proplists:get_value(K, P, D). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ee97603b..c96000a9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -76,6 +76,7 @@ all_tests() -> passed end), passed = test_configurable_server_properties(), + passed = vm_memory_monitor_tests:all_tests(), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bfd95b98..00a10530 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). + info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -837,18 +837,24 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, out = AvgEgressRate } }) -> {AvgIngressRate, AvgEgressRate}. -status(#vqstate { +info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> + RamMsgCount; +info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) -> + gb_trees:size(RPA); +info(messages_ram, State) -> + info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); +info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> + PersistentCount; +info(message_bytes, #vqstate{bytes = Bytes}) -> + Bytes; +info(message_bytes_ram, #vqstate{ram_msg_bytes = RamMsgBytes}) -> + RamMsgBytes; %% TODO this is really message_bytes_ready_ram! +info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> + PersistentBytes; +info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - bytes = Bytes, - ram_pending_ack = RPA, - disk_pending_ack = DPA, target_ram_count = TargetRamCount, - ram_msg_count = RamMsgCount, - ram_msg_bytes = RamMsgBytes, next_seq_id = NextSeqId, - persistent_count = PersistentCount, - persistent_bytes = PersistentBytes, rates = #rates { in = AvgIngressRate, out = AvgEgressRate, ack_in = AvgAckIngressRate, @@ -859,20 +865,14 @@ status(#vqstate { {delta , Delta}, {q3 , ?QUEUE:len(Q3)}, {q4 , ?QUEUE:len(Q4)}, - {len , Len}, - {bytes , Bytes}, - {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)}, {target_ram_count , TargetRamCount}, - {ram_msg_count , RamMsgCount}, - {ram_msg_bytes , RamMsgBytes}, - {ram_ack_count , gb_trees:size(RPA)}, {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount}, - {persistent_bytes , PersistentBytes}, {avg_ingress_rate , AvgIngressRate}, {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate} ]. + {avg_ack_egress_rate , AvgAckEgressRate} ]; +info(Item, _) -> + throw({bad_argument, Item}). invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); invoke( _, _, State) -> State. diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 5fb1e472..948956a3 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -37,6 +37,9 @@ get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1, get_memory_limit/0]). +%% for tests +-export([parse_line_linux/1]). + -define(SERVER, ?MODULE). -define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). @@ -77,7 +80,15 @@ %% Public API %%---------------------------------------------------------------------------- -get_total_memory() -> get_total_memory(os:type()). +get_total_memory() -> + try + get_total_memory(os:type()) + catch _:Error -> + rabbit_log:warning( + "Failed to get total system memory: ~n~p~n~p~n", + [Error, erlang:get_stacktrace()]), + unknown + end. get_vm_limit() -> get_vm_limit(os:type()). @@ -306,14 +317,24 @@ parse_line_mach(Line) -> {list_to_atom(Name), list_to_integer(Value)} end. -%% A line looks like "FooBar: 123456 kB" +%% A line looks like "MemTotal: 502968 kB" +%% or (with broken OS/modules) "Readahead 123456 kB" parse_line_linux(Line) -> - [Name, RHS | _Rest] = string:tokens(Line, ":"), - [Value | UnitsRest] = string:tokens(RHS, " "), - Value1 = case UnitsRest of - [] -> list_to_integer(Value); %% no units - ["kB"] -> list_to_integer(Value) * 1024 - end, + {Name, Value, UnitRest} = + case string:tokens(Line, ":") of + %% no colon in the line + [S] -> + [K, RHS] = re:split(S, "\s", [{parts, 2}, {return, list}]), + [V | Unit] = string:tokens(RHS, " "), + {K, V, Unit}; + [K, RHS | _Rest] -> + [V | Unit] = string:tokens(RHS, " "), + {K, V, Unit} + end, + Value1 = case UnitRest of + [] -> list_to_integer(Value); %% no units + ["kB"] -> list_to_integer(Value) * 1024 + end, {list_to_atom(Name), Value1}. %% A line looks like "Memory size: 1024 Megabytes" diff --git a/src/vm_memory_monitor_tests.erl b/src/vm_memory_monitor_tests.erl new file mode 100644 index 00000000..1f7cea33 --- /dev/null +++ b/src/vm_memory_monitor_tests.erl @@ -0,0 +1,35 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(vm_memory_monitor_tests). + +-export([all_tests/0]). + +%% --------------------------------------------------------------------------- +%% Tests +%% --------------------------------------------------------------------------- + +all_tests() -> + lists:foreach(fun ({S, {K, V}}) -> + {K, V} = vm_memory_monitor:parse_line_linux(S) + end, + [{"MemTotal: 0 kB", {'MemTotal', 0}}, + {"MemTotal: 502968 kB ", {'MemTotal', 515039232}}, + {"MemFree: 178232 kB", {'MemFree', 182509568}}, + {"MemTotal: 50296888", {'MemTotal', 50296888}}, + {"MemTotal 502968 kB", {'MemTotal', 515039232}}, + {"MemTotal 50296866 ", {'MemTotal', 50296866}}]), + passed. |