summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-30 17:35:01 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-30 17:35:01 +0100
commitaeab61b1e893304ccc576bf7c4d4c90e4e9742be (patch)
tree1a642fb94690068a0b4f76f96ff4b3249a596d91
parent4cddf2bf0efdd35a96d6002023a172be4d1dc7b8 (diff)
downloadrabbitmq-server-aeab61b1e893304ccc576bf7c4d4c90e4e9742be.tar.gz
Cosmetics and minor refactorings
-rw-r--r--src/delegate.erl70
-rw-r--r--src/rabbit_amqqueue.erl16
2 files changed, 45 insertions, 41 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index c9826f0d..c4ff764d 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -53,9 +53,13 @@
%%----------------------------------------------------------------------------
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+%%----------------------------------------------------------------------------
+
start_link(Hash) ->
- gen_server2:start_link({local, server(Hash)},
- ?MODULE, [], []).
+ gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
invoke(Pid, FPid) when is_pid(Pid) ->
[{Status, Res, _}] = invoke_per_node([{node(Pid), [Pid]}], FPid),
@@ -96,11 +100,11 @@ invoke_per_node(NodePids, FPid) ->
lists:append(delegate_per_node(NodePids, FPid, fun internal_call/2)).
invoke_no_result_per_node([{Node, Pids}], FPid) when Node == node() ->
- % This is not actually async! However, in practice FPid will always be
- % something that does a gen_server:cast or similar, so I don't think
- % it's a problem unless someone misuses this function. Making this
- % *actually* async would be painful as we can't spawn at this point or we
- % break effect ordering.
+ %% This is not actually async! However, in practice FPid will
+ %% always be something that does a gen_server:cast or similar, so
+ %% I don't think it's a problem unless someone misuses this
+ %% function. Making this *actually* async would be painful as we
+ %% can't spawn at this point or we break effect ordering.
local_delegate(Pids, FPid);
invoke_no_result_per_node(NodePids, FPid) ->
delegate_per_node(NodePids, FPid, fun internal_cast/2),
@@ -111,23 +115,22 @@ local_delegate(Pids, FPid) ->
delegate_per_node(NodePids, FPid, DelegateFun) ->
Self = self(),
- [gen_server2:cast(local_server(Node), {thunk, fun() ->
- Self ! {result, DelegateFun(Node,
- fun() -> local_delegate(Pids, FPid) end)}
- end}) || {Node, Pids} <- NodePids],
- gather_results([], length(NodePids)).
-
-gather_results(ResultsAcc, 0) ->
- ResultsAcc;
-gather_results(ResultsAcc, ToGo) ->
- receive
- {result, Result} -> gather_results([Result | ResultsAcc], ToGo - 1)
- end.
+ %% Note that this is unsafe if the FPid requires reentrancy to the
+ %% local_server. I.e. if self() == local_server(Node) then we'll
+ %% block forever.
+ [gen_server2:cast(
+ local_server(Node),
+ {thunk, fun() -> Self !
+ {result,
+ DelegateFun(
+ Node, fun() -> local_delegate(Pids, FPid) end)}
+ end}) || {Node, Pids} <- NodePids],
+ [receive {result, Result} -> Result end || _ <- NodePids].
local_server(Node) ->
case get({delegate_local_server_name, Node}) of
undefined ->
- Name = server(erlang:phash2(Node, process_count())),
+ Name = server(erlang:phash2({self(), Node}, process_count())),
put({delegate_local_server_name, Node}, Name),
Name;
Name -> Name
@@ -138,11 +141,11 @@ remote_server(Node) ->
undefined ->
case rpc:call(Node, delegate, process_count, []) of
{badrpc, _} ->
- delegate_process_1; % Have to return something, if we're
- % just casting then we don't want to
- % blow up
+ %% Have to return something, if we're just casting
+ %% then we don't want to blow up
+ server(1);
Count ->
- Name = server(erlang:phash2(self(), Count)),
+ Name = server(erlang:phash2({self(), Node}, Count)),
put({delegate_remote_server_name, Node}, Name),
Name
end;
@@ -153,14 +156,12 @@ server(Hash) ->
list_to_atom("delegate_process_" ++ integer_to_list(Hash)).
safe_invoke(FPid, Pid) ->
- % We need the catch here for the local case. In the remote case there will
- % already have been a catch in handle_ca{ll,st} below, but that's OK, catch
- % is idempotent.
+ %% We need the catch here for the local case. In the remote case
+ %% there will already have been a catch in handle_ca{ll,st} below,
+ %% but that's OK, catch is idempotent.
case catch FPid(Pid) of
- {'EXIT', Reason} ->
- {error, {'EXIT', Reason}, Pid};
- Result ->
- {ok, Result, Pid}
+ {'EXIT', Reason} -> {error, {'EXIT', Reason}, Pid};
+ Result -> {ok, Result, Pid}
end.
process_count() ->
@@ -169,14 +170,15 @@ process_count() ->
%%--------------------------------------------------------------------
init([]) ->
- {ok, no_state}.
+ {ok, no_state, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({thunk, Thunk}, _From, State) ->
- {reply, catch Thunk(), State}.
+ {reply, catch Thunk(), State, hibernate}.
handle_cast({thunk, Thunk}, State) ->
catch Thunk(),
- {noreply, State}.
+ {noreply, State, hibernate}.
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 603ab08d..2b5592c0 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -283,8 +283,8 @@ commit_all(QPids, Txn, ChPid) ->
QPids).
rollback_all(QPids, Txn, ChPid) ->
- delegate:invoke_no_result(QPids,
- fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end).
+ delegate:invoke_no_result(
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end).
notify_down_all(QPids, ChPid) ->
safe_delegate_call_ok(
@@ -295,8 +295,10 @@ notify_down_all(QPids, ChPid) ->
QPids).
limit_all(QPids, ChPid, LimiterPid) ->
- delegate:invoke_no_result(QPids,
- fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end).
+ delegate:invoke_no_result(
+ QPids, fun (QPid) ->
+ gen_server2:cast(QPid, {limit, ChPid, LimiterPid})
+ end).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
delegate_call(QPid, {claim_queue, ReaderPid}, infinity).
@@ -312,7 +314,7 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
- infinity).
+ infinity).
notify_sent(QPid, ChPid) ->
delegate_pcast(QPid, 7, {notify_sent, ChPid}).
@@ -321,8 +323,8 @@ unblock(QPid, ChPid) ->
delegate_pcast(QPid, 7, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
- delegate:invoke_no_result(QPids,
- fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
+ delegate:invoke_no_result(
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
internal_delete(QueueName) ->
case