diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-06-05 08:10:51 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-06-05 08:10:51 +0100 |
commit | 351e31e7af73877f683ebcf13dc711d29d10254b (patch) | |
tree | 8c4e12ad24bfa610bbe096d3ae09a838972aee67 | |
parent | a63a9d4971813a81df09d276e68c9048a2489683 (diff) | |
download | rabbitmq-server-bug20782.tar.gz |
clean up tx records in queues when a transaction's channel diesbug20782
Previously queues were only monitoring channels with subscribers or to
which ack-requiring messages had been delivered. Now queues also
monitor channels from which they have received transactional
publishes.
Queues record the last tx id they have seen from a channel. This then
makes it easy and efficient to find the associated tx record in the
queue's process dictionary when a channel process dies - the
alternative would be to scan the tx records for matching channel pids
- and perform the required rollback activities for the tx.
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 2 |
4 files changed, 43 insertions, 23 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5ebc82a2..784c21b3 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -64,7 +64,7 @@ -record(basic_message, {exchange_name, routing_key, content, persistent_key}). --record(delivery, {mandatory, immediate, txn, message}). +-record(delivery, {mandatory, immediate, txn, sender, message}). %%---------------------------------------------------------------------------- @@ -140,6 +140,7 @@ #delivery{mandatory :: bool(), immediate :: bool(), txn :: maybe(txn()), + sender :: pid(), message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 64f078bd..198e2782 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -241,14 +241,16 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). -deliver(QPid, #delivery{immediate = true, txn = Txn, message = Message}) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}, +deliver(QPid, #delivery{immediate = true, + txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, infinity); -deliver(QPid, #delivery{mandatory = true, txn = Txn, message = Message}) -> - gen_server2:call(QPid, {deliver, Txn, Message}, infinity), +deliver(QPid, #delivery{mandatory = true, + txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), true; -deliver(QPid, #delivery{txn = Txn, message = Message}) -> - gen_server2:cast(QPid, {deliver, Txn, Message}), +deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), true. redeliver(QPid, Messages) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c390b2b7..6027c9c0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -66,6 +66,7 @@ monitor_ref, unacked_messages, is_limit_active, + txn, unsent_message_count}). -define(INFO_KEYS, @@ -133,6 +134,7 @@ ch_record(ChPid) -> monitor_ref = MonitorRef, unacked_messages = dict:new(), is_limit_active = false, + txn = none, unsent_message_count = 0}, put(Key, C), C; @@ -156,6 +158,11 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. +record_current_channel_tx(ChPid, Txn) -> + %% as a side effect this also starts monitoring the channel (if + %% that wasn't happening already) + store_ch_record((ch_record(ChPid))#cr{txn = Txn}). + deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, round_robin = RoundRobin, @@ -198,7 +205,7 @@ deliver_immediately(Message, Delivered, {not_offered, State} end. -attempt_delivery(none, Message, State) -> +attempt_delivery(none, _ChPid, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> {true, State1}; @@ -209,13 +216,13 @@ attempt_delivery(none, Message, State) -> {not_offered, State1} -> {false, State1} end; -attempt_delivery(Txn, Message, State) -> +attempt_delivery(Txn, ChPid, Message, State) -> persist_message(Txn, qname(State), Message), - record_pending_message(Txn, Message), + record_pending_message(Txn, ChPid, Message), {true, State}. -deliver_or_enqueue(Txn, Message, State) -> - case attempt_delivery(Txn, Message, State) of +deliver_or_enqueue(Txn, ChPid, Message, State) -> + case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> @@ -295,10 +302,16 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, round_robin = ActiveConsumers}) -> case lookup_ch(DownPid) of not_found -> noreply(State); - #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> + #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, + unacked_messages = UAM} -> NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), erase({ch, ChPid}), + case Txn of + none -> ok; + _ -> ok = rollback_work(Txn, qname(State)), + erase_tx(Txn) + end, case check_auto_delete( deliver_or_enqueue_n( [{Message, true} || @@ -456,13 +469,17 @@ is_tx_persistent(Txn) -> #tx{is_persistent = Res} = lookup_tx(Txn), Res. -record_pending_message(Txn, Message) -> +record_pending_message(Txn, ChPid, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). + record_current_channel_tx(ChPid, Txn), + store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], + ch_pid = ChPid}). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). + record_current_channel_tx(ChPid, Txn), + store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], + ch_pid = ChPid}). process_pending(Txn, State) -> #tx{ch_pid = ChPid, @@ -541,7 +558,7 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call({deliver_immediately, Txn, Message}, _From, State) -> +handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -555,12 +572,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, Message, State), + {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); -handle_call({deliver, Txn, Message}, _From, State) -> +handle_call({deliver, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), + {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> @@ -711,9 +728,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, reply(locked, State) end. -handle_cast({deliver, Txn, Message}, State) -> +handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), + {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); handle_cast({ack, Txn, MsgIds, ChPid}, State) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index ce4f818d..761b3863 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -61,7 +61,7 @@ publish(Delivery = #delivery{ delivery(Mandatory, Immediate, Txn, Message) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - message = Message}. + sender = self(), message = Message}. message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), |