summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-06 23:12:45 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-06 23:12:45 +0100
commit55e453260ca60fa2d439e9f9af3de9f0cd4b861a (patch)
tree63ece80e124e2aaa761e27186acd1692bfac9b32
parent326341bc00df7e74268eb827d42b3fa2424fbaae (diff)
downloadrabbitmq-server-55e453260ca60fa2d439e9f9af3de9f0cd4b861a.tar.gz
big refactoring: extract msg_store from disk_queue
The msg_store knows nothing about queues, or message structure.
-rw-r--r--src/rabbit_disk_queue.erl1344
-rw-r--r--src/rabbit_msg_store.erl1128
2 files changed, 1315 insertions, 1157 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 538b08d8..ad5d8fb1 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -54,23 +54,13 @@
-include("rabbit.hrl").
--define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
--define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
+-define(MAX_READ_FILE_HANDLES, 256).
+-define(FILE_SIZE_LIMIT, (256*1024*1024)).
+
-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
--define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
--define(FILE_EXTENSION, ".rdq").
--define(FILE_EXTENSION_TMP, ".rdt").
--define(FILE_EXTENSION_DETS, ".dets").
-define(BATCH_SIZE, 10000).
--define(CACHE_MAX_SIZE, 10485760).
--define(MAX_READ_FILE_HANDLES, 256).
--define(FILE_SIZE_LIMIT, (256*1024*1024)).
-define(DISK_ONLY_MODE_FILE, "disk_only_stats.dat").
--define(BINARY_MODE, [raw, binary]).
--define(READ_MODE, [read, read_ahead]).
--define(WRITE_MODE, [write, delayed_write]).
-
-define(SHUTDOWN_MESSAGE_KEY, {internal_token, shutdown}).
-define(SHUTDOWN_MESSAGE,
#dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY,
@@ -86,175 +76,15 @@
-define(SERVER, ?MODULE).
-record(dqstate,
- {msg_location_dets, %% where are messages?
- msg_location_ets, %% as above, but for ets version
- operation_mode, %% ram_disk | disk_only
- file_summary, %% what's in the files?
+ {operation_mode, %% ram_disk | disk_only
+ store, %% message store
sequences, %% next read and write for each q
- current_file_num, %% current file name as number
- current_file_name, %% current file name
- current_file_handle, %% current file handle
- current_offset, %% current offset within current file
- current_dirty, %% has the current file been written to
- %% since the last fsync?
- file_size_limit, %% how big can our files get?
- read_file_handle_cache, %% file handle cache for reading
on_sync_txns, %% list of commiters to run on sync (reversed)
commit_timer_ref, %% TRef for our interval timer
- last_sync_offset, %% current_offset at the last time we sync'd
- message_cache, %% ets message cache
memory_report_timer_ref, %% TRef for the memory report timer
- wordsize, %% bytes in a word on this platform
- mnesia_bytes_per_record, %% bytes per record in mnesia in ram_disk mode
- ets_bytes_per_record %% bytes per record in msg_location_ets
+ mnesia_bytes_per_record %% bytes per record in mnesia in ram_disk mode
}).
--record(msg_location,
- {msg_id, ref_count, file, offset, total_size, is_persistent}).
-
--record(file_summary,
- {file, valid_total_size, contiguous_top, left, right}).
-
-%% The components:
-%%
-%% MsgLocation: this is a (d)ets table which contains:
-%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent}
-%% FileSummary: this is an ets table which contains:
-%% {File, ValidTotalSize, ContiguousTop, Left, Right}
-%% Sequences: this is an ets table which contains:
-%% {Q, ReadSeqId, WriteSeqId}
-%% rabbit_disk_queue: this is an mnesia table which contains:
-%% #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
-%% is_delivered = IsDelivered,
-%% msg_id = MsgId
-%% }
-%%
-
-%% The basic idea is that messages are appended to the current file up
-%% until that file becomes too big (> file_size_limit). At that point,
-%% the file is closed and a new file is created on the _right_ of the
-%% old file which is used for new messages. Files are named
-%% numerically ascending, thus the file with the lowest name is the
-%% eldest file.
-%%
-%% We need to keep track of which messages are in which files (this is
-%% the MsgLocation table); how much useful data is in each file and
-%% which files are on the left and right of each other. This is the
-%% purpose of the FileSummary table.
-%%
-%% As messages are removed from files, holes appear in these
-%% files. The field ValidTotalSize contains the total amount of useful
-%% data left in the file, whilst ContiguousTop contains the amount of
-%% valid data right at the start of each file. These are needed for
-%% garbage collection.
-%%
-%% On publish, we write the message to disk, record the changes to
-%% FileSummary and MsgLocation, and, should this be either a plain
-%% publish, or followed by a tx_commit, we record the message in the
-%% mnesia table. Sequences exists to enforce ordering of messages as
-%% they are published within a queue.
-%%
-%% On delivery, we read the next message to be read from disk
-%% (according to the ReadSeqId for the given queue) and record in the
-%% mnesia table that the message has been delivered.
-%%
-%% On ack we remove the relevant entry from MsgLocation, update
-%% FileSummary and delete from the mnesia table.
-%%
-%% In order to avoid extra mnesia searching, we return the SeqId
-%% during delivery which must be returned in ack - it is not possible
-%% to ack from MsgId alone.
-
-%% As messages are ack'd, holes develop in the files. When we discover
-%% that either a file is now empty or that it can be combined with the
-%% useful data in either its left or right file, we compact the two
-%% files together. This keeps disk utilisation high and aids
-%% performance.
-%%
-%% Given the compaction between two files, the left file is considered
-%% the ultimate destination for the good data in the right file. If
-%% necessary, the good data in the left file which is fragmented
-%% throughout the file is written out to a temporary file, then read
-%% back in to form a contiguous chunk of good data at the start of the
-%% left file. Thus the left file is garbage collected and
-%% compacted. Then the good data from the right file is copied onto
-%% the end of the left file. MsgLocation and FileSummary tables are
-%% updated.
-%%
-%% On startup, we scan the files we discover, dealing with the
-%% possibilites of a crash have occured during a compaction (this
-%% consists of tidyup - the compaction is deliberately designed such
-%% that data is duplicated on disk rather than risking it being lost),
-%% and rebuild the dets and ets tables (MsgLocation, FileSummary,
-%% Sequences) from what we find. We ensure that the messages we have
-%% discovered on disk match exactly with the messages recorded in the
-%% mnesia table.
-
-%% MsgLocation is deliberately a dets table, and the mnesia table is
-%% set to be a disk_only_table in order to ensure that we are not RAM
-%% constrained. However, for performance reasons, it is possible to
-%% call to_ram_disk_mode/0 which will alter the mnesia table to
-%% disc_copies and convert MsgLocation to an ets table. This results
-%% in a massive performance improvement, at the expense of greater RAM
-%% usage. The idea is that when memory gets tight, we switch to
-%% disk_only mode but otherwise try to run in ram_disk mode.
-
-%% So, with this design, messages move to the left. Eventually, they
-%% should end up in a contiguous block on the left and are then never
-%% rewritten. But this isn't quite the case. If in a file there is one
-%% message that is being ignored, for some reason, and messages in the
-%% file to the right and in the current block are being read all the
-%% time then it will repeatedly be the case that the good data from
-%% both files can be combined and will be written out to a new
-%% file. Whenever this happens, our shunned message will be rewritten.
-%%
-%% So, provided that we combine messages in the right order,
-%% (i.e. left file, bottom to top, right file, bottom to top),
-%% eventually our shunned message will end up at the bottom of the
-%% left file. The compaction/combining algorithm is smart enough to
-%% read in good data from the left file that is scattered throughout
-%% (i.e. C and D in the below diagram), then truncate the file to just
-%% above B (i.e. truncate to the limit of the good contiguous region
-%% at the start of the file), then write C and D on top and then write
-%% E, F and G from the right file on top. Thus contiguous blocks of
-%% good data at the bottom of files are not rewritten (yes, this is
-%% the data the size of which is tracked by the ContiguousTop
-%% variable. Judicious use of a mirror is required).
-%%
-%% +-------+ +-------+ +-------+
-%% | X | | G | | G |
-%% +-------+ +-------+ +-------+
-%% | D | | X | | F |
-%% +-------+ +-------+ +-------+
-%% | X | | X | | E |
-%% +-------+ +-------+ +-------+
-%% | C | | F | ===> | D |
-%% +-------+ +-------+ +-------+
-%% | X | | X | | C |
-%% +-------+ +-------+ +-------+
-%% | B | | X | | B |
-%% +-------+ +-------+ +-------+
-%% | A | | E | | A |
-%% +-------+ +-------+ +-------+
-%% left right left
-%%
-%% From this reasoning, we do have a bound on the number of times the
-%% message is rewritten. From when it is inserted, there can be no
-%% files inserted between it and the head of the queue, and the worst
-%% case is that everytime it is rewritten, it moves one position lower
-%% in the file (for it to stay at the same position requires that
-%% there are no holes beneath it, which means truncate would be used
-%% and so it would not be rewritten at all). Thus this seems to
-%% suggest the limit is the number of messages ahead of it in the
-%% queue, though it's likely that that's pessimistic, given the
-%% requirements for compaction/combination of files.
-%%
-%% The other property is that we have is the bound on the lowest
-%% utilisation, which should be 50% - worst case is that all files are
-%% fractionally over half full and can't be combined (equivalent is
-%% alternating full files and files with only one tiny message in
-%% them).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -414,69 +244,35 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
ok = detect_shutdown_state_and_adjust_delivered_flags(),
- file:delete(msg_location_dets_file()),
-
- {ok, MsgLocationDets} =
- dets:open_file(?MSG_LOC_NAME,
- [{file, msg_location_dets_file()},
- {min_no_slots, 1024*1024},
- %% man says this should be <= 32M. But it works...
- {max_no_slots, 30*1024*1024},
- {type, set},
- {keypos, 2}
- ]),
-
- %% it would be better to have this as private, but dets:from_ets/2
- %% seems to blow up if it is set private - see bug21489
- MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected, {keypos, 2}]),
-
- InitName = "0" ++ ?FILE_EXTENSION,
- HandleCache = rabbit_file_handle_cache:init(ReadFileHandlesLimit,
- ?BINARY_MODE ++ [read]),
+ ok = add_index(),
+ Store = rabbit_msg_store:init(Mode, base_directory(),
+ FileSizeLimit, ReadFileHandlesLimit,
+ fun ref_count/1, EtsBPR),
+ Store1 = prune_mnesia(Store),
+ ok = del_index(),
+
+ Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
+ ok = extract_sequence_numbers(Sequences),
+
State =
- #dqstate { msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts,
- operation_mode = Mode,
- file_summary = ets:new(
- ?FILE_SUMMARY_ETS_NAME,
- [set, private, {keypos, 2}]),
- sequences = ets:new(?SEQUENCE_ETS_NAME,
- [set, private]),
- current_file_num = 0,
- current_file_name = InitName,
- current_file_handle = undefined,
- current_offset = 0,
- current_dirty = false,
- file_size_limit = FileSizeLimit,
- read_file_handle_cache = HandleCache,
+ #dqstate { operation_mode = Mode,
+ store = Store1,
+ sequences = Sequences,
on_sync_txns = [],
commit_timer_ref = undefined,
- last_sync_offset = 0,
- message_cache = ets:new(?CACHE_ETS_NAME,
- [set, private]),
memory_report_timer_ref = undefined,
- wordsize = erlang:system_info(wordsize),
- mnesia_bytes_per_record = MnesiaBPR,
- ets_bytes_per_record = EtsBPR
+ mnesia_bytes_per_record = MnesiaBPR
},
- {ok, State1 = #dqstate { current_file_name = CurrentName,
- current_offset = Offset } } =
- load_from_disk(State),
- %% read is only needed so that we can seek
- {ok, FileHdl} = open_file(CurrentName, ?WRITE_MODE ++ [read]),
- {ok, Offset} = file:position(FileHdl, Offset),
- State2 = State1 #dqstate { current_file_handle = FileHdl },
- {ok, start_memory_timer(State2), hibernate,
+ {ok, start_memory_timer(State), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({fetch, Q}, _From, State) ->
- {ok, Result, State1} =
+ {Result, State1} =
internal_fetch_body(Q, record_delivery, pop_queue, State),
reply(Result, State1);
handle_call({phantom_fetch, Q}, _From, State) ->
- {ok, Result, State1} =
- internal_fetch_attributes(Q, record_delivery, pop_queue, State),
- reply(Result, State1);
+ Result = internal_fetch_attributes(Q, record_delivery, pop_queue, State),
+ reply(Result, State);
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
State1 =
internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State),
@@ -485,7 +281,7 @@ handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
reply(Count, State1);
handle_call(filesync, _From, State) ->
- reply(ok, sync_current_file_handle(State));
+ reply(ok, sync(State));
handle_call({delete_queue, Q}, From, State) ->
gen_server2:reply(From, ok),
{ok, State1} = internal_delete_queue(Q, State),
@@ -515,8 +311,8 @@ handle_call(to_ram_disk_mode, _From, State) ->
handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
{ok, State1} = internal_delete_non_durable_queues(DurableQueues, State),
reply(ok, State1);
-handle_call(cache_info, _From, State = #dqstate { message_cache = Cache }) ->
- reply(ets:info(Cache), State).
+handle_call(cache_info, _From, State = #dqstate { store = Store }) ->
+ reply(rabbit_msg_store:cache_info(Store), State).
handle_cast({publish, Q, Message, IsDelivered}, State) ->
{ok, _MsgSeqId, State1} = internal_publish(Q, Message, IsDelivered, State),
@@ -542,25 +338,19 @@ handle_cast({set_mode, Mode}, State) ->
liberated -> fun to_ram_disk_mode/1
end)(State));
handle_cast({prefetch, Q, From}, State) ->
- {ok, Result, State1} =
+ {Result, State1} =
internal_fetch_body(Q, record_delivery, peek_queue, State),
- Cont = rabbit_misc:with_exit_handler(
- fun () -> false end,
- fun () ->
- ok = rabbit_queue_prefetcher:publish(From, Result),
- true
- end),
- State3 =
- case Cont of
- true ->
- case internal_fetch_attributes(
- Q, ignore_delivery, pop_queue, State1) of
- {ok, empty, State2} -> State2;
- {ok, _, State2} -> State2
- end;
- false -> State1
- end,
- noreply(State3).
+ case rabbit_misc:with_exit_handler(
+ fun () -> false end,
+ fun () ->
+ ok = rabbit_queue_prefetcher:publish(From, Result),
+ true
+ end) of
+ true ->
+ internal_fetch_attributes(Q, ignore_delivery, pop_queue, State1);
+ false -> ok
+ end,
+ noreply(State1).
handle_info(report_memory, State) ->
%% call noreply1/2, not noreply/1/2, as we don't want to restart the
@@ -571,7 +361,7 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(timeout, State) ->
%% must have commit_timer set, so timeout was 0, and we're not hibernating
- noreply(sync_current_file_handle(State)).
+ noreply(sync(State)).
handle_pre_hibernate(State) ->
%% don't use noreply/1 or noreply1/1 as they'll restart the memory timer
@@ -585,33 +375,11 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { sequences = undefined }) ->
State;
-shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts,
- file_summary = FileSummary,
- sequences = Sequences,
- current_file_handle = FileHdl,
- read_file_handle_cache = HC
- }) ->
+shutdown(State = #dqstate { sequences = Sequences, store = Store }) ->
State1 = stop_commit_timer(stop_memory_timer(State)),
- case FileHdl of
- undefined -> ok;
- _ -> sync_current_file_handle(State1),
- file:close(FileHdl)
- end,
- HC1 = rabbit_file_handle_cache:close_all(HC),
- dets:close(MsgLocationDets),
- file:delete(msg_location_dets_file()),
- ets:delete(MsgLocationEts),
- ets:delete(FileSummary),
+ Store1 = rabbit_msg_store:cleanup(Store),
ets:delete(Sequences),
- State1 #dqstate { msg_location_dets = undefined,
- msg_location_ets = undefined,
- file_summary = undefined,
- sequences = undefined,
- current_file_handle = undefined,
- current_dirty = false,
- read_file_handle_cache = HC1
- }.
+ State1 #dqstate { sequences = undefined, store = Store1 }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -643,168 +411,59 @@ report_memory(Hibernating, State) ->
rabbit_memory_manager:report_memory(self(), trunc(2.5 * Bytes),
Hibernating).
-memory_use(#dqstate { operation_mode = ram_disk,
- file_summary = FileSummary,
- sequences = Sequences,
- msg_location_ets = MsgLocationEts,
- message_cache = Cache,
- wordsize = WordSize
- }) ->
- WordSize * (mnesia:table_info(rabbit_disk_queue, memory) +
- lists:sum([ets:info(Table, memory)
- || Table <- [MsgLocationEts, FileSummary, Cache,
- Sequences]]));
+memory_use(#dqstate { operation_mode = ram_disk,
+ store = Store,
+ sequences = Sequences }) ->
+ WordSize = erlang:system_info(wordsize),
+ rabbit_msg_store:memory(Store) +
+ WordSize * ets:info(Sequences, memory) +
+ WordSize * mnesia:table_info(rabbit_disk_queue, memory);
memory_use(#dqstate { operation_mode = disk_only,
- file_summary = FileSummary,
+ store = Store,
sequences = Sequences,
- msg_location_dets = MsgLocationDets,
- message_cache = Cache,
- wordsize = WordSize,
- mnesia_bytes_per_record = MnesiaBytesPerRecord,
- ets_bytes_per_record = EtsBytesPerRecord }) ->
- (WordSize * (lists:sum([ets:info(Table, memory)
- || Table <- [FileSummary, Cache, Sequences]]))) +
- rabbit_misc:ceil(
- mnesia:table_info(rabbit_disk_queue, size) * MnesiaBytesPerRecord) +
+ mnesia_bytes_per_record = MnesiaBytesPerRecord }) ->
+ WordSize = erlang:system_info(wordsize),
+ rabbit_msg_store:memory(Store) +
+ WordSize * ets:info(Sequences, memory) +
rabbit_misc:ceil(
- dets:info(MsgLocationDets, size) * EtsBytesPerRecord).
+ mnesia:table_info(rabbit_disk_queue, size) * MnesiaBytesPerRecord).
to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) ->
State;
-to_disk_only_mode(State = #dqstate { operation_mode = ram_disk,
- msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts,
- wordsize = WordSize }) ->
+to_disk_only_mode(State = #dqstate { operation_mode = ram_disk,
+ store = Store }) ->
rabbit_log:info("Converting disk queue to disk only mode~n", []),
- MnesiaMemBytes = WordSize * mnesia:table_info(rabbit_disk_queue, memory),
- EtsMemBytes = WordSize * ets:info(MsgLocationEts, memory),
- MnesiaSize = lists:max([1, mnesia:table_info(rabbit_disk_queue, size)]),
- EtsSize = lists:max([1, ets:info(MsgLocationEts, size)]),
+ MnesiaBPR = erlang:system_info(wordsize) *
+ mnesia:table_info(rabbit_disk_queue, memory) /
+ lists:max([1, mnesia:table_info(rabbit_disk_queue, size)]),
+ EtsBPR = rabbit_msg_store:ets_bpr(Store),
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
disc_only_copies),
- MnesiaBPR = MnesiaMemBytes / MnesiaSize,
- EtsBPR = EtsMemBytes / EtsSize,
+ Store1 = rabbit_msg_store:to_disk_only_mode(Store),
Path = form_filename(?DISK_ONLY_MODE_FILE),
case rabbit_misc:write_term_file(Path, [{MnesiaBPR, EtsBPR}]) of
ok -> ok;
{error, Reason} ->
throw({error, {cannot_create_disk_only_mode_file, Path, Reason}})
end,
- ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
- true = ets:delete_all_objects(MsgLocationEts),
garbage_collect(),
State #dqstate { operation_mode = disk_only,
- mnesia_bytes_per_record = MnesiaBPR,
- ets_bytes_per_record = EtsBPR }.
+ store = Store1,
+ mnesia_bytes_per_record = MnesiaBPR }.
to_ram_disk_mode(State = #dqstate { operation_mode = ram_disk }) ->
State;
-to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
- msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts }) ->
+to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
+ store = Store }) ->
rabbit_log:info("Converting disk queue to ram disk mode~n", []),
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
disc_copies),
+ Store1 = rabbit_msg_store:to_ram_disk_mode(Store),
ok = file:delete(form_filename(?DISK_ONLY_MODE_FILE)),
- true = ets:from_dets(MsgLocationEts, MsgLocationDets),
- ok = dets:delete_all_objects(MsgLocationDets),
garbage_collect(),
State #dqstate { operation_mode = ram_disk,
- mnesia_bytes_per_record = undefined,
- ets_bytes_per_record = undefined }.
-
-%%----------------------------------------------------------------------------
-%% message cache helper functions
-%%----------------------------------------------------------------------------
-
-%% The purpose of the cache is not especially performance, though it
-%% can help there too. The main purpose is to ensure that individual
-%% messages that are sent to multiple queues, and then to disk, are
-%% read back as the same binary object rather than multiples of
-%% identical binary objects. This prevents memory explosion.
-%%
-%% We limit the cache in size. If we didn't, then we could have two
-%% queues coming off the same exchange, receiving the same millions of
-%% messages, then one queue gets drained, which would pull the entire
-%% queue into the cache, which would potentially explode memory.
-
-remove_cache_entry(MsgId, #dqstate { message_cache = Cache }) ->
- true = ets:delete(Cache, MsgId),
- ok.
-
-fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) ->
- case ets:lookup(Cache, MsgId) of
- [] ->
- not_found;
- [{MsgId, Message, _RefCount}] ->
- NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}),
- {Message, NewRefCount}
- end.
-
-decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
- true = try case ets:update_counter(Cache, MsgId, {3, -1}) of
- N when N =< 0 -> true = ets:delete(Cache, MsgId);
- _N -> true
- end
- catch error:badarg ->
- %% MsgId is not in there because although it's been
- %% delivered, it's never actually been read (think:
- %% persistent message in mixed queue)
- true
- end,
- ok.
-
-insert_into_cache(Message = #basic_message { guid = MsgId },
- #dqstate { message_cache = Cache }) ->
- case cache_is_full(Cache) of
- true -> ok;
- false -> true = ets:insert_new(Cache, {MsgId, Message, 1}),
- ok
- end.
-
-cache_is_full(Cache) ->
- ets:info(Cache, memory) > ?CACHE_MAX_SIZE.
-
-%%----------------------------------------------------------------------------
-%% dets/ets agnosticism
-%%----------------------------------------------------------------------------
-
-dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, Key) ->
- dets:lookup(MsgLocationDets, Key);
-dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, Key) ->
- ets:lookup(MsgLocationEts, Key).
-
-dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, Key) ->
- ok = dets:delete(MsgLocationDets, Key);
-dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, Key) ->
- true = ets:delete(MsgLocationEts, Key),
- ok.
-
-dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, Obj) ->
- ok = dets:insert(MsgLocationDets, Obj);
-dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, Obj) ->
- true = ets:insert(MsgLocationEts, Obj),
- ok.
-
-dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, Obj) ->
- true = dets:insert_new(MsgLocationDets, Obj);
-dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, Obj) ->
- true = ets:insert_new(MsgLocationEts, Obj).
-
-dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, Obj) ->
- dets:match_object(MsgLocationDets, Obj);
-dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, Obj) ->
- ets:match_object(MsgLocationEts, Obj).
+ store = Store1,
+ mnesia_bytes_per_record = undefined }.
%%----------------------------------------------------------------------------
%% general helper functions
@@ -840,50 +499,10 @@ form_filename(Name) ->
base_directory() ->
filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/").
-msg_location_dets_file() ->
- form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS).
-
-open_file(File, Mode) -> file:open(form_filename(File), ?BINARY_MODE ++ Mode).
-
-with_read_handle_at(File, Offset, Fun, State =
- #dqstate { read_file_handle_cache = HC,
- current_file_name = CurName,
- current_dirty = IsDirty,
- last_sync_offset = SyncOffset
- }) ->
- State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset ->
- sync_current_file_handle(State);
- true -> State
- end,
- FilePath = form_filename(File),
- {Result, HC1} =
- rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC),
- {Result, State1 #dqstate { read_file_handle_cache = HC1 }}.
-
-sync_current_file_handle(State = #dqstate { current_dirty = false,
- on_sync_txns = [] }) ->
- State;
-sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
- current_dirty = IsDirty,
- current_offset = CurOffset,
- on_sync_txns = Txns,
- last_sync_offset = SyncOffset
- }) ->
- SyncOffset1 = case IsDirty of
- true -> ok = file:sync(CurHdl),
- CurOffset;
- false -> SyncOffset
- end,
- State1 = lists:foldl(fun internal_do_tx_commit/2, State, lists:reverse(Txns)),
- State1 #dqstate { current_dirty = false, on_sync_txns = [],
- last_sync_offset = SyncOffset1 }.
-
sequence_lookup(Sequences, Q) ->
case ets:lookup(Sequences, Q) of
- [] ->
- {0, 0};
- [{Q, ReadSeqId, WriteSeqId}] ->
- {ReadSeqId, WriteSeqId}
+ [] -> {0, 0};
+ [{_, ReadSeqId, WriteSeqId}] -> {ReadSeqId, WriteSeqId}
end.
start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
@@ -896,91 +515,54 @@ stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #dqstate { commit_timer_ref = undefined }.
-msg_to_bin(Msg = #basic_message { content = Content }) ->
- ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
- term_to_binary(Msg #basic_message { content = ClearedContent }).
-
-bin_to_msg(MsgBin) ->
- binary_to_term(MsgBin).
+sync(State = #dqstate { store = Store, on_sync_txns = Txns }) ->
+ State1 = State #dqstate { store = rabbit_msg_store:sync(Store) },
+ case Txns of
+ [] -> State1;
+ _ -> lists:foldl(fun internal_do_tx_commit/2,
+ State1 #dqstate { on_sync_txns = [] },
+ lists:reverse(Txns))
+ end.
%%----------------------------------------------------------------------------
%% internal functions
%%----------------------------------------------------------------------------
-internal_fetch_body(Q, MarkDelivered, Advance, State) ->
- case queue_head(Q, MarkDelivered, Advance, State) of
- E = {ok, empty, _} -> E;
- {ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} ->
- {Message, State2} = read_stored_message(StoreEntry, State1),
- {ok, {Message, IsDelivered, AckTag, Remaining}, State2}
+internal_fetch_body(Q, MarkDelivered, Advance,
+ State = #dqstate { store = Store }) ->
+ case next(Q, MarkDelivered, Advance, State) of
+ empty -> {empty, State};
+ {MsgId, IsDelivered, AckTag, Remaining} ->
+ {Message, Store1} = rabbit_msg_store:read(MsgId, Store),
+ State1 = State #dqstate { store = Store1 },
+ {{Message, IsDelivered, AckTag, Remaining}, State1}
end.
-internal_fetch_attributes(Q, MarkDelivered, Advance, State) ->
- case queue_head(Q, MarkDelivered, Advance, State) of
- E = {ok, empty, _} -> E;
- {ok, AckTag, IsDelivered,
- #msg_location { msg_id = MsgId, is_persistent = IsPersistent },
- Remaining, State1} ->
- {ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1}
+internal_fetch_attributes(Q, MarkDelivered, Advance,
+ State = #dqstate { store = Store }) ->
+ case next(Q, MarkDelivered, Advance, State) of
+ empty -> empty;
+ {MsgId, IsDelivered, AckTag, Remaining} ->
+ IsPersistent = rabbit_msg_store:is_persistent(MsgId, Store),
+ {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}
end.
-queue_head(Q, MarkDelivered, Advance,
- State = #dqstate { sequences = Sequences }) ->
+next(Q, MarkDelivered, Advance, #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
- {SeqId, SeqId} -> {ok, empty, State};
+ {SeqId, SeqId} -> empty;
{ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId ->
Remaining = WriteSeqId - ReadSeqId - 1,
- {AckTag, IsDelivered, StoreEntry} =
- update_message_attributes(Q, ReadSeqId, MarkDelivered, State),
+ {MsgId, IsDelivered} =
+ update_message_attributes(Q, ReadSeqId, MarkDelivered),
ok = maybe_advance(Advance, Sequences, Q, ReadSeqId, WriteSeqId),
- {ok, AckTag, IsDelivered, StoreEntry, Remaining, State}
+ AckTag = {MsgId, ReadSeqId},
+ {MsgId, IsDelivered, AckTag, Remaining}
end.
-maybe_advance(peek_queue, _, _, _, _) ->
- ok;
-maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) ->
- true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
- ok.
-
-read_stored_message(#msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset,
- total_size = TotalSize }, State) ->
- case fetch_and_increment_cache(MsgId, State) of
- not_found ->
- {{ok, {MsgId, MsgBody, _IsPersistent, _BodySize}}, State1} =
- with_read_handle_at(
- File, Offset,
- fun(Hdl) ->
- Res = case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {MsgId, _, _, _}} = Obj -> Obj;
- {ok, Rest} ->
- throw({error,
- {misread, [{old_state, State},
- {file, File},
- {offset, Offset},
- {read, Rest}]}})
- end,
- {Offset + TotalSize, Res}
- end, State),
- Message = #basic_message {} = bin_to_msg(MsgBody),
- ok = if RefCount > 1 ->
- insert_into_cache(Message, State1);
- true -> ok
- %% it's not in the cache and we only have
- %% 1 queue with the message. So don't
- %% bother putting it in the cache.
- end,
- {Message, State1};
- {Message, _RefCount} ->
- {Message, State}
- end.
-
-update_message_attributes(Q, SeqId, MarkDelivered, State) ->
+update_message_attributes(Q, SeqId, MarkDelivered) ->
[Obj =
#dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] =
mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}),
- [StoreEntry = #msg_location { msg_id = MsgId }] =
- dets_ets_lookup(State, MsgId),
ok = case {IsDelivered, MarkDelivered} of
{true, _} -> ok;
{false, ignore_delivery} -> ok;
@@ -988,130 +570,62 @@ update_message_attributes(Q, SeqId, MarkDelivered, State) ->
mnesia:dirty_write(rabbit_disk_queue,
Obj #dq_msg_loc {is_delivered = true})
end,
- {{MsgId, SeqId}, IsDelivered, StoreEntry}.
+ {MsgId, IsDelivered}.
+
+maybe_advance(peek_queue, _, _, _, _) ->
+ ok;
+maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) ->
+ true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
+ ok.
internal_foldl(Q, Fun, Init, State) ->
- State1 = #dqstate { sequences = Sequences } =
- sync_current_file_handle(State),
+ State1 = #dqstate { sequences = Sequences } = sync(State),
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId).
internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
{ok, Acc, State};
-internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) ->
- {AckTag, IsDelivered, StoreEntry} =
- update_message_attributes(Q, ReadSeqId, ignore_delivery, State),
- {Message, State1} = read_stored_message(StoreEntry, State),
- Acc1 = Fun(Message, AckTag, IsDelivered, Acc),
- internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1).
+internal_foldl(Q, WriteSeqId, Fun, State = #dqstate { store = Store },
+ Acc, ReadSeqId) ->
+ [#dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] =
+ mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
+ {Message, Store1} = rabbit_msg_store:read(MsgId, Store),
+ Acc1 = Fun(Message, {MsgId, ReadSeqId}, IsDelivered, Acc),
+ internal_foldl(Q, WriteSeqId, Fun, State #dqstate { store = Store1 },
+ Acc1, ReadSeqId + 1).
internal_ack(Q, MsgSeqIds, State) ->
remove_messages(Q, MsgSeqIds, true, State).
%% Q is only needed if MnesiaDelete /= false
-remove_messages(Q, MsgSeqIds, MnesiaDelete, State) ->
- Files =
- lists:foldl(
- fun ({MsgId, SeqId}, Files1) ->
- Files2 = remove_message(MsgId, Files1, State),
- ok = case MnesiaDelete of
- true -> mnesia:dirty_delete(rabbit_disk_queue,
- {Q, SeqId});
- _ -> ok
- end,
- Files2
- end, sets:new(), MsgSeqIds),
- State1 = compact(Files, State),
- {ok, State1}.
-
-remove_message(MsgId, Files,
- State = #dqstate { file_summary = FileSummary,
- current_file_name = CurName
- }) ->
- [StoreEntry =
- #msg_location { msg_id = MsgId, ref_count = RefCount, file = File,
- offset = Offset, total_size = TotalSize }] =
- dets_ets_lookup(State, MsgId),
- case RefCount of
- 1 ->
- ok = dets_ets_delete(State, MsgId),
- ok = remove_cache_entry(MsgId, State),
- [FSEntry = #file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop }] =
- ets:lookup(FileSummary, File),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- ValidTotalSize1 = ValidTotalSize - TotalSize,
- true = ets:insert(FileSummary, FSEntry #file_summary {
- valid_total_size = ValidTotalSize1,
- contiguous_top = ContiguousTop1 }),
- if CurName =:= File -> Files;
- true -> sets:add_element(File, Files)
- end;
- _ when 1 < RefCount ->
- ok = decrement_cache(MsgId, State),
- ok = dets_ets_insert(State, StoreEntry #msg_location {
- ref_count = RefCount - 1 }),
- Files
- end.
+remove_messages(Q, MsgSeqIds, MnesiaDelete,
+ State = #dqstate { store = Store } ) ->
+ MsgIds = lists:foldl(
+ fun ({MsgId, SeqId}, MsgIdAcc) ->
+ ok = case MnesiaDelete of
+ true -> mnesia:dirty_delete(rabbit_disk_queue,
+ {Q, SeqId});
+ _ -> ok
+ end,
+ [MsgId | MsgIdAcc]
+ end, [], MsgSeqIds),
+ Store1 = rabbit_msg_store:remove(MsgIds, Store),
+ {ok, State #dqstate { store = Store1}}.
internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
- guid = MsgId },
- State = #dqstate { current_file_handle = CurHdl,
- current_file_name = CurName,
- current_offset = CurOffset,
- file_summary = FileSummary
- }) ->
- case dets_ets_lookup(State, MsgId) of
- [] ->
- %% New message, lots to do
- {ok, TotalSize} = rabbit_msg_file:append(
- CurHdl, MsgId, msg_to_bin(Message),
- IsPersistent),
- true = dets_ets_insert_new(
- State, #msg_location {
- msg_id = MsgId, ref_count = 1, file = CurName,
- offset = CurOffset, total_size = TotalSize,
- is_persistent = IsPersistent }),
- [FSEntry = #file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
- right = undefined }] =
- ets:lookup(FileSummary, CurName),
- ValidTotalSize1 = ValidTotalSize + TotalSize,
- ContiguousTop1 = if CurOffset =:= ContiguousTop ->
- %% can't be any holes in this file
- ValidTotalSize1;
- true -> ContiguousTop
- end,
- true = ets:insert(FileSummary, FSEntry #file_summary {
- valid_total_size = ValidTotalSize1,
- contiguous_top = ContiguousTop1 }),
- NextOffset = CurOffset + TotalSize,
- maybe_roll_to_new_file(
- NextOffset, State #dqstate {current_offset = NextOffset,
- current_dirty = true});
- [StoreEntry =
- #msg_location { msg_id = MsgId, ref_count = RefCount }] ->
- %% We already know about it, just update counter
- ok = dets_ets_insert(State, StoreEntry #msg_location {
- ref_count = RefCount + 1 }),
- {ok, State}
- end.
+ guid = MsgId,
+ content = Content },
+ State = #dqstate { store = Store }) ->
+ ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
+ Message1 = Message #basic_message { content = ClearedContent },
+ Store1 = rabbit_msg_store:write(MsgId, Message1, IsPersistent, Store),
+ {ok, State #dqstate { store = Store1 }}.
internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
- State = #dqstate { current_file_name = CurFile,
- current_dirty = IsDirty,
- on_sync_txns = Txns,
- last_sync_offset = SyncOffset
- }) ->
- NeedsSync = IsDirty andalso
- lists:any(fun ({MsgId, _IsDelivered}) ->
- [#msg_location { msg_id = MsgId, file = File,
- offset = Offset }] =
- dets_ets_lookup(State, MsgId),
- File =:= CurFile andalso Offset >= SyncOffset
- end, PubMsgIds),
+ State = #dqstate { store = Store, on_sync_txns = Txns }) ->
TxnDetails = {Q, PubMsgIds, AckSeqIds, From},
- case NeedsSync of
+ case rabbit_msg_store:needs_sync(
+ [MsgId || {MsgId, _IsDelivered} <- PubMsgIds], Store) of
true -> Txns1 = [TxnDetails | Txns],
State #dqstate { on_sync_txns = Txns1 };
false -> internal_do_tx_commit(TxnDetails, State)
@@ -1165,13 +679,14 @@ internal_tx_rollback(MsgIds, State) ->
internal_requeue(_Q, [], State) ->
{ok, State};
-internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
+internal_requeue(Q, MsgSeqIds, State = #dqstate { store = Store,
+ sequences = Sequences }) ->
%% We know that every seq_id in here is less than the ReadSeqId
%% you'll get if you look up this queue in Sequences (i.e. they've
%% already been delivered). We also know that the rows for these
%% messages are still in rabbit_disk_queue (i.e. they've not been
%% ack'd).
-
+ %%
%% Now, it would be nice if we could adjust the sequence ids in
%% rabbit_disk_queue (mnesia) to create a contiguous block and
%% then drop the ReadSeqId for the queue by the corresponding
@@ -1180,13 +695,14 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
%% which are not being requeued. As such, moving things about in
%% rabbit_disk_queue _under_ the current ReadSeqId would result in
%% such sequence ids referring to the wrong messages.
-
+ %%
%% Therefore, the only solution is to take these messages, and to
%% reenqueue them at the top of the queue. Usefully, this only
%% affects the Sequences and rabbit_disk_queue structures - there
%% is no need to physically move the messages about on disk, so
- %% MsgLocation and FileSummary stay put (which makes further sense
- %% as they have no concept of sequence id anyway).
+ %% the message store remains unaffected, except we need to tell it
+ %% about the ids of the requeued messages so it can remove them
+ %% from its message cache if necessary.
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
{WriteSeqId1, Q, MsgIds} =
@@ -1197,8 +713,8 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
MsgSeqIds)
end),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}),
- lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
- {ok, State}.
+ Store1 = rabbit_msg_store:release(MsgIds, Store),
+ {ok, State #dqstate { store = Store1 }}.
requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) ->
[Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
@@ -1212,7 +728,8 @@ requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) ->
{WriteSeqId + 1, Q, [MsgId | Acc]}.
%% move the next N messages from the front of the queue to the back.
-internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) ->
+internal_requeue_next_n(Q, N, State = #dqstate { store = Store,
+ sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
if N >= (WriteSeqId - ReadSeqId) -> {ok, State};
true ->
@@ -1224,8 +741,8 @@ internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) ->
end
),
true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}),
- lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
- {ok, State}
+ Store1 = rabbit_msg_store:release(MsgIds, Store),
+ {ok, State #dqstate { store = Store1 }}
end.
requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) ->
@@ -1257,7 +774,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
end.
internal_delete_queue(Q, State) ->
- State1 = sync_current_file_handle(State),
+ State1 = sync(State),
{ok, _Count, State2 = #dqstate { sequences = Sequences }} =
internal_purge(Q, State1), %% remove everything undelivered
true = ets:delete(Sequences, Q),
@@ -1283,269 +800,6 @@ internal_delete_non_durable_queues(
end, {ok, State}, Sequences).
%%----------------------------------------------------------------------------
-%% garbage collection / compaction / aggregation
-%%----------------------------------------------------------------------------
-
-maybe_roll_to_new_file(Offset,
- State = #dqstate { file_size_limit = FileSizeLimit,
- current_file_name = CurName,
- current_file_handle = CurHdl,
- current_file_num = CurNum,
- file_summary = FileSummary
- }
- ) when Offset >= FileSizeLimit ->
- State1 = sync_current_file_handle(State),
- ok = file:close(CurHdl),
- NextNum = CurNum + 1,
- NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
- {ok, NextHdl} = open_file(NextName, ?WRITE_MODE),
- true = ets:update_element(FileSummary, CurName,
- {#file_summary.right, NextName}),
- true = ets:insert_new(
- FileSummary, #file_summary {
- file = NextName, valid_total_size = 0, contiguous_top = 0,
- left = CurName, right = undefined }),
- State2 = State1 #dqstate { current_file_name = NextName,
- current_file_handle = NextHdl,
- current_file_num = NextNum,
- current_offset = 0,
- last_sync_offset = 0
- },
- {ok, compact(sets:from_list([CurName]), State2)};
-maybe_roll_to_new_file(_, State) ->
- {ok, State}.
-
-compact(FilesSet, State) ->
- %% smallest number, hence eldest, hence left-most, first
- Files = lists:sort(fun file_name_sort/2, sets:to_list(FilesSet)),
- %% foldl reverses, so now youngest/right-most first
- RemainingFiles = lists:foldl(fun (File, Acc) ->
- delete_empty_files(File, Acc, State)
- end, [], Files),
- lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)).
-
-%% At this stage, we simply know that the file has had msgs removed
-%% from it. However, we don't know if we need to merge it left (which
-%% is what we would prefer), or merge it right. If we merge left, then
-%% this file is the source, and the left file is the destination. If
-%% we merge right then this file is the destination and the right file
-%% is the source.
-combine_file(File, State = #dqstate { file_summary = FileSummary,
- current_file_name = CurName
- }) ->
- %% the file we're looking at may no longer exist as it may have
- %% been deleted within the current GC run
- case ets:lookup(FileSummary, File) of
- [] -> State;
- [FSEntry = #file_summary { left = Left, right = Right }] ->
- GoRight =
- fun() ->
- case Right of
- undefined -> State;
- _ when not (CurName == Right) ->
- [FSRight] = ets:lookup(FileSummary, Right),
- {_, State1} = adjust_meta_and_combine(
- FSEntry, FSRight, State),
- State1;
- _ -> State
- end
- end,
- case Left of
- undefined ->
- GoRight();
- _ -> [FSLeft] = ets:lookup(FileSummary, Left),
- case adjust_meta_and_combine(FSLeft, FSEntry, State) of
- {true, State1} -> State1;
- {false, State} -> GoRight()
- end
- end
- end.
-
-adjust_meta_and_combine(
- LeftObj = #file_summary {
- file = LeftFile, valid_total_size = LeftValidData, right = RightFile },
- RightObj = #file_summary {
- file = RightFile, valid_total_size = RightValidData, left = LeftFile,
- right = RightRight },
- State = #dqstate { file_size_limit = FileSizeLimit,
- file_summary = FileSummary }) ->
- TotalValidData = LeftValidData + RightValidData,
- if FileSizeLimit >= TotalValidData ->
- State1 = combine_files(RightObj, LeftObj, State),
- %% this could fail if RightRight is undefined
- ets:update_element(FileSummary, RightRight,
- {#file_summary.left, LeftFile}),
- true = ets:insert(FileSummary, LeftObj #file_summary {
- valid_total_size = TotalValidData,
- contiguous_top = TotalValidData,
- right = RightRight }),
- true = ets:delete(FileSummary, RightFile),
- {true, State1};
- true -> {false, State}
- end.
-
-sort_msg_locations_by_offset(Dir, List) ->
- Comp = case Dir of
- asc -> fun erlang:'<'/2;
- desc -> fun erlang:'>'/2
- end,
- lists:sort(fun (#msg_location { offset = OffA },
- #msg_location { offset = OffB }) ->
- Comp(OffA, OffB)
- end, List).
-
-preallocate(Hdl, FileSizeLimit, FinalPos) ->
- {ok, FileSizeLimit} = file:position(Hdl, FileSizeLimit),
- ok = file:truncate(Hdl),
- {ok, FinalPos} = file:position(Hdl, FinalPos),
- ok.
-
-truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
- {ok, Lowpoint} = file:position(FileHdl, Lowpoint),
- ok = file:truncate(FileHdl),
- ok = preallocate(FileHdl, Highpoint, Lowpoint).
-
-combine_files(#file_summary { file = Source,
- valid_total_size = SourceValid,
- left = Destination },
- #file_summary { file = Destination,
- valid_total_size = DestinationValid,
- contiguous_top = DestinationContiguousTop,
- right = Source },
- State) ->
- State1 = close_file(Source, close_file(Destination, State)),
- {ok, SourceHdl} = open_file(Source, ?READ_MODE),
- {ok, DestinationHdl} = open_file(Destination, ?READ_MODE ++ ?WRITE_MODE),
- ExpectedSize = SourceValid + DestinationValid,
- %% if DestinationValid =:= DestinationContiguousTop then we don't
- %% need a tmp file
- %% if they're not equal, then we need to write out everything past
- %% the DestinationContiguousTop to a tmp file then truncate,
- %% copy back in, and then copy over from Source
- %% otherwise we just truncate straight away and copy over from Source
- if DestinationContiguousTop =:= DestinationValid ->
- ok = truncate_and_extend_file(DestinationHdl,
- DestinationValid, ExpectedSize);
- true ->
- Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Tmp, ?READ_MODE ++ ?WRITE_MODE),
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset /= DestinationContiguousTop ->
- %% it cannot be that Offset ==
- %% DestinationContiguousTop because if it
- %% was then DestinationContiguousTop would
- %% have been extended by TotalSize
- Offset < DestinationContiguousTop
- %% Given expected access patterns, I suspect
- %% that the list should be naturally sorted
- %% as we require, however, we need to
- %% enforce it anyway
- end, sort_msg_locations_by_offset(
- asc, dets_ets_match_object(
- State1, #msg_location {
- file = Destination, _ = '_' }))),
- ok = copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State1),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and MsgLocationDets has been updated to
- %% reflect compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file:position(TmpHdl, 0),
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file:sync(DestinationHdl),
- ok = file:close(TmpHdl),
- ok = file:delete(form_filename(Tmp))
- end,
- SourceWorkList =
- sort_msg_locations_by_offset(
- asc, dets_ets_match_object(State1, #msg_location {
- file = Source, _ = '_' })),
- ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
- SourceHdl, DestinationHdl, Destination, State1),
- %% tidy up
- ok = file:close(SourceHdl),
- ok = file:close(DestinationHdl),
- ok = file:delete(form_filename(Source)),
- State1.
-
-copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination, State) ->
- {FinalOffset, BlockStart1, BlockEnd1} =
- lists:foldl(
- fun (StoreEntry = #msg_location { offset = Offset,
- total_size = TotalSize },
- {CurOffset, BlockStart, BlockEnd}) ->
- %% CurOffset is in the DestinationFile.
- %% Offset, BlockStart and BlockEnd are in the SourceFile
- %% update MsgLocationDets to reflect change of file and offset
- ok = dets_ets_insert(State, StoreEntry #msg_location {
- file = Destination,
- offset = CurOffset }),
- NextOffset = CurOffset + TotalSize,
- if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
- {NextOffset, Offset, Offset + TotalSize};
- Offset =:= BlockEnd ->
- %% extend the current block because the next
- %% msg follows straight on
- {NextOffset, BlockStart, BlockEnd + TotalSize};
- true ->
- %% found a gap, so actually do the work for
- %% the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart} =
- file:position(SourceHdl, BlockStart),
- {ok, BSize} =
- file:copy(SourceHdl, DestinationHdl, BSize),
- {NextOffset, Offset, Offset + TotalSize}
- end
- end, {InitOffset, undefined, undefined}, WorkList),
- %% do the last remaining block
- BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} = file:position(SourceHdl, BlockStart1),
- {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1),
- ok = file:sync(DestinationHdl),
- ok.
-
-close_file(File, State = #dqstate { read_file_handle_cache = HC }) ->
- HC1 = rabbit_file_handle_cache:close_file(form_filename(File), HC),
- State #dqstate { read_file_handle_cache = HC1 }.
-
-delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
- [#file_summary { valid_total_size = ValidData,
- left = Left, right = Right }] =
- ets:lookup(FileSummary, File),
- case ValidData of
- %% we should NEVER find the current file in here hence right
- %% should always be a file, not undefined
- 0 ->
- case {Left, Right} of
- {undefined, _} when not is_atom(Right) ->
- %% the eldest file is empty.
- true = ets:update_element(
- FileSummary, Right,
- {#file_summary.left, undefined});
- {_, _} when not (is_atom(Right)) ->
- true = ets:update_element(FileSummary, Right,
- {#file_summary.left, Left}),
- true =
- ets:update_element(FileSummary, Left,
- {#file_summary.right, Right})
- end,
- true = ets:delete(FileSummary, File),
- ok = file:delete(form_filename(File)),
- Acc;
- _ -> [File|Acc]
- end.
-
-%%----------------------------------------------------------------------------
%% recovery
%%----------------------------------------------------------------------------
@@ -1630,55 +884,35 @@ del_index() ->
E1 -> E1
end.
-load_from_disk(State) ->
- %% sorted so that smallest number is first. which also means
- %% eldest file (left-most) first
- ok = add_index(),
- {Files, TmpFiles} = get_disk_queue_files(),
- ok = recover_crashed_compactions(Files, TmpFiles),
- %% There should be no more tmp files now, so go ahead and load the
- %% whole lot
- Files1 = case Files of
- [] -> [State #dqstate.current_file_name];
- _ -> Files
- end,
- State1 = load_messages(undefined, Files1, State),
- %% Finally, check there is nothing in mnesia which we haven't
- %% loaded
- Key = mnesia:dirty_first(rabbit_disk_queue),
- {ok, AlteredFiles} = prune_mnesia(State1, Key, sets:new(), [], 0),
- State2 = compact(AlteredFiles, State1),
- ok = extract_sequence_numbers(State2 #dqstate.sequences),
- ok = del_index(),
- {ok, State2}.
-
-prune_mnesia_flush_batch(DeleteAcc) ->
+prune_mnesia_flush_batch(DeleteAcc, RemoveAcc, Store) ->
lists:foldl(fun (Key, ok) ->
mnesia:dirty_delete(rabbit_disk_queue, Key)
- end, ok, DeleteAcc).
-
-prune_mnesia(_State, '$end_of_table', Files, _DeleteAcc, 0) ->
- {ok, Files};
-prune_mnesia(_State, '$end_of_table', Files, DeleteAcc, _Len) ->
- ok = prune_mnesia_flush_batch(DeleteAcc),
- {ok, Files};
-prune_mnesia(State, Key, Files, DeleteAcc, Len) ->
+ end, ok, DeleteAcc),
+ rabbit_msg_store:remove(RemoveAcc, Store).
+
+prune_mnesia(Store) ->
+ prune_mnesia(Store, mnesia:dirty_first(rabbit_disk_queue), [], [], 0).
+
+prune_mnesia(Store, '$end_of_table', _DeleteAcc, _RemoveAcc, 0) ->
+ Store;
+prune_mnesia(Store, '$end_of_table', DeleteAcc, RemoveAcc, _Len) ->
+ prune_mnesia_flush_batch(DeleteAcc, RemoveAcc, Store);
+prune_mnesia(Store, Key, DeleteAcc, RemoveAcc, Len) ->
[#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] =
mnesia:dirty_read(rabbit_disk_queue, Key),
- {DeleteAcc1, Files1, Len1} =
- case dets_ets_lookup(State, MsgId) of
- [] ->
+ {DeleteAcc1, RemoveAcc1, Len1} =
+ case rabbit_msg_store:is_persistent(MsgId, Store) of
+ not_found ->
%% msg hasn't been found on disk, delete it
- {[{Q, SeqId} | DeleteAcc], Files, Len + 1};
- [#msg_location { msg_id = MsgId, is_persistent = true }] ->
+ {[{Q, SeqId} | DeleteAcc], RemoveAcc, Len + 1};
+ true ->
%% msg is persistent, keep it
- {DeleteAcc, Files, Len};
- [#msg_location { msg_id = MsgId, is_persistent = false}] ->
+ {DeleteAcc, RemoveAcc, Len};
+ false ->
%% msg is not persistent, delete it
- Files2 = remove_message(MsgId, Files, State),
- {[{Q, SeqId} | DeleteAcc], Files2, Len + 1}
+ {[{Q, SeqId} | DeleteAcc], [MsgId | RemoveAcc], Len + 1}
end,
- {Key1, DeleteAcc2, Len2} =
+ {Store1, Key1, DeleteAcc2, RemoveAcc2, Len2} =
if
Len1 >= ?BATCH_SIZE ->
%% We have no way of knowing how flushing the batch
@@ -1686,14 +920,15 @@ prune_mnesia(State, Key, Files, DeleteAcc, Len) ->
%% so have no choice but to start again. Although this
%% will make recovery slower for large queues, we
%% guarantee we can start up in constant memory
- ok = prune_mnesia_flush_batch(DeleteAcc1),
+ Store2 = prune_mnesia_flush_batch(DeleteAcc1, RemoveAcc1,
+ Store),
Key2 = mnesia:dirty_first(rabbit_disk_queue),
- {Key2, [], 0};
+ {Store2, Key2, [], [], 0};
true ->
Key2 = mnesia:dirty_next(rabbit_disk_queue, Key),
- {Key2, DeleteAcc1, Len1}
+ {Store, Key2, DeleteAcc1, RemoveAcc1, Len1}
end,
- prune_mnesia(State, Key1, Files1, DeleteAcc2, Len2).
+ prune_mnesia(Store1, Key1, DeleteAcc2, RemoveAcc2, Len2).
extract_sequence_numbers(Sequences) ->
true =
@@ -1712,7 +947,7 @@ extract_sequence_numbers(Sequences) ->
case ets:lookup(Sequences, Q) of
[] -> ets:insert_new(Sequences,
{Q, SeqId, NextWrite});
- [Orig = {Q, Read, Write}] ->
+ [Orig = {_, Read, Write}] ->
Repl = {Q, lists:min([Read, SeqId]),
lists:max([Write, NextWrite])},
case Orig == Repl of
@@ -1767,213 +1002,8 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
end,
shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc).
-load_messages(Left, [], State) ->
- Num = list_to_integer(filename:rootname(Left)),
- Offset =
- case dets_ets_match_object(State, #msg_location {
- file = Left, _ = '_' }) of
- [] -> 0;
- L ->
- [ #msg_location { file = Left,
- offset = MaxOffset,
- total_size = TotalSize} | _ ] =
- sort_msg_locations_by_offset(desc, L),
- MaxOffset + TotalSize
- end,
- State #dqstate { current_file_num = Num, current_file_name = Left,
- current_offset = Offset };
-load_messages(Left, [File|Files],
- State = #dqstate { file_summary = FileSummary }) ->
- {ok, Messages} = scan_file_for_valid_messages(File),
- {ValidMessages, ValidTotalSize} = lists:foldl(
- fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId, _ = '_' },
- msg_id)) of
- 0 -> {VMAcc, VTSAcc};
- RefCount ->
- true = dets_ets_insert_new(
- State, #msg_location {
- msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset,
- total_size = TotalSize,
- is_persistent = IsPersistent }),
- {[Obj | VMAcc], VTSAcc + TotalSize}
- end
- end, {[], 0}, Messages),
- %% foldl reverses lists, find_contiguous_block_prefix needs
- %% msgs eldest first, so, ValidMessages is the right way round
- {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages),
- Right = case Files of
- [] -> undefined;
- [F|_] -> F
- end,
- true = ets:insert_new(FileSummary, #file_summary {
- file = File, valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
- left = Left, right = Right }),
- load_messages(File, Files, State).
-
-recover_crashed_compactions(Files, TmpFiles) ->
- lists:foreach(fun (TmpFile) ->
- ok = recover_crashed_compactions1(Files, TmpFile) end,
- TmpFiles),
- ok.
-
-verify_messages_in_mnesia(MsgIds) ->
- lists:foreach(
- fun (MsgId) ->
- true = 0 < length(mnesia:dirty_index_match_object(
- rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId, _ = '_' },
- msg_id))
- end, MsgIds).
-
-scan_file_for_valid_messages_msg_ids(File) ->
- {ok, Messages} = scan_file_for_valid_messages(File),
- {ok, Messages,
- [MsgId || {MsgId, _IsPersistent, _TotalSize, _FileOffset} <- Messages]}.
-
-recover_crashed_compactions1(Files, TmpFile) ->
- NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
- true = lists:member(NonTmpRelatedFile, Files),
- {ok, UncorruptedMessagesTmp, MsgIdsTmp} =
- scan_file_for_valid_messages_msg_ids(TmpFile),
- %% all of these messages should appear in the mnesia table,
- %% otherwise they wouldn't have been copied out
- verify_messages_in_mnesia(MsgIdsTmp),
- {ok, UncorruptedMessages, MsgIds} =
- scan_file_for_valid_messages_msg_ids(NonTmpRelatedFile),
- %% 1) It's possible that everything in the tmp file is also in the
- %% main file such that the main file is (prefix ++
- %% tmpfile). This means that compaction failed immediately
- %% prior to the final step of deleting the tmp file. Plan: just
- %% delete the tmp file
- %% 2) It's possible that everything in the tmp file is also in the
- %% main file but with holes throughout (or just somthing like
- %% main = (prefix ++ hole ++ tmpfile)). This means that
- %% compaction wrote out the tmp file successfully and then
- %% failed. Plan: just delete the tmp file and allow the
- %% compaction to eventually be triggered later
- %% 3) It's possible that everything in the tmp file is also in the
- %% main file but such that the main file does not end with tmp
- %% file (and there are valid messages in the suffix; main =
- %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This
- %% means that compaction failed as we were writing out the tmp
- %% file. Plan: just delete the tmp file and allow the
- %% compaction to eventually be triggered later
- %% 4) It's possible that there are messages in the tmp file which
- %% are not in the main file. This means that writing out the
- %% tmp file succeeded, but then we failed as we were copying
- %% them back over to the main file, after truncating the main
- %% file. As the main file has already been truncated, it should
- %% consist only of valid messages. Plan: Truncate the main file
- %% back to before any of the files in the tmp file and copy
- %% them over again
- TmpPath = form_filename(TmpFile),
- case is_sublist(MsgIdsTmp, MsgIds) of
- true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
- %% note this also catches the case when the tmp file
- %% is empty
- ok = file:delete(TmpPath);
- false ->
- %% We're in case 4 above. We only care about the inital
- %% msgs in main file that are not in the tmp file. If
- %% there are no msgs in the tmp file then we would be in
- %% the 'true' branch of this case, so we know the
- %% lists:last call is safe.
- EldestTmpMsgId = lists:last(MsgIdsTmp),
- {MsgIds1, UncorruptedMessages1}
- = case lists:splitwith(
- fun (MsgId) -> MsgId /= EldestTmpMsgId end, MsgIds) of
- {_MsgIds, []} -> %% no msgs from tmp in main
- {MsgIds, UncorruptedMessages};
- {Dropped, [EldestTmpMsgId | Rest]} ->
- %% Msgs in Dropped are in tmp, so forget them.
- %% *cry*. Lists indexed from 1.
- {Rest, lists:sublist(UncorruptedMessages,
- 2 + length(Dropped),
- length(Rest))}
- end,
- %% Check that everything in the main file prefix is a
- %% valid message in mnesia
- verify_messages_in_mnesia(MsgIds1),
- %% The main file prefix should be contiguous
- {Top, MsgIds1} = find_contiguous_block_prefix(
- lists:reverse(UncorruptedMessages1)),
- %% we should have that none of the messages in the prefix
- %% are in the tmp file
- true = is_disjoint(MsgIds1, MsgIdsTmp),
- %% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} = open_file(NonTmpRelatedFile, ?WRITE_MODE ++ [read]),
- %% Wipe out any rubbish at the end of the file. Remember
- %% the head of the list will be the highest entry in the
- %% file.
- [{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
- TmpSize = TmpTopOffset + TmpTopTotalSize,
- %% Extend the main file as big as necessary in a single
- %% move. If we run out of disk space, this truncate could
- %% fail, but we still aren't risking losing data
- ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
- {ok, TmpHdl} = open_file(TmpFile, ?READ_MODE),
- {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
- ok = file:sync(MainHdl),
- ok = file:close(MainHdl),
- ok = file:close(TmpHdl),
- ok = file:delete(TmpPath),
-
- {ok, _MainMessages, MsgIdsMain} =
- scan_file_for_valid_messages_msg_ids(NonTmpRelatedFile),
- %% check that everything in MsgIds1 is in MsgIdsMain
- true = is_sublist(MsgIds1, MsgIdsMain),
- %% check that everything in MsgIdsTmp is in MsgIdsMain
- true = is_sublist(MsgIdsTmp, MsgIdsMain)
- end,
- ok.
-
-is_sublist(SmallerList, BiggerList) ->
- lists:all(fun (Item) -> lists:member(Item, BiggerList) end, SmallerList).
-
-is_disjoint(SmallerList, BiggerList) ->
- lists:all(fun (Item) -> not lists:member(Item, BiggerList) end, SmallerList).
-
-%% Takes the list in *ascending* order (i.e. eldest message
-%% first). This is the opposite of what scan_file_for_valid_messages
-%% produces. The list of msgs that is produced is youngest first.
-find_contiguous_block_prefix([]) -> {0, []};
-find_contiguous_block_prefix(List) ->
- find_contiguous_block_prefix(List, 0, []).
-
-find_contiguous_block_prefix([], ExpectedOffset, MsgIds) ->
- {ExpectedOffset, MsgIds};
-find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset}
- | Tail], ExpectedOffset, MsgIds) ->
- ExpectedOffset1 = ExpectedOffset + TotalSize,
- find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]);
-find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
- {ExpectedOffset, MsgIds}.
-
-file_name_sort(A, B) ->
- ANum = list_to_integer(filename:rootname(A)),
- BNum = list_to_integer(filename:rootname(B)),
- ANum < BNum.
-
-get_disk_queue_files() ->
- DQFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION, base_directory()),
- DQFilesSorted = lists:sort(fun file_name_sort/2, DQFiles),
- DQTFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, base_directory()),
- DQTFilesSorted = lists:sort(fun file_name_sort/2, DQTFiles),
- {DQFilesSorted, DQTFilesSorted}.
-
-scan_file_for_valid_messages(File) ->
- case open_file(File, ?READ_MODE) of
- {ok, Hdl} ->
- Valid = rabbit_msg_file:scan(Hdl),
- %% if something really bad's happened, the close could fail,
- %% but ignore
- file:close(Hdl),
- Valid;
- {error, enoent} -> {ok, []};
- {error, Reason} -> throw({error, {unable_to_scan_file, File, Reason}})
- end.
+ref_count(MsgId) ->
+ length(mnesia:dirty_index_match_object(
+ rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId, _ = '_' },
+ msg_id)).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
new file mode 100644
index 00000000..e4ccc1df
--- /dev/null
+++ b/src/rabbit_msg_store.erl
@@ -0,0 +1,1128 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_msg_store).
+
+-export([init/6, write/4, read/2, is_persistent/2, remove/2, release/2,
+ needs_sync/2, sync/1, cleanup/1, cache_info/1, memory/1,
+ ets_bpr/1, to_disk_only_mode/1, to_ram_disk_mode/1]).
+
+%%----------------------------------------------------------------------------
+
+-record(msstate,
+ {operation_mode, %% ram_disk | disk_only
+ dir, %% store directory
+ msg_location_dets, %% where are messages?
+ msg_location_ets, %% as above, but for ets version
+ file_summary, %% what's in the files?
+ current_file_num, %% current file name as number
+ current_file_name, %% current file name
+ current_file_handle, %% current file handle
+ current_offset, %% current offset within current file
+ current_dirty, %% has the current file been written to
+ %% since the last fsync?
+ file_size_limit, %% how big can our files get?
+ read_file_handle_cache, %% file handle cache for reading
+ last_sync_offset, %% current_offset at the last time we sync'd
+ message_cache, %% ets message cache
+ ets_bytes_per_record %% bytes per record in msg_location_ets
+ }).
+
+-record(msg_location,
+ {msg_id, ref_count, file, offset, total_size, is_persistent}).
+
+-record(file_summary,
+ {file, valid_total_size, contiguous_top, left, right}).
+
+-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
+-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
+-define(FILE_EXTENSION, ".rdq").
+-define(FILE_EXTENSION_TMP, ".rdt").
+-define(FILE_EXTENSION_DETS, ".dets").
+
+-define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
+-define(CACHE_MAX_SIZE, 10485760).
+
+-define(BINARY_MODE, [raw, binary]).
+-define(READ_MODE, [read, read_ahead]).
+-define(WRITE_MODE, [write, delayed_write]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(mode() :: 'ram_disk' | 'disk_only').
+-type(dets_table() :: any()).
+-type(ets_table() :: any()).
+-type(msg_id() :: any()).
+-type(msg() :: any()).
+-type(file_path() :: any()).
+-type(io_device() :: any()).
+
+-type(msstate() :: #msstate {
+ operation_mode :: mode(),
+ dir :: file_path(),
+ msg_location_dets :: dets_table(),
+ msg_location_ets :: ets_table(),
+ file_summary :: ets_table(),
+ current_file_num :: non_neg_integer(),
+ current_file_name :: file_path(),
+ current_file_handle :: io_device(),
+ current_offset :: non_neg_integer(),
+ current_dirty :: boolean(),
+ file_size_limit :: non_neg_integer(),
+ read_file_handle_cache :: any(),
+ last_sync_offset :: non_neg_integer(),
+ message_cache :: ets_table(),
+ ets_bytes_per_record :: non_neg_integer()
+ }).
+
+-spec(init/6 :: ('ram_disk' | 'disk_only', file_path(),
+ non_neg_integer(), non_neg_integer(),
+ fun ((msg_id()) -> non_neg_integer()), non_neg_integer()) ->
+ msstate()).
+-spec(write/4 :: (msg_id(), msg(), boolean(), msstate()) -> msstate()).
+-spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found').
+-spec(is_persistent/2 :: (msg_id(), msstate()) -> boolean() | 'not_found').
+-spec(remove/2 :: ([msg_id()], msstate()) -> msstate()).
+-spec(release/2 :: ([msg_id()], msstate()) -> msstate()).
+-spec(needs_sync/2 :: ([msg_id()], msstate()) -> boolean()).
+-spec(sync/1 :: (msstate()) -> msstate()).
+-spec(cleanup/1 :: (msstate()) -> msstate()).
+-spec(cache_info/1 :: (msstate()) -> [{atom(), term()}]).
+-spec(memory/1 :: (msstate()) -> non_neg_integer()).
+-spec(ets_bpr/1 :: (msstate()) -> non_neg_integer()).
+-spec(to_disk_only_mode/1 :: (msstate()) -> msstate()).
+-spec(to_ram_disk_mode/1 :: (msstate()) -> msstate()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% The components:
+%%
+%% MsgLocation: this is a (d)ets table which contains:
+%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent}
+%% FileSummary: this is an ets table which contains:
+%% {File, ValidTotalSize, ContiguousTop, Left, Right}
+%%
+%% The basic idea is that messages are appended to the current file up
+%% until that file becomes too big (> file_size_limit). At that point,
+%% the file is closed and a new file is created on the _right_ of the
+%% old file which is used for new messages. Files are named
+%% numerically ascending, thus the file with the lowest name is the
+%% eldest file.
+%%
+%% We need to keep track of which messages are in which files (this is
+%% the MsgLocation table); how much useful data is in each file and
+%% which files are on the left and right of each other. This is the
+%% purpose of the FileSummary table.
+%%
+%% As messages are removed from files, holes appear in these
+%% files. The field ValidTotalSize contains the total amount of useful
+%% data left in the file, whilst ContiguousTop contains the amount of
+%% valid data right at the start of each file. These are needed for
+%% garbage collection.
+%%
+%% When we discover that either a file is now empty or that it can be
+%% combined with the useful data in either its left or right file, we
+%% compact the two files together. This keeps disk utilisation high
+%% and aids performance.
+%%
+%% Given the compaction between two files, the left file is considered
+%% the ultimate destination for the good data in the right file. If
+%% necessary, the good data in the left file which is fragmented
+%% throughout the file is written out to a temporary file, then read
+%% back in to form a contiguous chunk of good data at the start of the
+%% left file. Thus the left file is garbage collected and
+%% compacted. Then the good data from the right file is copied onto
+%% the end of the left file. MsgLocation and FileSummary tables are
+%% updated.
+%%
+%% On startup, we scan the files we discover, dealing with the
+%% possibilites of a crash have occured during a compaction (this
+%% consists of tidyup - the compaction is deliberately designed such
+%% that data is duplicated on disk rather than risking it being lost),
+%% and rebuild the dets and ets tables (MsgLocation, FileSummary).
+%%
+%% MsgLocation is deliberately a dets table in order to ensure that we
+%% are not RAM constrained. However, for performance reasons, it is
+%% possible to call to_ram_disk_mode/0 which will convert MsgLocation
+%% to an ets table. This results in a massive performance improvement,
+%% at the expense of greater RAM usage. The idea is that when memory
+%% gets tight, we switch to disk_only mode but otherwise try to run in
+%% ram_disk mode.
+%%
+%% So, with this design, messages move to the left. Eventually, they
+%% should end up in a contiguous block on the left and are then never
+%% rewritten. But this isn't quite the case. If in a file there is one
+%% message that is being ignored, for some reason, and messages in the
+%% file to the right and in the current block are being read all the
+%% time then it will repeatedly be the case that the good data from
+%% both files can be combined and will be written out to a new
+%% file. Whenever this happens, our shunned message will be rewritten.
+%%
+%% So, provided that we combine messages in the right order,
+%% (i.e. left file, bottom to top, right file, bottom to top),
+%% eventually our shunned message will end up at the bottom of the
+%% left file. The compaction/combining algorithm is smart enough to
+%% read in good data from the left file that is scattered throughout
+%% (i.e. C and D in the below diagram), then truncate the file to just
+%% above B (i.e. truncate to the limit of the good contiguous region
+%% at the start of the file), then write C and D on top and then write
+%% E, F and G from the right file on top. Thus contiguous blocks of
+%% good data at the bottom of files are not rewritten (yes, this is
+%% the data the size of which is tracked by the ContiguousTop
+%% variable. Judicious use of a mirror is required).
+%%
+%% +-------+ +-------+ +-------+
+%% | X | | G | | G |
+%% +-------+ +-------+ +-------+
+%% | D | | X | | F |
+%% +-------+ +-------+ +-------+
+%% | X | | X | | E |
+%% +-------+ +-------+ +-------+
+%% | C | | F | ===> | D |
+%% +-------+ +-------+ +-------+
+%% | X | | X | | C |
+%% +-------+ +-------+ +-------+
+%% | B | | X | | B |
+%% +-------+ +-------+ +-------+
+%% | A | | E | | A |
+%% +-------+ +-------+ +-------+
+%% left right left
+%%
+%% From this reasoning, we do have a bound on the number of times the
+%% message is rewritten. From when it is inserted, there can be no
+%% files inserted between it and the head of the queue, and the worst
+%% case is that everytime it is rewritten, it moves one position lower
+%% in the file (for it to stay at the same position requires that
+%% there are no holes beneath it, which means truncate would be used
+%% and so it would not be rewritten at all). Thus this seems to
+%% suggest the limit is the number of messages ahead of it in the
+%% queue, though it's likely that that's pessimistic, given the
+%% requirements for compaction/combination of files.
+%%
+%% The other property is that we have is the bound on the lowest
+%% utilisation, which should be 50% - worst case is that all files are
+%% fractionally over half full and can't be combined (equivalent is
+%% alternating full files and files with only one tiny message in
+%% them).
+%%
+%% Messages are reference-counted. When a message with the same id is
+%% written several times we only store it once, and only remove it
+%% from the store when it has been removed the same number of times.
+%%
+%% The reference counts do not persist. Therefore the initialisation
+%% function must be provided with a function that determines the
+%% initial reference count of any (recovered) message.
+%%
+%% Read messages with a reference count greater than one are entered
+%% into a message cache. The purpose of the cache is not especially
+%% performance, though it can help there too, but prevention of memory
+%% explosion. It ensures that as messages with a high reference count
+%% are read from several processes they are read back as the same
+%% binary object rather than multiples of identical binary
+%% objects.
+
+%%----------------------------------------------------------------------------
+%% public API
+%%----------------------------------------------------------------------------
+
+init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun,
+ EtsBytesPerRecord) ->
+
+ file:delete(msg_location_dets_file(Dir)),
+
+ {ok, MsgLocationDets} =
+ dets:open_file(?MSG_LOC_NAME,
+ [{file, msg_location_dets_file(Dir)},
+ {min_no_slots, 1024*1024},
+ %% man says this should be <= 32M. But it works...
+ {max_no_slots, 30*1024*1024},
+ {type, set},
+ {keypos, 2}
+ ]),
+
+ %% it would be better to have this as private, but dets:from_ets/2
+ %% seems to blow up if it is set private - see bug21489
+ MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected, {keypos, 2}]),
+
+ InitName = "0" ++ ?FILE_EXTENSION,
+ HandleCache = rabbit_file_handle_cache:init(ReadFileHandlesLimit,
+ ?BINARY_MODE ++ [read]),
+ State =
+ #msstate { operation_mode = Mode,
+ dir = Dir,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts,
+ file_summary = ets:new(
+ ?FILE_SUMMARY_ETS_NAME,
+ [set, private, {keypos, 2}]),
+ current_file_num = 0,
+ current_file_name = InitName,
+ current_file_handle = undefined,
+ current_offset = 0,
+ current_dirty = false,
+ file_size_limit = FileSizeLimit,
+ read_file_handle_cache = HandleCache,
+ last_sync_offset = 0,
+ message_cache = ets:new(?CACHE_ETS_NAME,
+ [set, private]),
+ ets_bytes_per_record = EtsBytesPerRecord
+ },
+
+ Files =
+ sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
+ TmpFiles =
+ sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
+ ok = recover_crashed_compactions(RefCountFun, Dir, Files, TmpFiles),
+ %% There should be no more tmp files now, so go ahead and load the
+ %% whole lot
+ State1 = #msstate { current_file_name = CurrentName,
+ current_offset = Offset } =
+ load_messages(RefCountFun, Files, State),
+
+ %% read is only needed so that we can seek
+ {ok, FileHdl} = open_file(Dir, CurrentName, ?WRITE_MODE ++ [read]),
+ {ok, Offset} = file:position(FileHdl, Offset),
+
+ State1 #msstate { current_file_handle = FileHdl }.
+
+write(MsgId, Msg, IsPersistent,
+ State = #msstate { current_file_handle = CurHdl,
+ current_file_name = CurName,
+ current_offset = CurOffset,
+ file_summary = FileSummary }) ->
+ case dets_ets_lookup(State, MsgId) of
+ [] ->
+ %% New message, lots to do
+ {ok, TotalSize} = rabbit_msg_file:append(
+ CurHdl, MsgId, term_to_binary(Msg),
+ IsPersistent),
+ true = dets_ets_insert_new(
+ State, #msg_location {
+ msg_id = MsgId, ref_count = 1, file = CurName,
+ offset = CurOffset, total_size = TotalSize,
+ is_persistent = IsPersistent }),
+ [FSEntry = #file_summary { valid_total_size = ValidTotalSize,
+ contiguous_top = ContiguousTop,
+ right = undefined }] =
+ ets:lookup(FileSummary, CurName),
+ ValidTotalSize1 = ValidTotalSize + TotalSize,
+ ContiguousTop1 = if CurOffset =:= ContiguousTop ->
+ %% can't be any holes in this file
+ ValidTotalSize1;
+ true -> ContiguousTop
+ end,
+ true = ets:insert(FileSummary, FSEntry #file_summary {
+ valid_total_size = ValidTotalSize1,
+ contiguous_top = ContiguousTop1 }),
+ NextOffset = CurOffset + TotalSize,
+ maybe_roll_to_new_file(
+ NextOffset, State #msstate {current_offset = NextOffset,
+ current_dirty = true});
+ [StoreEntry =
+ #msg_location { msg_id = MsgId, ref_count = RefCount }] ->
+ %% We already know about it, just update counter
+ ok = dets_ets_insert(State, StoreEntry #msg_location {
+ ref_count = RefCount + 1 }),
+ State
+ end.
+
+read(MsgId, State) ->
+ Objs = dets_ets_lookup(State, MsgId),
+ case Objs of
+ [] ->
+ not_found;
+ [#msg_location { ref_count = RefCount,
+ file = File,
+ offset = Offset,
+ total_size = TotalSize }] ->
+ case fetch_and_increment_cache(MsgId, State) of
+ not_found ->
+ {{ok, {MsgId, MsgBody, _IsPersistent, _BodySize}}, State1} =
+ with_read_handle_at(
+ File, Offset,
+ fun(Hdl) ->
+ Res = case rabbit_msg_file:read(
+ Hdl, TotalSize) of
+ {ok, {MsgId, _, _, _}} = Obj -> Obj;
+ {ok, Rest} ->
+ throw({error,
+ {misread,
+ [{old_state, State},
+ {file, File},
+ {offset, Offset},
+ {read, Rest}]}})
+ end,
+ {Offset + TotalSize, Res}
+ end, State),
+ Message = binary_to_term(MsgBody),
+ ok = if RefCount > 1 ->
+ insert_into_cache(MsgId, Message, State1);
+ true -> ok
+ %% it's not in the cache and we
+ %% only have one reference to the
+ %% message. So don't bother
+ %% putting it in the cache.
+ end,
+ {Message, State1};
+ {Message, _RefCount} ->
+ {Message, State}
+ end
+ end.
+
+is_persistent(MsgId, State) ->
+ Objs = dets_ets_lookup(State, MsgId),
+ case Objs of
+ [] ->
+ not_found;
+ [#msg_location { msg_id = MsgId, is_persistent = IsPersistent }] ->
+ IsPersistent
+ end.
+
+remove(MsgIds, State = #msstate { current_file_name = CurName }) ->
+ compact(sets:to_list(
+ lists:foldl(
+ fun (MsgId, Files1) ->
+ case remove_message(MsgId, State) of
+ {compact, File} ->
+ if CurName =:= File -> Files1;
+ true -> sets:add_element(File, Files1)
+ end;
+ no_compact -> Files1
+ end
+ end, sets:new(), MsgIds)),
+ State).
+
+release(MsgIds, State) ->
+ lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
+ State.
+
+needs_sync(_MsgIds, #msstate { current_dirty = false }) ->
+ false;
+needs_sync(MsgIds, State = #msstate { current_file_name = CurFile,
+ last_sync_offset = SyncOffset }) ->
+ lists:any(fun (MsgId) ->
+ [#msg_location { msg_id = MsgId, file = File,
+ offset = Offset }] =
+ dets_ets_lookup(State, MsgId),
+ File =:= CurFile andalso Offset >= SyncOffset
+ end, MsgIds).
+
+sync(State = #msstate { current_dirty = false }) ->
+ State;
+sync(State = #msstate { current_file_handle = CurHdl,
+ current_offset = CurOffset }) ->
+ ok = file:sync(CurHdl),
+ State #msstate { current_dirty = false, last_sync_offset = CurOffset }.
+
+cleanup(State = #msstate { dir = Dir,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts,
+ file_summary = FileSummary,
+ current_file_handle = FileHdl,
+ read_file_handle_cache = HC }) ->
+ State1 = case FileHdl of
+ undefined -> State;
+ _ -> State2 = sync(State),
+ file:close(FileHdl),
+ State2
+ end,
+ HC1 = rabbit_file_handle_cache:close_all(HC),
+ dets:close(MsgLocationDets),
+ file:delete(msg_location_dets_file(Dir)),
+ ets:delete(MsgLocationEts),
+ ets:delete(FileSummary),
+ State1 #msstate { msg_location_dets = undefined,
+ msg_location_ets = undefined,
+ file_summary = undefined,
+ current_file_handle = undefined,
+ current_dirty = false,
+ read_file_handle_cache = HC1
+ }.
+
+cache_info(#msstate { message_cache = Cache }) ->
+ ets:info(Cache).
+
+memory(#msstate { operation_mode = ram_disk,
+ file_summary = FileSummary,
+ msg_location_ets = MsgLocationEts,
+ message_cache = Cache }) ->
+ erlang:system_info(wordsize) *
+ lists:sum([ets:info(Table, memory) ||
+ Table <- [FileSummary, MsgLocationEts, Cache]]);
+memory(#msstate { operation_mode = disk_only,
+ file_summary = FileSummary,
+ msg_location_dets = MsgLocationDets,
+ message_cache = Cache,
+ ets_bytes_per_record = EtsBytesPerRecord }) ->
+ erlang:system_info(wordsize) *
+ lists:sum([ets:info(Table, memory) ||
+ Table <- [FileSummary, Cache]]) +
+ rabbit_misc:ceil(dets:info(MsgLocationDets, size) * EtsBytesPerRecord).
+
+ets_bpr(#msstate { operation_mode = disk_only,
+ ets_bytes_per_record = EtsBytesPerRecord }) ->
+ EtsBytesPerRecord;
+ets_bpr(#msstate { operation_mode = ram_disk,
+ msg_location_ets = MsgLocationEts }) ->
+ erlang:system_info(wordsize) * ets:info(MsgLocationEts, memory) /
+ lists:max([1, ets:info(MsgLocationEts, size)]).
+
+to_disk_only_mode(State = #msstate { operation_mode = disk_only }) ->
+ State;
+to_disk_only_mode(State = #msstate { operation_mode = ram_disk,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
+ ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
+ true = ets:delete_all_objects(MsgLocationEts),
+ State #msstate { operation_mode = disk_only,
+ ets_bytes_per_record = ets_bpr(State) }.
+
+to_ram_disk_mode(State = #msstate { operation_mode = ram_disk }) ->
+ State;
+to_ram_disk_mode(State = #msstate { operation_mode = disk_only,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
+ true = ets:from_dets(MsgLocationEts, MsgLocationDets),
+ ok = dets:delete_all_objects(MsgLocationDets),
+ State #msstate { operation_mode = ram_disk,
+ ets_bytes_per_record = undefined }.
+
+%%----------------------------------------------------------------------------
+%% general helper functions
+%%----------------------------------------------------------------------------
+
+form_filename(Dir, Name) ->
+ filename:join(Dir, Name).
+
+msg_location_dets_file(Dir) ->
+ form_filename(Dir, atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS).
+
+open_file(Dir, File, Mode) ->
+ file:open(form_filename(Dir, File), ?BINARY_MODE ++ Mode).
+
+sort_file_names(Files) ->
+ lists:sort(fun (A, B) ->
+ ANum = list_to_integer(filename:rootname(A)),
+ BNum = list_to_integer(filename:rootname(B)),
+ ANum < BNum
+ end, Files).
+
+preallocate(Hdl, FileSizeLimit, FinalPos) ->
+ {ok, FileSizeLimit} = file:position(Hdl, FileSizeLimit),
+ ok = file:truncate(Hdl),
+ {ok, FinalPos} = file:position(Hdl, FinalPos),
+ ok.
+
+truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
+ {ok, Lowpoint} = file:position(FileHdl, Lowpoint),
+ ok = file:truncate(FileHdl),
+ ok = preallocate(FileHdl, Highpoint, Lowpoint).
+
+with_read_handle_at(File, Offset, Fun,
+ State = #msstate { dir = Dir,
+ read_file_handle_cache = HC,
+ current_file_name = CurName,
+ current_dirty = IsDirty,
+ last_sync_offset = SyncOffset }) ->
+ State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset ->
+ sync(State);
+ true -> State
+ end,
+ FilePath = form_filename(Dir, File),
+ {Result, HC1} =
+ rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC),
+ {Result, State1 #msstate { read_file_handle_cache = HC1 }}.
+
+remove_message(MsgId, State = #msstate { file_summary = FileSummary }) ->
+ [StoreEntry =
+ #msg_location { msg_id = MsgId, ref_count = RefCount, file = File,
+ offset = Offset, total_size = TotalSize }] =
+ dets_ets_lookup(State, MsgId),
+ case RefCount of
+ 1 ->
+ ok = dets_ets_delete(State, MsgId),
+ ok = remove_cache_entry(MsgId, State),
+ [FSEntry = #file_summary { valid_total_size = ValidTotalSize,
+ contiguous_top = ContiguousTop }] =
+ ets:lookup(FileSummary, File),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ ValidTotalSize1 = ValidTotalSize - TotalSize,
+ true = ets:insert(FileSummary, FSEntry #file_summary {
+ valid_total_size = ValidTotalSize1,
+ contiguous_top = ContiguousTop1 }),
+ {compact, File};
+ _ when 1 < RefCount ->
+ ok = decrement_cache(MsgId, State),
+ ok = dets_ets_insert(State, StoreEntry #msg_location {
+ ref_count = RefCount - 1 }),
+ no_compact
+ end.
+
+%%----------------------------------------------------------------------------
+%% message cache helper functions
+%%----------------------------------------------------------------------------
+
+remove_cache_entry(MsgId, #msstate { message_cache = Cache }) ->
+ true = ets:delete(Cache, MsgId),
+ ok.
+
+fetch_and_increment_cache(MsgId, #msstate { message_cache = Cache }) ->
+ case ets:lookup(Cache, MsgId) of
+ [] ->
+ not_found;
+ [{MsgId, Message, _RefCount}] ->
+ NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}),
+ {Message, NewRefCount}
+ end.
+
+decrement_cache(MsgId, #msstate { message_cache = Cache }) ->
+ true = try case ets:update_counter(Cache, MsgId, {3, -1}) of
+ N when N =< 0 -> true = ets:delete(Cache, MsgId);
+ _N -> true
+ end
+ catch error:badarg ->
+ %% MsgId is not in there because although it's been
+ %% delivered, it's never actually been read (think:
+ %% persistent message in mixed queue)
+ true
+ end,
+ ok.
+
+insert_into_cache(MsgId, Message, #msstate { message_cache = Cache }) ->
+ case cache_is_full(Cache) of
+ true -> ok;
+ false -> true = ets:insert_new(Cache, {MsgId, Message, 1}),
+ ok
+ end.
+
+cache_is_full(Cache) ->
+ ets:info(Cache, memory) > ?CACHE_MAX_SIZE.
+
+%%----------------------------------------------------------------------------
+%% dets/ets agnosticism
+%%----------------------------------------------------------------------------
+
+dets_ets_lookup(#msstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, Key) ->
+ dets:lookup(MsgLocationDets, Key);
+dets_ets_lookup(#msstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, Key) ->
+ ets:lookup(MsgLocationEts, Key).
+
+dets_ets_delete(#msstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, Key) ->
+ ok = dets:delete(MsgLocationDets, Key);
+dets_ets_delete(#msstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, Key) ->
+ true = ets:delete(MsgLocationEts, Key),
+ ok.
+
+dets_ets_insert(#msstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, Obj) ->
+ ok = dets:insert(MsgLocationDets, Obj);
+dets_ets_insert(#msstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, Obj) ->
+ true = ets:insert(MsgLocationEts, Obj),
+ ok.
+
+dets_ets_insert_new(#msstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, Obj) ->
+ true = dets:insert_new(MsgLocationDets, Obj);
+dets_ets_insert_new(#msstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, Obj) ->
+ true = ets:insert_new(MsgLocationEts, Obj).
+
+dets_ets_match_object(#msstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, Obj) ->
+ dets:match_object(MsgLocationDets, Obj);
+dets_ets_match_object(#msstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, Obj) ->
+ ets:match_object(MsgLocationEts, Obj).
+
+%%----------------------------------------------------------------------------
+%% recovery
+%%----------------------------------------------------------------------------
+
+recover_crashed_compactions(RefCountFun, Dir, Files, TmpFiles) ->
+ lists:foreach(fun (TmpFile) ->
+ ok = recover_crashed_compactions1(
+ RefCountFun, Dir, Files, TmpFile)
+ end,
+ TmpFiles),
+ ok.
+
+recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) ->
+ NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
+ true = lists:member(NonTmpRelatedFile, Files),
+ {ok, UncorruptedMessagesTmp, MsgIdsTmp} =
+ scan_file_for_valid_messages_msg_ids(Dir, TmpFile),
+ %% all of these messages should be referenced
+ %% otherwise they wouldn't have been copied out
+ verify_messages_referenced(RefCountFun, MsgIdsTmp),
+ {ok, UncorruptedMessages, MsgIds} =
+ scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFile),
+ %% 1) It's possible that everything in the tmp file is also in the
+ %% main file such that the main file is (prefix ++
+ %% tmpfile). This means that compaction failed immediately
+ %% prior to the final step of deleting the tmp file. Plan: just
+ %% delete the tmp file
+ %% 2) It's possible that everything in the tmp file is also in the
+ %% main file but with holes throughout (or just somthing like
+ %% main = (prefix ++ hole ++ tmpfile)). This means that
+ %% compaction wrote out the tmp file successfully and then
+ %% failed. Plan: just delete the tmp file and allow the
+ %% compaction to eventually be triggered later
+ %% 3) It's possible that everything in the tmp file is also in the
+ %% main file but such that the main file does not end with tmp
+ %% file (and there are valid messages in the suffix; main =
+ %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This
+ %% means that compaction failed as we were writing out the tmp
+ %% file. Plan: just delete the tmp file and allow the
+ %% compaction to eventually be triggered later
+ %% 4) It's possible that there are messages in the tmp file which
+ %% are not in the main file. This means that writing out the
+ %% tmp file succeeded, but then we failed as we were copying
+ %% them back over to the main file, after truncating the main
+ %% file. As the main file has already been truncated, it should
+ %% consist only of valid messages. Plan: Truncate the main file
+ %% back to before any of the files in the tmp file and copy
+ %% them over again
+ TmpPath = form_filename(Dir, TmpFile),
+ case is_sublist(MsgIdsTmp, MsgIds) of
+ true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
+ %% note this also catches the case when the tmp file
+ %% is empty
+ ok = file:delete(TmpPath);
+ false ->
+ %% We're in case 4 above. We only care about the inital
+ %% msgs in main file that are not in the tmp file. If
+ %% there are no msgs in the tmp file then we would be in
+ %% the 'true' branch of this case, so we know the
+ %% lists:last call is safe.
+ EldestTmpMsgId = lists:last(MsgIdsTmp),
+ {MsgIds1, UncorruptedMessages1}
+ = case lists:splitwith(
+ fun (MsgId) -> MsgId /= EldestTmpMsgId end, MsgIds) of
+ {_MsgIds, []} -> %% no msgs from tmp in main
+ {MsgIds, UncorruptedMessages};
+ {Dropped, [EldestTmpMsgId | Rest]} ->
+ %% Msgs in Dropped are in tmp, so forget them.
+ %% *cry*. Lists indexed from 1.
+ {Rest, lists:sublist(UncorruptedMessages,
+ 2 + length(Dropped),
+ length(Rest))}
+ end,
+ %% Check that everything in the main file prefix is referenced
+ verify_messages_referenced(RefCountFun, MsgIds1),
+ %% The main file prefix should be contiguous
+ {Top, MsgIds1} = find_contiguous_block_prefix(
+ lists:reverse(UncorruptedMessages1)),
+ %% we should have that none of the messages in the prefix
+ %% are in the tmp file
+ true = is_disjoint(MsgIds1, MsgIdsTmp),
+ %% must open with read flag, otherwise will stomp over contents
+ {ok, MainHdl} = open_file(Dir, NonTmpRelatedFile,
+ ?WRITE_MODE ++ [read]),
+ %% Wipe out any rubbish at the end of the file. Remember
+ %% the head of the list will be the highest entry in the
+ %% file.
+ [{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
+ TmpSize = TmpTopOffset + TmpTopTotalSize,
+ %% Extend the main file as big as necessary in a single
+ %% move. If we run out of disk space, this truncate could
+ %% fail, but we still aren't risking losing data
+ ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
+ {ok, TmpHdl} = open_file(Dir, TmpFile, ?READ_MODE),
+ {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
+ ok = file:sync(MainHdl),
+ ok = file:close(MainHdl),
+ ok = file:close(TmpHdl),
+ ok = file:delete(TmpPath),
+
+ {ok, _MainMessages, MsgIdsMain} =
+ scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFile),
+ %% check that everything in MsgIds1 is in MsgIdsMain
+ true = is_sublist(MsgIds1, MsgIdsMain),
+ %% check that everything in MsgIdsTmp is in MsgIdsMain
+ true = is_sublist(MsgIdsTmp, MsgIdsMain)
+ end,
+ ok.
+
+is_sublist(SmallerL, BiggerL) ->
+ lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL).
+
+is_disjoint(SmallerL, BiggerL) ->
+ lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
+
+verify_messages_referenced(RefCountFun, MsgIds) ->
+ lists:foreach(fun (MsgId) -> false = RefCountFun(MsgId) == 0 end, MsgIds).
+
+scan_file_for_valid_messages_msg_ids(Dir, File) ->
+ {ok, Messages} = scan_file_for_valid_messages(Dir, File),
+ {ok, Messages,
+ [MsgId || {MsgId, _IsPersistent, _TotalSize, _FileOffset} <- Messages]}.
+
+scan_file_for_valid_messages(Dir, File) ->
+ case open_file(Dir, File, ?READ_MODE) of
+ {ok, Hdl} ->
+ Valid = rabbit_msg_file:scan(Hdl),
+ %% if something really bad's happened, the close could fail,
+ %% but ignore
+ file:close(Hdl),
+ Valid;
+ {error, enoent} -> {ok, []};
+ {error, Reason} -> throw({error, {unable_to_scan_file, File, Reason}})
+ end.
+
+%% Takes the list in *ascending* order (i.e. eldest message
+%% first). This is the opposite of what scan_file_for_valid_messages
+%% produces. The list of msgs that is produced is youngest first.
+find_contiguous_block_prefix([]) -> {0, []};
+find_contiguous_block_prefix(List) ->
+ find_contiguous_block_prefix(List, 0, []).
+
+find_contiguous_block_prefix([], ExpectedOffset, MsgIds) ->
+ {ExpectedOffset, MsgIds};
+find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset}
+ | Tail], ExpectedOffset, MsgIds) ->
+ ExpectedOffset1 = ExpectedOffset + TotalSize,
+ find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]);
+find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
+ {ExpectedOffset, MsgIds}.
+
+load_messages(RefCountFun, [], State) ->
+ CurrentFile = State #msstate.current_file_name,
+ load_messages(RefCountFun, undefined, [CurrentFile], State);
+load_messages(RefCountFun, Files, State) ->
+ load_messages(RefCountFun, undefined, Files, State).
+
+load_messages(_RefCountFun, Left, [], State) ->
+ Num = list_to_integer(filename:rootname(Left)),
+ Offset =
+ case dets_ets_match_object(State, #msg_location {
+ file = Left, _ = '_' }) of
+ [] -> 0;
+ L ->
+ [ #msg_location { file = Left,
+ offset = MaxOffset,
+ total_size = TotalSize} | _ ] =
+ sort_msg_locations_by_offset(desc, L),
+ MaxOffset + TotalSize
+ end,
+ State #msstate { current_file_num = Num, current_file_name = Left,
+ current_offset = Offset };
+load_messages(RefCountFun, Left, [File|Files],
+ State = #msstate { dir = Dir, file_summary = FileSummary }) ->
+ {ok, Messages} = scan_file_for_valid_messages(Dir, File),
+ {ValidMessages, ValidTotalSize} = lists:foldl(
+ fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ case RefCountFun(MsgId) of
+ 0 -> {VMAcc, VTSAcc};
+ RefCount ->
+ true = dets_ets_insert_new(
+ State, #msg_location {
+ msg_id = MsgId, ref_count = RefCount,
+ file = File, offset = Offset,
+ total_size = TotalSize,
+ is_persistent = IsPersistent }),
+ {[Obj | VMAcc], VTSAcc + TotalSize}
+ end
+ end, {[], 0}, Messages),
+ %% foldl reverses lists, find_contiguous_block_prefix needs
+ %% msgs eldest first, so, ValidMessages is the right way round
+ {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages),
+ Right = case Files of
+ [] -> undefined;
+ [F|_] -> F
+ end,
+ true = ets:insert_new(FileSummary, #file_summary {
+ file = File, valid_total_size = ValidTotalSize,
+ contiguous_top = ContiguousTop,
+ left = Left, right = Right }),
+ load_messages(RefCountFun, File, Files, State).
+
+%%----------------------------------------------------------------------------
+%% garbage collection / compaction / aggregation
+%%----------------------------------------------------------------------------
+
+maybe_roll_to_new_file(Offset,
+ State = #msstate { dir = Dir,
+ file_size_limit = FileSizeLimit,
+ current_file_name = CurName,
+ current_file_handle = CurHdl,
+ current_file_num = CurNum,
+ file_summary = FileSummary
+ }
+ ) when Offset >= FileSizeLimit ->
+ State1 = sync(State),
+ ok = file:close(CurHdl),
+ NextNum = CurNum + 1,
+ NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
+ {ok, NextHdl} = open_file(Dir, NextName, ?WRITE_MODE),
+ true = ets:update_element(FileSummary, CurName,
+ {#file_summary.right, NextName}),
+ true = ets:insert_new(
+ FileSummary, #file_summary {
+ file = NextName, valid_total_size = 0, contiguous_top = 0,
+ left = CurName, right = undefined }),
+ State2 = State1 #msstate { current_file_name = NextName,
+ current_file_handle = NextHdl,
+ current_file_num = NextNum,
+ current_offset = 0,
+ last_sync_offset = 0
+ },
+ compact([CurName], State2);
+maybe_roll_to_new_file(_, State) ->
+ State.
+
+compact(Files, State) ->
+ %% smallest number, hence eldest, hence left-most, first
+ SortedFiles = sort_file_names(Files),
+ %% foldl reverses, so now youngest/right-most first
+ RemainingFiles = lists:foldl(fun (File, Acc) ->
+ delete_empty_files(File, Acc, State)
+ end, [], SortedFiles),
+ lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)).
+
+%% At this stage, we simply know that the file has had msgs removed
+%% from it. However, we don't know if we need to merge it left (which
+%% is what we would prefer), or merge it right. If we merge left, then
+%% this file is the source, and the left file is the destination. If
+%% we merge right then this file is the destination and the right file
+%% is the source.
+combine_file(File, State = #msstate { file_summary = FileSummary,
+ current_file_name = CurName }) ->
+ %% the file we're looking at may no longer exist as it may have
+ %% been deleted within the current GC run
+ case ets:lookup(FileSummary, File) of
+ [] -> State;
+ [FSEntry = #file_summary { left = Left, right = Right }] ->
+ GoRight =
+ fun() ->
+ case Right of
+ undefined -> State;
+ _ when not (CurName == Right) ->
+ [FSRight] = ets:lookup(FileSummary, Right),
+ {_, State1} = adjust_meta_and_combine(
+ FSEntry, FSRight, State),
+ State1;
+ _ -> State
+ end
+ end,
+ case Left of
+ undefined ->
+ GoRight();
+ _ -> [FSLeft] = ets:lookup(FileSummary, Left),
+ case adjust_meta_and_combine(FSLeft, FSEntry, State) of
+ {true, State1} -> State1;
+ {false, State} -> GoRight()
+ end
+ end
+ end.
+
+adjust_meta_and_combine(
+ LeftObj = #file_summary {
+ file = LeftFile, valid_total_size = LeftValidData, right = RightFile },
+ RightObj = #file_summary {
+ file = RightFile, valid_total_size = RightValidData, left = LeftFile,
+ right = RightRight },
+ State = #msstate { file_size_limit = FileSizeLimit,
+ file_summary = FileSummary }) ->
+ TotalValidData = LeftValidData + RightValidData,
+ if FileSizeLimit >= TotalValidData ->
+ State1 = combine_files(RightObj, LeftObj, State),
+ %% this could fail if RightRight is undefined
+ ets:update_element(FileSummary, RightRight,
+ {#file_summary.left, LeftFile}),
+ true = ets:insert(FileSummary, LeftObj #file_summary {
+ valid_total_size = TotalValidData,
+ contiguous_top = TotalValidData,
+ right = RightRight }),
+ true = ets:delete(FileSummary, RightFile),
+ {true, State1};
+ true -> {false, State}
+ end.
+
+sort_msg_locations_by_offset(Dir, List) ->
+ Comp = case Dir of
+ asc -> fun erlang:'<'/2;
+ desc -> fun erlang:'>'/2
+ end,
+ lists:sort(fun (#msg_location { offset = OffA },
+ #msg_location { offset = OffB }) ->
+ Comp(OffA, OffB)
+ end, List).
+
+combine_files(#file_summary { file = Source,
+ valid_total_size = SourceValid,
+ left = Destination },
+ #file_summary { file = Destination,
+ valid_total_size = DestinationValid,
+ contiguous_top = DestinationContiguousTop,
+ right = Source },
+ State = #msstate { dir = Dir }) ->
+ State1 = close_file(Source, close_file(Destination, State)),
+ {ok, SourceHdl} = open_file(Dir, Source, ?READ_MODE),
+ {ok, DestinationHdl} = open_file(Dir, Destination,
+ ?READ_MODE ++ ?WRITE_MODE),
+ ExpectedSize = SourceValid + DestinationValid,
+ %% if DestinationValid =:= DestinationContiguousTop then we don't
+ %% need a tmp file
+ %% if they're not equal, then we need to write out everything past
+ %% the DestinationContiguousTop to a tmp file then truncate,
+ %% copy back in, and then copy over from Source
+ %% otherwise we just truncate straight away and copy over from Source
+ if DestinationContiguousTop =:= DestinationValid ->
+ ok = truncate_and_extend_file(DestinationHdl,
+ DestinationValid, ExpectedSize);
+ true ->
+ Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_MODE ++ ?WRITE_MODE),
+ Worklist =
+ lists:dropwhile(
+ fun (#msg_location { offset = Offset })
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset ==
+ %% DestinationContiguousTop because if it
+ %% was then DestinationContiguousTop would
+ %% have been extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I suspect
+ %% that the list should be naturally sorted
+ %% as we require, however, we need to
+ %% enforce it anyway
+ end, sort_msg_locations_by_offset(
+ asc, dets_ets_match_object(
+ State1, #msg_location {
+ file = Destination, _ = '_' }))),
+ ok = copy_messages(
+ Worklist, DestinationContiguousTop, DestinationValid,
+ DestinationHdl, TmpHdl, Destination, State1),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and MsgLocationDets has been updated to
+ %% reflect compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file:position(TmpHdl, 0),
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file:sync(DestinationHdl),
+ ok = file:close(TmpHdl),
+ ok = file:delete(form_filename(Dir, Tmp))
+ end,
+ SourceWorkList =
+ sort_msg_locations_by_offset(
+ asc, dets_ets_match_object(State1, #msg_location {
+ file = Source, _ = '_' })),
+ ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ SourceHdl, DestinationHdl, Destination, State1),
+ %% tidy up
+ ok = file:close(SourceHdl),
+ ok = file:close(DestinationHdl),
+ ok = file:delete(form_filename(Dir, Source)),
+ State1.
+
+copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
+ Destination, State) ->
+ {FinalOffset, BlockStart1, BlockEnd1} =
+ lists:foldl(
+ fun (StoreEntry = #msg_location { offset = Offset,
+ total_size = TotalSize },
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the DestinationFile.
+ %% Offset, BlockStart and BlockEnd are in the SourceFile
+ %% update MsgLocationDets to reflect change of file and offset
+ ok = dets_ets_insert(State, StoreEntry #msg_location {
+ file = Destination,
+ offset = CurOffset }),
+ NextOffset = CurOffset + TotalSize,
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {NextOffset, Offset, Offset + TotalSize};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the next
+ %% msg follows straight on
+ {NextOffset, BlockStart, BlockEnd + TotalSize};
+ true ->
+ %% found a gap, so actually do the work for
+ %% the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file:position(SourceHdl, BlockStart),
+ {ok, BSize} =
+ file:copy(SourceHdl, DestinationHdl, BSize),
+ {NextOffset, Offset, Offset + TotalSize}
+ end
+ end, {InitOffset, undefined, undefined}, WorkList),
+ %% do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} = file:position(SourceHdl, BlockStart1),
+ {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1),
+ ok = file:sync(DestinationHdl),
+ ok.
+
+close_file(File, State = #msstate { dir = Dir, read_file_handle_cache = HC }) ->
+ HC1 = rabbit_file_handle_cache:close_file(form_filename(Dir, File), HC),
+ State #msstate { read_file_handle_cache = HC1 }.
+
+delete_empty_files(File, Acc,
+ #msstate { dir = Dir, file_summary = FileSummary }) ->
+ [#file_summary { valid_total_size = ValidData,
+ left = Left, right = Right }] =
+ ets:lookup(FileSummary, File),
+ case ValidData of
+ %% we should NEVER find the current file in here hence right
+ %% should always be a file, not undefined
+ 0 ->
+ case {Left, Right} of
+ {undefined, _} when not is_atom(Right) ->
+ %% the eldest file is empty.
+ true = ets:update_element(
+ FileSummary, Right,
+ {#file_summary.left, undefined});
+ {_, _} when not (is_atom(Right)) ->
+ true = ets:update_element(FileSummary, Right,
+ {#file_summary.left, Left}),
+ true =
+ ets:update_element(FileSummary, Left,
+ {#file_summary.right, Right})
+ end,
+ true = ets:delete(FileSummary, File),
+ ok = file:delete(form_filename(Dir, File)),
+ Acc;
+ _ -> [File|Acc]
+ end.