From 929f43f01c183a9e0fbc753952da4d9f640b08e7 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 17 Mar 2010 13:47:44 +0000 Subject: Generic delegate mechanism, similar to what the router did before. --- include/delegate.hrl | 32 +++++++++++++ src/delegate.erl | 129 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/delegate_sup.erl | 56 ++++++++++++++++++++++ src/rabbit.erl | 8 ++++ src/rabbit_tests.erl | 90 +++++++++++++++++++++++++++++++++++ 5 files changed, 315 insertions(+) create mode 100644 include/delegate.hrl create mode 100644 src/delegate.erl create mode 100644 src/delegate_sup.erl diff --git a/include/delegate.hrl b/include/delegate.hrl new file mode 100644 index 00000000..38f8d42f --- /dev/null +++ b/include/delegate.hrl @@ -0,0 +1,32 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-define(DELEGATE_PROCESSES, 10). diff --git a/src/delegate.erl b/src/delegate.erl new file mode 100644 index 00000000..3a26c410 --- /dev/null +++ b/src/delegate.erl @@ -0,0 +1,129 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(delegate). +-include("delegate.hrl"). + +-behaviour(gen_server2). + +-export([start_link/1, delegate_async/2, delegate_sync/2, server/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + + +%%---------------------------------------------------------------------------- + +start_link(Hash) -> + gen_server2:start_link({local, server(Hash)}, + ?MODULE, [], []). +delegate_sync(Node, Thunk) when is_atom(Node) -> + gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); + +delegate_sync(Pid, FPid) when is_pid(Pid) -> + [[Res]] = delegate_per_node([{node(Pid), [Pid]}], + f_pid_node(fun delegate_sync/2, FPid)), + Res; + +delegate_sync(Pids, FPid) when is_list(Pids) -> + lists:flatten( + delegate_per_node(split_per_node(Pids), + f_pid_node(fun delegate_sync/2, FPid))). + +delegate_async(Node, Thunk) when is_atom(Node) -> + gen_server2:cast({server(), Node}, {thunk, Thunk}); + +delegate_async(Pid, FPid) when is_pid(Pid) -> + delegate_per_node([{node(Pid), [Pid]}], + f_pid_node(fun delegate_async/2, FPid)); + +delegate_async(Pids, FPid) when is_list(Pids) -> + delegate_per_node(split_per_node(Pids), + f_pid_node(fun delegate_async/2, FPid)). + +%%---------------------------------------------------------------------------- + +split_per_node(Pids) -> + dict:to_list( + lists:foldl( + fun (Pid, D) -> + dict:update(node(Pid), + fun (Pids1) -> [Pid | Pids1] end, + [Pid], D) + end, + dict:new(), Pids)). + +f_pid_node(DelegateFun, FPid) -> + fun(Pid, Node) -> + DelegateFun(Node, fun() -> FPid(Pid) end) + end. + +% TODO this only gets called when we are ONLY talking to the local node - can +% we improve this? +delegate_per_node([{Node, Pids}], FPidNode) when Node == node() -> + % optimisation + [[FPidNode(Pid, node()) || Pid <- Pids]]; + +delegate_per_node(NodePids, FPidNode) -> + rabbit_misc:upmap( + fun ({Node, Pids}) -> + [FPidNode(Pid, Node) || Pid <- Pids] + end, + NodePids). + +server() -> + server(erlang:phash(self(), ?DELEGATE_PROCESSES)). + +server(Hash) -> + list_to_atom(string:concat("delegate_process_", integer_to_list(Hash))). + +%%-------------------------------------------------------------------- + +init([]) -> + {ok, no_state}. + +handle_call({thunk, Thunk}, _From, State) -> + {reply, catch Thunk(), State}. + +handle_cast({thunk, Thunk}, State) -> + catch Thunk(), + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl new file mode 100644 index 00000000..f7a1c586 --- /dev/null +++ b/src/delegate_sup.erl @@ -0,0 +1,56 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(delegate_sup). +-include("delegate.hrl"). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%-------------------------------------------------------------------- + +init(_Args) -> + {ok, {{one_for_one, 10, 10}, + [{delegate:server(Hash), {delegate, start_link, [Hash]}, + temporary, 16#ffffffff, worker, [delegate]} || + Hash <- lists:seq(1, ?DELEGATE_PROCESSES)]}}. + +%%-------------------------------------------------------------------- diff --git a/src/rabbit.erl b/src/rabbit.erl index 700acede..693731f9 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -85,6 +85,14 @@ {requires, kernel_ready}, {enables, core_initialized}]}). + +-rabbit_boot_step({delegate_sup, + [{description, "cluster delegate"}, + {mfa, {rabbit_sup, start_restartable_child, + [delegate_sup]}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + -rabbit_boot_step({rabbit_router, [{description, "cluster router"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 82f2d199..29ec7999 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -62,6 +62,8 @@ all_tests() -> passed = test_user_management(), passed = test_server_status(), passed = test_hooks(), + passed = test_delegates_async(), + passed = test_delegates_sync(), passed. test_priority_queue() -> @@ -811,6 +813,94 @@ test_hooks() -> end, passed. +test_delegates_async() -> + SecondaryNode = rabbit_misc:makenode("hare"), + + Self = self(), + Sender = fun(Pid) -> Pid ! {invoked, Self} end, + + Receiver = fun() -> + receive + {invoked, Pid} -> + Pid ! response, + ok + after 100 -> + io:format("Async message not sent~n"), + throw(timeout) + end + end, + + delegate:delegate_async(spawn(Receiver), Sender), + delegate:delegate_async(spawn(SecondaryNode, Receiver), Sender), + await_response(2), + + LocalPids = [spawn(Receiver) || _ <- lists:seq(1,10)], + RemotePids = [spawn(SecondaryNode, Receiver) || _ <- lists:seq(1,10)], + delegate:delegate_async(LocalPids ++ RemotePids, Sender), + await_response(20), + + passed. + +await_response(0) -> + ok; + +await_response(Count) -> + receive + response -> ok, + await_response(Count - 1) + after 100 -> + io:format("Async reply not received~n"), + throw(timeout) + end. + +test_delegates_sync() -> + SecondaryNode = rabbit_misc:makenode("hare"), + "foo" = delegate:delegate_sync(node(), fun() -> "foo" end), + "bar" = delegate:delegate_sync(SecondaryNode, fun() -> "bar" end), + + Sender = fun(Pid) -> + gen_server2:call(Pid, invoked) + end, + + Responder = fun() -> + receive + {'$gen_call', From, invoked} -> + gen_server2:reply(From, response) + after 100 -> + io:format("Sync hook not invoked~n"), + throw(timeout) + end + end, + + BadResponder = fun() -> + receive + {'$gen_call', _From, invoked} -> + throw(exception) + after 100 -> + io:format("Crashing sync hook not invoked~n"), + throw(timeout) + end + end, + + response = delegate:delegate_sync(spawn(Responder), Sender), + response = delegate:delegate_sync(spawn(SecondaryNode, Responder), Sender), + + {'EXIT', _} = delegate:delegate_sync(spawn(BadResponder), Sender), + {'EXIT', _} = delegate:delegate_sync(spawn(SecondaryNode, BadResponder), Sender), + + LocalGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], + RemoteGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], + LocalBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], + RemoteBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], + + [response, response, response, response] = + delegate:delegate_sync(LocalGoodPids ++ RemoteGoodPids, Sender), + [{'EXIT', _}, {'EXIT', _}, {'EXIT', _}, {'EXIT', _}] = + delegate:delegate_sync(LocalBadPids ++ RemoteBadPids, Sender), + + passed. + + %--------------------------------------------------------------------- control_action(Command, Args) -> control_action(Command, node(), Args). -- cgit v1.2.1 From 97d4d5e8c4907af4f902a2200877c9611d69895e Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 17 Mar 2010 18:21:05 +0000 Subject: Return statuses (and pids where appropriate). --- src/delegate.erl | 28 ++++++++++++++++++++-------- src/rabbit_tests.erl | 38 +++++++++++++++++++++++++------------- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index 3a26c410..d469e464 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -45,13 +45,14 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). + delegate_sync(Node, Thunk) when is_atom(Node) -> gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); delegate_sync(Pid, FPid) when is_pid(Pid) -> - [[Res]] = delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun delegate_sync/2, FPid)), - Res; + [[{Status, Res, _}]] = delegate_per_node([{node(Pid), [Pid]}], + f_pid_node(fun delegate_sync/2, FPid)), + {Status, Res}; delegate_sync(Pids, FPid) when is_list(Pids) -> lists:flatten( @@ -63,11 +64,13 @@ delegate_async(Node, Thunk) when is_atom(Node) -> delegate_async(Pid, FPid) when is_pid(Pid) -> delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun delegate_async/2, FPid)); + f_pid_node(fun delegate_async/2, FPid)), + ok; delegate_async(Pids, FPid) when is_list(Pids) -> delegate_per_node(split_per_node(Pids), - f_pid_node(fun delegate_async/2, FPid)). + f_pid_node(fun delegate_async/2, FPid)), + ok. %%---------------------------------------------------------------------------- @@ -90,15 +93,18 @@ f_pid_node(DelegateFun, FPid) -> % we improve this? delegate_per_node([{Node, Pids}], FPidNode) when Node == node() -> % optimisation - [[FPidNode(Pid, node()) || Pid <- Pids]]; + [[add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids]]; delegate_per_node(NodePids, FPidNode) -> rabbit_misc:upmap( fun ({Node, Pids}) -> - [FPidNode(Pid, Node) || Pid <- Pids] + [add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids] end, NodePids). +add_pid({Status, Result}, Pid) -> {Status, Result, Pid}; +add_pid(Status, Pid) -> {Status, Pid}. + server() -> server(erlang:phash(self(), ?DELEGATE_PROCESSES)). @@ -111,7 +117,13 @@ init([]) -> {ok, no_state}. handle_call({thunk, Thunk}, _From, State) -> - {reply, catch Thunk(), State}. + Res = case catch Thunk() of + {'EXIT', Reason} -> + {error, {'EXIT', Reason}}; + Result -> + {ok, Result} + end, + {reply, Res, State}. handle_cast({thunk, Thunk}, State) -> catch Thunk(), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 29ec7999..c4024831 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -830,13 +830,13 @@ test_delegates_async() -> end end, - delegate:delegate_async(spawn(Receiver), Sender), - delegate:delegate_async(spawn(SecondaryNode, Receiver), Sender), + ok = delegate:delegate_async(spawn(Receiver), Sender), + ok = delegate:delegate_async(spawn(SecondaryNode, Receiver), Sender), await_response(2), LocalPids = [spawn(Receiver) || _ <- lists:seq(1,10)], RemotePids = [spawn(SecondaryNode, Receiver) || _ <- lists:seq(1,10)], - delegate:delegate_async(LocalPids ++ RemotePids, Sender), + ok = delegate:delegate_async(LocalPids ++ RemotePids, Sender), await_response(20), passed. @@ -855,8 +855,8 @@ await_response(Count) -> test_delegates_sync() -> SecondaryNode = rabbit_misc:makenode("hare"), - "foo" = delegate:delegate_sync(node(), fun() -> "foo" end), - "bar" = delegate:delegate_sync(SecondaryNode, fun() -> "bar" end), + {ok, "foo"} = delegate:delegate_sync(node(), fun() -> "foo" end), + {ok, "bar"} = delegate:delegate_sync(SecondaryNode, fun() -> "bar" end), Sender = fun(Pid) -> gen_server2:call(Pid, invoked) @@ -882,21 +882,33 @@ test_delegates_sync() -> end end, - response = delegate:delegate_sync(spawn(Responder), Sender), - response = delegate:delegate_sync(spawn(SecondaryNode, Responder), Sender), + {ok, response} = delegate:delegate_sync(spawn(Responder), Sender), + {ok, response} = delegate:delegate_sync(spawn(SecondaryNode, Responder), Sender), - {'EXIT', _} = delegate:delegate_sync(spawn(BadResponder), Sender), - {'EXIT', _} = delegate:delegate_sync(spawn(SecondaryNode, BadResponder), Sender), + {error, _} = delegate:delegate_sync(spawn(BadResponder), Sender), + {error, _} = delegate:delegate_sync(spawn(SecondaryNode, BadResponder), Sender), LocalGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], RemoteGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], LocalBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], RemoteBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], - [response, response, response, response] = - delegate:delegate_sync(LocalGoodPids ++ RemoteGoodPids, Sender), - [{'EXIT', _}, {'EXIT', _}, {'EXIT', _}, {'EXIT', _}] = - delegate:delegate_sync(LocalBadPids ++ RemoteBadPids, Sender), + GoodRes = delegate:delegate_sync(LocalGoodPids ++ RemoteGoodPids, Sender), + [{ok, response, _}, {ok, response, _}, + {ok, response, _}, {ok, response, _}] = GoodRes, + + BadRes = delegate:delegate_sync(LocalBadPids ++ RemoteBadPids, Sender), + [{error, _, _}, {error, _, _}, + {error, _, _}, {error, _, _}] = BadRes, + + GoodResPids = [Pid || {_, _, Pid} <- GoodRes], + BadResPids = [Pid || {_, _, Pid} <- BadRes], + + Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids), + Good = ordsets:from_list(GoodResPids), + + Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids), + Bad = ordsets:from_list(BadResPids), passed. -- cgit v1.2.1 From 087c3b638825322414cad5954ccd6a6fc86ccbac Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 18 Mar 2010 11:05:59 +0000 Subject: Reimplement rabbit_router in terms of delegate. --- src/rabbit.erl | 7 --- src/rabbit_router.erl | 131 ++++++-------------------------------------------- 2 files changed, 16 insertions(+), 122 deletions(-) diff --git a/src/rabbit.erl b/src/rabbit.erl index 693731f9..d3f81bdf 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -93,13 +93,6 @@ {requires, kernel_ready}, {enables, core_initialized}]}). --rabbit_boot_step({rabbit_router, - [{description, "cluster router"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_router]}}, - {requires, kernel_ready}, - {enables, core_initialized}]}). - -rabbit_boot_step({rabbit_node_monitor, [{description, "node monitor"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 884ea4ab..a8cffbdf 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -33,104 +33,40 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --behaviour(gen_server2). - --export([start_link/0, - deliver/2, +-export([deliver/2, match_bindings/2, match_routing_key/2]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - -%% cross-node routing optimisation is disabled because of bug 19758. --define(BUG19758, true). - %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}). -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). - --ifdef(BUG19758). - -deliver(QPids, Delivery) -> - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - run_bindings(QPids, Delivery)). - --else. - -deliver(QPids, Delivery) -> - %% we reduce inter-node traffic by grouping the qpids by node and - %% only delivering one copy of the message to each node involved, - %% which then in turn delivers it to its queues. - deliver_per_node( - dict:to_list( - lists:foldl( - fun (QPid, D) -> - dict:update(node(QPid), - fun (QPids1) -> [QPid | QPids1] end, - [QPid], D) - end, - dict:new(), QPids)), - Delivery). - -deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> - %% optimisation - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - run_bindings(QPids, Delivery)); -deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> +deliver(QPids, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver in run_bindings below will deliver the + %% rabbit_amqqueue:deliver will deliver the %% message to the queue process asynchronously, and return true, %% which means all the QPids will always be returned. It is %% therefore safe to use a fire-and-forget cast here and return %% the QPids - the semantics is preserved. This scales much better %% than the non-immediate case below. - {routed, - lists:flatmap( - fun ({Node, QPids}) -> - gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}), - QPids - end, - NodeQPids)}; -deliver_per_node(NodeQPids, Delivery) -> - R = rabbit_misc:upmap( - fun ({Node, QPids}) -> - try gen_server2:call({?SERVER, Node}, - {deliver, QPids, Delivery}, - infinity) - catch - _Class:_Reason -> - %% TODO: figure out what to log (and do!) here - {false, []} - end - end, - NodeQPids), + delegate:delegate_async(QPids, + fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), + {routed, QPids}; + +deliver(QPids, Delivery) -> + Res = delegate:delegate_sync(QPids, + fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = - lists:foldl(fun ({Routed, Handled}, {RoutedAcc, HandledAcc}) -> - {Routed or RoutedAcc, - %% we do the concatenation below, which - %% should be faster - [Handled | HandledAcc]} - end, - {false, []}, - R), + lists:foldl(fun fold_deliveries/2, {false, []}, Res), check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, lists:append(Handled)}). - --endif. + {Routed, Handled}). %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange @@ -174,44 +110,9 @@ lookup_qpids(Queues) -> %%-------------------------------------------------------------------- -init([]) -> - {ok, no_state}. - -handle_call({deliver, QPids, Delivery}, From, State) -> - spawn( - fun () -> - R = run_bindings(QPids, Delivery), - gen_server2:reply(From, R) - end), - {noreply, State}. - -handle_cast({deliver, QPids, Delivery}, State) -> - %% in order to preserve message ordering we must not spawn here - run_bindings(QPids, Delivery), - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- - -run_bindings(QPids, Delivery) -> - lists:foldl( - fun (QPid, {Routed, Handled}) -> - case catch rabbit_amqqueue:deliver(QPid, Delivery) of - true -> {true, [QPid | Handled]}; - false -> {true, Handled}; - {'EXIT', _Reason} -> {Routed, Handled} - end - end, - {false, []}, - QPids). +fold_deliveries({ok, true, Pid}, {_, Handled}) -> {true, [Pid | Handled]}; +fold_deliveries({ok, false, _}, {_, Handled}) -> {true, Handled}; +fold_deliveries({error, _, _}, {Routed, Handled}) -> {Routed, Handled}. %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) check_delivery(true, _ , {false, []}) -> {unroutable, []}; -- cgit v1.2.1 From ecd46b56aaf59389b98466dbe9eb6e8a265710d4 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 9 Apr 2010 14:14:39 +0100 Subject: Rename delegate_{sync,async} to delegate_{call,cast}. --- src/delegate.erl | 22 +++++++++++----------- src/rabbit_router.erl | 4 ++-- src/rabbit_tests.erl | 22 +++++++++++----------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index d469e464..c72a7e5a 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). --export([start_link/1, delegate_async/2, delegate_sync/2, server/1]). +-export([start_link/1, delegate_cast/2, delegate_call/2, server/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -46,30 +46,30 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). -delegate_sync(Node, Thunk) when is_atom(Node) -> +delegate_call(Node, Thunk) when is_atom(Node) -> gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); -delegate_sync(Pid, FPid) when is_pid(Pid) -> +delegate_call(Pid, FPid) when is_pid(Pid) -> [[{Status, Res, _}]] = delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun delegate_sync/2, FPid)), + f_pid_node(fun delegate_call/2, FPid)), {Status, Res}; -delegate_sync(Pids, FPid) when is_list(Pids) -> +delegate_call(Pids, FPid) when is_list(Pids) -> lists:flatten( delegate_per_node(split_per_node(Pids), - f_pid_node(fun delegate_sync/2, FPid))). + f_pid_node(fun delegate_call/2, FPid))). -delegate_async(Node, Thunk) when is_atom(Node) -> +delegate_cast(Node, Thunk) when is_atom(Node) -> gen_server2:cast({server(), Node}, {thunk, Thunk}); -delegate_async(Pid, FPid) when is_pid(Pid) -> +delegate_cast(Pid, FPid) when is_pid(Pid) -> delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun delegate_async/2, FPid)), + f_pid_node(fun delegate_cast/2, FPid)), ok; -delegate_async(Pids, FPid) when is_list(Pids) -> +delegate_cast(Pids, FPid) when is_list(Pids) -> delegate_per_node(split_per_node(Pids), - f_pid_node(fun delegate_async/2, FPid)), + f_pid_node(fun delegate_cast/2, FPid)), ok. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index a8cffbdf..8c4da5f3 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -56,12 +56,12 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% therefore safe to use a fire-and-forget cast here and return %% the QPids - the semantics is preserved. This scales much better %% than the non-immediate case below. - delegate:delegate_async(QPids, + delegate:delegate_cast(QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; deliver(QPids, Delivery) -> - Res = delegate:delegate_sync(QPids, + Res = delegate:delegate_call(QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Res), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c4024831..297803f9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -830,13 +830,13 @@ test_delegates_async() -> end end, - ok = delegate:delegate_async(spawn(Receiver), Sender), - ok = delegate:delegate_async(spawn(SecondaryNode, Receiver), Sender), + ok = delegate:delegate_cast(spawn(Receiver), Sender), + ok = delegate:delegate_cast(spawn(SecondaryNode, Receiver), Sender), await_response(2), LocalPids = [spawn(Receiver) || _ <- lists:seq(1,10)], RemotePids = [spawn(SecondaryNode, Receiver) || _ <- lists:seq(1,10)], - ok = delegate:delegate_async(LocalPids ++ RemotePids, Sender), + ok = delegate:delegate_cast(LocalPids ++ RemotePids, Sender), await_response(20), passed. @@ -855,8 +855,8 @@ await_response(Count) -> test_delegates_sync() -> SecondaryNode = rabbit_misc:makenode("hare"), - {ok, "foo"} = delegate:delegate_sync(node(), fun() -> "foo" end), - {ok, "bar"} = delegate:delegate_sync(SecondaryNode, fun() -> "bar" end), + {ok, "foo"} = delegate:delegate_call(node(), fun() -> "foo" end), + {ok, "bar"} = delegate:delegate_call(SecondaryNode, fun() -> "bar" end), Sender = fun(Pid) -> gen_server2:call(Pid, invoked) @@ -882,22 +882,22 @@ test_delegates_sync() -> end end, - {ok, response} = delegate:delegate_sync(spawn(Responder), Sender), - {ok, response} = delegate:delegate_sync(spawn(SecondaryNode, Responder), Sender), + {ok, response} = delegate:delegate_call(spawn(Responder), Sender), + {ok, response} = delegate:delegate_call(spawn(SecondaryNode, Responder), Sender), - {error, _} = delegate:delegate_sync(spawn(BadResponder), Sender), - {error, _} = delegate:delegate_sync(spawn(SecondaryNode, BadResponder), Sender), + {error, _} = delegate:delegate_call(spawn(BadResponder), Sender), + {error, _} = delegate:delegate_call(spawn(SecondaryNode, BadResponder), Sender), LocalGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], RemoteGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], LocalBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], RemoteBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], - GoodRes = delegate:delegate_sync(LocalGoodPids ++ RemoteGoodPids, Sender), + GoodRes = delegate:delegate_call(LocalGoodPids ++ RemoteGoodPids, Sender), [{ok, response, _}, {ok, response, _}, {ok, response, _}, {ok, response, _}] = GoodRes, - BadRes = delegate:delegate_sync(LocalBadPids ++ RemoteBadPids, Sender), + BadRes = delegate:delegate_call(LocalBadPids ++ RemoteBadPids, Sender), [{error, _, _}, {error, _, _}, {error, _, _}, {error, _, _}] = BadRes, -- cgit v1.2.1 From b77ef427b978d2a1ccb78f619bab0e8176edef78 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 12 Apr 2010 11:37:21 +0100 Subject: Port various queue operations over to the new delegate system. Unfortunately this doesn't fix the underlying bug! --- src/delegate.erl | 25 ++++++++++++++++- src/rabbit_amqqueue.erl | 75 ++++++++++++++++++++++--------------------------- 2 files changed, 58 insertions(+), 42 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index c72a7e5a..98075428 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -34,11 +34,16 @@ -behaviour(gen_server2). --export([start_link/1, delegate_cast/2, delegate_call/2, server/1]). +-export([start_link/1, delegate_cast/2, delegate_call/2, + delegate_gs2_call/3, delegate_gs2_pcall/4, + delegate_gs2_cast/2, delegate_gs2_pcast/3, + server/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%%---------------------------------------------------------------------------- + %%---------------------------------------------------------------------------- @@ -46,6 +51,24 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). +delegate_gs2_call(Pid, Msg, Timeout) -> + {Status, Res} = + delegate_call(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end), + Res. + +delegate_gs2_pcall(Pid, Pri, Msg, Timeout) -> + {Status, Res} = + delegate_call(Pid, + fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end), + Res. + +delegate_gs2_cast(Pid, Msg) -> + delegate_cast(Pid, fun(P) -> gen_server2:cast(P, Msg) end). + +delegate_gs2_pcast(Pid, Pri, Msg) -> + delegate_cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). + + delegate_call(Node, Thunk) when is_atom(Node) -> gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ceec00fd..f502b940 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -234,10 +234,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, info, infinity). + delegate:delegate_gs2_pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of + case delegate:delegate_gs2_pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -247,7 +247,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, consumers, infinity). + delegate:delegate_gs2_pcall(QPid, 9, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -256,15 +256,16 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> delegate:delegate_gs2_call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate:delegate_gs2_call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). +purge(#amqqueue{ pid = QPid }) -> + delegate:delegate_gs2_call(QPid, purge, infinity). deliver(QPid, #delivery{immediate = true, txn = Txn, sender = ChPid, message = Message}) -> @@ -279,28 +280,26 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. redeliver(QPid, Messages) -> - gen_server2:cast(QPid, {redeliver, Messages}). + delegate:delegate_gs2_cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). + delegate:delegate_gs2_cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate:delegate_gs2_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> - safe_pmap_ok( + safe_delegate_call_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, QPids). rollback_all(QPids, Txn) -> - safe_pmap_ok( - fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, - QPids). + delegate:delegate_cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end). notify_down_all(QPids, ChPid) -> - safe_pmap_ok( + safe_delegate_call_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, @@ -308,38 +307,35 @@ notify_down_all(QPids, ChPid) -> QPids). limit_all(QPids, ChPid, LimiterPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, - QPids). + delegate:delegate_cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). + delegate:delegate_gs2_call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate:delegate_gs2_call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + delegate:delegate_gs2_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, - infinity). + ok = delegate:delegate_gs2_call(QPid, + {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> - gen_server2:pcast(QPid, 7, {notify_sent, ChPid}). + delegate:delegate_gs2_pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 7, {unblock, ChPid}). + delegate:delegate_gs2_pcast(QPid, 7, {unblock, ChPid}). flush_all(QPids, ChPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end, - QPids). + delegate:delegate_cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). internal_delete(QueueName) -> case @@ -385,17 +381,14 @@ pseudo_queue(QueueName, Pid) -> arguments = [], pid = Pid}. -safe_pmap_ok(H, F, L) -> - case [R || R <- rabbit_misc:upmap( - fun (V) -> - try - rabbit_misc:with_exit_handler( - fun () -> H(V) end, - fun () -> F(V) end) - catch Class:Reason -> {Class, Reason} - end - end, L), - R =/= ok] of +safe_delegate_call_ok(H, F, Pids) -> + case [R || R = {error, _, _} <- delegate:delegate_call( + Pids, + fun (Pid) -> + rabbit_misc:with_exit_handler( + fun () -> H(Pid) end, + fun () -> F(Pid) end) + end)] of [] -> ok; Errors -> {error, Errors} end. -- cgit v1.2.1 From 1bc29585c63c95708a7bc11fdb9a4b0ee3f3387d Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 12 Apr 2010 13:41:01 +0100 Subject: Remove upmap that was causing tests to fail, remove "optimisation" as it's not doing the optimisation we wanted. --- src/delegate.erl | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index 98075428..a8d4ab31 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -52,12 +52,12 @@ start_link(Hash) -> ?MODULE, [], []). delegate_gs2_call(Pid, Msg, Timeout) -> - {Status, Res} = + {_Status, Res} = delegate_call(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end), Res. delegate_gs2_pcall(Pid, Pri, Msg, Timeout) -> - {Status, Res} = + {_Status, Res} = delegate_call(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end), Res. @@ -69,6 +69,8 @@ delegate_gs2_pcast(Pid, Pri, Msg) -> delegate_cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). +% TODO reimplement the single-node optimisation + delegate_call(Node, Thunk) when is_atom(Node) -> gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); @@ -112,18 +114,9 @@ f_pid_node(DelegateFun, FPid) -> DelegateFun(Node, fun() -> FPid(Pid) end) end. -% TODO this only gets called when we are ONLY talking to the local node - can -% we improve this? -delegate_per_node([{Node, Pids}], FPidNode) when Node == node() -> - % optimisation - [[add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids]]; - delegate_per_node(NodePids, FPidNode) -> - rabbit_misc:upmap( - fun ({Node, Pids}) -> - [add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids] - end, - NodePids). + [[add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids] || + {Node, Pids} <- NodePids]. add_pid({Status, Result}, Pid) -> {Status, Result, Pid}; add_pid(Status, Pid) -> {Status, Pid}. -- cgit v1.2.1 From c76f074ad679220dac767aa4000dba0e217b9fdd Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 12 Apr 2010 13:52:57 +0100 Subject: Clean up exported function names. --- src/delegate.erl | 49 ++++++++++++++++++++++++------------------------- src/rabbit_amqqueue.erl | 38 +++++++++++++++++++------------------- src/rabbit_router.erl | 4 ++-- src/rabbit_tests.erl | 22 +++++++++++----------- 4 files changed, 56 insertions(+), 57 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index a8d4ab31..76fd9d72 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -34,9 +34,9 @@ -behaviour(gen_server2). --export([start_link/1, delegate_cast/2, delegate_call/2, - delegate_gs2_call/3, delegate_gs2_pcall/4, - delegate_gs2_cast/2, delegate_gs2_pcast/3, +-export([start_link/1, cast/2, call/2, + gs2_call/3, gs2_pcall/4, + gs2_cast/2, gs2_pcast/3, server/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -51,55 +51,54 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). -delegate_gs2_call(Pid, Msg, Timeout) -> +gs2_call(Pid, Msg, Timeout) -> {_Status, Res} = - delegate_call(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end), + call(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end), Res. -delegate_gs2_pcall(Pid, Pri, Msg, Timeout) -> +gs2_pcall(Pid, Pri, Msg, Timeout) -> {_Status, Res} = - delegate_call(Pid, - fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end), + call(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end), Res. -delegate_gs2_cast(Pid, Msg) -> - delegate_cast(Pid, fun(P) -> gen_server2:cast(P, Msg) end). +gs2_cast(Pid, Msg) -> + cast(Pid, fun(P) -> gen_server2:cast(P, Msg) end). -delegate_gs2_pcast(Pid, Pri, Msg) -> - delegate_cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). +gs2_pcast(Pid, Pri, Msg) -> + cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). % TODO reimplement the single-node optimisation -delegate_call(Node, Thunk) when is_atom(Node) -> +call(Node, Thunk) when is_atom(Node) -> gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); -delegate_call(Pid, FPid) when is_pid(Pid) -> +call(Pid, FPid) when is_pid(Pid) -> [[{Status, Res, _}]] = delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun delegate_call/2, FPid)), + f_pid_node(fun call/2, FPid)), {Status, Res}; -delegate_call(Pids, FPid) when is_list(Pids) -> +call(Pids, FPid) when is_list(Pids) -> lists:flatten( - delegate_per_node(split_per_node(Pids), - f_pid_node(fun delegate_call/2, FPid))). + delegate_per_node(split_delegate_per_node(Pids), + f_pid_node(fun call/2, FPid))). -delegate_cast(Node, Thunk) when is_atom(Node) -> +cast(Node, Thunk) when is_atom(Node) -> gen_server2:cast({server(), Node}, {thunk, Thunk}); -delegate_cast(Pid, FPid) when is_pid(Pid) -> +cast(Pid, FPid) when is_pid(Pid) -> delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun delegate_cast/2, FPid)), + f_pid_node(fun cast/2, FPid)), ok; -delegate_cast(Pids, FPid) when is_list(Pids) -> - delegate_per_node(split_per_node(Pids), - f_pid_node(fun delegate_cast/2, FPid)), +cast(Pids, FPid) when is_list(Pids) -> + delegate_per_node(split_delegate_per_node(Pids), + f_pid_node(fun cast/2, FPid)), ok. %%---------------------------------------------------------------------------- -split_per_node(Pids) -> +split_delegate_per_node(Pids) -> dict:to_list( lists:foldl( fun (Pid, D) -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f502b940..8c8a0e1f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -234,10 +234,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate:delegate_gs2_pcall(QPid, 9, info, infinity). + delegate:gs2_pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case delegate:delegate_gs2_pcall(QPid, 9, {info, Items}, infinity) of + case delegate:gs2_pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -247,7 +247,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate:delegate_gs2_pcall(QPid, 9, consumers, infinity). + delegate:gs2_pcall(QPid, 9, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -256,16 +256,16 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> delegate:delegate_gs2_call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> delegate:gs2_call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - delegate:delegate_gs2_call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate:gs2_call(QPid, {delete, IfUnused, IfEmpty}, infinity). purge(#amqqueue{ pid = QPid }) -> - delegate:delegate_gs2_call(QPid, purge, infinity). + delegate:gs2_call(QPid, purge, infinity). deliver(QPid, #delivery{immediate = true, txn = Txn, sender = ChPid, message = Message}) -> @@ -280,13 +280,13 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. redeliver(QPid, Messages) -> - delegate:delegate_gs2_cast(QPid, {redeliver, Messages}). + delegate:gs2_cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - delegate:delegate_gs2_cast(QPid, {requeue, MsgIds, ChPid}). + delegate:gs2_cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - delegate:delegate_gs2_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate:gs2_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> safe_delegate_call_ok( @@ -295,7 +295,7 @@ commit_all(QPids, Txn) -> QPids). rollback_all(QPids, Txn) -> - delegate:delegate_cast(QPids, + delegate:cast(QPids, fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end). notify_down_all(QPids, ChPid) -> @@ -307,34 +307,34 @@ notify_down_all(QPids, ChPid) -> QPids). limit_all(QPids, ChPid, LimiterPid) -> - delegate:delegate_cast(QPids, + delegate:cast(QPids, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - delegate:delegate_gs2_call(QPid, {claim_queue, ReaderPid}, infinity). + delegate:gs2_call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate:delegate_gs2_call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate:gs2_call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate:delegate_gs2_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + delegate:gs2_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = delegate:delegate_gs2_call(QPid, + ok = delegate:gs2_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, infinity). notify_sent(QPid, ChPid) -> - delegate:delegate_gs2_pcast(QPid, 7, {notify_sent, ChPid}). + delegate:gs2_pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - delegate:delegate_gs2_pcast(QPid, 7, {unblock, ChPid}). + delegate:gs2_pcast(QPid, 7, {unblock, ChPid}). flush_all(QPids, ChPid) -> - delegate:delegate_cast(QPids, + delegate:cast(QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). internal_delete(QueueName) -> @@ -382,7 +382,7 @@ pseudo_queue(QueueName, Pid) -> pid = Pid}. safe_delegate_call_ok(H, F, Pids) -> - case [R || R = {error, _, _} <- delegate:delegate_call( + case [R || R = {error, _, _} <- delegate:call( Pids, fun (Pid) -> rabbit_misc:with_exit_handler( diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 8c4da5f3..5fd5715b 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -56,12 +56,12 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% therefore safe to use a fire-and-forget cast here and return %% the QPids - the semantics is preserved. This scales much better %% than the non-immediate case below. - delegate:delegate_cast(QPids, + delegate:cast(QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; deliver(QPids, Delivery) -> - Res = delegate:delegate_call(QPids, + Res = delegate:call(QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Res), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 297803f9..b5efd2d4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -830,13 +830,13 @@ test_delegates_async() -> end end, - ok = delegate:delegate_cast(spawn(Receiver), Sender), - ok = delegate:delegate_cast(spawn(SecondaryNode, Receiver), Sender), + ok = delegate:cast(spawn(Receiver), Sender), + ok = delegate:cast(spawn(SecondaryNode, Receiver), Sender), await_response(2), LocalPids = [spawn(Receiver) || _ <- lists:seq(1,10)], RemotePids = [spawn(SecondaryNode, Receiver) || _ <- lists:seq(1,10)], - ok = delegate:delegate_cast(LocalPids ++ RemotePids, Sender), + ok = delegate:cast(LocalPids ++ RemotePids, Sender), await_response(20), passed. @@ -855,8 +855,8 @@ await_response(Count) -> test_delegates_sync() -> SecondaryNode = rabbit_misc:makenode("hare"), - {ok, "foo"} = delegate:delegate_call(node(), fun() -> "foo" end), - {ok, "bar"} = delegate:delegate_call(SecondaryNode, fun() -> "bar" end), + {ok, "foo"} = delegate:call(node(), fun() -> "foo" end), + {ok, "bar"} = delegate:call(SecondaryNode, fun() -> "bar" end), Sender = fun(Pid) -> gen_server2:call(Pid, invoked) @@ -882,22 +882,22 @@ test_delegates_sync() -> end end, - {ok, response} = delegate:delegate_call(spawn(Responder), Sender), - {ok, response} = delegate:delegate_call(spawn(SecondaryNode, Responder), Sender), + {ok, response} = delegate:call(spawn(Responder), Sender), + {ok, response} = delegate:call(spawn(SecondaryNode, Responder), Sender), - {error, _} = delegate:delegate_call(spawn(BadResponder), Sender), - {error, _} = delegate:delegate_call(spawn(SecondaryNode, BadResponder), Sender), + {error, _} = delegate:call(spawn(BadResponder), Sender), + {error, _} = delegate:call(spawn(SecondaryNode, BadResponder), Sender), LocalGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], RemoteGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], LocalBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], RemoteBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], - GoodRes = delegate:delegate_call(LocalGoodPids ++ RemoteGoodPids, Sender), + GoodRes = delegate:call(LocalGoodPids ++ RemoteGoodPids, Sender), [{ok, response, _}, {ok, response, _}, {ok, response, _}, {ok, response, _}] = GoodRes, - BadRes = delegate:delegate_call(LocalBadPids ++ RemoteBadPids, Sender), + BadRes = delegate:call(LocalBadPids ++ RemoteBadPids, Sender), [{error, _, _}, {error, _, _}, {error, _, _}, {error, _, _}] = BadRes, -- cgit v1.2.1 From df35f9d75121ca86dcd0ae1e3a8aaabe124549cb Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 15 Apr 2010 10:55:20 +0100 Subject: Don't run delegate tests if there's no second node, remove use of closures since the coverage analyser breaks them. --- src/rabbit_tests.erl | 180 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 107 insertions(+), 73 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b5efd2d4..ed7b2e60 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -61,9 +61,37 @@ all_tests() -> passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), - passed = test_hooks(), - passed = test_delegates_async(), - passed = test_delegates_sync(), + passed = maybe_run_cluster_dependent_tests(), + passed. + + +maybe_run_cluster_dependent_tests() -> + SecondaryNode = rabbit_misc:makenode("hare"), + + case net_adm:ping(SecondaryNode) of + pong -> passed = run_cluster_dependent_tests(SecondaryNode); + pang -> io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]) + end, + passed. + +run_cluster_dependent_tests(SecondaryNode) -> + SecondaryNodeS = atom_to_list(SecondaryNode), + + ok = control_action(stop_app, []), + ok = control_action(reset, []), + ok = control_action(cluster, [SecondaryNodeS]), + ok = control_action(start_app, []), + + io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), + passed = test_delegates_async(SecondaryNode), + passed = test_delegates_sync(SecondaryNode), + + ok = control_action(stop_app, []), + ok = control_action(reset, []), + ok = control_action(cluster, []), + ok = control_action(start_app, []), + passed. test_priority_queue() -> @@ -813,91 +841,48 @@ test_hooks() -> end, passed. -test_delegates_async() -> - SecondaryNode = rabbit_misc:makenode("hare"), - +test_delegates_async(SecondaryNode) -> Self = self(), Sender = fun(Pid) -> Pid ! {invoked, Self} end, - Receiver = fun() -> - receive - {invoked, Pid} -> - Pid ! response, - ok - after 100 -> - io:format("Async message not sent~n"), - throw(timeout) - end - end, - - ok = delegate:cast(spawn(Receiver), Sender), - ok = delegate:cast(spawn(SecondaryNode, Receiver), Sender), - await_response(2), + ok = delegate:cast(spawn(fun async_responder/0), Sender), + ok = delegate:cast(spawn(SecondaryNode, fun async_responder/0), Sender), + async_await_response(2), - LocalPids = [spawn(Receiver) || _ <- lists:seq(1,10)], - RemotePids = [spawn(SecondaryNode, Receiver) || _ <- lists:seq(1,10)], + LocalPids = [spawn(fun async_responder/0) || _ <- lists:seq(1,10)], + RemotePids = + [spawn(SecondaryNode, fun async_responder/0) || _ <- lists:seq(1,10)], ok = delegate:cast(LocalPids ++ RemotePids, Sender), - await_response(20), + async_await_response(20), passed. -await_response(0) -> - ok; - -await_response(Count) -> - receive - response -> ok, - await_response(Count - 1) - after 100 -> - io:format("Async reply not received~n"), - throw(timeout) - end. - -test_delegates_sync() -> - SecondaryNode = rabbit_misc:makenode("hare"), - {ok, "foo"} = delegate:call(node(), fun() -> "foo" end), - {ok, "bar"} = delegate:call(SecondaryNode, fun() -> "bar" end), - - Sender = fun(Pid) -> - gen_server2:call(Pid, invoked) - end, - - Responder = fun() -> - receive - {'$gen_call', From, invoked} -> - gen_server2:reply(From, response) - after 100 -> - io:format("Sync hook not invoked~n"), - throw(timeout) - end - end, +test_delegates_sync(SecondaryNode) -> + {ok, "foo"} = delegate:call(node(), fun sync_simple_result/0), + {ok, "foo"} = delegate:call(SecondaryNode, fun sync_simple_result/0), - BadResponder = fun() -> - receive - {'$gen_call', _From, invoked} -> - throw(exception) - after 100 -> - io:format("Crashing sync hook not invoked~n"), - throw(timeout) - end - end, + {ok, response} = delegate:call(spawn(fun sync_responder/0), + fun sync_sender/1), + {ok, response} = delegate:call(spawn(SecondaryNode, fun sync_responder/0), + fun sync_sender/1), - {ok, response} = delegate:call(spawn(Responder), Sender), - {ok, response} = delegate:call(spawn(SecondaryNode, Responder), Sender), + {error, _} = delegate:call(spawn(fun sync_bad_responder/0), + fun sync_sender/1), + {error, _} = delegate:call(spawn(SecondaryNode, fun sync_bad_responder/0), + fun sync_sender/1), - {error, _} = delegate:call(spawn(BadResponder), Sender), - {error, _} = delegate:call(spawn(SecondaryNode, BadResponder), Sender), + LocalGoodPids = [spawn(fun sync_responder/0) || _ <- lists:seq(1,2)], + RemoteGoodPids = [spawn(fun sync_responder/0) || _ <- lists:seq(1,2)], + LocalBadPids = + [spawn(SecondaryNode, fun sync_bad_responder/0) || _ <- lists:seq(1,2)], + RemoteBadPids = + [spawn(SecondaryNode, fun sync_bad_responder/0) || _ <- lists:seq(1,2)], - LocalGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], - RemoteGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], - LocalBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], - RemoteBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], - - GoodRes = delegate:call(LocalGoodPids ++ RemoteGoodPids, Sender), + GoodRes = delegate:call(LocalGoodPids ++ RemoteGoodPids, fun sync_sender/1), [{ok, response, _}, {ok, response, _}, {ok, response, _}, {ok, response, _}] = GoodRes, - BadRes = delegate:call(LocalBadPids ++ RemoteBadPids, Sender), + BadRes = delegate:call(LocalBadPids ++ RemoteBadPids, fun sync_sender/1), [{error, _, _}, {error, _, _}, {error, _, _}, {error, _, _}] = BadRes, @@ -912,6 +897,55 @@ test_delegates_sync() -> passed. +% Unfortunately we need these to be top level functions, not closures as +% something about them being closures trips up execution under the coverage +% analyser in clustered mode since the analyser only recompiles on one node and +% there appears to be some binary-level incompatibility around closures. + +async_responder() -> + receive + {invoked, Pid} -> + Pid ! response, + ok + after 100 -> + io:format("Async message not sent~n"), + throw(timeout) + end. + +async_await_response(0) -> + ok; + +async_await_response(Count) -> + receive + response -> ok, + async_await_response(Count - 1) + after 100 -> + io:format("Async reply not received~n"), + throw(timeout) + end. + +sync_simple_result() -> "foo". + +sync_sender(Pid) -> + gen_server2:call(Pid, invoked). + +sync_responder() -> + receive + {'$gen_call', From, invoked} -> + gen_server2:reply(From, response) + after 100 -> + io:format("Sync hook not invoked~n"), + throw(timeout) + end. + +sync_bad_responder() -> + receive + {'$gen_call', _From, invoked} -> + throw(exception) + after 100 -> + io:format("Crashing sync hook not invoked~n"), + throw(timeout) + end. %--------------------------------------------------------------------- -- cgit v1.2.1 From 45a4c0b02972218be9fa53e5996cbc77de7a293e Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 15 Apr 2010 12:15:08 +0100 Subject: Java tests need the cluster to be left up if there's more than one node, see bug 22613. --- src/rabbit_tests.erl | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ed7b2e60..f7060b0f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -87,11 +87,6 @@ run_cluster_dependent_tests(SecondaryNode) -> passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), - ok = control_action(stop_app, []), - ok = control_action(reset, []), - ok = control_action(cluster, []), - ok = control_action(start_app, []), - passed. test_priority_queue() -> -- cgit v1.2.1 From adbdd400e3201ca7f2f5b3e59e9aede0b2344ee8 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 15 Apr 2010 17:19:12 +0100 Subject: Allow enabling cover for secondary node. --- Makefile | 4 ++++ src/rabbit_misc.erl | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/Makefile b/Makefile index 5d43ee8a..8457cb31 100644 --- a/Makefile +++ b/Makefile @@ -151,10 +151,14 @@ stop-node: # code coverage will be created for subdirectory "ebin" of COVER_DIR COVER_DIR=. +SECONDARY_NODENAME=hare start-cover: all echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) +start-secondary-cover: + echo "rabbit_misc:enable_cover_node(\"$(SECONDARY_NODENAME)\")." | $(ERL_CALL) + stop-cover: all echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL) cat cover/summary.txt diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9abc1695..011f14d8 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -43,6 +43,7 @@ -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). +-export([enable_cover_node/1]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). @@ -209,6 +210,7 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> lists:flatten(io_lib:format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath])). + enable_cover() -> enable_cover("."). @@ -220,6 +222,10 @@ enable_cover(Root) -> _ -> ok end. +enable_cover_node(NodeS) -> + Node = makenode(NodeS), + {ok, _} = cover:start([Node]). + report_cover() -> report_cover("."). -- cgit v1.2.1 From 185d207aad36516532d47d0ec718078e821a5064 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 15 Apr 2010 18:09:53 +0100 Subject: Make start-secondary-cover actually do something, stop Erlang tests blowing up. --- Makefile | 2 +- src/rabbit_tests.erl | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 8457cb31..690cb955 100644 --- a/Makefile +++ b/Makefile @@ -157,7 +157,7 @@ start-cover: all echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) start-secondary-cover: - echo "rabbit_misc:enable_cover_node(\"$(SECONDARY_NODENAME)\")." | $(ERL_CALL) + echo "rabbit_misc:enable_cover_node(\"$(SECONDARY_NODENAME)\")." | $(ERL_CALL) stop-cover: all echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f7060b0f..58887f2a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -650,8 +650,12 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), %% NB: this will log an inconsistent_database error, which is harmless + %% Turning cover on / off is OK even if we're not in general using cover, + %% it just turns the engine on / off, doesn't actually log anything. + cover:stop([SecondaryNode]), true = disconnect_node(SecondaryNode), pong = net_adm:ping(SecondaryNode), + cover:start([SecondaryNode]), %% leaving a cluster as a ram node ok = control_action(reset, []), -- cgit v1.2.1 From 629d5a84d5b0586a18e9747c643fe7d435e62e06 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 15 Apr 2010 18:39:19 +0100 Subject: Go back to using closures now that we can, refactor a bit. --- src/rabbit_tests.erl | 139 ++++++++++++++++++++++----------------------------- 1 file changed, 61 insertions(+), 78 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 58887f2a..e5e57a68 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -844,44 +844,76 @@ test_delegates_async(SecondaryNode) -> Self = self(), Sender = fun(Pid) -> Pid ! {invoked, Self} end, - ok = delegate:cast(spawn(fun async_responder/0), Sender), - ok = delegate:cast(spawn(SecondaryNode, fun async_responder/0), Sender), - async_await_response(2), + Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end), - LocalPids = [spawn(fun async_responder/0) || _ <- lists:seq(1,10)], - RemotePids = - [spawn(SecondaryNode, fun async_responder/0) || _ <- lists:seq(1,10)], + ok = delegate:cast(spawn(Responder), Sender), + ok = delegate:cast(spawn(SecondaryNode, Responder), Sender), + await_response(2), + + LocalPids = spawn_responders(node(), Responder, 10), + RemotePids = spawn_responders(SecondaryNode, Responder, 10), ok = delegate:cast(LocalPids ++ RemotePids, Sender), - async_await_response(20), + await_response(20), passed. +make_responder(FMsg) -> + fun() -> + receive + Msg -> + FMsg(Msg) + after 100 -> + throw(timeout) + end + end. + +spawn_responders(Node, Responder, Count) -> + [spawn(Node, Responder) || _ <- lists:seq(1, Count)]. + +await_response(0) -> + ok; + +await_response(Count) -> + receive + response -> ok, + await_response(Count - 1) + after 100 -> + io:format("Async reply not received~n"), + throw(timeout) + end. + test_delegates_sync(SecondaryNode) -> - {ok, "foo"} = delegate:call(node(), fun sync_simple_result/0), - {ok, "foo"} = delegate:call(SecondaryNode, fun sync_simple_result/0), - - {ok, response} = delegate:call(spawn(fun sync_responder/0), - fun sync_sender/1), - {ok, response} = delegate:call(spawn(SecondaryNode, fun sync_responder/0), - fun sync_sender/1), - - {error, _} = delegate:call(spawn(fun sync_bad_responder/0), - fun sync_sender/1), - {error, _} = delegate:call(spawn(SecondaryNode, fun sync_bad_responder/0), - fun sync_sender/1), - - LocalGoodPids = [spawn(fun sync_responder/0) || _ <- lists:seq(1,2)], - RemoteGoodPids = [spawn(fun sync_responder/0) || _ <- lists:seq(1,2)], - LocalBadPids = - [spawn(SecondaryNode, fun sync_bad_responder/0) || _ <- lists:seq(1,2)], - RemoteBadPids = - [spawn(SecondaryNode, fun sync_bad_responder/0) || _ <- lists:seq(1,2)], - - GoodRes = delegate:call(LocalGoodPids ++ RemoteGoodPids, fun sync_sender/1), + {ok, "foo"} = delegate:call(node(), fun() -> "foo" end), + {ok, "bar"} = delegate:call(SecondaryNode, fun() -> "bar" end), + + Sender = fun(Pid) -> + gen_server2:call(Pid, invoked) + end, + + Responder = make_responder(fun({'$gen_call', From, invoked}) -> + gen_server2:reply(From, response) + end), + + BadResponder = make_responder(fun({'$gen_call', _From, invoked}) -> + throw(exception) + end), + + {ok, response} = delegate:call(spawn(Responder), Sender), + {ok, response} = delegate:call(spawn(SecondaryNode, Responder), Sender), + + {error, _} = delegate:call(spawn(BadResponder), Sender), + {error, _} = delegate:call(spawn(SecondaryNode, BadResponder), Sender), + + LocalGoodPids = spawn_responders(node(), Responder, 2), + RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2), + LocalBadPids = spawn_responders(node(), BadResponder, 2), + RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2), + + GoodRes = delegate:call(LocalGoodPids ++ RemoteGoodPids, Sender), [{ok, response, _}, {ok, response, _}, {ok, response, _}, {ok, response, _}] = GoodRes, - BadRes = delegate:call(LocalBadPids ++ RemoteBadPids, fun sync_sender/1), + BadRes = delegate:call(LocalBadPids ++ RemoteBadPids, Sender), [{error, _, _}, {error, _, _}, {error, _, _}, {error, _, _}] = BadRes, @@ -896,55 +928,6 @@ test_delegates_sync(SecondaryNode) -> passed. -% Unfortunately we need these to be top level functions, not closures as -% something about them being closures trips up execution under the coverage -% analyser in clustered mode since the analyser only recompiles on one node and -% there appears to be some binary-level incompatibility around closures. - -async_responder() -> - receive - {invoked, Pid} -> - Pid ! response, - ok - after 100 -> - io:format("Async message not sent~n"), - throw(timeout) - end. - -async_await_response(0) -> - ok; - -async_await_response(Count) -> - receive - response -> ok, - async_await_response(Count - 1) - after 100 -> - io:format("Async reply not received~n"), - throw(timeout) - end. - -sync_simple_result() -> "foo". - -sync_sender(Pid) -> - gen_server2:call(Pid, invoked). - -sync_responder() -> - receive - {'$gen_call', From, invoked} -> - gen_server2:reply(From, response) - after 100 -> - io:format("Sync hook not invoked~n"), - throw(timeout) - end. - -sync_bad_responder() -> - receive - {'$gen_call', _From, invoked} -> - throw(exception) - after 100 -> - io:format("Crashing sync hook not invoked~n"), - throw(timeout) - end. %--------------------------------------------------------------------- -- cgit v1.2.1 From 2bfed7413f3678c46d7f01524f596d8098b2f3b5 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 16 Apr 2010 15:16:15 +0100 Subject: Reimplement the local optimistion, refactor quite a bit, fix delegate to actually do fanout properly (oops). --- src/delegate.erl | 64 +++++++++++++++++++++++++++------------------------- src/rabbit_tests.erl | 3 --- 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index 76fd9d72..a7020d9b 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -68,34 +68,29 @@ gs2_pcast(Pid, Pri, Msg) -> cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). -% TODO reimplement the single-node optimisation - -call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); - call(Pid, FPid) when is_pid(Pid) -> - [[{Status, Res, _}]] = delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun call/2, FPid)), + [[{Status, Res, _}]] = call_per_node([{node(Pid), [Pid]}], FPid), {Status, Res}; call(Pids, FPid) when is_list(Pids) -> lists:flatten( - delegate_per_node(split_delegate_per_node(Pids), - f_pid_node(fun call/2, FPid))). + call_per_node(split_delegate_per_node(Pids), FPid)). + +internal_call(Node, Thunk) when is_atom(Node) -> + gen_server2:call({server(), Node}, {thunk, Thunk}, infinity). -cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({server(), Node}, {thunk, Thunk}); cast(Pid, FPid) when is_pid(Pid) -> - delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun cast/2, FPid)), + cast_per_node([{node(Pid), [Pid]}], FPid), ok; cast(Pids, FPid) when is_list(Pids) -> - delegate_per_node(split_delegate_per_node(Pids), - f_pid_node(fun cast/2, FPid)), + cast_per_node(split_delegate_per_node(Pids), FPid), ok. +internal_cast(Node, Thunk) when is_atom(Node) -> + gen_server2:cast({server(), Node}, {thunk, Thunk}). + %%---------------------------------------------------------------------------- split_delegate_per_node(Pids) -> @@ -108,17 +103,22 @@ split_delegate_per_node(Pids) -> end, dict:new(), Pids)). -f_pid_node(DelegateFun, FPid) -> - fun(Pid, Node) -> - DelegateFun(Node, fun() -> FPid(Pid) end) - end. +call_per_node([{Node, Pids}], FPid) when Node == node() -> + local_delegate(Pids, FPid); +call_per_node(NodePids, FPid) -> + delegate_per_node(NodePids, FPid, fun internal_call/2). + +cast_per_node([{Node, Pids}], FPid) when Node == node() -> + local_delegate(Pids, FPid); +cast_per_node(NodePids, FPid) -> + delegate_per_node(NodePids, FPid, fun internal_cast/2). -delegate_per_node(NodePids, FPidNode) -> - [[add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids] || - {Node, Pids} <- NodePids]. +local_delegate(Pids, FPid) -> + [[safe_invoke(FPid, Pid) || Pid <- Pids]]. -add_pid({Status, Result}, Pid) -> {Status, Result, Pid}; -add_pid(Status, Pid) -> {Status, Pid}. +delegate_per_node(NodePids, FPid, DelegateFun) -> + [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) || + {Node, Pids} <- NodePids]. server() -> server(erlang:phash(self(), ?DELEGATE_PROCESSES)). @@ -126,19 +126,21 @@ server() -> server(Hash) -> list_to_atom(string:concat("delegate_process_", integer_to_list(Hash))). +safe_invoke(FPid, Pid) -> + case catch FPid(Pid) of + {'EXIT', Reason} -> + {error, {'EXIT', Reason}, Pid}; + Result -> + {ok, Result, Pid} + end. + %%-------------------------------------------------------------------- init([]) -> {ok, no_state}. handle_call({thunk, Thunk}, _From, State) -> - Res = case catch Thunk() of - {'EXIT', Reason} -> - {error, {'EXIT', Reason}}; - Result -> - {ok, Result} - end, - {reply, Res, State}. + {reply, Thunk(), State}. handle_cast({thunk, Thunk}, State) -> catch Thunk(), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e5e57a68..5ed7d64c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -883,9 +883,6 @@ await_response(Count) -> end. test_delegates_sync(SecondaryNode) -> - {ok, "foo"} = delegate:call(node(), fun() -> "foo" end), - {ok, "bar"} = delegate:call(SecondaryNode, fun() -> "bar" end), - Sender = fun(Pid) -> gen_server2:call(Pid, invoked) end, -- cgit v1.2.1 From 0ee57907ea7a0352fc6e13896e9f10e7b5268fa4 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 16 Apr 2010 15:25:18 +0100 Subject: Don't pass around lists of lists for no reason. --- src/delegate.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index a7020d9b..517f29f2 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -69,12 +69,11 @@ gs2_pcast(Pid, Pri, Msg) -> call(Pid, FPid) when is_pid(Pid) -> - [[{Status, Res, _}]] = call_per_node([{node(Pid), [Pid]}], FPid), + [{Status, Res, _}] = call_per_node([{node(Pid), [Pid]}], FPid), {Status, Res}; call(Pids, FPid) when is_list(Pids) -> - lists:flatten( - call_per_node(split_delegate_per_node(Pids), FPid)). + call_per_node(split_delegate_per_node(Pids), FPid). internal_call(Node, Thunk) when is_atom(Node) -> gen_server2:call({server(), Node}, {thunk, Thunk}, infinity). @@ -114,11 +113,12 @@ cast_per_node(NodePids, FPid) -> delegate_per_node(NodePids, FPid, fun internal_cast/2). local_delegate(Pids, FPid) -> - [[safe_invoke(FPid, Pid) || Pid <- Pids]]. + [safe_invoke(FPid, Pid) || Pid <- Pids]. delegate_per_node(NodePids, FPid, DelegateFun) -> - [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) || - {Node, Pids} <- NodePids]. + lists:flatten( + [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) + || {Node, Pids} <- NodePids]). server() -> server(erlang:phash(self(), ?DELEGATE_PROCESSES)). -- cgit v1.2.1 From c43c070c98705ae9c4d36d8bca83ccd12629604f Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 19 Apr 2010 13:37:32 +0100 Subject: orddict is much faster given we only have a tiny dict. --- src/delegate.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index 517f29f2..5a2a011a 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -93,14 +93,14 @@ internal_cast(Node, Thunk) when is_atom(Node) -> %%---------------------------------------------------------------------------- split_delegate_per_node(Pids) -> - dict:to_list( + orddict:to_list( lists:foldl( fun (Pid, D) -> - dict:update(node(Pid), + orddict:update(node(Pid), fun (Pids1) -> [Pid | Pids1] end, [Pid], D) end, - dict:new(), Pids)). + orddict:new(), Pids)). call_per_node([{Node, Pids}], FPid) when Node == node() -> local_delegate(Pids, FPid); -- cgit v1.2.1 From 8b540eccf13988d118603895fcb5aa0062b38c36 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 21 Apr 2010 17:43:49 +0100 Subject: Cosmetics --- src/rabbit_amqqueue.erl | 5 ++--- src/rabbit_router.erl | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 791a55fe..1dd92403 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -324,9 +324,8 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = delegate:gs2_call(QPid, - {basic_cancel, ChPid, ConsumerTag, OkMsg}, - infinity). + ok = delegate:gs2_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> delegate:gs2_pcast(QPid, 7, {notify_sent, ChPid}). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 5fd5715b..6a886eac 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -110,9 +110,9 @@ lookup_qpids(Queues) -> %%-------------------------------------------------------------------- -fold_deliveries({ok, true, Pid}, {_, Handled}) -> {true, [Pid | Handled]}; -fold_deliveries({ok, false, _}, {_, Handled}) -> {true, Handled}; -fold_deliveries({error, _, _}, {Routed, Handled}) -> {Routed, Handled}. +fold_deliveries({ok, true, Pid},{_, Handled}) -> {true, [Pid|Handled]}; +fold_deliveries({ok, false, _ },{_, Handled}) -> {true, Handled}; +fold_deliveries({error, _ , _ },{Routed, Handled}) -> {Routed, Handled}. %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) check_delivery(true, _ , {false, []}) -> {unroutable, []}; -- cgit v1.2.1 From 8786ed508ec2d10373a4ac36147080a9fc3537ad Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 22 Apr 2010 13:07:13 +0100 Subject: Remove delegate_sup_sup, make delegates restartable on error. --- src/delegate_sup.erl | 2 +- src/rabbit.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index f7a1c586..d9722a11 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -50,7 +50,7 @@ start_link() -> init(_Args) -> {ok, {{one_for_one, 10, 10}, [{delegate:server(Hash), {delegate, start_link, [Hash]}, - temporary, 16#ffffffff, worker, [delegate]} || + transient, 16#ffffffff, worker, [delegate]} || Hash <- lists:seq(1, ?DELEGATE_PROCESSES)]}}. %%-------------------------------------------------------------------- diff --git a/src/rabbit.erl b/src/rabbit.erl index c3698759..e7bff270 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -93,7 +93,7 @@ -rabbit_boot_step({delegate_sup, [{description, "cluster delegate"}, - {mfa, {rabbit_sup, start_restartable_child, + {mfa, {rabbit_sup, start_child, [delegate_sup]}}, {requires, kernel_ready}, {enables, core_initialized}]}). -- cgit v1.2.1 From 44ecde08790dcdc2f9fee001f728394cbb72c119 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 22 Apr 2010 13:10:16 +0100 Subject: Use phash2/2, remove string:concat. --- src/delegate.erl | 4 ++-- src/delegate_sup.erl | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index 5a2a011a..2724736e 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -121,10 +121,10 @@ delegate_per_node(NodePids, FPid, DelegateFun) -> || {Node, Pids} <- NodePids]). server() -> - server(erlang:phash(self(), ?DELEGATE_PROCESSES)). + server(erlang:phash2(self(), ?DELEGATE_PROCESSES)). server(Hash) -> - list_to_atom(string:concat("delegate_process_", integer_to_list(Hash))). + list_to_atom("delegate_process_" ++ integer_to_list(Hash)). safe_invoke(FPid, Pid) -> case catch FPid(Pid) of diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index d9722a11..99a5e4d3 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -51,6 +51,6 @@ init(_Args) -> {ok, {{one_for_one, 10, 10}, [{delegate:server(Hash), {delegate, start_link, [Hash]}, transient, 16#ffffffff, worker, [delegate]} || - Hash <- lists:seq(1, ?DELEGATE_PROCESSES)]}}. + Hash <- lists:seq(0, ?DELEGATE_PROCESSES - 1)]}}. %%-------------------------------------------------------------------- -- cgit v1.2.1 From 38848c3c625f000e92352f4996a8e45c6806f2b1 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 22 Apr 2010 16:41:43 +0100 Subject: Use 2x CPUs as the number of delegate processes. --- include/delegate.hrl | 32 -------------------------------- src/delegate.erl | 24 ++++++++++++++++++------ src/delegate_sup.erl | 3 +-- 3 files changed, 19 insertions(+), 40 deletions(-) delete mode 100644 include/delegate.hrl diff --git a/include/delegate.hrl b/include/delegate.hrl deleted file mode 100644 index 38f8d42f..00000000 --- a/include/delegate.hrl +++ /dev/null @@ -1,32 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --define(DELEGATE_PROCESSES, 10). diff --git a/src/delegate.erl b/src/delegate.erl index 2724736e..03dd06ac 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -30,14 +30,14 @@ %% -module(delegate). --include("delegate.hrl"). +-define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2). -behaviour(gen_server2). -export([start_link/1, cast/2, call/2, gs2_call/3, gs2_pcall/4, gs2_cast/2, gs2_pcast/3, - server/1]). + server/1, process_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -76,7 +76,7 @@ call(Pids, FPid) when is_list(Pids) -> call_per_node(split_delegate_per_node(Pids), FPid). internal_call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({server(), Node}, {thunk, Thunk}, infinity). + gen_server2:call({server(Node), Node}, {thunk, Thunk}, infinity). cast(Pid, FPid) when is_pid(Pid) -> @@ -88,7 +88,7 @@ cast(Pids, FPid) when is_list(Pids) -> ok. internal_cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({server(), Node}, {thunk, Thunk}). + gen_server2:cast({server(Node), Node}, {thunk, Thunk}). %%---------------------------------------------------------------------------- @@ -120,8 +120,8 @@ delegate_per_node(NodePids, FPid, DelegateFun) -> [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) || {Node, Pids} <- NodePids]). -server() -> - server(erlang:phash2(self(), ?DELEGATE_PROCESSES)). +server(Node) when is_atom(Node) -> + server(erlang:phash2(self(), process_count(Node))); server(Hash) -> list_to_atom("delegate_process_" ++ integer_to_list(Hash)). @@ -134,6 +134,18 @@ safe_invoke(FPid, Pid) -> {ok, Result, Pid} end. +process_count(Node) -> + case get({process_count, Node}) of + undefined -> + Count = rpc:call(Node, delegate, process_count, []), + put({process_count, Node}, Count), + Count; + Count -> Count + end. + +process_count() -> + ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers). + %%-------------------------------------------------------------------- init([]) -> diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 99a5e4d3..dd3d0eef 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -30,7 +30,6 @@ %% -module(delegate_sup). --include("delegate.hrl"). -behaviour(supervisor). @@ -51,6 +50,6 @@ init(_Args) -> {ok, {{one_for_one, 10, 10}, [{delegate:server(Hash), {delegate, start_link, [Hash]}, transient, 16#ffffffff, worker, [delegate]} || - Hash <- lists:seq(0, ?DELEGATE_PROCESSES - 1)]}}. + Hash <- lists:seq(0, delegate:process_count() - 1)]}}. %%-------------------------------------------------------------------- -- cgit v1.2.1 From 4e763135f0b562a9b7356ff4d62a10a1d9adc0ef Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 22 Apr 2010 16:56:42 +0100 Subject: If the other node is down and we're only going to delegate:cast anyway we don't want to log an error, so handle badrpc by ignoring it. --- src/delegate.erl | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index 03dd06ac..68739324 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -137,9 +137,14 @@ safe_invoke(FPid, Pid) -> process_count(Node) -> case get({process_count, Node}) of undefined -> - Count = rpc:call(Node, delegate, process_count, []), - put({process_count, Node}, Count), - Count; + case rpc:call(Node, delegate, process_count, []) of + {badrpc, _} -> + 1; % Have to return something, if we're just casting then + % we don't want to blow up + Count -> + put({process_count, Node}, Count), + Count + end; Count -> Count end. -- cgit v1.2.1 From 05f24d96759eeb8f1c54996e8106d203dddf9b69 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 22 Apr 2010 18:29:25 +0100 Subject: Add / fix specs, and remove some unreachable code. --- src/delegate.erl | 19 +++++++++++++++++++ src/delegate_sup.erl | 8 ++++++++ src/rabbit_amqqueue.erl | 2 +- src/rabbit_channel.erl | 11 ++++------- 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/delegate.erl b/src/delegate.erl index 68739324..f3c3f097 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -44,6 +44,25 @@ %%---------------------------------------------------------------------------- +-ifdef(use_specs). + +-type(serverref() :: atom() | {atom(), atom()} | {'global', term()} | pid()). + +-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}). +-spec(cast/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). +-spec(call/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). + +-spec(gs2_call/3 :: + (serverref(), any(), non_neg_integer() | 'infinity') -> any()). +-spec(gs2_pcall/4 :: + (serverref(), number(), any(), non_neg_integer() | 'infinity') -> any()). +-spec(gs2_cast/2 :: (serverref(), any()) -> 'ok'). +-spec(gs2_pcast/3 :: (serverref(), number(), any()) -> 'ok'). + +-spec(server/1 :: (node() | non_neg_integer()) -> atom()). +-spec(process_count/0 :: () -> non_neg_integer()). + +-endif. %%---------------------------------------------------------------------------- diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index dd3d0eef..1f351406 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -41,6 +41,14 @@ %%---------------------------------------------------------------------------- +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1dd92403..efa62e1b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -92,7 +92,7 @@ -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). --spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7d3cd722..1f16ec08 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -944,13 +944,10 @@ internal_rollback(State = #ch{transaction_id = TxnKey, [self(), queue:len(UAQ), queue:len(UAMQ)]), - case rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey, self()) of - ok -> NewUAMQ = queue:join(UAQ, UAMQ), - new_tx(State#ch{unacked_message_q = NewUAMQ}); - {error, Errors} -> rabbit_misc:protocol_error( - internal_error, "rollback failed: ~w", [Errors]) - end. + ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants), + TxnKey, self()), + NewUAMQ = queue:join(UAQ, UAMQ), + new_tx(State#ch{unacked_message_q = NewUAMQ}). rollback_and_notify(State = #ch{transaction_id = none}) -> notify_queues(State); -- cgit v1.2.1