diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-09-07 06:00:54 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-09-07 06:00:54 +0100 |
commit | 3fab3c99e35761a5b88dad7f7b35d03ffe2e1919 (patch) | |
tree | eb3b33cfceafa7d759ca788d8eb334c0bc137c31 | |
parent | 3d84698fca3083ee53f12c0c248bd321c668ef24 (diff) | |
download | rabbitmq-server-3fab3c99e35761a5b88dad7f7b35d03ffe2e1919.tar.gz |
perform term/binary conversion of msg body in rabbit_msg_file
thus further generalising rabbit_msg_file
-rw-r--r-- | src/rabbit_msg_file.erl | 20 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 22 |
2 files changed, 21 insertions, 21 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index d5b891b6..254d987d 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -48,7 +48,7 @@ -type(io_device() :: any()). -type(msg_id() :: any()). --type(msg() :: binary()). +-type(msg() :: any()). -type(msg_attrs() :: boolean()). -type(position() :: non_neg_integer()). -type(msg_size() :: non_neg_integer()). @@ -64,11 +64,12 @@ %%---------------------------------------------------------------------------- -append(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> - BodySize = size(MsgBody), - MsgIdBin = term_to_binary(MsgId), +append(FileHdl, MsgId, MsgBody, IsPersistent) -> + MsgBodyBin = term_to_binary(MsgBody), + BodyBinSize = size(MsgBodyBin), + MsgIdBin = term_to_binary(MsgId), MsgIdBinSize = size(MsgIdBin), - Size = BodySize + MsgIdBinSize, + Size = BodyBinSize + MsgIdBinSize, StopByte = case IsPersistent of true -> ?WRITE_OK_PERSISTENT; false -> ?WRITE_OK_TRANSIENT @@ -76,7 +77,7 @@ append(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, MsgIdBin:MsgIdBinSize/binary, - MsgBody:BodySize/binary, + MsgBodyBin:BodyBinSize/binary, StopByte:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; KO -> KO @@ -89,14 +90,15 @@ read(FileHdl, TotalSize) -> {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, Rest:SizeWriteOkBytes/binary>>} -> - BodySize = Size - MsgIdBinSize, - <<MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + BodyBinSize = Size - MsgIdBinSize, + <<MsgIdBin:MsgIdBinSize/binary, MsgBodyBin:BodyBinSize/binary, StopByte:?WRITE_OK_SIZE_BITS>> = Rest, Persistent = case StopByte of ?WRITE_OK_TRANSIENT -> false; ?WRITE_OK_PERSISTENT -> true end, - {ok, {binary_to_term(MsgId), MsgBody, Persistent}}; + {ok, {binary_to_term(MsgIdBin), binary_to_term(MsgBodyBin), + Persistent}}; KO -> KO end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e45c9a63..427a6695 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -326,8 +326,7 @@ write(MsgId, Msg, IsPersistent, [] -> %% New message, lots to do {ok, TotalSize} = rabbit_msg_file:append( - CurHdl, MsgId, term_to_binary(Msg), - IsPersistent), + CurHdl, MsgId, Msg, IsPersistent), true = dets_ets_insert_new( State, #msg_location { msg_id = MsgId, ref_count = 1, file = CurName, @@ -369,7 +368,7 @@ read(MsgId, State) -> total_size = TotalSize }] -> case fetch_and_increment_cache(MsgId, State) of not_found -> - {{ok, {MsgId, MsgBody, _IsPersistent}}, State1} = + {{ok, {MsgId, Msg, _IsPersistent}}, State1} = with_read_handle_at( File, Offset, fun(Hdl) -> @@ -386,18 +385,17 @@ read(MsgId, State) -> end, {Offset + TotalSize, Res} end, State), - Message = binary_to_term(MsgBody), ok = if RefCount > 1 -> - insert_into_cache(MsgId, Message, State1); + insert_into_cache(MsgId, Msg, State1); true -> ok %% it's not in the cache and we %% only have one reference to the %% message. So don't bother %% putting it in the cache. end, - {Message, State1}; - {Message, _RefCount} -> - {Message, State} + {Msg, State1}; + {Msg, _RefCount} -> + {Msg, State} end end. @@ -602,9 +600,9 @@ fetch_and_increment_cache(MsgId, #msstate { message_cache = Cache }) -> case ets:lookup(Cache, MsgId) of [] -> not_found; - [{MsgId, Message, _RefCount}] -> + [{MsgId, Msg, _RefCount}] -> NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}), - {Message, NewRefCount} + {Msg, NewRefCount} end. decrement_cache(MsgId, #msstate { message_cache = Cache }) -> @@ -620,10 +618,10 @@ decrement_cache(MsgId, #msstate { message_cache = Cache }) -> end, ok. -insert_into_cache(MsgId, Message, #msstate { message_cache = Cache }) -> +insert_into_cache(MsgId, Msg, #msstate { message_cache = Cache }) -> case cache_is_full(Cache) of true -> ok; - false -> true = ets:insert_new(Cache, {MsgId, Message, 1}), + false -> true = ets:insert_new(Cache, {MsgId, Msg, 1}), ok end. |