diff options
Diffstat (limited to 'src/rabbit_msg_file.erl')
-rw-r--r-- | src/rabbit_msg_file.erl | 43 |
1 files changed, 23 insertions, 20 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index ea7cf80c..b7de27d4 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -27,8 +27,8 @@ -define(WRITE_OK_SIZE_BITS, 8). -define(WRITE_OK_MARKER, 255). -define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)). --define(GUID_SIZE_BYTES, 16). --define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)). +-define(MSG_ID_SIZE_BYTES, 16). +-define(MSG_ID_SIZE_BITS, (8 * ?MSG_ID_SIZE_BYTES)). -define(SCAN_BLOCK_SIZE, 4194304). %% 4MB %%---------------------------------------------------------------------------- @@ -39,28 +39,30 @@ -type(position() :: non_neg_integer()). -type(msg_size() :: non_neg_integer()). -type(file_size() :: non_neg_integer()). +-type(message_accumulator(A) :: + fun (({rabbit_types:msg_id(), msg_size(), position(), binary()}, A) -> + A)). --spec(append/3 :: (io_device(), rabbit_guid:guid(), msg()) -> +-spec(append/3 :: (io_device(), rabbit_types:msg_id(), msg()) -> rabbit_types:ok_or_error2(msg_size(), any())). -spec(read/2 :: (io_device(), msg_size()) -> - rabbit_types:ok_or_error2({rabbit_guid:guid(), msg()}, + rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()}, any())). --spec(scan/4 :: (io_device(), file_size(), - fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A), - A) -> {'ok', A, position()}). +-spec(scan/4 :: (io_device(), file_size(), message_accumulator(A), A) -> + {'ok', A, position()}). -endif. %%---------------------------------------------------------------------------- -append(FileHdl, Guid, MsgBody) - when is_binary(Guid) andalso size(Guid) =:= ?GUID_SIZE_BYTES -> +append(FileHdl, MsgId, MsgBody) + when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES -> MsgBodyBin = term_to_binary(MsgBody), MsgBodyBinSize = size(MsgBodyBin), - Size = MsgBodyBinSize + ?GUID_SIZE_BYTES, + Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES, case file_handle_cache:append(FileHdl, <<Size:?INTEGER_SIZE_BITS, - Guid:?GUID_SIZE_BYTES/binary, + MsgId:?MSG_ID_SIZE_BYTES/binary, MsgBodyBin:MsgBodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; @@ -69,13 +71,13 @@ append(FileHdl, Guid, MsgBody) read(FileHdl, TotalSize) -> Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, - BodyBinSize = Size - ?GUID_SIZE_BYTES, + BodyBinSize = Size - ?MSG_ID_SIZE_BYTES, case file_handle_cache:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, - Guid:?GUID_SIZE_BYTES/binary, + MsgId:?MSG_ID_SIZE_BYTES/binary, MsgBodyBin:BodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> - {ok, {Guid, binary_to_term(MsgBodyBin)}}; + {ok, {MsgId, binary_to_term(MsgBodyBin)}}; KO -> KO end. @@ -100,21 +102,22 @@ scanner(<<>>, Offset, _Fun, Acc) -> {<<>>, Acc, Offset}; scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) -> {<<>>, Acc, Offset}; %% Nothing to do other than stop. -scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, +scanner(<<Size:?INTEGER_SIZE_BITS, MsgIdAndMsg:Size/binary, WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, case WriteMarker of ?WRITE_OK_MARKER -> %% Here we take option 5 from %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in - %% which we read the Guid as a number, and then convert it + %% which we read the MsgId as a number, and then convert it %% back to a binary in order to work around bugs in %% Erlang's GC. - <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> = - <<GuidAndMsg:Size/binary>>, - <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, + <<MsgIdNum:?MSG_ID_SIZE_BITS, Msg/binary>> = + <<MsgIdAndMsg:Size/binary>>, + <<MsgId:?MSG_ID_SIZE_BYTES/binary>> = + <<MsgIdNum:?MSG_ID_SIZE_BITS>>, scanner(Rest, Offset + TotalSize, Fun, - Fun({Guid, TotalSize, Offset, Msg}, Acc)); + Fun({MsgId, TotalSize, Offset, Msg}, Acc)); _ -> scanner(Rest, Offset + TotalSize, Fun, Acc) end; |