diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-13 10:43:40 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-13 10:43:40 +0100 |
commit | 1c719c93d3c486925f1e4211ab2060ad9c67e55d (patch) | |
tree | 90265bd5fba82bb7784d5fcaff3ad6c0528db728 | |
parent | 0a3f8afd5e6fffa3cf509a332c957cbe194d12a1 (diff) | |
download | rabbitmq-server-1c719c93d3c486925f1e4211ab2060ad9c67e55d.tar.gz |
refactor
-rw-r--r-- | src/rabbit_queue_collector.erl | 49 |
1 files changed, 22 insertions, 27 deletions
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index f0534b9a..e3c8d671 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {queues}). +-record(state, {queues, delete_from}). -include("rabbit.hrl"). @@ -71,24 +71,32 @@ init([]) -> %%-------------------------------------------------------------------------- handle_call({register, Q}, _From, - State = #state{queues = Queues}) -> + State = #state{queues = Queues, delete_from = Deleting}) -> MonitorRef = erlang:monitor(process, Q#amqqueue.pid), - {reply, ok, - State#state{queues = dict:store(MonitorRef, Q, Queues)}}; - -handle_call(delete_all, _From, State = #state{queues = Queues}) -> - Qs = dict:to_list(Queues), - [rabbit_misc:with_exit_handler( - fun () -> ok end, - fun () -> rabbit_amqqueue:delete_immediately(Q) end) - || {_MRef, Q} <- Qs], - {reply, ok, wait_DOWNs(sets:from_list([MRef || {MRef, _Q} <- Qs]), State)}. + case Deleting of + undefined -> ok; + _ -> rabbit_amqqueue:delete_immediately(Q) + end, + {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}}; + +handle_call(delete_all, From, State = #state{queues = Queues}) -> + case dict:size(Queues) of + 0 -> {reply, ok, State#state{delete_from = From}}; + _ -> [rabbit_amqqueue:delete_immediately(Q) + || {_MRef, Q} <- dict:to_list(Queues)], + {noreply, State#state{delete_from = From}} + end. handle_cast(Msg, State) -> {stop, {unhandled_cast, Msg}, State}. -handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, State) -> - {noreply, erase_queue(MonitorRef, State)}. +handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, + State = #state{queues = Queues, delete_from = Deleting}) -> + case {Deleting, dict:size(Queues)} of + {undefined, _} -> ok; + {_, 1} -> gen_server:reply(Deleting, ok) + end, + {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}. terminate(_Reason, _State) -> rabbit_log:info("collector terminated~n"), @@ -96,16 +104,3 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - -wait_DOWNs(MRefs, State) -> - case sets:size(MRefs) of - 0 -> State; - _ -> receive - {'DOWN', MRef, process, _DownPid, _Reason} -> - wait_DOWNs(sets:del_element(MRef, MRefs), - erase_queue(MRef, State)) - end - end. - -erase_queue(MRef, State = #state{queues = Queues}) -> - State#state{queues = dict:erase(MRef, Queues)}. |