diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-09-14 15:27:31 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-09-14 15:27:31 +0100 |
commit | ca6ac54cee5679f3efdfb4853dcb8783dcf5b29a (patch) | |
tree | e4296e1585969b79b3262eb8952b2d205695d5d2 | |
parent | ef341f575b7a96c4c0c958e9e3400dcf73991597 (diff) | |
download | rabbitmq-server-bug24903.tar.gz |
Get slaves to emit statistics on memory use and idling.bug24903
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 45 |
1 files changed, 39 insertions, 6 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 964c3e24..279005c2 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -56,6 +56,11 @@ is_synchronised ]). +-define(STATISTICS_KEYS, + [pid, + memory + ]). + -define(INFO_KEYS, ?CREATION_EVENT_KEYS). -define(SYNC_INTERVAL, 25). %% milliseconds @@ -77,7 +82,8 @@ msg_id_status, known_senders, - synchronised + synchronised, + stats_timer }). start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). @@ -134,7 +140,10 @@ init(#amqqueue { name = QueueName } = Q) -> rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), ok = gm:broadcast(GM, request_length), - {ok, State, hibernate, + State1 = rabbit_event:init_stats_timer(State, #state.stats_timer), + rabbit_event:if_enabled(State1, #state.stats_timer, + fun() -> emit_stats(State1) end), + {ok, State1, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; {stale, StalePid} -> @@ -265,6 +274,12 @@ handle_info(sync_timeout, State) -> handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); +handle_info(emit_stats, State) -> + %% Do not invoke noreply as it would see no timer and create a new one. + emit_stats(State), + State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer), + {noreply, State1, hibernate}; + handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, State = #state { gm = GM, master_pid = MPid }) -> ok = gm:broadcast(GM, {process_death, MPid}), @@ -319,7 +334,12 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), BQS3 = BQ:handle_pre_hibernate(BQS2), - {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. + rabbit_event:if_enabled( + State, #state.stats_timer, + fun () -> emit_stats(State, [{idle_since, now()}]) end), + State1 = rabbit_event:stop_stats_timer( + State#state{backing_queue_state = BQS3}, #state.stats_timer), + {hibernate, stop_rate_timer(State1 #state { backing_queue_state = BQS3 })}. prioritise_call(Msg, _From, _State) -> case Msg of @@ -386,8 +406,17 @@ i(pid, _State) -> self(); i(name, #state { q = #amqqueue { name = Name } }) -> Name; i(master_pid, #state { master_pid = MPid }) -> MPid; i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised; +i(memory, _State) -> {memory, M} = process_info(self(), memory), + M; i(Item, _State) -> throw({bad_argument, Item}). +emit_stats(State) -> + emit_stats(State, []). + +emit_stats(State, Extra) -> + rabbit_event:notify( + queue_slave_stats, Extra ++ infos(?STATISTICS_KEYS, State)). + bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover, @@ -569,9 +598,10 @@ reply(Reply, State) -> next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> {MsgIds, BQS1} = BQ:drain_confirmed(BQS), - State1 = ensure_rate_timer( - confirm_messages(MsgIds, State #state { - backing_queue_state = BQS1 })), + State1 = ensure_stats_timer( + ensure_rate_timer( + confirm_messages(MsgIds, State #state { + backing_queue_state = BQS1 }))), case BQ:needs_timeout(BQS1) of false -> {stop_sync_timer(State1), hibernate }; idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; @@ -610,6 +640,9 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> erlang:cancel_timer(TRef), State #state { rate_timer_ref = undefined }. +ensure_stats_timer(State) -> + rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats). + ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> State #state { known_senders = pmon:monitor(ChPid, KS) }. |