diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 96 |
1 files changed, 54 insertions, 42 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 382810c3..eb076e94 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,13 +37,13 @@ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([notify_sent/2, unblock/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). --import(gen_server). +-import(gen_server2). -import(lists). -import(queue). @@ -91,15 +91,17 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/7 :: - (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/8 :: + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -130,7 +132,7 @@ recover_durable_queues() -> %% re-created it). case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:match_object( - durable_queues, RecoveredQ, read) of + rabbit_durable_queue, RecoveredQ, read) of [_] -> ok = store_queue(Q), true; [] -> false @@ -144,7 +146,7 @@ recover_durable_queues() -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(durable_queues), + <- mnesia:table(rabbit_durable_queue), node(Pid) == Node])) end)), ok. @@ -157,7 +159,7 @@ declare(QueueName, Durable, AutoDelete, Args) -> pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QueueName}) of + case mnesia:wread({rabbit_queue, QueueName}) of [] -> ok = store_queue(Q), ok = add_default_binding(Q), Q; @@ -170,11 +172,11 @@ declare(QueueName, Durable, AutoDelete, Args) -> end. store_queue(Q = #amqqueue{durable = true}) -> - ok = mnesia:write(durable_queues, Q, write), - ok = mnesia:write(Q), + ok = mnesia:write(rabbit_durable_queue, Q, write), + ok = mnesia:write(rabbit_queue, Q, write), ok; store_queue(Q = #amqqueue{durable = false}) -> - ok = mnesia:write(Q), + ok = mnesia:write(rabbit_queue, Q, write), ok. start_queue_process(Q) -> @@ -188,7 +190,7 @@ add_default_binding(#amqqueue{name = QueueName}) -> ok. lookup(Name) -> - rabbit_misc:dirty_read({amqqueue, Name}). + rabbit_misc:dirty_read({rabbit_queue, Name}). with(Name, F, E) -> case lookup(Name) of @@ -205,15 +207,16 @@ with_or_die(Name, F) -> list(VHostPath) -> mnesia:dirty_match_object( + rabbit_queue, #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server:call(QPid, info). + gen_server2:pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server:call(QPid, {info, Items}) of + case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -222,82 +225,91 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). stat_all() -> - lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)). + lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver_immediately, Txn, Message}); + gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver, Txn, Message}), + gen_server2:call(QPid, {deliver, Txn, Message}, infinity), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> - gen_server:cast(QPid, {deliver, Txn, Message}), + gen_server2:cast(QPid, {deliver, Txn, Message}), true. redeliver(QPid, Messages) -> - gen_server:cast(QPid, {redeliver, Messages}). + gen_server2:cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server:cast(QPid, {requeue, MsgIds, ChPid}). + gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). +limit_all(QPids, ChPid, LimiterPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, + QPids). + claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server:call(QPid, {claim_queue, ReaderPid}). + gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server:call(QPid, {basic_get, ChPid, NoAck}). + gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, + infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> - gen_server:cast(QPid, {notify_sent, ChPid}). + gen_server2:cast(QPid, {notify_sent, ChPid}). + +unblock(QPid, ChPid) -> + gen_server2:cast(QPid, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QueueName}) of + case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; [_] -> ok = rabbit_exchange:delete_queue_bindings(QueueName), - ok = mnesia:delete({amqqueue, QueueName}), - ok = mnesia:delete({durable_queues, QueueName}), + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), ok end end). @@ -309,12 +321,12 @@ on_node_down(Node) -> fun (QueueName, Acc) -> ok = rabbit_exchange:delete_transient_queue_bindings( QueueName), - ok = mnesia:delete({amqqueue, QueueName}), + ok = mnesia:delete({rabbit_queue, QueueName}), Acc end, ok, qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(amqqueue), + <- mnesia:table(rabbit_queue), node(Pid) == Node])) end). |