summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-27 13:18:37 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-27 13:18:37 +0100
commit58a35a817260cb41dec2236e52ec3e27a906cc9c (patch)
treed36a6ed3274a5719466a2661af9a7dd4d5b53c96
parentc0991485aa85e0211d16cac24b07d8fa1432acbe (diff)
parent305dc5866712547c982b4567774ce1b9910fb335 (diff)
downloadrabbitmq-server-58a35a817260cb41dec2236e52ec3e27a906cc9c.tar.gz
Merging default into bug 19844
-rw-r--r--Makefile3
-rw-r--r--src/delegate.erl194
-rw-r--r--src/delegate_sup.erl63
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue.erl74
-rw-r--r--src/rabbit_channel.erl11
-rw-r--r--src/rabbit_misc.erl1
-rw-r--r--src/rabbit_router.erl127
-rw-r--r--src/rabbit_tests.erl113
9 files changed, 430 insertions, 164 deletions
diff --git a/Makefile b/Makefile
index 2b08e071..3d39ccb0 100644
--- a/Makefile
+++ b/Makefile
@@ -167,6 +167,9 @@ start-cover: all
echo "rabbit_misc:start_cover([\"rabbit\", \"hare\"])." | $(ERL_CALL)
echo "rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
+start-secondary-cover: all
+ echo "rabbit_misc:start_cover([\"hare\"])." | $(ERL_CALL)
+
stop-cover: all
echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL)
cat cover/summary.txt
diff --git a/src/delegate.erl b/src/delegate.erl
new file mode 100644
index 00000000..f3c3f097
--- /dev/null
+++ b/src/delegate.erl
@@ -0,0 +1,194 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(delegate).
+-define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2).
+
+-behaviour(gen_server2).
+
+-export([start_link/1, cast/2, call/2,
+ gs2_call/3, gs2_pcall/4,
+ gs2_cast/2, gs2_pcast/3,
+ server/1, process_count/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(serverref() :: atom() | {atom(), atom()} | {'global', term()} | pid()).
+
+-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}).
+-spec(cast/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
+-spec(call/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
+
+-spec(gs2_call/3 ::
+ (serverref(), any(), non_neg_integer() | 'infinity') -> any()).
+-spec(gs2_pcall/4 ::
+ (serverref(), number(), any(), non_neg_integer() | 'infinity') -> any()).
+-spec(gs2_cast/2 :: (serverref(), any()) -> 'ok').
+-spec(gs2_pcast/3 :: (serverref(), number(), any()) -> 'ok').
+
+-spec(server/1 :: (node() | non_neg_integer()) -> atom()).
+-spec(process_count/0 :: () -> non_neg_integer()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link(Hash) ->
+ gen_server2:start_link({local, server(Hash)},
+ ?MODULE, [], []).
+
+gs2_call(Pid, Msg, Timeout) ->
+ {_Status, Res} =
+ call(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end),
+ Res.
+
+gs2_pcall(Pid, Pri, Msg, Timeout) ->
+ {_Status, Res} =
+ call(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end),
+ Res.
+
+gs2_cast(Pid, Msg) ->
+ cast(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
+
+gs2_pcast(Pid, Pri, Msg) ->
+ cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
+
+
+call(Pid, FPid) when is_pid(Pid) ->
+ [{Status, Res, _}] = call_per_node([{node(Pid), [Pid]}], FPid),
+ {Status, Res};
+
+call(Pids, FPid) when is_list(Pids) ->
+ call_per_node(split_delegate_per_node(Pids), FPid).
+
+internal_call(Node, Thunk) when is_atom(Node) ->
+ gen_server2:call({server(Node), Node}, {thunk, Thunk}, infinity).
+
+
+cast(Pid, FPid) when is_pid(Pid) ->
+ cast_per_node([{node(Pid), [Pid]}], FPid),
+ ok;
+
+cast(Pids, FPid) when is_list(Pids) ->
+ cast_per_node(split_delegate_per_node(Pids), FPid),
+ ok.
+
+internal_cast(Node, Thunk) when is_atom(Node) ->
+ gen_server2:cast({server(Node), Node}, {thunk, Thunk}).
+
+%%----------------------------------------------------------------------------
+
+split_delegate_per_node(Pids) ->
+ orddict:to_list(
+ lists:foldl(
+ fun (Pid, D) ->
+ orddict:update(node(Pid),
+ fun (Pids1) -> [Pid | Pids1] end,
+ [Pid], D)
+ end,
+ orddict:new(), Pids)).
+
+call_per_node([{Node, Pids}], FPid) when Node == node() ->
+ local_delegate(Pids, FPid);
+call_per_node(NodePids, FPid) ->
+ delegate_per_node(NodePids, FPid, fun internal_call/2).
+
+cast_per_node([{Node, Pids}], FPid) when Node == node() ->
+ local_delegate(Pids, FPid);
+cast_per_node(NodePids, FPid) ->
+ delegate_per_node(NodePids, FPid, fun internal_cast/2).
+
+local_delegate(Pids, FPid) ->
+ [safe_invoke(FPid, Pid) || Pid <- Pids].
+
+delegate_per_node(NodePids, FPid, DelegateFun) ->
+ lists:flatten(
+ [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end)
+ || {Node, Pids} <- NodePids]).
+
+server(Node) when is_atom(Node) ->
+ server(erlang:phash2(self(), process_count(Node)));
+
+server(Hash) ->
+ list_to_atom("delegate_process_" ++ integer_to_list(Hash)).
+
+safe_invoke(FPid, Pid) ->
+ case catch FPid(Pid) of
+ {'EXIT', Reason} ->
+ {error, {'EXIT', Reason}, Pid};
+ Result ->
+ {ok, Result, Pid}
+ end.
+
+process_count(Node) ->
+ case get({process_count, Node}) of
+ undefined ->
+ case rpc:call(Node, delegate, process_count, []) of
+ {badrpc, _} ->
+ 1; % Have to return something, if we're just casting then
+ % we don't want to blow up
+ Count ->
+ put({process_count, Node}, Count),
+ Count
+ end;
+ Count -> Count
+ end.
+
+process_count() ->
+ ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers).
+
+%%--------------------------------------------------------------------
+
+init([]) ->
+ {ok, no_state}.
+
+handle_call({thunk, Thunk}, _From, State) ->
+ {reply, Thunk(), State}.
+
+handle_cast({thunk, Thunk}, State) ->
+ catch Thunk(),
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
new file mode 100644
index 00000000..1f351406
--- /dev/null
+++ b/src/delegate_sup.erl
@@ -0,0 +1,63 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(delegate_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%%--------------------------------------------------------------------
+
+init(_Args) ->
+ {ok, {{one_for_one, 10, 10},
+ [{delegate:server(Hash), {delegate, start_link, [Hash]},
+ transient, 16#ffffffff, worker, [delegate]} ||
+ Hash <- lists:seq(0, delegate:process_count() - 1)]}}.
+
+%%--------------------------------------------------------------------
diff --git a/src/rabbit.erl b/src/rabbit.erl
index bbda29c9..40f38b3b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -99,10 +99,10 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
--rabbit_boot_step({rabbit_router,
- [{description, "cluster router"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_router]}},
+-rabbit_boot_step({delegate_sup,
+ [{description, "cluster delegate"},
+ {mfa, {rabbit_sup, start_child,
+ [delegate_sup]}},
{requires, kernel_ready},
{enables, core_initialized}]}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5f045b27..23a4932a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -91,7 +91,7 @@
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
--spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
+-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
@@ -225,10 +225,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server2:pcall(QPid, 9, info, infinity).
+ delegate:gs2_pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
+ case delegate:gs2_pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -238,7 +238,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- gen_server2:pcall(QPid, 9, consumers, infinity).
+ delegate:gs2_pcall(QPid, 9, consumers, infinity).
consumers_all(VHostPath) ->
lists:concat(
@@ -247,15 +247,16 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
+stat(#amqqueue{pid = QPid}) -> delegate:gs2_call(QPid, stat, infinity).
stat_all() ->
lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
+ delegate:gs2_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
-purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
+purge(#amqqueue{ pid = QPid }) ->
+ delegate:gs2_call(QPid, purge, infinity).
deliver(QPid, #delivery{immediate = true,
txn = Txn, sender = ChPid, message = Message}) ->
@@ -270,25 +271,23 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
+ delegate:gs2_cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+ delegate:gs2_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn, ChPid) ->
- safe_pmap_ok(
+ safe_delegate_call_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
QPids).
rollback_all(QPids, Txn, ChPid) ->
- safe_pmap_ok(
- fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end,
- QPids).
+ delegate:cast(QPids,
+ fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end).
notify_down_all(QPids, ChPid) ->
- safe_pmap_ok(
+ safe_delegate_call_ok(
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
@@ -296,38 +295,34 @@ notify_down_all(QPids, ChPid) ->
QPids).
limit_all(QPids, ChPid, LimiterPid) ->
- safe_pmap_ok(
- fun (_) -> ok end,
- fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
- QPids).
+ delegate:cast(QPids,
+ fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
+ delegate:gs2_call(QPid, {claim_queue, ReaderPid}, infinity).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
+ delegate:gs2_call(QPid, {basic_get, ChPid, NoAck}, infinity).
basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ delegate:gs2_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
- infinity).
+ ok = delegate:gs2_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:pcast(QPid, 7, {notify_sent, ChPid}).
+ delegate:gs2_pcast(QPid, 7, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:pcast(QPid, 7, {unblock, ChPid}).
+ delegate:gs2_pcast(QPid, 7, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
- safe_pmap_ok(
- fun (_) -> ok end,
- fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end,
- QPids).
+ delegate:cast(QPids,
+ fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
internal_delete(QueueName) ->
case
@@ -373,17 +368,14 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
-safe_pmap_ok(H, F, L) ->
- case [R || R <- rabbit_misc:upmap(
- fun (V) ->
- try
- rabbit_misc:with_exit_handler(
- fun () -> H(V) end,
- fun () -> F(V) end)
- catch Class:Reason -> {Class, Reason}
- end
- end, L),
- R =/= ok] of
+safe_delegate_call_ok(H, F, Pids) ->
+ case [R || R = {error, _, _} <- delegate:call(
+ Pids,
+ fun (Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> H(Pid) end,
+ fun () -> F(Pid) end)
+ end)] of
[] -> ok;
Errors -> {error, Errors}
end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7d3cd722..1f16ec08 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -944,13 +944,10 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
[self(),
queue:len(UAQ),
queue:len(UAMQ)]),
- case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
- TxnKey, self()) of
- ok -> NewUAMQ = queue:join(UAQ, UAMQ),
- new_tx(State#ch{unacked_message_q = NewUAMQ});
- {error, Errors} -> rabbit_misc:protocol_error(
- internal_error, "rollback failed: ~w", [Errors])
- end.
+ ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants),
+ TxnKey, self()),
+ NewUAMQ = queue:join(UAQ, UAMQ),
+ new_tx(State#ch{unacked_message_q = NewUAMQ}).
rollback_and_notify(State = #ch{transaction_id = none}) ->
notify_queues(State);
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2c180846..119808af 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -219,6 +219,7 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
lists:flatten(io_lib:format("~s '~s' in vhost '~s'",
[Kind, Name, VHostPath])).
+
enable_cover() ->
enable_cover(".").
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index a449e19e..6a886eac 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -33,100 +33,40 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--behaviour(gen_server2).
-
--export([start_link/0,
- deliver/2,
+-export([deliver/2,
match_bindings/2,
match_routing_key/2]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--define(SERVER, ?MODULE).
-
-%% cross-node routing optimisation is disabled because of bug 19758.
--define(BUG19758, true).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-
--ifdef(BUG19758).
-
-deliver(QPids, Delivery) ->
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- run_bindings(QPids, Delivery)).
-
--else.
-
-deliver(QPids, Delivery) ->
- %% we reduce inter-node traffic by grouping the qpids by node and
- %% only delivering one copy of the message to each node involved,
- %% which then in turn delivers it to its queues.
- deliver_per_node(
- dict:to_list(
- lists:foldl(fun (QPid, D) ->
- rabbit_misc:dict_cons(node(QPid), QPid, D)
- end, dict:new(), QPids)),
- Delivery).
-
-deliver_per_node([{Node, QPids}], Delivery) when Node == node() ->
- %% optimisation
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- run_bindings(QPids, Delivery));
-deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
+deliver(QPids, Delivery = #delivery{mandatory = false,
+ immediate = false}) ->
%% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver in run_bindings below will deliver the
+ %% rabbit_amqqueue:deliver will deliver the
%% message to the queue process asynchronously, and return true,
%% which means all the QPids will always be returned. It is
%% therefore safe to use a fire-and-forget cast here and return
%% the QPids - the semantics is preserved. This scales much better
%% than the non-immediate case below.
- {routed,
- lists:flatmap(
- fun ({Node, QPids}) ->
- gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}),
- QPids
- end,
- NodeQPids)};
-deliver_per_node(NodeQPids, Delivery) ->
- R = rabbit_misc:upmap(
- fun ({Node, QPids}) ->
- try gen_server2:call({?SERVER, Node},
- {deliver, QPids, Delivery},
- infinity)
- catch
- _Class:_Reason ->
- %% TODO: figure out what to log (and do!) here
- {false, []}
- end
- end,
- NodeQPids),
+ delegate:cast(QPids,
+ fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
+ {routed, QPids};
+
+deliver(QPids, Delivery) ->
+ Res = delegate:call(QPids,
+ fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{Routed, Handled} =
- lists:foldl(fun ({Routed, Handled}, {RoutedAcc, HandledAcc}) ->
- {Routed or RoutedAcc,
- %% we do the concatenation below, which
- %% should be faster
- [Handled | HandledAcc]}
- end,
- {false, []},
- R),
+ lists:foldl(fun fold_deliveries/2, {false, []}, Res),
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- {Routed, lists:append(Handled)}).
-
--endif.
+ {Routed, Handled}).
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
@@ -170,44 +110,9 @@ lookup_qpids(Queues) ->
%%--------------------------------------------------------------------
-init([]) ->
- {ok, no_state}.
-
-handle_call({deliver, QPids, Delivery}, From, State) ->
- spawn(
- fun () ->
- R = run_bindings(QPids, Delivery),
- gen_server2:reply(From, R)
- end),
- {noreply, State}.
-
-handle_cast({deliver, QPids, Delivery}, State) ->
- %% in order to preserve message ordering we must not spawn here
- run_bindings(QPids, Delivery),
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-
-run_bindings(QPids, Delivery) ->
- lists:foldl(
- fun (QPid, {Routed, Handled}) ->
- case catch rabbit_amqqueue:deliver(QPid, Delivery) of
- true -> {true, [QPid | Handled]};
- false -> {true, Handled};
- {'EXIT', _Reason} -> {Routed, Handled}
- end
- end,
- {false, []},
- QPids).
+fold_deliveries({ok, true, Pid},{_, Handled}) -> {true, [Pid|Handled]};
+fold_deliveries({ok, false, _ },{_, Handled}) -> {true, Handled};
+fold_deliveries({error, _ , _ },{Routed, Handled}) -> {Routed, Handled}.
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
check_delivery(true, _ , {false, []}) -> {unroutable, []};
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d645d183..5ed7d64c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -61,7 +61,32 @@ all_tests() ->
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
- passed = test_hooks(),
+ passed = maybe_run_cluster_dependent_tests(),
+ passed.
+
+
+maybe_run_cluster_dependent_tests() ->
+ SecondaryNode = rabbit_misc:makenode("hare"),
+
+ case net_adm:ping(SecondaryNode) of
+ pong -> passed = run_cluster_dependent_tests(SecondaryNode);
+ pang -> io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode])
+ end,
+ passed.
+
+run_cluster_dependent_tests(SecondaryNode) ->
+ SecondaryNodeS = atom_to_list(SecondaryNode),
+
+ ok = control_action(stop_app, []),
+ ok = control_action(reset, []),
+ ok = control_action(cluster, [SecondaryNodeS]),
+ ok = control_action(start_app, []),
+
+ io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
+ passed = test_delegates_async(SecondaryNode),
+ passed = test_delegates_sync(SecondaryNode),
+
passed.
test_priority_queue() ->
@@ -815,6 +840,92 @@ test_hooks() ->
end,
passed.
+test_delegates_async(SecondaryNode) ->
+ Self = self(),
+ Sender = fun(Pid) -> Pid ! {invoked, Self} end,
+
+ Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end),
+
+ ok = delegate:cast(spawn(Responder), Sender),
+ ok = delegate:cast(spawn(SecondaryNode, Responder), Sender),
+ await_response(2),
+
+ LocalPids = spawn_responders(node(), Responder, 10),
+ RemotePids = spawn_responders(SecondaryNode, Responder, 10),
+ ok = delegate:cast(LocalPids ++ RemotePids, Sender),
+ await_response(20),
+
+ passed.
+
+make_responder(FMsg) ->
+ fun() ->
+ receive
+ Msg ->
+ FMsg(Msg)
+ after 100 ->
+ throw(timeout)
+ end
+ end.
+
+spawn_responders(Node, Responder, Count) ->
+ [spawn(Node, Responder) || _ <- lists:seq(1, Count)].
+
+await_response(0) ->
+ ok;
+
+await_response(Count) ->
+ receive
+ response -> ok,
+ await_response(Count - 1)
+ after 100 ->
+ io:format("Async reply not received~n"),
+ throw(timeout)
+ end.
+
+test_delegates_sync(SecondaryNode) ->
+ Sender = fun(Pid) ->
+ gen_server2:call(Pid, invoked)
+ end,
+
+ Responder = make_responder(fun({'$gen_call', From, invoked}) ->
+ gen_server2:reply(From, response)
+ end),
+
+ BadResponder = make_responder(fun({'$gen_call', _From, invoked}) ->
+ throw(exception)
+ end),
+
+ {ok, response} = delegate:call(spawn(Responder), Sender),
+ {ok, response} = delegate:call(spawn(SecondaryNode, Responder), Sender),
+
+ {error, _} = delegate:call(spawn(BadResponder), Sender),
+ {error, _} = delegate:call(spawn(SecondaryNode, BadResponder), Sender),
+
+ LocalGoodPids = spawn_responders(node(), Responder, 2),
+ RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
+ LocalBadPids = spawn_responders(node(), BadResponder, 2),
+ RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2),
+
+ GoodRes = delegate:call(LocalGoodPids ++ RemoteGoodPids, Sender),
+ [{ok, response, _}, {ok, response, _},
+ {ok, response, _}, {ok, response, _}] = GoodRes,
+
+ BadRes = delegate:call(LocalBadPids ++ RemoteBadPids, Sender),
+ [{error, _, _}, {error, _, _},
+ {error, _, _}, {error, _, _}] = BadRes,
+
+ GoodResPids = [Pid || {_, _, Pid} <- GoodRes],
+ BadResPids = [Pid || {_, _, Pid} <- BadRes],
+
+ Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids),
+ Good = ordsets:from_list(GoodResPids),
+
+ Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids),
+ Bad = ordsets:from_list(BadResPids),
+
+ passed.
+
+
%---------------------------------------------------------------------
control_action(Command, Args) -> control_action(Command, node(), Args).