summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-14 15:27:31 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-14 15:27:31 +0100
commitca6ac54cee5679f3efdfb4853dcb8783dcf5b29a (patch)
treee4296e1585969b79b3262eb8952b2d205695d5d2
parentef341f575b7a96c4c0c958e9e3400dcf73991597 (diff)
downloadrabbitmq-server-bug24903.tar.gz
Get slaves to emit statistics on memory use and idling.bug24903
-rw-r--r--src/rabbit_mirror_queue_slave.erl45
1 files changed, 39 insertions, 6 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 964c3e24..279005c2 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -56,6 +56,11 @@
is_synchronised
]).
+-define(STATISTICS_KEYS,
+ [pid,
+ memory
+ ]).
+
-define(INFO_KEYS, ?CREATION_EVENT_KEYS).
-define(SYNC_INTERVAL, 25). %% milliseconds
@@ -77,7 +82,8 @@
msg_id_status,
known_senders,
- synchronised
+ synchronised,
+ stats_timer
}).
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
@@ -134,7 +140,10 @@ init(#amqqueue { name = QueueName } = Q) ->
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
ok = gm:broadcast(GM, request_length),
- {ok, State, hibernate,
+ State1 = rabbit_event:init_stats_timer(State, #state.stats_timer),
+ rabbit_event:if_enabled(State1, #state.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {ok, State1, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
{stale, StalePid} ->
@@ -265,6 +274,12 @@ handle_info(sync_timeout, State) ->
handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
+handle_info(emit_stats, State) ->
+ %% Do not invoke noreply as it would see no timer and create a new one.
+ emit_stats(State),
+ State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer),
+ {noreply, State1, hibernate};
+
handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
State = #state { gm = GM, master_pid = MPid }) ->
ok = gm:broadcast(GM, {process_death, MPid}),
@@ -319,7 +334,12 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
BQS3 = BQ:handle_pre_hibernate(BQS2),
- {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
+ rabbit_event:if_enabled(
+ State, #state.stats_timer,
+ fun () -> emit_stats(State, [{idle_since, now()}]) end),
+ State1 = rabbit_event:stop_stats_timer(
+ State#state{backing_queue_state = BQS3}, #state.stats_timer),
+ {hibernate, stop_rate_timer(State1 #state { backing_queue_state = BQS3 })}.
prioritise_call(Msg, _From, _State) ->
case Msg of
@@ -386,8 +406,17 @@ i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
i(master_pid, #state { master_pid = MPid }) -> MPid;
i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(memory, _State) -> {memory, M} = process_info(self(), memory),
+ M;
i(Item, _State) -> throw({bad_argument, Item}).
+emit_stats(State) ->
+ emit_stats(State, []).
+
+emit_stats(State, Extra) ->
+ rabbit_event:notify(
+ queue_slave_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover,
@@ -569,9 +598,10 @@ reply(Reply, State) ->
next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = ensure_rate_timer(
- confirm_messages(MsgIds, State #state {
- backing_queue_state = BQS1 })),
+ State1 = ensure_stats_timer(
+ ensure_rate_timer(
+ confirm_messages(MsgIds, State #state {
+ backing_queue_state = BQS1 }))),
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -610,6 +640,9 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
erlang:cancel_timer(TRef),
State #state { rate_timer_ref = undefined }.
+ensure_stats_timer(State) ->
+ rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats).
+
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.