summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-16 17:38:01 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-16 17:38:01 +0100
commit39fdd3039cf23a323df9136f94c4266cc602922f (patch)
treee934ebc9cf130f92d5f54dc11cdcc5ca06715d86
parentf48db4a0bbbe15ab62922b309a2ac89af3e9a2fb (diff)
downloadrabbitmq-server-39fdd3039cf23a323df9136f94c4266cc602922f.tar.gz
tweak tx API on amqqueue
commit_all and rollback_all take the channel pid as an additional arg. This brings these functions in line with deliver and ack, which also take both a txn and ch_pid. In the queue process this saves us some book keeping. Also, in the queue process we need to clear the txn field in the ch record on commit/rollback, since otherwise a subsequent channel 'DOWN' results in some suprising and unnecessary, though perfectly safe, control flow. Finally, there is no need to check that a commit relates to a channel the queue process knows about - this is always guaranteed to be the case. All these changes were cherry-picked from the bug21673 branch and bring default closer to that branch.
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_amqqueue_process.erl38
-rw-r--r--src/rabbit_channel.erl4
3 files changed, 25 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 97b5ce46..f6278836 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -41,7 +41,7 @@
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
+-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
@@ -91,8 +91,8 @@
-spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
--spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
+-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
@@ -288,16 +288,16 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
-commit_all(QPids, Txn) ->
+commit_all(QPids, Txn, ChPid) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
+ fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
QPids).
-rollback_all(QPids, Txn) ->
+rollback_all(QPids, Txn, ChPid) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end,
+ fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end,
QPids).
notify_down_all(QPids, ChPid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ba41f550..449e79ea 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -57,7 +57,7 @@
-record(consumer, {tag, ack_required}).
--record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
+-record(tx, {is_persistent, pending_messages, pending_acks}).
%% These are held in our process dictionary
-record(cr, {consumer_count,
@@ -431,8 +431,7 @@ do_if_persistent(F, Txn, QName) ->
lookup_tx(Txn) ->
case get({txn, Txn}) of
- undefined -> #tx{ch_pid = none,
- is_persistent = false,
+ undefined -> #tx{is_persistent = false,
pending_messages = [],
pending_acks = []};
V -> V
@@ -461,26 +460,19 @@ is_tx_persistent(Txn) ->
record_pending_message(Txn, ChPid, Message) ->
Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending],
- ch_pid = ChPid}).
+ store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}).
record_pending_acks(Txn, ChPid, MsgIds) ->
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
- ch_pid = ChPid}).
-
-process_pending(Txn, State) ->
- #tx{ch_pid = ChPid,
- pending_messages = PendingMessages,
- pending_acks = PendingAcks} = lookup_tx(Txn),
- case lookup_ch(ChPid) of
- not_found -> ok;
- C = #cr{unacked_messages = UAM} ->
- {_Acked, Remaining} =
- collect_messages(lists:append(PendingAcks), UAM),
- store_ch_record(C#cr{unacked_messages = Remaining})
- end,
+ store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending]}).
+
+process_pending(Txn, ChPid, State) ->
+ #tx{pending_messages = PendingMessages, pending_acks = PendingAcks} =
+ lookup_tx(Txn),
+ C = #cr{unacked_messages = UAM} = lookup_ch(ChPid),
+ {_Acked, Remaining} = collect_messages(lists:append(PendingAcks), UAM),
+ store_ch_record(C#cr{unacked_messages = Remaining}),
deliver_or_enqueue_n(lists:reverse(PendingMessages), State).
collect_messages(MsgIds, UAM) ->
@@ -589,12 +581,13 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
{Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
reply(Delivered, NewState);
-handle_call({commit, Txn}, From, State) ->
+handle_call({commit, Txn, ChPid}, From, State) ->
ok = commit_work(Txn, qname(State)),
%% optimisation: we reply straight away so the sender can continue
gen_server2:reply(From, ok),
- NewState = process_pending(Txn, State),
+ NewState = process_pending(Txn, ChPid, State),
erase_tx(Txn),
+ record_current_channel_tx(ChPid, none),
noreply(NewState);
handle_call({notify_down, ChPid}, _From, State) ->
@@ -776,9 +769,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
noreply(State)
end;
-handle_cast({rollback, Txn}, State) ->
+handle_cast({rollback, Txn, ChPid}, State) ->
ok = rollback_work(Txn, qname(State)),
erase_tx(Txn),
+ record_current_channel_tx(ChPid, none),
noreply(State);
handle_cast({redeliver, Messages}, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9aeb4623..7d3cd722 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -928,7 +928,7 @@ new_tx(State) ->
internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
- TxnKey) of
+ TxnKey, self()) of
ok -> ok = notify_limiter(State#ch.limiter_pid,
State#ch.uncommitted_ack_q),
new_tx(State);
@@ -945,7 +945,7 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
queue:len(UAQ),
queue:len(UAMQ)]),
case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
- TxnKey) of
+ TxnKey, self()) of
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ});
{error, Errors} -> rabbit_misc:protocol_error(