diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 76 |
1 files changed, 34 insertions, 42 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f6278836..efa62e1b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -92,7 +92,7 @@ -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). --spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). @@ -235,10 +235,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, info, infinity). + delegate:gs2_pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of + case delegate:gs2_pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -248,7 +248,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, consumers, infinity). + delegate:gs2_pcall(QPid, 9, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -257,15 +257,16 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> delegate:gs2_call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate:gs2_call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). +purge(#amqqueue{ pid = QPid }) -> + delegate:gs2_call(QPid, purge, infinity). deliver(QPid, #delivery{immediate = true, txn = Txn, sender = ChPid, message = Message}) -> @@ -280,28 +281,26 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. redeliver(QPid, Messages) -> - gen_server2:cast(QPid, {redeliver, Messages}). + delegate:gs2_cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). + delegate:gs2_cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate:gs2_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn, ChPid) -> - safe_pmap_ok( + safe_delegate_call_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, QPids). rollback_all(QPids, Txn, ChPid) -> - safe_pmap_ok( - fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end, - QPids). + delegate:cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end). notify_down_all(QPids, ChPid) -> - safe_pmap_ok( + safe_delegate_call_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, @@ -309,38 +308,34 @@ notify_down_all(QPids, ChPid) -> QPids). limit_all(QPids, ChPid, LimiterPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, - QPids). + delegate:cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). + delegate:gs2_call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate:gs2_call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + delegate:gs2_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, - infinity). + ok = delegate:gs2_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> - gen_server2:pcast(QPid, 7, {notify_sent, ChPid}). + delegate:gs2_pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 7, {unblock, ChPid}). + delegate:gs2_pcast(QPid, 7, {unblock, ChPid}). flush_all(QPids, ChPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end, - QPids). + delegate:cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). internal_delete(QueueName) -> case @@ -386,17 +381,14 @@ pseudo_queue(QueueName, Pid) -> arguments = [], pid = Pid}. -safe_pmap_ok(H, F, L) -> - case [R || R <- rabbit_misc:upmap( - fun (V) -> - try - rabbit_misc:with_exit_handler( - fun () -> H(V) end, - fun () -> F(V) end) - catch Class:Reason -> {Class, Reason} - end - end, L), - R =/= ok] of +safe_delegate_call_ok(H, F, Pids) -> + case [R || R = {error, _, _} <- delegate:call( + Pids, + fun (Pid) -> + rabbit_misc:with_exit_handler( + fun () -> H(Pid) end, + fun () -> F(Pid) end) + end)] of [] -> ok; Errors -> {error, Errors} end. |