summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-07-29 13:26:43 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-07-29 13:26:43 +0100
commit3e96fcafe76f0df5df315c4fbf88e666ff7461c4 (patch)
treea62808817909cdaf948eda78a27fa07a04e36323
parent3fcd2b90cade09d9906b2b656570a767be5ba206 (diff)
parent0fa2b4f6be0580f2fa76eb80d272ae3b37850ead (diff)
downloadrabbitmq-server-3e96fcafe76f0df5df315c4fbf88e666ff7461c4.tar.gz
Merge in bug26307 and update.
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--src/pmon.erl22
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_backing_queue.erl19
-rw-r--r--src/rabbit_control_main.erl28
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_misc.erl7
-rw-r--r--src/rabbit_tests.erl1
-rw-r--r--src/rabbit_variable_queue.erl38
-rw-r--r--src/vm_memory_monitor.erl37
-rw-r--r--src/vm_memory_monitor_tests.erl35
11 files changed, 165 insertions, 48 deletions
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 27c882df..0d5ec7ce 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
-Uploaders: Emile Joubert <emile@rabbitmq.com>
+Uploaders: Blair Hester <bhester@gopivotal.com>
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:13.b.3), erlang-src (>= 1:13.b.3), unzip, zip
Standards-Version: 3.9.2
diff --git a/src/pmon.erl b/src/pmon.erl
index ae1be40c..de3e9fea 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -54,8 +54,13 @@ new(Module) -> #state{dict = dict:new(),
monitor(Item, S = #state{dict = M, module = Module}) ->
case dict:is_key(Item, M) of
true -> S;
- false -> S#state{dict = dict:store(
- Item, Module:monitor(process, Item), M)}
+ false -> case node_alive_shortcut(Item) of
+ true -> Ref = Module:monitor(process, Item),
+ S#state{dict = dict:store(Item, Ref, M)};
+ false -> self() ! {'DOWN', fake_ref, process, Item,
+ nodedown},
+ S
+ end
end.
monitor_all([], S) -> S; %% optimisation
@@ -76,3 +81,16 @@ erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}.
monitored(#state{dict = M}) -> dict:fetch_keys(M).
is_empty(#state{dict = M}) -> dict:size(M) == 0.
+
+%%----------------------------------------------------------------------------
+
+%% We check here to see if the node is alive in order to avoid trying
+%% to connect to it if it isn't - this can cause substantial
+%% slowdowns. We can't perform this shortcut if passed {Name, Node}
+%% since we would need to convert that into a pid for the fake 'DOWN'
+%% message, so we always return true here - but that's OK, it's just
+%% an optimisation.
+node_alive_shortcut(P) when is_pid(P) ->
+ lists:member(node(P), [node() | nodes()]);
+node_alive_shortcut({_Name, _Node}) ->
+ true.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 04a38b78..8f44d761 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -103,7 +103,8 @@
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
-info_keys() -> ?INFO_KEYS.
+info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys().
+statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
%%----------------------------------------------------------------------------
@@ -824,17 +825,15 @@ i(down_slave_nodes, #q{q = #amqqueue{name = Name,
end;
i(state, #q{status = running}) -> credit_flow:state();
i(state, #q{status = State}) -> State;
-i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- BQ:status(BQS);
-i(Item, _) ->
- throw({bad_argument, Item}).
+i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:info(Item, BQS).
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
ExtraKs = [K || {K, _} <- Extra],
- Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State),
+ Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State),
not lists:member(K, ExtraKs)],
rabbit_event:notify(queue_stats, Extra ++ Infos).
@@ -934,7 +933,7 @@ handle_call({init, Recover}, From,
end;
handle_call(info, _From, State) ->
- reply(infos(?INFO_KEYS, State), State);
+ reply(infos(info_keys(), State), State);
handle_call({info, Items}, _From, State) ->
try
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 8f37bf60..595a05d3 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -16,6 +16,13 @@
-module(rabbit_backing_queue).
+-export([info_keys/0]).
+
+-define(INFO_KEYS, [messages_ram, messages_ready_ram,
+ messages_unacknowledged_ram, messages_persistent,
+ message_bytes, message_bytes_ram, message_bytes_persistent,
+ backing_queue_status]).
+
-ifdef(use_specs).
%% We can't specify a per-queue ack/state with callback signatures
@@ -37,6 +44,8 @@
-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+
%% Called on startup with a list of durable queue names. The queues
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
@@ -216,9 +225,7 @@
%% inbound messages and outbound messages at the moment.
-callback msg_rates(state()) -> {float(), float()}.
-%% Exists for debugging purposes, to be able to expose state via
-%% rabbitmqctl list_queues backing_queue_status
--callback status(state()) -> [{atom(), any()}].
+-callback info(atom(), state()) -> any().
%% Passed a function to be invoked with the relevant backing queue's
%% state. Useful for when the backing queue or other components need
@@ -243,9 +250,11 @@ behaviour_info(callbacks) ->
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {status, 1},
- {invoke, 3}, {is_duplicate, 2}] ;
+ {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {info_keys, 0},
+ {infos, 2}, {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
-endif.
+
+info_keys() -> ?INFO_KEYS.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index b8b6afad..70fe10e6 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -115,6 +115,12 @@
{"Policies", rabbit_policy, list_formatted, info_keys},
{"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]).
+-define(COMMANDS_NOT_REQUIRING_APP,
+ [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs,
+ join_cluster, change_cluster_node_type, update_cluster_nodes,
+ forget_cluster_node, cluster_status, status, environment, eval,
+ force_boot]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -156,7 +162,7 @@ start() ->
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
- case catch action(Command, Node, Args, Opts, Inform) of
+ case catch do_action(Command, Node, Args, Opts, Inform) of
ok ->
case Quiet of
true -> ok;
@@ -251,6 +257,15 @@ parse_arguments(CmdLine, NodeStr) ->
%%----------------------------------------------------------------------------
+do_action(Command, Node, Args, Opts, Inform) ->
+ case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of
+ false -> case ensure_app_running(Node) of
+ ok -> action(Command, Node, Args, Opts, Inform);
+ E -> E
+ end;
+ true -> action(Command, Node, Args, Opts, Inform)
+ end.
+
action(stop, Node, Args, _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
Res = call(Node, {rabbit, stop_and_halt, []}),
@@ -744,6 +759,17 @@ unsafe_rpc(Node, Mod, Fun, Args) ->
Normal -> Normal
end.
+ensure_app_running(Node) ->
+ case call(Node, {rabbit, is_running, []}) of
+ true -> ok;
+ false -> {error_string,
+ rabbit_misc:format(
+ "rabbit application is not running on node ~s.~n"
+ " * Suggestion: start it with \"rabbitmqctl start_app\" "
+ "and try again", [Node])};
+ Other -> Other
+ end.
+
call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 24b22d4c..9bccf5dd 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
- msg_rates/1, status/1, invoke/3, is_duplicate/2]).
+ msg_rates/1, info/2, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -374,10 +374,13 @@ resume(State = #state { backing_queue = BQ,
msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:msg_rates(BQS).
-status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
- BQ:status(BQS) ++
+info(backing_queue_status,
+ State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:info(backing_queue_status, BQS) ++
[ {mirror_seen, dict:size(State #state.seen_status)},
- {mirror_senders, sets:size(State #state.known_senders)} ].
+ {mirror_senders, sets:size(State #state.known_senders)} ];
+info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:info(Item, BQS).
invoke(?MODULE, Fun, State) ->
Fun(?MODULE, State);
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 180993a5..09355f3f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -912,8 +912,13 @@ ntoab(IP) ->
_ -> "[" ++ Str ++ "]"
end.
+%% We try to avoid reconnecting to down nodes here; this is used in a
+%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur
+%% would be bad news.
is_process_alive(Pid) ->
- rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true.
+ Node = node(Pid),
+ lists:member(Node, [node() | nodes()]) andalso
+ rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true.
pget(K, P) -> proplists:get_value(K, P).
pget(K, P, D) -> proplists:get_value(K, P, D).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ee97603b..c96000a9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -76,6 +76,7 @@ all_tests() ->
passed
end),
passed = test_configurable_server_properties(),
+ passed = vm_memory_monitor_tests:all_tests(),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bfd95b98..00a10530 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
- status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
+ info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -837,18 +837,24 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
out = AvgEgressRate } }) ->
{AvgIngressRate, AvgEgressRate}.
-status(#vqstate {
+info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
+ RamMsgCount;
+info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) ->
+ gb_trees:size(RPA);
+info(messages_ram, State) ->
+ info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
+info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
+ PersistentCount;
+info(message_bytes, #vqstate{bytes = Bytes}) ->
+ Bytes;
+info(message_bytes_ram, #vqstate{ram_msg_bytes = RamMsgBytes}) ->
+ RamMsgBytes; %% TODO this is really message_bytes_ready_ram!
+info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
+ PersistentBytes;
+info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- bytes = Bytes,
- ram_pending_ack = RPA,
- disk_pending_ack = DPA,
target_ram_count = TargetRamCount,
- ram_msg_count = RamMsgCount,
- ram_msg_bytes = RamMsgBytes,
next_seq_id = NextSeqId,
- persistent_count = PersistentCount,
- persistent_bytes = PersistentBytes,
rates = #rates { in = AvgIngressRate,
out = AvgEgressRate,
ack_in = AvgAckIngressRate,
@@ -859,20 +865,14 @@ status(#vqstate {
{delta , Delta},
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
- {len , Len},
- {bytes , Bytes},
- {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)},
{target_ram_count , TargetRamCount},
- {ram_msg_count , RamMsgCount},
- {ram_msg_bytes , RamMsgBytes},
- {ram_ack_count , gb_trees:size(RPA)},
{next_seq_id , NextSeqId},
- {persistent_count , PersistentCount},
- {persistent_bytes , PersistentBytes},
{avg_ingress_rate , AvgIngressRate},
{avg_egress_rate , AvgEgressRate},
{avg_ack_ingress_rate, AvgAckIngressRate},
- {avg_ack_egress_rate , AvgAckEgressRate} ].
+ {avg_ack_egress_rate , AvgAckEgressRate} ];
+info(Item, _) ->
+ throw({bad_argument, Item}).
invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
invoke( _, _, State) -> State.
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 5fb1e472..948956a3 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -37,6 +37,9 @@
get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1,
get_memory_limit/0]).
+%% for tests
+-export([parse_line_linux/1]).
+
-define(SERVER, ?MODULE).
-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
@@ -77,7 +80,15 @@
%% Public API
%%----------------------------------------------------------------------------
-get_total_memory() -> get_total_memory(os:type()).
+get_total_memory() ->
+ try
+ get_total_memory(os:type())
+ catch _:Error ->
+ rabbit_log:warning(
+ "Failed to get total system memory: ~n~p~n~p~n",
+ [Error, erlang:get_stacktrace()]),
+ unknown
+ end.
get_vm_limit() -> get_vm_limit(os:type()).
@@ -306,14 +317,24 @@ parse_line_mach(Line) ->
{list_to_atom(Name), list_to_integer(Value)}
end.
-%% A line looks like "FooBar: 123456 kB"
+%% A line looks like "MemTotal: 502968 kB"
+%% or (with broken OS/modules) "Readahead 123456 kB"
parse_line_linux(Line) ->
- [Name, RHS | _Rest] = string:tokens(Line, ":"),
- [Value | UnitsRest] = string:tokens(RHS, " "),
- Value1 = case UnitsRest of
- [] -> list_to_integer(Value); %% no units
- ["kB"] -> list_to_integer(Value) * 1024
- end,
+ {Name, Value, UnitRest} =
+ case string:tokens(Line, ":") of
+ %% no colon in the line
+ [S] ->
+ [K, RHS] = re:split(S, "\s", [{parts, 2}, {return, list}]),
+ [V | Unit] = string:tokens(RHS, " "),
+ {K, V, Unit};
+ [K, RHS | _Rest] ->
+ [V | Unit] = string:tokens(RHS, " "),
+ {K, V, Unit}
+ end,
+ Value1 = case UnitRest of
+ [] -> list_to_integer(Value); %% no units
+ ["kB"] -> list_to_integer(Value) * 1024
+ end,
{list_to_atom(Name), Value1}.
%% A line looks like "Memory size: 1024 Megabytes"
diff --git a/src/vm_memory_monitor_tests.erl b/src/vm_memory_monitor_tests.erl
new file mode 100644
index 00000000..1f7cea33
--- /dev/null
+++ b/src/vm_memory_monitor_tests.erl
@@ -0,0 +1,35 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(vm_memory_monitor_tests).
+
+-export([all_tests/0]).
+
+%% ---------------------------------------------------------------------------
+%% Tests
+%% ---------------------------------------------------------------------------
+
+all_tests() ->
+ lists:foreach(fun ({S, {K, V}}) ->
+ {K, V} = vm_memory_monitor:parse_line_linux(S)
+ end,
+ [{"MemTotal: 0 kB", {'MemTotal', 0}},
+ {"MemTotal: 502968 kB ", {'MemTotal', 515039232}},
+ {"MemFree: 178232 kB", {'MemFree', 182509568}},
+ {"MemTotal: 50296888", {'MemTotal', 50296888}},
+ {"MemTotal 502968 kB", {'MemTotal', 515039232}},
+ {"MemTotal 50296866 ", {'MemTotal', 50296866}}]),
+ passed.