diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-12-05 14:44:31 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-12-05 14:44:31 +0000 |
commit | 250667f80cffb7f6a63d049e1480df42d7460b6f (patch) | |
tree | 433abcf5e4dbb75f03f060c2d12ac29cabe1ece1 | |
parent | 42fb44d8a398124333db478b23724b3a46686835 (diff) | |
download | rabbitmq-server-250667f80cffb7f6a63d049e1480df42d7460b6f.tar.gz |
Introduce a selection mechanism so that small messages go to the index, and large ones go to the store.
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 46 |
2 files changed, 34 insertions, 13 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 9e5584a1..5ebef608 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -29,6 +29,7 @@ {heartbeat, 580}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 65536}, + {queue_index_embed_msgs_below, 1024}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_user_tags, [administrator]}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 884342d8..7a7a2900 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -989,7 +989,11 @@ beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> index_on_disk = true, msg_props = MsgProps}. -trim_msg_status(MsgStatus) -> MsgStatus.%% TODO #msg_status { msg = undefined }. +trim_msg_status(MsgStatus = #msg_status{msg_props = MsgProps}) -> + case persist_to(MsgProps) of + msg_store -> MsgStatus#msg_status{msg = undefined}; + queue_index -> MsgStatus + end. with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> {Result, MSCStateP1} = Fun(MSCStateP), @@ -1325,14 +1329,15 @@ maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, + msg_props = MsgProps, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - %% Msg1 = Msg #basic_message { - %% %% don't persist any recoverable decoded properties - %% content = rabbit_binary_parser:clear_decoded_content( - %% Msg #basic_message.content)}, - %% ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1), - MsgStatus; %% #msg_status { msg_on_disk = true }; + case persist_to(MsgProps) of + msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, + prepare_to_store(Msg)), + MsgStatus#msg_status{msg_in_store = true}; + queue_index -> MsgStatus + end; maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> MsgStatus. @@ -1347,13 +1352,12 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { is_delivered = IsDelivered, msg_props = MsgProps}, IndexState) when Force orelse IsPersistent -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, IndexState1 = rabbit_queue_index:publish( - Msg1, SeqId, MsgProps, IsPersistent, IndexState), - {MsgStatus #msg_status { index_on_disk = true }, + case persist_to(MsgProps) of + msg_store -> MsgId; + queue_index -> prepare_to_store(Msg) + end, SeqId, MsgProps, IsPersistent, IndexState), + {MsgStatus#msg_status{index_on_disk = true}, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> {MsgStatus, IndexState}. @@ -1366,6 +1370,22 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), {MsgStatus2, State #vqstate { index_state = IndexState1 }}. +persist_to(#message_properties{size = Size}) -> + {ok, IndexMaxSize} = application:get_env( + rabbit, queue_index_embed_msgs_below), + %% This is >= so that you can set the env to 0 and never persist + %% to the index. + case Size >= IndexMaxSize of + true -> msg_store; + false -> queue_index + end. + +prepare_to_store(Msg) -> + Msg#basic_message{ + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}. + %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- |