summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-03-14 17:10:58 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-03-14 17:10:58 +0000
commit00f507239680640c8f858456cce6942a15de8209 (patch)
tree4d9a97af40c5ef6a1ed7a8bee81bd4dff99764b4
parent8dd3a08a840c85e80891f2c46d486ac26d348d95 (diff)
downloadrabbitmq-server-00f507239680640c8f858456cce6942a15de8209.tar.gz
Scatter-gather.
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_misc.erl28
2 files changed, 33 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 43d65aea..977d302f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -433,10 +433,11 @@ force_event_refresh([]) ->
force_event_refresh(QNames) ->
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
- Results = [catch gen_server2:call(Q#amqqueue.pid, force_event_refresh) ||
- Q <- Qs],
- Failed = [QName || {QName, {'EXIT', _}} <- lists:zip(QNames, Results)],
- io:format("Failed: ~p~n", [Failed]),
+ {_, Bad} = rabbit_misc:multi_call(
+ [Q#amqqueue.pid || Q <- Qs], force_event_refresh),
+ FailedPids = [Pid || {Pid, _Reason} <- Bad],
+ Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
+ lists:member(Pid, FailedPids)],
timer:sleep(100),
force_event_refresh(Failed),
ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index dca3bead..196d6da0 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -58,6 +58,7 @@
-export([pget/2, pget/3, pget_or_die/2]).
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
+-export([multi_call/2]).
-export([quit/1]).
%%----------------------------------------------------------------------------
@@ -200,6 +201,8 @@
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
+-spec(multi_call/2 ::
+ ([pid()], any()) -> [{[{pid(), any()}], [{pid(), any()}]}]).
-spec(quit/1 :: (integer() | string()) -> no_return()).
-endif.
@@ -880,6 +883,31 @@ append_rpc_all_nodes(Nodes, M, F, A) ->
_ -> Res
end || Res <- ResL]).
+%% A simplified version of gen_server:multi_call/2 with a sane
+%% API. This is not in gen_server2 as there is no useful
+%% infrastructure there to share.
+multi_call(Pids, Req) ->
+ MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids],
+ receive_multi_call(MonitorPids, [], []).
+
+start_multi_call(Pid, Req) ->
+ Mref = erlang:monitor(process, Pid),
+ Pid ! {'$gen_call', {self(), Mref}, Req},
+ {Mref, Pid}.
+
+receive_multi_call([], Good, Bad) ->
+ {Good, Bad};
+receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) ->
+ receive
+ {Mref, Reply} ->
+ erlang:demonitor(Mref, [flush]),
+ receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad);
+ {'DOWN', Mref, _, _, noconnection} ->
+ receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]);
+ {'DOWN', Mref, _, _, Reason} ->
+ receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad])
+ end.
+
%% the slower shutdown on windows required to flush stdout
quit(Status) ->
case os:type() of