diff options
-rw-r--r-- | src/rabbit_writer.erl | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index bf6964d8..34dd3d3b 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -25,6 +25,7 @@ -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5, + send_command_flow/2, send_command_flow/3, flush/1]). -export([internal_send_command/4, internal_send_command/6]). @@ -78,6 +79,11 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(send_command_flow/2 :: + (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). +-spec(send_command_flow/3 :: + (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) + -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), @@ -165,6 +171,12 @@ handle_message({send_command, MethodRecord}, State) -> internal_send_command_async(MethodRecord, State); handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); +handle_message({send_command_flow, MethodRecord, Sender}, State) -> + credit_flow:ack(Sender), + internal_send_command_async(MethodRecord, State); +handle_message({send_command_flow, MethodRecord, Content, Sender}, State) -> + credit_flow:ack(Sender), + internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> State1 = internal_flush( internal_send_command_async(MethodRecord, State)), @@ -212,6 +224,16 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. +send_command_flow(W, MethodRecord) -> + credit_flow:send(W), + W ! {send_command_flow, MethodRecord, self()}, + ok. + +send_command_flow(W, MethodRecord, Content) -> + credit_flow:send(W), + W ! {send_command_flow, MethodRecord, Content, self()}, + ok. + send_command_sync(W, MethodRecord) -> call(W, {send_command_sync, MethodRecord}). |