summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-10-11 14:02:44 +0100
committerMatthias Radestock <matthias@lshift.net>2009-10-11 14:02:44 +0100
commitf528b2da3af823b457a11e86b901eb6b98fd6f9e (patch)
tree30958f64739468ffb00d5c2c535b858a62a34c64 /src/rabbit_amqqueue_process.erl
parent0bcf01d6c9dcf28709e5ce668ba11850a4bc720d (diff)
downloadrabbitmq-server-f528b2da3af823b457a11e86b901eb6b98fd6f9e.tar.gz
rollback transactions on queue terminationbug21368
That way we don't leave garbage - transactionally published, but uncommitted messages - in the message store. Also, we we can get rid of the pending_commits state wart in disk_queue. That is possible because both tx commits and queue deletions are issued by the queue process and tx commits are synchronous, so there is never a chance of there being a pending commit when doing a deletion.
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl11
1 files changed, 9 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5789b105..0c334bc3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -118,16 +118,23 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
{ok, start_memory_timer(State), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(_Reason, State) ->
+terminate(_Reason, State = #q{mixed_state = MS}) ->
%% FIXME: How do we cancel active subscriptions?
State1 = stop_memory_timer(State),
+ %% Ensure that any persisted tx messages are removed;
+ %% mixed_queue:delete_queue cannot do that for us since neither
+ %% mixed_queue nor disk_queue keep a record of uncommitted tx
+ %% messages.
+ {ok, MS1} = rabbit_mixed_queue:tx_rollback(
+ lists:concat([PM || #tx { pending_messages = PM } <-
+ all_tx_record()]), MS),
%% Delete from disk queue first. If we crash at this point, when a
%% durable queue, we will be recreated at startup, possibly with
%% partial content. The alternative is much worse however - if we
%% called internal_delete first, we would then have a race between
%% the disk_queue delete and a new queue with the same name being
%% created and published to.
- {ok, _MS} = rabbit_mixed_queue:delete_queue(State1 #q.mixed_state),
+ {ok, _MS} = rabbit_mixed_queue:delete_queue(MS1),
ok = rabbit_amqqueue:internal_delete(qname(State1)).
code_change(_OldVsn, State, _Extra) ->