summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-03-20 16:13:43 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-03-20 16:13:43 +0000
commit6cba6e937e940ec6d68e78efcf8727a93dc29d04 (patch)
tree93d617250c2c23da0b2a4c86619740a015bd65e5
parent3ae5428b60aa7c44d207245adf47397dce6df57d (diff)
downloadrabbitmq-server-bug24764.tar.gz
reduce number of system calls by batching socket writesbug24764
-rw-r--r--src/rabbit_writer.erl110
1 files changed, 60 insertions, 50 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index dc74b2f5..f3a8cacf 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -24,7 +24,7 @@
send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--record(wstate, {sock, channel, frame_max, protocol}).
+-record(wstate, {sock, channel, frame_max, protocol, pending}).
-define(HIBERNATE_AFTER, 5000).
@@ -80,7 +80,8 @@ start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
- protocol = Protocol}])}.
+ protocol = Protocol,
+ pending = []}])}.
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
{ok,
@@ -88,7 +89,8 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
- protocol = Protocol}])}.
+ protocol = Protocol,
+ pending = []}])}.
mainloop(ReaderPid, State) ->
try
@@ -98,37 +100,41 @@ mainloop(ReaderPid, State) ->
end,
done.
-mainloop1(ReaderPid, State) ->
+mainloop1(ReaderPid, State = #wstate{pending = []}) ->
receive
Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
after ?HIBERNATE_AFTER ->
erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
+ end;
+mainloop1(ReaderPid, State) ->
+ receive
+ Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ after 0 ->
+ ?MODULE:mainloop1(ReaderPid, flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
- ok = internal_send_command_async(MethodRecord, State),
- State;
+ internal_send_command_async(MethodRecord, State);
handle_message({send_command, MethodRecord, Content}, State) ->
- ok = internal_send_command_async(MethodRecord, Content, State),
- State;
+ internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
- ok = internal_send_command_async(MethodRecord, State),
+ State1 = flush(internal_send_command_async(MethodRecord, State)),
gen_server:reply(From, ok),
- State;
+ State1;
handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
State) ->
- ok = internal_send_command_async(MethodRecord, Content, State),
+ State1 = flush(internal_send_command_async(MethodRecord, Content, State)),
gen_server:reply(From, ok),
- State;
+ State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
- ok = internal_send_command_async(MethodRecord, State),
+ State1 = internal_send_command_async(MethodRecord, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
- State;
+ State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State) ->
- ok = internal_send_command_async(MethodRecord, Content, State),
+ State1 = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
- State;
+ State1;
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
State;
@@ -184,22 +190,6 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
-%% We optimise delivery of small messages. Content-bearing methods
-%% require at least three frames. Small messages always fit into
-%% that. We hand their frames to the Erlang network functions in one
-%% go, which may lead to somewhat more efficient processing in the
-%% runtime and a greater chance of coalescing into fewer TCP packets.
-%%
-%% By contrast, for larger messages, split across many frames, we want
-%% to allow interleaving of frames on different channels. Hence we
-%% hand them to the Erlang network functions one frame at a time.
-send_frames(Fun, Sock, Frames) when length(Frames) =< 3 ->
- Fun(Sock, Frames);
-send_frames(Fun, Sock, Frames) ->
- lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame);
- (_Frame, Other) -> Other
- end, ok, Frames).
-
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
@@ -209,9 +199,44 @@ internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
Protocol) ->
- ok = send_frames(fun tcp_send/2, Sock,
- assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)).
+ ok = lists:foldl(fun (Frame, ok) -> tcp_send(Sock, Frame);
+ (_Frame, Other) -> Other
+ end, ok, assemble_frames(Channel, MethodRecord,
+ Content, FrameMax, Protocol)).
+
+internal_send_command_async(MethodRecord,
+ State = #wstate{channel = Channel,
+ protocol = Protocol,
+ pending = Pending}) ->
+ Frame = assemble_frame(Channel, MethodRecord, Protocol),
+ maybe_flush(State#wstate{pending = [Frame | Pending]}).
+
+internal_send_command_async(MethodRecord, Content,
+ State = #wstate{channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol,
+ pending = Pending}) ->
+ Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax,
+ Protocol),
+ maybe_flush(State#wstate{pending = [Frames | Pending]}).
+
+%% This magic number is the tcp-over-ethernet MSS (1460) minus the
+%% minimum size of a AMQP basic.deliver method frame (24) plus basic
+%% content header (22). The idea is that we want to flush just before
+%% exceeding the MSS.
+-define(FLUSH_THRESHOLD, 1414).
+
+maybe_flush(State = #wstate{pending = Pending}) ->
+ case iolist_size(Pending) >= ?FLUSH_THRESHOLD of
+ true -> flush(State);
+ false -> State
+ end.
+
+flush(State = #wstate{pending = []}) ->
+ State;
+flush(State = #wstate{sock = Sock, pending = Pending}) ->
+ ok = port_cmd(Sock, lists:reverse(Pending)),
+ State#wstate{pending = []}.
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -231,21 +256,6 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(MethodRecord,
- #wstate{sock = Sock,
- channel = Channel,
- protocol = Protocol}) ->
- ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
-
-internal_send_command_async(MethodRecord, Content,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = send_frames(fun port_cmd/2, Sock,
- assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)).
-
port_cmd(Sock, Data) ->
true = try rabbit_net:port_command(Sock, Data)
catch error:Error -> exit({writer, send_failed, Error})