diff options
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r-- | src/rabbit_queue_index.erl | 40 |
1 files changed, 25 insertions, 15 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f087e761..65118c1a 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -628,12 +628,12 @@ read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, end, size = Size}, case IndexSize of - 0 -> {MsgId, Props}; + 0 -> {ok, MsgId, Props}; _ -> case file_handle_cache:read(Hdl, IndexSize) of {ok, MsgBin} -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin), - {Msg, Props}; - _ -> exit(could_not_read) %% TODO + {ok, Msg, Props}; + Else -> Else end end. @@ -769,14 +769,20 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} -> State; {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} -> - {MsgOrId, Props} = read_pub_record_body(Bin, Hdl), - IsPersistent = case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end, - load_journal_entries( - add_to_journal( - SeqId, {MsgOrId, Props, IsPersistent}, State)); + case read_pub_record_body(Bin, Hdl) of + {ok, MsgOrId, Props} -> + IsPersistent = + case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end, + load_journal_entries( + add_to_journal( + SeqId, {MsgOrId, Props, IsPersistent}, + State)); + _ErrOrEoF -> + State + end; _ErrOrEoF -> %% err, we've lost at least a publish State end @@ -963,10 +969,14 @@ load_segment_entries(Hdl, KeepAcked, Acc) -> load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) -> case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} -> - {MsgOrId, MsgProps} = read_pub_record_body(PubRecordBody, Hdl), - Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack}, - SegEntries1 = array:set(RelSeq, Obj, SegEntries), - {SegEntries1, Unacked + 1}; + case read_pub_record_body(PubRecordBody, Hdl) of + {ok, MsgOrId, MsgProps} -> + Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack}, + SegEntries1 = array:set(RelSeq, Obj, SegEntries), + {SegEntries1, Unacked + 1}; + _ -> + {SegEntries, Unacked} + end; _ -> {SegEntries, Unacked} end. |