summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-13 10:43:40 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-13 10:43:40 +0100
commit1c719c93d3c486925f1e4211ab2060ad9c67e55d (patch)
tree90265bd5fba82bb7784d5fcaff3ad6c0528db728
parent0a3f8afd5e6fffa3cf509a332c957cbe194d12a1 (diff)
downloadrabbitmq-server-1c719c93d3c486925f1e4211ab2060ad9c67e55d.tar.gz
refactor
-rw-r--r--src/rabbit_queue_collector.erl49
1 files changed, 22 insertions, 27 deletions
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index f0534b9a..e3c8d671 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {queues}).
+-record(state, {queues, delete_from}).
-include("rabbit.hrl").
@@ -71,24 +71,32 @@ init([]) ->
%%--------------------------------------------------------------------------
handle_call({register, Q}, _From,
- State = #state{queues = Queues}) ->
+ State = #state{queues = Queues, delete_from = Deleting}) ->
MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
- {reply, ok,
- State#state{queues = dict:store(MonitorRef, Q, Queues)}};
-
-handle_call(delete_all, _From, State = #state{queues = Queues}) ->
- Qs = dict:to_list(Queues),
- [rabbit_misc:with_exit_handler(
- fun () -> ok end,
- fun () -> rabbit_amqqueue:delete_immediately(Q) end)
- || {_MRef, Q} <- Qs],
- {reply, ok, wait_DOWNs(sets:from_list([MRef || {MRef, _Q} <- Qs]), State)}.
+ case Deleting of
+ undefined -> ok;
+ _ -> rabbit_amqqueue:delete_immediately(Q)
+ end,
+ {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}};
+
+handle_call(delete_all, From, State = #state{queues = Queues}) ->
+ case dict:size(Queues) of
+ 0 -> {reply, ok, State#state{delete_from = From}};
+ _ -> [rabbit_amqqueue:delete_immediately(Q)
+ || {_MRef, Q} <- dict:to_list(Queues)],
+ {noreply, State#state{delete_from = From}}
+ end.
handle_cast(Msg, State) ->
{stop, {unhandled_cast, Msg}, State}.
-handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, State) ->
- {noreply, erase_queue(MonitorRef, State)}.
+handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
+ State = #state{queues = Queues, delete_from = Deleting}) ->
+ case {Deleting, dict:size(Queues)} of
+ {undefined, _} -> ok;
+ {_, 1} -> gen_server:reply(Deleting, ok)
+ end,
+ {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}.
terminate(_Reason, _State) ->
rabbit_log:info("collector terminated~n"),
@@ -96,16 +104,3 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-wait_DOWNs(MRefs, State) ->
- case sets:size(MRefs) of
- 0 -> State;
- _ -> receive
- {'DOWN', MRef, process, _DownPid, _Reason} ->
- wait_DOWNs(sets:del_element(MRef, MRefs),
- erase_queue(MRef, State))
- end
- end.
-
-erase_queue(MRef, State = #state{queues = Queues}) ->
- State#state{queues = dict:erase(MRef, Queues)}.