summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-05 15:18:45 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-05 15:18:45 +0100
commitff22f91a975741e87d8d1469a9dc3d6f9cfeb631 (patch)
tree731418b65fc93effa6dd095cdf4f4504b6e3cda5
parentd94eaeb471aeabaf54a5623f1c4cdf52312c28c3 (diff)
downloadrabbitmq-server-bug20980.tar.gz
Removed some transactions and made all transaction bodies idempotent. They were actually fine before: a) the rabbit_disk_queue table is local_content and b) only one process ever accesses that table - thus there is no reason why any transaction will ever retry. However, this change is probably still beneficial. The only slight loss is that tx-commit is no longer atomic (ref counting of messages in ets, not mnesia, was resulting in non idempotency, so moved outside the transaction). This means that you could have msgs in a tx committed, but the acks not enforced, in the event of power failure or other catastrophic event.bug20980
All tests pass.
-rw-r--r--src/rabbit_disk_queue.erl130
1 files changed, 56 insertions, 74 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index fe8c433c..75892f68 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -980,11 +980,6 @@ internal_ack(Q, MsgSeqIds, State) ->
remove_messages(Q, MsgSeqIds, true, State).
%% Q is only needed if MnesiaDelete /= false
-%% called from ack with MnesiaDelete = true
-%% called from tx_commit with MnesiaDelete = txn
-%% called from tx_cancel with MnesiaDelete = false
-%% called from purge with MnesiaDelete = txn
-%% called from delete_queue with MnesiaDelete = txn
remove_messages(Q, MsgSeqIds, MnesiaDelete,
State = #dqstate { file_summary = FileSummary,
current_file_name = CurName
@@ -1092,8 +1087,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
State = #dqstate { sequences = Sequences }) ->
{InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
- {atomic, {WriteSeqId, State1}} =
- mnesia:transaction(
+ WriteSeqId =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
{ok, WriteSeqId1} =
@@ -1107,9 +1102,9 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
}, write),
SeqId + 1}
end, {ok, InitWriteSeqId}, PubMsgIds),
- {ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
- {WriteSeqId1, State2}
+ WriteSeqId1
end),
+ {ok, State1} = remove_messages(Q, AckSeqIds, true, State),
true = case PubMsgIds of
[] -> true;
_ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId})
@@ -1162,17 +1157,18 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
%% as they have no concept of sequence id anyway).
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- {atomic, {WriteSeqId1, Q, State}} =
- mnesia:transaction(
+ {WriteSeqId1, Q, MsgIds} =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(fun requeue_message/2, {WriteSeqId, Q, State},
+ lists:foldl(fun requeue_message/2, {WriteSeqId, Q, []},
MsgSeqIds)
end),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}),
+ lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
{ok, State}.
-requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, State}) ->
+requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) ->
[Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
ok = mnesia:write(rabbit_disk_queue,
@@ -1181,57 +1177,50 @@ requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, State}) ->
},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write),
- decrement_cache(MsgId, State),
- {WriteSeqId + 1, Q, State}.
+ {WriteSeqId + 1, Q, [MsgId | Acc]}.
%% move the next N messages from the front of the queue to the back.
internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
if N >= (WriteSeqId - ReadSeqId) -> {ok, State};
true ->
- {atomic, {ReadSeqIdN, WriteSeqIdN}} =
- mnesia:transaction(
+ {ReadSeqIdN, WriteSeqIdN, MsgIds} =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- requeue_next_messages(Q, State, N, ReadSeqId, WriteSeqId)
+ requeue_next_messages(Q, N, ReadSeqId, WriteSeqId, [])
end
),
true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}),
+ lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
{ok, State}
end.
-requeue_next_messages(_Q, _State, 0, ReadSeq, WriteSeq) ->
- {ReadSeq, WriteSeq};
-requeue_next_messages(Q, State, N, ReadSeq, WriteSeq) ->
+requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) ->
+ {ReadSeq, WriteSeq, Acc};
+requeue_next_messages(Q, N, ReadSeq, WriteSeq, Acc) ->
[Obj = #dq_msg_loc { msg_id = MsgId }] =
mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write),
ok = mnesia:write(rabbit_disk_queue,
Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write),
- decrement_cache(MsgId, State),
- requeue_next_messages(Q, State, N - 1, ReadSeq + 1, WriteSeq + 1).
+ requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1, [MsgId | Acc]).
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
{SeqId, SeqId} -> {ok, 0, State};
{ReadSeqId, WriteSeqId} ->
- {atomic, {ok, State1}} =
- mnesia:transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- {MsgSeqIds, WriteSeqId} =
- rabbit_misc:unfold(
- fun (SeqId) when SeqId == WriteSeqId -> false;
- (SeqId) ->
- [#dq_msg_loc { msg_id = MsgId }] =
- mnesia:read(rabbit_disk_queue,
- {Q, SeqId}, write),
- {true, {MsgId, SeqId}, SeqId + 1}
- end, ReadSeqId),
- remove_messages(Q, MsgSeqIds, txn, State)
- end),
+ {MsgSeqIds, WriteSeqId} =
+ rabbit_misc:unfold(
+ fun (SeqId) when SeqId == WriteSeqId -> false;
+ (SeqId) ->
+ [#dq_msg_loc { msg_id = MsgId }] =
+ mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}),
+ {true, {MsgId, SeqId}, SeqId + 1}
+ end, ReadSeqId),
true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}),
+ {ok, State1} = remove_messages(Q, MsgSeqIds, true, State),
{ok, WriteSeqId - ReadSeqId, State1}
end.
@@ -1239,26 +1228,19 @@ internal_delete_queue(Q, State) ->
{ok, _Count, State1 = #dqstate { sequences = Sequences }} =
internal_purge(Q, State), %% remove everything undelivered
true = ets:delete(Sequences, Q),
- {atomic, {ok, State2}} =
- mnesia:transaction(
- fun() -> %% now remove everything already delivered
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- Objs =
- mnesia:match_object(
- rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, '_'},
- msg_id = '_',
- is_delivered = '_'
- },
- write),
- MsgSeqIds =
- lists:map(
- fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
- msg_id = MsgId }) ->
- {MsgId, SeqId} end, Objs),
- remove_messages(Q, MsgSeqIds, txn, State1)
- end),
- {ok, State2}.
+ %% now remove everything already delivered
+ Objs = mnesia:dirty_match_object(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, '_'},
+ msg_id = '_',
+ is_delivered = '_'
+ }),
+ MsgSeqIds =
+ lists:map(
+ fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
+ msg_id = MsgId }) ->
+ {MsgId, SeqId} end, Objs),
+ remove_messages(Q, MsgSeqIds, true, State1).
internal_delete_non_durable_queues(
DurableQueues, State = #dqstate { sequences = Sequences }) ->
@@ -1563,8 +1545,8 @@ load_from_disk(State) ->
State1 = load_messages(undefined, Files, State),
%% Finally, check there is nothing in mnesia which we haven't
%% loaded
- {atomic, State2} =
- mnesia:transaction(
+ State2 =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
{State6, FinalQ, MsgSeqIds2, _Len} =
@@ -1605,7 +1587,7 @@ load_from_disk(State) ->
{ok, State8}.
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
- {atomic, true} = mnesia:transaction(
+ true = rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
mnesia:foldl(
@@ -1624,7 +1606,7 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
end
end, true, rabbit_disk_queue)
end),
- remove_gaps_in_sequences(State),
+ ok = remove_gaps_in_sequences(State),
State.
remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
@@ -1637,18 +1619,18 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
%% we could shuffle downwards. However, I think there's greater
%% likelihood of gaps being at the bottom rather than the top of
%% the queue, so shuffling up should be the better bet.
- {atomic, _} =
- mnesia:transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foreach(
- fun ({Q, ReadSeqId, WriteSeqId}) ->
- Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
- ReadSeqId1 = ReadSeqId + Gap,
- true = ets:insert(Sequences,
- {Q, ReadSeqId1, WriteSeqId})
- end, ets:match_object(Sequences, '_'))
- end).
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foreach(
+ fun ({Q, ReadSeqId, WriteSeqId}) ->
+ Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
+ ReadSeqId1 = ReadSeqId + Gap,
+ true = ets:insert(Sequences,
+ {Q, ReadSeqId1, WriteSeqId})
+ end, ets:match_object(Sequences, '_'))
+ end),
+ ok.
shuffle_up(_Q, SeqId, SeqId, Gap) ->
Gap;