summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-24 15:52:01 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-24 15:52:01 +0100
commit4040998294bd2d7e3e5e7af892c088af77343801 (patch)
tree36623a90601d2d7c2122c1243cf759eb440bedc2
parent0157f0ccfc43f7b82f0791e6d3d68b9af632a634 (diff)
downloadrabbitmq-server-4040998294bd2d7e3e5e7af892c088af77343801.tar.gz
migrated amqqueue to prioritisers
-rw-r--r--src/rabbit_amqqueue.erl36
-rw-r--r--src/rabbit_amqqueue_process.erl26
2 files changed, 40 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0cdb4fff..abcb87d7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -331,10 +331,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- delegate_pcall(QPid, 9, info, infinity).
+ delegate_call(QPid, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_pcall(QPid, 9, {info, Items}, infinity) of
+ case delegate_call(QPid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -344,7 +344,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- delegate_pcall(QPid, 9, consumers, infinity).
+ delegate_call(QPid, consumers, infinity).
consumers_all(VHostPath) ->
lists:concat(
@@ -356,7 +356,7 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
- delegate_pcast(QPid, 7, emit_stats).
+ delegate_cast(QPid, emit_stats).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -379,10 +379,10 @@ requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
ack(QPid, Txn, MsgIds, ChPid) ->
- delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+ delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}).
+ delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
@@ -418,10 +418,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- delegate_pcast(QPid, 7, {notify_sent, ChPid}).
+ delegate_cast(QPid, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- delegate_pcast(QPid, 7, {unblock, ChPid}).
+ delegate_cast(QPid, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
@@ -451,20 +451,19 @@ internal_delete(QueueName) ->
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun},
- infinity).
+ gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
update_ram_duration(QPid) ->
- gen_server2:pcast(QPid, 8, update_ram_duration).
+ gen_server2:cast(QPid, update_ram_duration).
set_ram_duration_target(QPid, Duration) ->
- gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}).
+ gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
set_maximum_since_use(QPid, Age) ->
- gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(QPid, {set_maximum_since_use, Age}).
maybe_expire(QPid) ->
- gen_server2:pcast(QPid, 8, maybe_expire).
+ gen_server2:cast(QPid, maybe_expire).
on_node_down(Node) ->
[Hook() ||
@@ -504,11 +503,6 @@ safe_delegate_call_ok(F, Pids) ->
delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
-delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid,
- fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
-
-delegate_pcast(Pid, Pri, Msg) ->
- delegate:invoke_no_result(Pid,
- fun (P) -> gen_server2:pcast(P, Pri, Msg) end).
+delegate_cast(Pid, Msg) ->
+ delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2cab7136..97a264d7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -42,7 +42,8 @@
-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2]).
-import(queue).
-import(erlang).
@@ -592,6 +593,29 @@ emit_stats(State) ->
%---------------------------------------------------------------------------
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {maybe_run_queue_via_backing_queue, _Fun} -> 6;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ _ -> 0
+ end.
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);