summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-06 18:30:24 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-06 18:30:24 +0000
commitf869f99612facf92219be406ef62ec7c836644ae (patch)
treec9bdc1311fdcda05e888d416bb0415d3eab3ee8c
parentc35a4b43ad993897b7bb6fe70eb364d357758f94 (diff)
downloadrabbitmq-server-f869f99612facf92219be406ef62ec7c836644ae.tar.gz
Decide where to persist to based on the encoded size of the message.
-rw-r--r--src/rabbit_variable_queue.erl32
1 files changed, 21 insertions, 11 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 2d7a8bef..a0f14b0a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -306,6 +306,7 @@
is_delivered,
msg_in_store,
index_on_disk,
+ persist_to,
msg_props
}).
@@ -989,6 +990,7 @@ msg_status(IsPersistent, IsDelivered, SeqId,
is_delivered = IsDelivered,
msg_in_store = false,
index_on_disk = false,
+ persist_to = determine_persist_to(Msg, MsgProps),
msg_props = MsgProps}.
beta_msg_status({Msg = #basic_message{id = MsgId},
@@ -996,12 +998,14 @@ beta_msg_status({Msg = #basic_message{id = MsgId},
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
MS0#msg_status{msg_id = MsgId,
msg = Msg,
+ persist_to = queue_index,
msg_in_store = false};
beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
MS0#msg_status{msg_id = MsgId,
msg = undefined,
+ persist_to = msg_store,
msg_in_store = true}.
beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
@@ -1012,8 +1016,8 @@ beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
index_on_disk = true,
msg_props = MsgProps}.
-trim_msg_status(MsgStatus = #msg_status{msg_props = MsgProps}) ->
- case persist_to(MsgProps) of
+trim_msg_status(MsgStatus) ->
+ case persist_to(MsgStatus) of
msg_store -> MsgStatus#msg_status{msg = undefined};
queue_index -> MsgStatus
end.
@@ -1360,10 +1364,9 @@ maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
- msg_props = MsgProps,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
- case persist_to(MsgProps) of
+ case persist_to(MsgStatus) of
msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
prepare_to_store(Msg)),
MsgStatus#msg_status{msg_in_store = true};
@@ -1384,7 +1387,7 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
IndexState1 = rabbit_queue_index:publish(
- case persist_to(MsgProps) of
+ case persist_to(MsgStatus) of
msg_store -> MsgId;
queue_index -> prepare_to_store(Msg)
end, SeqId, MsgProps, IsPersistent, IndexState),
@@ -1401,16 +1404,24 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
{MsgStatus2, State #vqstate { index_state = IndexState1 }}.
-persist_to(#message_properties{size = Size}) ->
+determine_persist_to(Msg, #message_properties{size = Size}) ->
{ok, IndexMaxSize} = application:get_env(
rabbit, queue_index_embed_msgs_below),
- %% This is >= so that you can set the env to 0 and never persist
+ %% The >= is so that you can set the env to 0 and never persist
%% to the index.
+ %%
+ %% We avoid invoking term_to_binary/1 if the message is large
+ %% anyway.
case Size >= IndexMaxSize of
true -> msg_store;
- false -> queue_index
+ false -> case size(term_to_binary(Msg)) >= IndexMaxSize of
+ true -> msg_store;
+ false -> queue_index
+ end
end.
+persist_to(#msg_status{persist_to = To}) -> To.
+
prepare_to_store(Msg) ->
Msg#basic_message{
%% don't persist any recoverable decoded properties
@@ -1421,15 +1432,14 @@ prepare_to_store(Msg) ->
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { seq_id = SeqId,
- msg_props = MsgProps } = MsgStatus,
+record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA,
ack_in_counter = AckInCount}) ->
Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
{RPA1, DPA1, QPA1} =
- case {msg_in_ram(MsgStatus), persist_to(MsgProps)} of
+ case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of
{false, _} -> {RPA, Insert(DPA), QPA};
{_, queue_index} -> {RPA, DPA, Insert(QPA)};
{_, msg_store} -> {Insert(RPA), DPA, QPA}