diff options
Diffstat (limited to 'src/rabbit_writer.erl')
-rw-r--r-- | src/rabbit_writer.erl | 59 |
1 files changed, 43 insertions, 16 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 50bca390..068ac186 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -34,12 +34,11 @@ -include("rabbit_framing.hrl"). -export([start/5, start_link/5, mainloop/2, mainloop1/2]). --export([send_command/2, send_command/3, send_command_sync/2, - send_command_sync/3, send_command_and_notify/5]). +-export([send_command/2, send_command/3, + send_command_sync/2, send_command_sync/3, + send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). --import(gen_tcp). - -record(wstate, {sock, channel, frame_max, protocol}). -define(HIBERNATE_AFTER, 5000). @@ -66,6 +65,9 @@ -spec(send_command_sync/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(send_command_and_notify/4 :: + (pid(), pid(), pid(), rabbit_framing:amqp_method_record()) + -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) @@ -130,6 +132,10 @@ handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, ok = internal_send_command_async(MethodRecord, Content, State), gen_server:reply(From, ok), State; +handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> + ok = internal_send_command_async(MethodRecord, State), + rabbit_amqqueue:notify_sent(QPid, ChPid), + State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State) -> ok = internal_send_command_async(MethodRecord, Content, State), @@ -158,6 +164,10 @@ send_command_sync(W, MethodRecord) -> send_command_sync(W, MethodRecord, Content) -> call(W, {send_command_sync, MethodRecord, Content}). +send_command_and_notify(W, Q, ChPid, MethodRecord) -> + W ! {send_command_and_notify, Q, ChPid, MethodRecord}, + ok. + send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. @@ -170,7 +180,7 @@ call(Pid, Msg) -> %--------------------------------------------------------------------------- -assemble_frames(Channel, MethodRecord, Protocol) -> +assemble_frame(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), rabbit_binary_generator:build_simple_method_frame( Channel, MethodRecord, Protocol). @@ -185,17 +195,34 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. +%% We optimise delivery of small messages. Content-bearing methods +%% require at least three frames. Small messages always fit into +%% that. We hand their frames to the Erlang network functions in one +%% go, which may lead to somewhat more efficient processing in the +%% runtime and a greater chance of coalescing into fewer TCP packets. +%% +%% By contrast, for larger messages, split across many frames, we want +%% to allow interleaving of frames on different channels. Hence we +%% hand them to the Erlang network functions one frame at a time. +send_frames(Fun, Sock, Frames) when length(Frames) =< 3 -> + Fun(Sock, Frames); +send_frames(Fun, Sock, Frames) -> + lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame); + (_Frame, Other) -> Other + end, ok, Frames). + tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). internal_send_command(Sock, Channel, MethodRecord, Protocol) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)). + ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, Protocol)). internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, Protocol) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). + ok = send_frames(fun tcp_send/2, Sock, + assemble_frames(Channel, MethodRecord, + Content, FrameMax, Protocol)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -219,19 +246,19 @@ internal_send_command_async(MethodRecord, #wstate{sock = Sock, channel = Channel, protocol = Protocol}) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), - ok. + ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)). internal_send_command_async(MethodRecord, Content, #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)), - ok. + ok = send_frames(fun port_cmd/2, Sock, + assemble_frames(Channel, MethodRecord, + Content, FrameMax, Protocol)). port_cmd(Sock, Data) -> - try rabbit_net:port_command(Sock, Data) - catch error:Error -> exit({writer, send_failed, Error}) - end. + true = try rabbit_net:port_command(Sock, Data) + catch error:Error -> exit({writer, send_failed, Error}) + end, + ok. |