summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-20 16:01:05 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-20 16:01:05 +0000
commite2645b89e948168c3c2d9595ec7f7097ad0fb3d0 (patch)
treeffaac228fdb0fd034ab79e3c39e4cb36aec3e854
parent9b01ba889cf62aea9bb5a29062610139bdecc6a2 (diff)
downloadrabbitmq-server-e2645b89e948168c3c2d9595ec7f7097ad0fb3d0.tar.gz
Deal with the possibility of a ch DOWN overtaking other messages from the channel
-rw-r--r--src/rabbit_amqqueue_process.erl40
1 files changed, 26 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 663977ba..48192dcb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -630,24 +630,36 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
ttl = TTL}) ->
- {AckTags, BQS1} = BQ:tx_commit(Txn,
- fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(TTL),
- BQS),
%% ChPid must be known here because of the participant management
- %% by the channel.
- C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
- State#q{backing_queue_state = BQS1}.
+ %% by the channel. However, in a cluster, the DOWN can overtake
+ %% the commit, and so there is a case where handle_ch_down has
+ %% already been called for ChPid.
+ case lookup_ch(ChPid) of
+ not_found ->
+ gen_server2:reply(From, ok),
+ State;
+ C = #cr{acktags = ChAckTags} ->
+ {AckTags, BQS1} = BQ:tx_commit(
+ Txn, fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(TTL), BQS),
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
+ State#q{backing_queue_state = BQS1}
+ end.
rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
- %% Iff we removed acktags from the channel record on ack+txn then
- %% we would add them back in here (would also require ChPid)
- record_current_channel_tx(ChPid, none),
- State#q{backing_queue_state = BQS1}.
+ case lookup_ch(ChPid) of
+ not_found ->
+ State;
+ #cr{} ->
+ {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ %% Iff we removed acktags from the channel record on
+ %% ack+txn then we would add them back in here (would also
+ %% require ChPid)
+ record_current_channel_tx(ChPid, none),
+ State#q{backing_queue_state = BQS1}
+ end.
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).