summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-05 14:44:31 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-05 14:44:31 +0000
commit250667f80cffb7f6a63d049e1480df42d7460b6f (patch)
tree433abcf5e4dbb75f03f060c2d12ac29cabe1ece1
parent42fb44d8a398124333db478b23724b3a46686835 (diff)
downloadrabbitmq-server-250667f80cffb7f6a63d049e1480df42d7460b6f.tar.gz
Introduce a selection mechanism so that small messages go to the index, and large ones go to the store.
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--src/rabbit_variable_queue.erl46
2 files changed, 34 insertions, 13 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 9e5584a1..5ebef608 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -29,6 +29,7 @@
{heartbeat, 580},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 65536},
+ {queue_index_embed_msgs_below, 1024},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_user_tags, [administrator]},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 884342d8..7a7a2900 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -989,7 +989,11 @@ beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
index_on_disk = true,
msg_props = MsgProps}.
-trim_msg_status(MsgStatus) -> MsgStatus.%% TODO #msg_status { msg = undefined }.
+trim_msg_status(MsgStatus = #msg_status{msg_props = MsgProps}) ->
+ case persist_to(MsgProps) of
+ msg_store -> MsgStatus#msg_status{msg = undefined};
+ queue_index -> MsgStatus
+ end.
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
@@ -1325,14 +1329,15 @@ maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
+ msg_props = MsgProps,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
- %% Msg1 = Msg #basic_message {
- %% %% don't persist any recoverable decoded properties
- %% content = rabbit_binary_parser:clear_decoded_content(
- %% Msg #basic_message.content)},
- %% ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
- MsgStatus; %% #msg_status { msg_on_disk = true };
+ case persist_to(MsgProps) of
+ msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
+ prepare_to_store(Msg)),
+ MsgStatus#msg_status{msg_in_store = true};
+ queue_index -> MsgStatus
+ end;
maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
@@ -1347,13 +1352,12 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
is_delivered = IsDelivered,
msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
IndexState1 = rabbit_queue_index:publish(
- Msg1, SeqId, MsgProps, IsPersistent, IndexState),
- {MsgStatus #msg_status { index_on_disk = true },
+ case persist_to(MsgProps) of
+ msg_store -> MsgId;
+ queue_index -> prepare_to_store(Msg)
+ end, SeqId, MsgProps, IsPersistent, IndexState),
+ {MsgStatus#msg_status{index_on_disk = true},
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
{MsgStatus, IndexState}.
@@ -1366,6 +1370,22 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
{MsgStatus2, State #vqstate { index_state = IndexState1 }}.
+persist_to(#message_properties{size = Size}) ->
+ {ok, IndexMaxSize} = application:get_env(
+ rabbit, queue_index_embed_msgs_below),
+ %% This is >= so that you can set the env to 0 and never persist
+ %% to the index.
+ case Size >= IndexMaxSize of
+ true -> msg_store;
+ false -> queue_index
+ end.
+
+prepare_to_store(Msg) ->
+ Msg#basic_message{
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)}.
+
%%----------------------------------------------------------------------------
%% Internal gubbins for acks
%%----------------------------------------------------------------------------