From ed59c8af0c29c90c8310c621555965c88d534d10 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 28 Apr 2010 12:38:27 +0100 Subject: Parallelise communication with multiple nodes. --- src/delegate.erl | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index 71287496..88abb20b 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -44,8 +44,6 @@ -ifdef(use_specs). --type(serverref() :: atom() | {atom(), atom()} | {'global', term()} | pid()). - -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}). -spec(invoke_async/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). -spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). @@ -97,20 +95,32 @@ split_delegate_per_node(Pids) -> invoke_per_node([{Node, Pids}], FPid) when Node == node() -> local_delegate(Pids, FPid); invoke_per_node(NodePids, FPid) -> - delegate_per_node(NodePids, FPid, fun internal_call/2). + lists:append(delegate_per_node(NodePids, FPid, fun internal_call/2)). invoke_async_per_node([{Node, Pids}], FPid) when Node == node() -> local_delegate(Pids, FPid); invoke_async_per_node(NodePids, FPid) -> - delegate_per_node(NodePids, FPid, fun internal_cast/2). + delegate_per_node(NodePids, FPid, fun internal_cast/2), + ok. local_delegate(Pids, FPid) -> [safe_invoke(FPid, Pid) || Pid <- Pids]. delegate_per_node(NodePids, FPid, DelegateFun) -> - lists:flatten( - [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) - || {Node, Pids} <- NodePids]). + Self = self(), + [spawn(fun() -> + Self ! {result, DelegateFun(Node, + fun() -> local_delegate(Pids, FPid) end)} + end) || {Node, Pids} <- NodePids], + gather_results([], length(NodePids)). + +gather_results(ResultsAcc, 0) -> + ResultsAcc; + +gather_results(ResultsAcc, ToGo) -> + receive {result, Result} -> + gather_results([Result | ResultsAcc], ToGo - 1) + end. server(Node) when is_atom(Node) -> case get({delegate_server_name, Node}) of -- cgit v1.2.1