summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-10-11 14:02:44 +0100
committerMatthias Radestock <matthias@lshift.net>2009-10-11 14:02:44 +0100
commitf528b2da3af823b457a11e86b901eb6b98fd6f9e (patch)
tree30958f64739468ffb00d5c2c535b858a62a34c64
parent0bcf01d6c9dcf28709e5ce668ba11850a4bc720d (diff)
downloadrabbitmq-server-bug21368.tar.gz
rollback transactions on queue terminationbug21368
That way we don't leave garbage - transactionally published, but uncommitted messages - in the message store. Also, we we can get rid of the pending_commits state wart in disk_queue. That is possible because both tx commits and queue deletions are issued by the queue process and tx commits are synchronous, so there is never a chance of there being a pending commit when doing a deletion.
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_disk_queue.erl128
2 files changed, 59 insertions, 80 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5789b105..0c334bc3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -118,16 +118,23 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
{ok, start_memory_timer(State), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(_Reason, State) ->
+terminate(_Reason, State = #q{mixed_state = MS}) ->
%% FIXME: How do we cancel active subscriptions?
State1 = stop_memory_timer(State),
+ %% Ensure that any persisted tx messages are removed;
+ %% mixed_queue:delete_queue cannot do that for us since neither
+ %% mixed_queue nor disk_queue keep a record of uncommitted tx
+ %% messages.
+ {ok, MS1} = rabbit_mixed_queue:tx_rollback(
+ lists:concat([PM || #tx { pending_messages = PM } <-
+ all_tx_record()]), MS),
%% Delete from disk queue first. If we crash at this point, when a
%% durable queue, we will be recreated at startup, possibly with
%% partial content. The alternative is much worse however - if we
%% called internal_delete first, we would then have a race between
%% the disk_queue delete and a new queue with the same name being
%% created and published to.
- {ok, _MS} = rabbit_mixed_queue:delete_queue(State1 #q.mixed_state),
+ {ok, _MS} = rabbit_mixed_queue:delete_queue(MS1),
ok = rabbit_amqqueue:internal_delete(qname(State1)).
code_change(_OldVsn, State, _Extra) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 42c4ed8b..7d44dd9d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -66,10 +66,7 @@
-define(SERVER, ?MODULE).
--record(dqstate,
- { sequences, %% next read and write for each q
- pending_commits %% dict of txns waiting for msg_store
- }).
+-record(dqstate, { sequences }). %% next read and write for each q
%%----------------------------------------------------------------------------
@@ -170,8 +167,8 @@ stop_and_obliterate() ->
%% private
-finalise_commit(TxId) ->
- gen_server2:cast(?SERVER, {finalise_commit, TxId}).
+finalise_commit(TxDetails) ->
+ gen_server2:cast(?SERVER, {finalise_commit, TxDetails}).
%%----------------------------------------------------------------------------
%% gen_server behaviour
@@ -200,7 +197,7 @@ init([]) ->
Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
ok = extract_sequence_numbers(Sequences),
- State = #dqstate { sequences = Sequences, pending_commits = dict:new() },
+ State = #dqstate { sequences = Sequences },
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -269,8 +266,8 @@ handle_cast({prefetch, Q, From}, State) ->
false -> ok
end,
noreply(State1);
-handle_cast({finalise_commit, TxId}, State) ->
- noreply(finalise_commit(TxId, State)).
+handle_cast({finalise_commit, TxDetails}, State) ->
+ noreply(finalise_commit(TxDetails, State)).
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -390,54 +387,40 @@ internal_tx_publish(Message = #basic_message { guid = MsgId,
MsgId, Message #basic_message { content = ClearedContent }),
{ok, State}.
-internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
- State = #dqstate { pending_commits = PendingCommits }) ->
+internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State) ->
+ TxDetails = {Q, PubMsgIds, AckSeqIds, From},
ok = rabbit_msg_store:sync([MsgId || {MsgId, _, _} <- PubMsgIds],
- fun () -> finalise_commit({Q, From}) end),
- PendingCommits1 = dict:store(Q, {PubMsgIds, AckSeqIds, From},
- PendingCommits),
- State #dqstate { pending_commits = PendingCommits1 }.
-
-finalise_commit({Q, From},
- State = #dqstate { sequences = Sequences,
- pending_commits = PendingCommits }) ->
- case dict:find(Q, PendingCommits) of
- {ok, {PubMsgIds, AckSeqIds, From}} ->
- {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
- WriteSeqId =
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(
- fun ({MsgId, IsDelivered, IsPersistent}, SeqId) ->
- ok = mnesia:write(
- rabbit_disk_queue,
- #dq_msg_loc {
- queue_and_seq_id = {Q, SeqId},
- msg_id = MsgId,
- is_delivered = IsDelivered,
- is_persistent = IsPersistent
- }, write),
- SeqId + 1
- end, InitWriteSeqId, PubMsgIds)
- end),
- {ok, State1} = remove_messages(Q, AckSeqIds, State),
- true = case PubMsgIds of
- [] -> true;
- _ -> ets:insert(Sequences,
- {Q, InitReadSeqId, WriteSeqId})
- end,
- gen_server2:reply(From, ok),
- State1 # dqstate { pending_commits =
- dict:erase(Q, PendingCommits) };
- {ok, _} ->
- %% sync notification for a deleted queue which has since
- %% been recreated
- State;
- error ->
- %% sync notification for a deleted queue
- State
- end.
+ fun () -> finalise_commit(TxDetails) end),
+ State.
+
+finalise_commit({Q, PubMsgIds, AckSeqIds, From},
+ State = #dqstate { sequences = Sequences }) ->
+ {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
+ WriteSeqId =
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foldl(
+ fun ({MsgId, IsDelivered, IsPersistent}, SeqId) ->
+ ok = mnesia:write(
+ rabbit_disk_queue,
+ #dq_msg_loc {
+ queue_and_seq_id = {Q, SeqId},
+ msg_id = MsgId,
+ is_delivered = IsDelivered,
+ is_persistent = IsPersistent
+ }, write),
+ SeqId + 1
+ end, InitWriteSeqId, PubMsgIds)
+ end),
+ {ok, State1} = remove_messages(Q, AckSeqIds, State),
+ true = case PubMsgIds of
+ [] -> true;
+ _ -> ets:insert(Sequences,
+ {Q, InitReadSeqId, WriteSeqId})
+ end,
+ gen_server2:reply(From, ok),
+ State1.
internal_publish(Q, Message = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
@@ -551,31 +534,20 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
{ok, WriteSeqId - ReadSeqId, State1}
end.
-internal_delete_queue(Q,
- State = #dqstate { pending_commits = PendingCommits }) ->
- %% remove pending commits
- State1 = case dict:find(Q, PendingCommits) of
- {ok, {PubMsgIds, _, _}} ->
- ok = rabbit_msg_store:remove(
- [MsgId || {MsgId, _, _} <- PubMsgIds]),
- State # dqstate { pending_commits =
- dict:erase(Q, PendingCommits) };
- error ->
- State
- end,
+internal_delete_queue(Q, State) ->
%% remove everything undelivered
- {ok, _Count, State2 = #dqstate { sequences = Sequences }} =
- internal_purge(Q, State1),
+ {ok, _Count, State1 = #dqstate { sequences = Sequences }} =
+ internal_purge(Q, State),
true = ets:delete(Sequences, Q),
%% remove everything already delivered
- Objs = mnesia:dirty_match_object(
- rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, '_'}, _ = '_' }),
- MsgSeqIds = lists:map(fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
- msg_id = MsgId }) ->
- {MsgId, SeqId}
- end, Objs),
- remove_messages(Q, MsgSeqIds, State2).
+ remove_messages(
+ Q, [{MsgId, SeqId} || #dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
+ msg_id = MsgId } <-
+ mnesia:dirty_match_object(
+ rabbit_disk_queue,
+ #dq_msg_loc {
+ queue_and_seq_id = {Q, '_'},
+ _ = '_' })], State1).
internal_delete_non_durable_queues(
DurableQueues, State = #dqstate { sequences = Sequences }) ->