diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-10-11 14:02:44 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-10-11 14:02:44 +0100 |
commit | f528b2da3af823b457a11e86b901eb6b98fd6f9e (patch) | |
tree | 30958f64739468ffb00d5c2c535b858a62a34c64 /src/rabbit_amqqueue_process.erl | |
parent | 0bcf01d6c9dcf28709e5ce668ba11850a4bc720d (diff) | |
download | rabbitmq-server-f528b2da3af823b457a11e86b901eb6b98fd6f9e.tar.gz |
rollback transactions on queue terminationbug21368
That way we don't leave garbage - transactionally published, but
uncommitted messages - in the message store.
Also, we we can get rid of the pending_commits state wart in
disk_queue. That is possible because both tx commits and queue
deletions are issued by the queue process and tx commits are
synchronous, so there is never a chance of there being a pending commit
when doing a deletion.
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5789b105..0c334bc3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -118,16 +118,23 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> {ok, start_memory_timer(State), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(_Reason, State) -> +terminate(_Reason, State = #q{mixed_state = MS}) -> %% FIXME: How do we cancel active subscriptions? State1 = stop_memory_timer(State), + %% Ensure that any persisted tx messages are removed; + %% mixed_queue:delete_queue cannot do that for us since neither + %% mixed_queue nor disk_queue keep a record of uncommitted tx + %% messages. + {ok, MS1} = rabbit_mixed_queue:tx_rollback( + lists:concat([PM || #tx { pending_messages = PM } <- + all_tx_record()]), MS), %% Delete from disk queue first. If we crash at this point, when a %% durable queue, we will be recreated at startup, possibly with %% partial content. The alternative is much worse however - if we %% called internal_delete first, we would then have a race between %% the disk_queue delete and a new queue with the same name being %% created and published to. - {ok, _MS} = rabbit_mixed_queue:delete_queue(State1 #q.mixed_state), + {ok, _MS} = rabbit_mixed_queue:delete_queue(MS1), ok = rabbit_amqqueue:internal_delete(qname(State1)). code_change(_OldVsn, State, _Extra) -> |