summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-07 06:00:54 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-07 06:00:54 +0100
commit3fab3c99e35761a5b88dad7f7b35d03ffe2e1919 (patch)
treeeb3b33cfceafa7d759ca788d8eb334c0bc137c31
parent3d84698fca3083ee53f12c0c248bd321c668ef24 (diff)
downloadrabbitmq-server-3fab3c99e35761a5b88dad7f7b35d03ffe2e1919.tar.gz
perform term/binary conversion of msg body in rabbit_msg_file
thus further generalising rabbit_msg_file
-rw-r--r--src/rabbit_msg_file.erl20
-rw-r--r--src/rabbit_msg_store.erl22
2 files changed, 21 insertions, 21 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index d5b891b6..254d987d 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -48,7 +48,7 @@
-type(io_device() :: any()).
-type(msg_id() :: any()).
--type(msg() :: binary()).
+-type(msg() :: any()).
-type(msg_attrs() :: boolean()).
-type(position() :: non_neg_integer()).
-type(msg_size() :: non_neg_integer()).
@@ -64,11 +64,12 @@
%%----------------------------------------------------------------------------
-append(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
- BodySize = size(MsgBody),
- MsgIdBin = term_to_binary(MsgId),
+append(FileHdl, MsgId, MsgBody, IsPersistent) ->
+ MsgBodyBin = term_to_binary(MsgBody),
+ BodyBinSize = size(MsgBodyBin),
+ MsgIdBin = term_to_binary(MsgId),
MsgIdBinSize = size(MsgIdBin),
- Size = BodySize + MsgIdBinSize,
+ Size = BodyBinSize + MsgIdBinSize,
StopByte = case IsPersistent of
true -> ?WRITE_OK_PERSISTENT;
false -> ?WRITE_OK_TRANSIENT
@@ -76,7 +77,7 @@ append(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
MsgIdBin:MsgIdBinSize/binary,
- MsgBody:BodySize/binary,
+ MsgBodyBin:BodyBinSize/binary,
StopByte:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
KO -> KO
@@ -89,14 +90,15 @@ read(FileHdl, TotalSize) ->
{ok, <<Size:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
Rest:SizeWriteOkBytes/binary>>} ->
- BodySize = Size - MsgIdBinSize,
- <<MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ BodyBinSize = Size - MsgIdBinSize,
+ <<MsgIdBin:MsgIdBinSize/binary, MsgBodyBin:BodyBinSize/binary,
StopByte:?WRITE_OK_SIZE_BITS>> = Rest,
Persistent = case StopByte of
?WRITE_OK_TRANSIENT -> false;
?WRITE_OK_PERSISTENT -> true
end,
- {ok, {binary_to_term(MsgId), MsgBody, Persistent}};
+ {ok, {binary_to_term(MsgIdBin), binary_to_term(MsgBodyBin),
+ Persistent}};
KO -> KO
end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e45c9a63..427a6695 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -326,8 +326,7 @@ write(MsgId, Msg, IsPersistent,
[] ->
%% New message, lots to do
{ok, TotalSize} = rabbit_msg_file:append(
- CurHdl, MsgId, term_to_binary(Msg),
- IsPersistent),
+ CurHdl, MsgId, Msg, IsPersistent),
true = dets_ets_insert_new(
State, #msg_location {
msg_id = MsgId, ref_count = 1, file = CurName,
@@ -369,7 +368,7 @@ read(MsgId, State) ->
total_size = TotalSize }] ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {{ok, {MsgId, MsgBody, _IsPersistent}}, State1} =
+ {{ok, {MsgId, Msg, _IsPersistent}}, State1} =
with_read_handle_at(
File, Offset,
fun(Hdl) ->
@@ -386,18 +385,17 @@ read(MsgId, State) ->
end,
{Offset + TotalSize, Res}
end, State),
- Message = binary_to_term(MsgBody),
ok = if RefCount > 1 ->
- insert_into_cache(MsgId, Message, State1);
+ insert_into_cache(MsgId, Msg, State1);
true -> ok
%% it's not in the cache and we
%% only have one reference to the
%% message. So don't bother
%% putting it in the cache.
end,
- {Message, State1};
- {Message, _RefCount} ->
- {Message, State}
+ {Msg, State1};
+ {Msg, _RefCount} ->
+ {Msg, State}
end
end.
@@ -602,9 +600,9 @@ fetch_and_increment_cache(MsgId, #msstate { message_cache = Cache }) ->
case ets:lookup(Cache, MsgId) of
[] ->
not_found;
- [{MsgId, Message, _RefCount}] ->
+ [{MsgId, Msg, _RefCount}] ->
NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}),
- {Message, NewRefCount}
+ {Msg, NewRefCount}
end.
decrement_cache(MsgId, #msstate { message_cache = Cache }) ->
@@ -620,10 +618,10 @@ decrement_cache(MsgId, #msstate { message_cache = Cache }) ->
end,
ok.
-insert_into_cache(MsgId, Message, #msstate { message_cache = Cache }) ->
+insert_into_cache(MsgId, Msg, #msstate { message_cache = Cache }) ->
case cache_is_full(Cache) of
true -> ok;
- false -> true = ets:insert_new(Cache, {MsgId, Message, 1}),
+ false -> true = ets:insert_new(Cache, {MsgId, Msg, 1}),
ok
end.