diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-06 18:30:24 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-06 18:30:24 +0000 |
commit | f869f99612facf92219be406ef62ec7c836644ae (patch) | |
tree | c9bdc1311fdcda05e888d416bb0415d3eab3ee8c | |
parent | c35a4b43ad993897b7bb6fe70eb364d357758f94 (diff) | |
download | rabbitmq-server-f869f99612facf92219be406ef62ec7c836644ae.tar.gz |
Decide where to persist to based on the encoded size of the message.
-rw-r--r-- | src/rabbit_variable_queue.erl | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 2d7a8bef..a0f14b0a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -306,6 +306,7 @@ is_delivered, msg_in_store, index_on_disk, + persist_to, msg_props }). @@ -989,6 +990,7 @@ msg_status(IsPersistent, IsDelivered, SeqId, is_delivered = IsDelivered, msg_in_store = false, index_on_disk = false, + persist_to = determine_persist_to(Msg, MsgProps), msg_props = MsgProps}. beta_msg_status({Msg = #basic_message{id = MsgId}, @@ -996,12 +998,14 @@ beta_msg_status({Msg = #basic_message{id = MsgId}, MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), MS0#msg_status{msg_id = MsgId, msg = Msg, + persist_to = queue_index, msg_in_store = false}; beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), MS0#msg_status{msg_id = MsgId, msg = undefined, + persist_to = msg_store, msg_in_store = true}. beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> @@ -1012,8 +1016,8 @@ beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> index_on_disk = true, msg_props = MsgProps}. -trim_msg_status(MsgStatus = #msg_status{msg_props = MsgProps}) -> - case persist_to(MsgProps) of +trim_msg_status(MsgStatus) -> + case persist_to(MsgStatus) of msg_store -> MsgStatus#msg_status{msg = undefined}; queue_index -> MsgStatus end. @@ -1360,10 +1364,9 @@ maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, - msg_props = MsgProps, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - case persist_to(MsgProps) of + case persist_to(MsgStatus) of msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, prepare_to_store(Msg)), MsgStatus#msg_status{msg_in_store = true}; @@ -1384,7 +1387,7 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { msg_props = MsgProps}, IndexState) when Force orelse IsPersistent -> IndexState1 = rabbit_queue_index:publish( - case persist_to(MsgProps) of + case persist_to(MsgStatus) of msg_store -> MsgId; queue_index -> prepare_to_store(Msg) end, SeqId, MsgProps, IsPersistent, IndexState), @@ -1401,16 +1404,24 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), {MsgStatus2, State #vqstate { index_state = IndexState1 }}. -persist_to(#message_properties{size = Size}) -> +determine_persist_to(Msg, #message_properties{size = Size}) -> {ok, IndexMaxSize} = application:get_env( rabbit, queue_index_embed_msgs_below), - %% This is >= so that you can set the env to 0 and never persist + %% The >= is so that you can set the env to 0 and never persist %% to the index. + %% + %% We avoid invoking term_to_binary/1 if the message is large + %% anyway. case Size >= IndexMaxSize of true -> msg_store; - false -> queue_index + false -> case size(term_to_binary(Msg)) >= IndexMaxSize of + true -> msg_store; + false -> queue_index + end end. +persist_to(#msg_status{persist_to = To}) -> To. + prepare_to_store(Msg) -> Msg#basic_message{ %% don't persist any recoverable decoded properties @@ -1421,15 +1432,14 @@ prepare_to_store(Msg) -> %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { seq_id = SeqId, - msg_props = MsgProps } = MsgStatus, +record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, qi_pending_ack = QPA, ack_in_counter = AckInCount}) -> Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end, {RPA1, DPA1, QPA1} = - case {msg_in_ram(MsgStatus), persist_to(MsgProps)} of + case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of {false, _} -> {RPA, Insert(DPA), QPA}; {_, queue_index} -> {RPA, DPA, Insert(QPA)}; {_, msg_store} -> {Insert(RPA), DPA, QPA} |