summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-12 11:41:09 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-12 11:41:09 +0000
commitc35a4b43ad993897b7bb6fe70eb364d357758f94 (patch)
tree135cb3a9e1af290badc263e28c5573f05dfa534b
parentd77a0d5ee5bd3f29e19b566018f439c959c89f06 (diff)
downloadrabbitmq-server-c35a4b43ad993897b7bb6fe70eb364d357758f94.tar.gz
Handle read failure in read_pub_record_body
-rw-r--r--src/rabbit_queue_index.erl40
1 files changed, 25 insertions, 15 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f087e761..65118c1a 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -628,12 +628,12 @@ read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
end,
size = Size},
case IndexSize of
- 0 -> {MsgId, Props};
+ 0 -> {ok, MsgId, Props};
_ -> case file_handle_cache:read(Hdl, IndexSize) of
{ok, MsgBin} -> Msg = #basic_message{id = MsgId} =
binary_to_term(MsgBin),
- {Msg, Props};
- _ -> exit(could_not_read) %% TODO
+ {ok, Msg, Props};
+ Else -> Else
end
end.
@@ -769,14 +769,20 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
{ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
State;
{ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
- {MsgOrId, Props} = read_pub_record_body(Bin, Hdl),
- IsPersistent = case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end,
- load_journal_entries(
- add_to_journal(
- SeqId, {MsgOrId, Props, IsPersistent}, State));
+ case read_pub_record_body(Bin, Hdl) of
+ {ok, MsgOrId, Props} ->
+ IsPersistent =
+ case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end,
+ load_journal_entries(
+ add_to_journal(
+ SeqId, {MsgOrId, Props, IsPersistent},
+ State));
+ _ErrOrEoF ->
+ State
+ end;
_ErrOrEoF -> %% err, we've lost at least a publish
State
end
@@ -963,10 +969,14 @@ load_segment_entries(Hdl, KeepAcked, Acc) ->
load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) ->
case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
{ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} ->
- {MsgOrId, MsgProps} = read_pub_record_body(PubRecordBody, Hdl),
- Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack},
- SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- {SegEntries1, Unacked + 1};
+ case read_pub_record_body(PubRecordBody, Hdl) of
+ {ok, MsgOrId, MsgProps} ->
+ Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ {SegEntries1, Unacked + 1};
+ _ ->
+ {SegEntries, Unacked}
+ end;
_ ->
{SegEntries, Unacked}
end.