diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-05 15:18:45 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-05 15:18:45 +0100 |
commit | ff22f91a975741e87d8d1469a9dc3d6f9cfeb631 (patch) | |
tree | 731418b65fc93effa6dd095cdf4f4504b6e3cda5 | |
parent | d94eaeb471aeabaf54a5623f1c4cdf52312c28c3 (diff) | |
download | rabbitmq-server-bug20980.tar.gz |
Removed some transactions and made all transaction bodies idempotent. They were actually fine before: a) the rabbit_disk_queue table is local_content and b) only one process ever accesses that table - thus there is no reason why any transaction will ever retry. However, this change is probably still beneficial. The only slight loss is that tx-commit is no longer atomic (ref counting of messages in ets, not mnesia, was resulting in non idempotency, so moved outside the transaction). This means that you could have msgs in a tx committed, but the acks not enforced, in the event of power failure or other catastrophic event.bug20980
All tests pass.
-rw-r--r-- | src/rabbit_disk_queue.erl | 130 |
1 files changed, 56 insertions, 74 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index fe8c433c..75892f68 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -980,11 +980,6 @@ internal_ack(Q, MsgSeqIds, State) -> remove_messages(Q, MsgSeqIds, true, State). %% Q is only needed if MnesiaDelete /= false -%% called from ack with MnesiaDelete = true -%% called from tx_commit with MnesiaDelete = txn -%% called from tx_cancel with MnesiaDelete = false -%% called from purge with MnesiaDelete = txn -%% called from delete_queue with MnesiaDelete = txn remove_messages(Q, MsgSeqIds, MnesiaDelete, State = #dqstate { file_summary = FileSummary, current_file_name = CurName @@ -1092,8 +1087,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, State = #dqstate { sequences = Sequences }) -> {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), - {atomic, {WriteSeqId, State1}} = - mnesia:transaction( + WriteSeqId = + rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), {ok, WriteSeqId1} = @@ -1107,9 +1102,9 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, }, write), SeqId + 1} end, {ok, InitWriteSeqId}, PubMsgIds), - {ok, State2} = remove_messages(Q, AckSeqIds, txn, State), - {WriteSeqId1, State2} + WriteSeqId1 end), + {ok, State1} = remove_messages(Q, AckSeqIds, true, State), true = case PubMsgIds of [] -> true; _ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId}) @@ -1162,17 +1157,18 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> %% as they have no concept of sequence id anyway). {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - {atomic, {WriteSeqId1, Q, State}} = - mnesia:transaction( + {WriteSeqId1, Q, MsgIds} = + rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl(fun requeue_message/2, {WriteSeqId, Q, State}, + lists:foldl(fun requeue_message/2, {WriteSeqId, Q, []}, MsgSeqIds) end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}), + lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), {ok, State}. -requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, State}) -> +requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) -> [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), ok = mnesia:write(rabbit_disk_queue, @@ -1181,57 +1177,50 @@ requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, State}) -> }, write), ok = mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write), - decrement_cache(MsgId, State), - {WriteSeqId + 1, Q, State}. + {WriteSeqId + 1, Q, [MsgId | Acc]}. %% move the next N messages from the front of the queue to the back. internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), if N >= (WriteSeqId - ReadSeqId) -> {ok, State}; true -> - {atomic, {ReadSeqIdN, WriteSeqIdN}} = - mnesia:transaction( + {ReadSeqIdN, WriteSeqIdN, MsgIds} = + rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - requeue_next_messages(Q, State, N, ReadSeqId, WriteSeqId) + requeue_next_messages(Q, N, ReadSeqId, WriteSeqId, []) end ), true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}), + lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), {ok, State} end. -requeue_next_messages(_Q, _State, 0, ReadSeq, WriteSeq) -> - {ReadSeq, WriteSeq}; -requeue_next_messages(Q, State, N, ReadSeq, WriteSeq) -> +requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) -> + {ReadSeq, WriteSeq, Acc}; +requeue_next_messages(Q, N, ReadSeq, WriteSeq, Acc) -> [Obj = #dq_msg_loc { msg_id = MsgId }] = mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write), ok = mnesia:write(rabbit_disk_queue, Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}}, write), ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write), - decrement_cache(MsgId, State), - requeue_next_messages(Q, State, N - 1, ReadSeq + 1, WriteSeq + 1). + requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1, [MsgId | Acc]). internal_purge(Q, State = #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of {SeqId, SeqId} -> {ok, 0, State}; {ReadSeqId, WriteSeqId} -> - {atomic, {ok, State1}} = - mnesia:transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - {MsgSeqIds, WriteSeqId} = - rabbit_misc:unfold( - fun (SeqId) when SeqId == WriteSeqId -> false; - (SeqId) -> - [#dq_msg_loc { msg_id = MsgId }] = - mnesia:read(rabbit_disk_queue, - {Q, SeqId}, write), - {true, {MsgId, SeqId}, SeqId + 1} - end, ReadSeqId), - remove_messages(Q, MsgSeqIds, txn, State) - end), + {MsgSeqIds, WriteSeqId} = + rabbit_misc:unfold( + fun (SeqId) when SeqId == WriteSeqId -> false; + (SeqId) -> + [#dq_msg_loc { msg_id = MsgId }] = + mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}), + {true, {MsgId, SeqId}, SeqId + 1} + end, ReadSeqId), true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}), + {ok, State1} = remove_messages(Q, MsgSeqIds, true, State), {ok, WriteSeqId - ReadSeqId, State1} end. @@ -1239,26 +1228,19 @@ internal_delete_queue(Q, State) -> {ok, _Count, State1 = #dqstate { sequences = Sequences }} = internal_purge(Q, State), %% remove everything undelivered true = ets:delete(Sequences, Q), - {atomic, {ok, State2}} = - mnesia:transaction( - fun() -> %% now remove everything already delivered - ok = mnesia:write_lock_table(rabbit_disk_queue), - Objs = - mnesia:match_object( - rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, '_'}, - msg_id = '_', - is_delivered = '_' - }, - write), - MsgSeqIds = - lists:map( - fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, - msg_id = MsgId }) -> - {MsgId, SeqId} end, Objs), - remove_messages(Q, MsgSeqIds, txn, State1) - end), - {ok, State2}. + %% now remove everything already delivered + Objs = mnesia:dirty_match_object( + rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, '_'}, + msg_id = '_', + is_delivered = '_' + }), + MsgSeqIds = + lists:map( + fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, + msg_id = MsgId }) -> + {MsgId, SeqId} end, Objs), + remove_messages(Q, MsgSeqIds, true, State1). internal_delete_non_durable_queues( DurableQueues, State = #dqstate { sequences = Sequences }) -> @@ -1563,8 +1545,8 @@ load_from_disk(State) -> State1 = load_messages(undefined, Files, State), %% Finally, check there is nothing in mnesia which we haven't %% loaded - {atomic, State2} = - mnesia:transaction( + State2 = + rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), {State6, FinalQ, MsgSeqIds2, _Len} = @@ -1605,7 +1587,7 @@ load_from_disk(State) -> {ok, State8}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> - {atomic, true} = mnesia:transaction( + true = rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), mnesia:foldl( @@ -1624,7 +1606,7 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> end end, true, rabbit_disk_queue) end), - remove_gaps_in_sequences(State), + ok = remove_gaps_in_sequences(State), State. remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> @@ -1637,18 +1619,18 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> %% we could shuffle downwards. However, I think there's greater %% likelihood of gaps being at the bottom rather than the top of %% the queue, so shuffling up should be the better bet. - {atomic, _} = - mnesia:transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foreach( - fun ({Q, ReadSeqId, WriteSeqId}) -> - Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), - ReadSeqId1 = ReadSeqId + Gap, - true = ets:insert(Sequences, - {Q, ReadSeqId1, WriteSeqId}) - end, ets:match_object(Sequences, '_')) - end). + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foreach( + fun ({Q, ReadSeqId, WriteSeqId}) -> + Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), + ReadSeqId1 = ReadSeqId + Gap, + true = ets:insert(Sequences, + {Q, ReadSeqId1, WriteSeqId}) + end, ets:match_object(Sequences, '_')) + end), + ok. shuffle_up(_Q, SeqId, SeqId, Gap) -> Gap; |