summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-04-28 12:38:27 +0100
committerSimon MacMullen <simon@lshift.net>2010-04-28 12:38:27 +0100
commited59c8af0c29c90c8310c621555965c88d534d10 (patch)
tree39c0ad24d91d8305c85077174f84be4095cb6061
parent0a60ed986a73c267a7c09a2297dc4329e6fbfa88 (diff)
downloadrabbitmq-server-ed59c8af0c29c90c8310c621555965c88d534d10.tar.gz
Parallelise communication with multiple nodes.
-rw-r--r--src/delegate.erl24
1 files changed, 17 insertions, 7 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 71287496..88abb20b 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -44,8 +44,6 @@
-ifdef(use_specs).
--type(serverref() :: atom() | {atom(), atom()} | {'global', term()} | pid()).
-
-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}).
-spec(invoke_async/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
-spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
@@ -97,20 +95,32 @@ split_delegate_per_node(Pids) ->
invoke_per_node([{Node, Pids}], FPid) when Node == node() ->
local_delegate(Pids, FPid);
invoke_per_node(NodePids, FPid) ->
- delegate_per_node(NodePids, FPid, fun internal_call/2).
+ lists:append(delegate_per_node(NodePids, FPid, fun internal_call/2)).
invoke_async_per_node([{Node, Pids}], FPid) when Node == node() ->
local_delegate(Pids, FPid);
invoke_async_per_node(NodePids, FPid) ->
- delegate_per_node(NodePids, FPid, fun internal_cast/2).
+ delegate_per_node(NodePids, FPid, fun internal_cast/2),
+ ok.
local_delegate(Pids, FPid) ->
[safe_invoke(FPid, Pid) || Pid <- Pids].
delegate_per_node(NodePids, FPid, DelegateFun) ->
- lists:flatten(
- [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end)
- || {Node, Pids} <- NodePids]).
+ Self = self(),
+ [spawn(fun() ->
+ Self ! {result, DelegateFun(Node,
+ fun() -> local_delegate(Pids, FPid) end)}
+ end) || {Node, Pids} <- NodePids],
+ gather_results([], length(NodePids)).
+
+gather_results(ResultsAcc, 0) ->
+ ResultsAcc;
+
+gather_results(ResultsAcc, ToGo) ->
+ receive {result, Result} ->
+ gather_results([Result | ResultsAcc], ToGo - 1)
+ end.
server(Node) when is_atom(Node) ->
case get({delegate_server_name, Node}) of