summaryrefslogtreecommitdiff
path: root/src/rabbit_writer.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_writer.erl')
-rw-r--r--src/rabbit_writer.erl59
1 files changed, 43 insertions, 16 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 50bca390..068ac186 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -34,12 +34,11 @@
-include("rabbit_framing.hrl").
-export([start/5, start_link/5, mainloop/2, mainloop1/2]).
--export([send_command/2, send_command/3, send_command_sync/2,
- send_command_sync/3, send_command_and_notify/5]).
+-export([send_command/2, send_command/3,
+ send_command_sync/2, send_command_sync/3,
+ send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--import(gen_tcp).
-
-record(wstate, {sock, channel, frame_max, protocol}).
-define(HIBERNATE_AFTER, 5000).
@@ -66,6 +65,9 @@
-spec(send_command_sync/3 ::
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
-> 'ok').
+-spec(send_command_and_notify/4 ::
+ (pid(), pid(), pid(), rabbit_framing:amqp_method_record())
+ -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
@@ -130,6 +132,10 @@ handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
ok = internal_send_command_async(MethodRecord, Content, State),
gen_server:reply(From, ok),
State;
+handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
+ rabbit_amqqueue:notify_sent(QPid, ChPid),
+ State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State) ->
ok = internal_send_command_async(MethodRecord, Content, State),
@@ -158,6 +164,10 @@ send_command_sync(W, MethodRecord) ->
send_command_sync(W, MethodRecord, Content) ->
call(W, {send_command_sync, MethodRecord, Content}).
+send_command_and_notify(W, Q, ChPid, MethodRecord) ->
+ W ! {send_command_and_notify, Q, ChPid, MethodRecord},
+ ok.
+
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
@@ -170,7 +180,7 @@ call(Pid, Msg) ->
%---------------------------------------------------------------------------
-assemble_frames(Channel, MethodRecord, Protocol) ->
+assemble_frame(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
rabbit_binary_generator:build_simple_method_frame(
Channel, MethodRecord, Protocol).
@@ -185,17 +195,34 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
+%% We optimise delivery of small messages. Content-bearing methods
+%% require at least three frames. Small messages always fit into
+%% that. We hand their frames to the Erlang network functions in one
+%% go, which may lead to somewhat more efficient processing in the
+%% runtime and a greater chance of coalescing into fewer TCP packets.
+%%
+%% By contrast, for larger messages, split across many frames, we want
+%% to allow interleaving of frames on different channels. Hence we
+%% hand them to the Erlang network functions one frame at a time.
+send_frames(Fun, Sock, Frames) when length(Frames) =< 3 ->
+ Fun(Sock, Frames);
+send_frames(Fun, Sock, Frames) ->
+ lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame);
+ (_Frame, Other) -> Other
+ end, ok, Frames).
+
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)).
+ ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
Protocol) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)).
+ ok = send_frames(fun tcp_send/2, Sock,
+ assemble_frames(Channel, MethodRecord,
+ Content, FrameMax, Protocol)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -219,19 +246,19 @@ internal_send_command_async(MethodRecord,
#wstate{sock = Sock,
channel = Channel,
protocol = Protocol}) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
- ok.
+ ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
internal_send_command_async(MethodRecord, Content,
#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)),
- ok.
+ ok = send_frames(fun port_cmd/2, Sock,
+ assemble_frames(Channel, MethodRecord,
+ Content, FrameMax, Protocol)).
port_cmd(Sock, Data) ->
- try rabbit_net:port_command(Sock, Data)
- catch error:Error -> exit({writer, send_failed, Error})
- end.
+ true = try rabbit_net:port_command(Sock, Data)
+ catch error:Error -> exit({writer, send_failed, Error})
+ end,
+ ok.