diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 36 |
1 files changed, 15 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0cdb4fff..abcb87d7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -331,10 +331,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate_pcall(QPid, 9, info, infinity). + delegate_call(QPid, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_pcall(QPid, 9, {info, Items}, infinity) of + case delegate_call(QPid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -344,7 +344,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate_pcall(QPid, 9, consumers, infinity). + delegate_call(QPid, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -356,7 +356,7 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> - delegate_pcast(QPid, 7, emit_stats). + delegate_cast(QPid, emit_stats). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -379,10 +379,10 @@ requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). ack(QPid, Txn, MsgIds, ChPid) -> - delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> - delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}). + delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}). commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( @@ -418,10 +418,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - delegate_pcast(QPid, 7, {notify_sent, ChPid}). + delegate_cast(QPid, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - delegate_pcast(QPid, 7, {unblock, ChPid}). + delegate_cast(QPid, {unblock, ChPid}). flush_all(QPids, ChPid) -> delegate:invoke_no_result( @@ -451,20 +451,19 @@ internal_delete(QueueName) -> end. maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun}, - infinity). + gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). update_ram_duration(QPid) -> - gen_server2:pcast(QPid, 8, update_ram_duration). + gen_server2:cast(QPid, update_ram_duration). set_ram_duration_target(QPid, Duration) -> - gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}). + gen_server2:cast(QPid, {set_ram_duration_target, Duration}). set_maximum_since_use(QPid, Age) -> - gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(QPid, {set_maximum_since_use, Age}). maybe_expire(QPid) -> - gen_server2:pcast(QPid, 8, maybe_expire). + gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> [Hook() || @@ -504,11 +503,6 @@ safe_delegate_call_ok(F, Pids) -> delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). -delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, - fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). - -delegate_pcast(Pid, Pri, Msg) -> - delegate:invoke_no_result(Pid, - fun (P) -> gen_server2:pcast(P, Pri, Msg) end). +delegate_cast(Pid, Msg) -> + delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). |