summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl96
1 files changed, 54 insertions, 42 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 382810c3..eb076e94 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,13 +37,13 @@
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([notify_sent/2, unblock/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
--import(gen_server).
+-import(gen_server2).
-import(lists).
-import(queue).
@@ -91,15 +91,17 @@
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
--spec(basic_consume/7 ::
- (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) ->
+-spec(basic_consume/8 ::
+ (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -130,7 +132,7 @@ recover_durable_queues() ->
%% re-created it).
case rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:match_object(
- durable_queues, RecoveredQ, read) of
+ rabbit_durable_queue, RecoveredQ, read) of
[_] -> ok = store_queue(Q),
true;
[] -> false
@@ -144,7 +146,7 @@ recover_durable_queues() ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(durable_queues),
+ <- mnesia:table(rabbit_durable_queue),
node(Pid) == Node]))
end)),
ok.
@@ -157,7 +159,7 @@ declare(QueueName, Durable, AutoDelete, Args) ->
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
+ case mnesia:wread({rabbit_queue, QueueName}) of
[] -> ok = store_queue(Q),
ok = add_default_binding(Q),
Q;
@@ -170,11 +172,11 @@ declare(QueueName, Durable, AutoDelete, Args) ->
end.
store_queue(Q = #amqqueue{durable = true}) ->
- ok = mnesia:write(durable_queues, Q, write),
- ok = mnesia:write(Q),
+ ok = mnesia:write(rabbit_durable_queue, Q, write),
+ ok = mnesia:write(rabbit_queue, Q, write),
ok;
store_queue(Q = #amqqueue{durable = false}) ->
- ok = mnesia:write(Q),
+ ok = mnesia:write(rabbit_queue, Q, write),
ok.
start_queue_process(Q) ->
@@ -188,7 +190,7 @@ add_default_binding(#amqqueue{name = QueueName}) ->
ok.
lookup(Name) ->
- rabbit_misc:dirty_read({amqqueue, Name}).
+ rabbit_misc:dirty_read({rabbit_queue, Name}).
with(Name, F, E) ->
case lookup(Name) of
@@ -205,15 +207,16 @@ with_or_die(Name, F) ->
list(VHostPath) ->
mnesia:dirty_match_object(
+ rabbit_queue,
#amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}).
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server:call(QPid, info).
+ gen_server2:pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server:call(QPid, {info, Items}) of
+ case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -222,82 +225,91 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
-stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->
- lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)).
+ lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- gen_server:call(QPid, {delete, IfUnused, IfEmpty}).
+ gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
-purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
deliver(_IsMandatory, true, Txn, Message, QPid) ->
- gen_server:call(QPid, {deliver_immediately, Txn, Message});
+ gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
deliver(true, _IsImmediate, Txn, Message, QPid) ->
- gen_server:call(QPid, {deliver, Txn, Message}),
+ gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
true;
deliver(false, _IsImmediate, Txn, Message, QPid) ->
- gen_server:cast(QPid, {deliver, Txn, Message}),
+ gen_server2:cast(QPid, {deliver, Txn, Message}),
true.
redeliver(QPid, Messages) ->
- gen_server:cast(QPid, {redeliver, Messages}).
+ gen_server2:cast(QPid, {redeliver, Messages}).
requeue(QPid, MsgIds, ChPid) ->
- gen_server:cast(QPid, {requeue, MsgIds, ChPid}).
+ gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}).
+ gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
QPids).
rollback_all(QPids, Txn) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
+ fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end,
QPids).
notify_down_all(QPids, ChPid) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
- fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
+limit_all(QPids, ChPid, LimiterPid) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
+ QPids).
+
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server:call(QPid, {claim_queue, ReaderPid}).
+ gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- gen_server:call(QPid, {basic_get, ChPid, NoAck}).
+ gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- ConsumerTag, ExclusiveConsume, OkMsg}).
+ gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
+ infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ infinity).
notify_sent(QPid, ChPid) ->
- gen_server:cast(QPid, {notify_sent, ChPid}).
+ gen_server2:cast(QPid, {notify_sent, ChPid}).
+
+unblock(QPid, ChPid) ->
+ gen_server2:cast(QPid, {unblock, ChPid}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
+ case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[_] ->
ok = rabbit_exchange:delete_queue_bindings(QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
- ok = mnesia:delete({durable_queues, QueueName}),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
ok
end
end).
@@ -309,12 +321,12 @@ on_node_down(Node) ->
fun (QueueName, Acc) ->
ok = rabbit_exchange:delete_transient_queue_bindings(
QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
Acc
end,
ok,
qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(amqqueue),
+ <- mnesia:table(rabbit_queue),
node(Pid) == Node]))
end).