From e2645b89e948168c3c2d9595ec7f7097ad0fb3d0 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 20 Jan 2011 16:01:05 +0000 Subject: Deal with the possibility of a ch DOWN overtaking other messages from the channel --- src/rabbit_amqqueue_process.erl | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 663977ba..48192dcb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -630,24 +630,36 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS, ttl = TTL}) -> - {AckTags, BQS1} = BQ:tx_commit(Txn, - fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(TTL), - BQS), %% ChPid must be known here because of the participant management - %% by the channel. - C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), - State#q{backing_queue_state = BQS1}. + %% by the channel. However, in a cluster, the DOWN can overtake + %% the commit, and so there is a case where handle_ch_down has + %% already been called for ChPid. + case lookup_ch(ChPid) of + not_found -> + gen_server2:reply(From, ok), + State; + C = #cr{acktags = ChAckTags} -> + {AckTags, BQS1} = BQ:tx_commit( + Txn, fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(TTL), BQS), + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + State#q{backing_queue_state = BQS1} + end. rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), - %% Iff we removed acktags from the channel record on ack+txn then - %% we would add them back in here (would also require ChPid) - record_current_channel_tx(ChPid, none), - State#q{backing_queue_state = BQS1}. + case lookup_ch(ChPid) of + not_found -> + State; + #cr{} -> + {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + %% Iff we removed acktags from the channel record on + %% ack+txn then we would add them back in here (would also + %% require ChPid) + record_current_channel_tx(ChPid, none), + State#q{backing_queue_state = BQS1} + end. subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). -- cgit v1.2.1