diff options
-rw-r--r-- | src/rabbit_writer.erl | 110 |
1 files changed, 60 insertions, 50 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index dc74b2f5..f3a8cacf 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -24,7 +24,7 @@ send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). --record(wstate, {sock, channel, frame_max, protocol}). +-record(wstate, {sock, channel, frame_max, protocol, pending}). -define(HIBERNATE_AFTER, 5000). @@ -80,7 +80,8 @@ start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, - protocol = Protocol}])}. + protocol = Protocol, + pending = []}])}. start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> {ok, @@ -88,7 +89,8 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, - protocol = Protocol}])}. + protocol = Protocol, + pending = []}])}. mainloop(ReaderPid, State) -> try @@ -98,37 +100,41 @@ mainloop(ReaderPid, State) -> end, done. -mainloop1(ReaderPid, State) -> +mainloop1(ReaderPid, State = #wstate{pending = []}) -> receive Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) after ?HIBERNATE_AFTER -> erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) + end; +mainloop1(ReaderPid, State) -> + receive + Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + after 0 -> + ?MODULE:mainloop1(ReaderPid, flush(State)) end. handle_message({send_command, MethodRecord}, State) -> - ok = internal_send_command_async(MethodRecord, State), - State; + internal_send_command_async(MethodRecord, State); handle_message({send_command, MethodRecord, Content}, State) -> - ok = internal_send_command_async(MethodRecord, Content, State), - State; + internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> - ok = internal_send_command_async(MethodRecord, State), + State1 = flush(internal_send_command_async(MethodRecord, State)), gen_server:reply(From, ok), - State; + State1; handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, State) -> - ok = internal_send_command_async(MethodRecord, Content, State), + State1 = flush(internal_send_command_async(MethodRecord, Content, State)), gen_server:reply(From, ok), - State; + State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> - ok = internal_send_command_async(MethodRecord, State), + State1 = internal_send_command_async(MethodRecord, State), rabbit_amqqueue:notify_sent(QPid, ChPid), - State; + State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State) -> - ok = internal_send_command_async(MethodRecord, Content, State), + State1 = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), - State; + State1; handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> rabbit_amqqueue:notify_sent_queue_down(QPid), State; @@ -184,22 +190,6 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. -%% We optimise delivery of small messages. Content-bearing methods -%% require at least three frames. Small messages always fit into -%% that. We hand their frames to the Erlang network functions in one -%% go, which may lead to somewhat more efficient processing in the -%% runtime and a greater chance of coalescing into fewer TCP packets. -%% -%% By contrast, for larger messages, split across many frames, we want -%% to allow interleaving of frames on different channels. Hence we -%% hand them to the Erlang network functions one frame at a time. -send_frames(Fun, Sock, Frames) when length(Frames) =< 3 -> - Fun(Sock, Frames); -send_frames(Fun, Sock, Frames) -> - lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame); - (_Frame, Other) -> Other - end, ok, Frames). - tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). @@ -209,9 +199,44 @@ internal_send_command(Sock, Channel, MethodRecord, Protocol) -> internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, Protocol) -> - ok = send_frames(fun tcp_send/2, Sock, - assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). + ok = lists:foldl(fun (Frame, ok) -> tcp_send(Sock, Frame); + (_Frame, Other) -> Other + end, ok, assemble_frames(Channel, MethodRecord, + Content, FrameMax, Protocol)). + +internal_send_command_async(MethodRecord, + State = #wstate{channel = Channel, + protocol = Protocol, + pending = Pending}) -> + Frame = assemble_frame(Channel, MethodRecord, Protocol), + maybe_flush(State#wstate{pending = [Frame | Pending]}). + +internal_send_command_async(MethodRecord, Content, + State = #wstate{channel = Channel, + frame_max = FrameMax, + protocol = Protocol, + pending = Pending}) -> + Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax, + Protocol), + maybe_flush(State#wstate{pending = [Frames | Pending]}). + +%% This magic number is the tcp-over-ethernet MSS (1460) minus the +%% minimum size of a AMQP basic.deliver method frame (24) plus basic +%% content header (22). The idea is that we want to flush just before +%% exceeding the MSS. +-define(FLUSH_THRESHOLD, 1414). + +maybe_flush(State = #wstate{pending = Pending}) -> + case iolist_size(Pending) >= ?FLUSH_THRESHOLD of + true -> flush(State); + false -> State + end. + +flush(State = #wstate{pending = []}) -> + State; +flush(State = #wstate{sock = Sock, pending = Pending}) -> + ok = port_cmd(Sock, lists:reverse(Pending)), + State#wstate{pending = []}. %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -231,21 +256,6 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(MethodRecord, - #wstate{sock = Sock, - channel = Channel, - protocol = Protocol}) -> - ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)). - -internal_send_command_async(MethodRecord, Content, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = send_frames(fun port_cmd/2, Sock, - assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). - port_cmd(Sock, Data) -> true = try rabbit_net:port_command(Sock, Data) catch error:Error -> exit({writer, send_failed, Error}) |