diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-12-12 11:41:09 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-12-12 11:41:09 +0000 |
commit | c35a4b43ad993897b7bb6fe70eb364d357758f94 (patch) | |
tree | 135cb3a9e1af290badc263e28c5573f05dfa534b | |
parent | d77a0d5ee5bd3f29e19b566018f439c959c89f06 (diff) | |
download | rabbitmq-server-c35a4b43ad993897b7bb6fe70eb364d357758f94.tar.gz |
Handle read failure in read_pub_record_body
-rw-r--r-- | src/rabbit_queue_index.erl | 40 |
1 files changed, 25 insertions, 15 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f087e761..65118c1a 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -628,12 +628,12 @@ read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, end, size = Size}, case IndexSize of - 0 -> {MsgId, Props}; + 0 -> {ok, MsgId, Props}; _ -> case file_handle_cache:read(Hdl, IndexSize) of {ok, MsgBin} -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin), - {Msg, Props}; - _ -> exit(could_not_read) %% TODO + {ok, Msg, Props}; + Else -> Else end end. @@ -769,14 +769,20 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} -> State; {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} -> - {MsgOrId, Props} = read_pub_record_body(Bin, Hdl), - IsPersistent = case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end, - load_journal_entries( - add_to_journal( - SeqId, {MsgOrId, Props, IsPersistent}, State)); + case read_pub_record_body(Bin, Hdl) of + {ok, MsgOrId, Props} -> + IsPersistent = + case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end, + load_journal_entries( + add_to_journal( + SeqId, {MsgOrId, Props, IsPersistent}, + State)); + _ErrOrEoF -> + State + end; _ErrOrEoF -> %% err, we've lost at least a publish State end @@ -963,10 +969,14 @@ load_segment_entries(Hdl, KeepAcked, Acc) -> load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) -> case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} -> - {MsgOrId, MsgProps} = read_pub_record_body(PubRecordBody, Hdl), - Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack}, - SegEntries1 = array:set(RelSeq, Obj, SegEntries), - {SegEntries1, Unacked + 1}; + case read_pub_record_body(PubRecordBody, Hdl) of + {ok, MsgOrId, MsgProps} -> + Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack}, + SegEntries1 = array:set(RelSeq, Obj, SegEntries), + {SegEntries1, Unacked + 1}; + _ -> + {SegEntries, Unacked} + end; _ -> {SegEntries, Unacked} end. |