diff options
author | Jerry Kuch <jerryk@vmware.com> | 2011-03-15 17:35:08 -0700 |
---|---|---|
committer | Jerry Kuch <jerryk@vmware.com> | 2011-03-15 17:35:08 -0700 |
commit | 234846f7878d13573178fbc1e71d9629c04bf640 (patch) | |
tree | 4266a720b1522f43d19a83946c33eaa4e2382e00 | |
parent | 096411d81b3b1e8bcf41fd910050b678eb5d2849 (diff) | |
download | rabbitmq-server-bug23577.tar.gz |
Fix transactions. All qpid tests run.bug23577
-rw-r--r-- | sql_scripts/setup_all.sql | 3 | ||||
-rw-r--r-- | src/mysql_helper.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mysql_queue.erl | 86 |
3 files changed, 41 insertions, 60 deletions
diff --git a/sql_scripts/setup_all.sql b/sql_scripts/setup_all.sql index 01208f5e..877fdd85 100644 --- a/sql_scripts/setup_all.sql +++ b/sql_scripts/setup_all.sql @@ -44,8 +44,7 @@ CREATE TABLE IF NOT EXISTS CREATE TABLE IF NOT EXISTS p (seq_id BIGINT UNSIGNED NOT NULL, queue_name VARCHAR(256) NOT NULL, -- (BUGBUG: max size?) - m MEDIUMBLOB, - PRIMARY KEY(seq_id)) + m MEDIUMBLOB) ENGINE=InnoDB; CREATE INDEX p_seq_id_index ON p(seq_id); CREATE INDEX p_queue_name_index ON p(queue_name); diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl index 1fb03d77..94d516de 100644 --- a/src/mysql_helper.erl +++ b/src/mysql_helper.erl @@ -89,7 +89,7 @@ prepare_mysql_statements() -> <<"INSERT INTO p(seq_id, queue_name, m) VALUES(?,?,?)">>}, {delete_p_stmt,<<"DELETE FROM p WHERE queue_name = ?">>}, {delete_p_by_seq_id_stmt, - <<"DELETE FROM p WHERE seq_id = ?">>}, + <<"DELETE FROM p WHERE seq_id = ? AND queue_name = ?">>}, {clear_p_stmt, <<"DELETE FROM p WHERE queue_name = ?">>}, {count_p_stmt, @@ -199,16 +199,16 @@ write_message_to_p(DbQueueName, SeqId, M) -> [SeqId, DbQueueName, term_to_binary(M)]), ok. -delete_message_from_p_by_seq_id(SeqId) -> +delete_message_from_p_by_seq_id(SeqId, DbQueueName) -> emysql:execute(?RABBIT_DB_POOL_NAME, delete_p_by_seq_id_stmt, - [SeqId]), + [SeqId, DbQueueName]), ok. read_p_record(DbQueueName, SeqId) -> - emysql:execute(?RABBIT_DB_POOL_NAME, - read_p_stmt, - [DbQueueName, SeqId]). + Result = emysql:execute(?RABBIT_DB_POOL_NAME, + read_p_stmt, + [DbQueueName, SeqId]). q_peek(DbQueueName) -> emysql:execute(?RABBIT_DB_POOL_NAME, diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl index fa01f7e8..44441225 100644 --- a/src/rabbit_mysql_queue.erl +++ b/src/rabbit_mysql_queue.erl @@ -453,8 +453,8 @@ ack(SeqIds, S) -> %% msg and its properties in the to_pub field of the txn, waiting to %% be committed. %% -%% tx_publish/4 creates an Mnesia transaction to run in, and therefore -%% may not be called from inside another Mnesia transaction. +%% tx_publish/4 creates a MySQL transaction to run in, and therefore +%% may not be called from inside another MySQL transaction. %% %% -spec(tx_publish/4 :: %% (rabbit_types:txn(), @@ -465,17 +465,13 @@ ack(SeqIds, S) -> tx_publish(Txn, Msg, Props, S) -> % rabbit_log:info("tx_publish(~n ~p,~n ~p,~n ~p,~n ~p) ->", [Txn, Msg, Props, S]), - {atomic, Result} = - mnesia:transaction( - fun () -> Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, S), - RS = store_tx(Txn, - Tx #tx { to_pub = [{Msg, Props} | Pubs] }, - S), - save(RS), - RS - end), + Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, S), + RS = store_tx(Txn, + Tx #tx { to_pub = [{Msg, Props} | Pubs] }, + S), + save(RS), + RS. % rabbit_log:info("tx_publish ->~n ~p", [Result]), - Result. %%---------------------------------------------------------------------------- %% tx_ack/3 acks within an AMQP transaction. It stores the seq_id in @@ -488,18 +484,14 @@ tx_publish(Txn, Msg, Props, S) -> tx_ack(Txn, SeqIds, S) -> % rabbit_log:info("tx_ack(~n ~p,~n ~p,~n ~p) ->", [Txn, SeqIds, S]), - {atomic, Result} = - mnesia:transaction( - fun () -> Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, S), - RS = store_tx(Txn, - Tx #tx { - to_ack = lists:append(SeqIds, SeqIds0) }, - S), - save(RS), - RS - end), - % rabbit_log:info("tx_ack ->~n ~p", [Result]), - Result. + Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, S), + RS = store_tx(Txn, + Tx #tx { + to_ack = lists:append(SeqIds, SeqIds0) }, + S), + save(RS), + % rabbit_log:info("tx_ack ->~n ~p", [RS]), + RS. %%---------------------------------------------------------------------------- %% tx_rollback/2 aborts an AMQP transaction. @@ -511,25 +503,17 @@ tx_ack(Txn, SeqIds, S) -> tx_rollback(Txn, S) -> % rabbit_log:info("tx_rollback(~n ~p,~n ~p) ->", [Txn, S]), - {atomic, Result} = - mnesia:transaction(fun () -> - #tx { to_ack = SeqIds } = lookup_tx(Txn, S), - RS = erase_tx(Txn, S), - save(RS), - {SeqIds, RS} - end), - % rabbit_log:info("tx_rollback ->~n ~p", [Result]), - Result. + #tx { to_ack = SeqIds } = lookup_tx(Txn, S), + RS = erase_tx(Txn, S), + save(RS), + % rabbit_log:info("tx_rollback ->~n ~p", [RS]), + {SeqIds, RS}. %%---------------------------------------------------------------------------- %% tx_commit/4 commits an AMQP transaction. The F passed in is called %% once the msgs have really been commited. This CPS permits the %% possibility of commit coalescing. %% -%% tx_commit/4 creates an Mnesia transaction to run in, and therefore -%% may not be called from inside another Mnesia transaction. However, -%% the supplied F is called outside the transaction. -%% %% -spec(tx_commit/4 :: %% (rabbit_types:txn(), %% fun (() -> any()), @@ -539,15 +523,10 @@ tx_rollback(Txn, S) -> tx_commit(Txn, F, PropsF, S) -> % rabbit_log:info("tx_commit(~n ~p,~n ~p,~n ~p,~n ~p) ->", [Txn, F, PropsF, S]), - {atomic, {Result, Pubs}} = - mnesia:transaction( - fun () -> - #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, S), - RS = - tx_commit_state(Pubs, SeqIds, PropsF, erase_tx(Txn, S)), - save(RS), - {{SeqIds, RS}, Pubs} - end), + #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, S), + RS = tx_commit_state(Pubs, SeqIds, PropsF, erase_tx(Txn, S)), + save(RS), + {Result = {SeqIds, RS}, Pubs}, F(), % rabbit_log:info("tx_commit ->~n ~p", [Result]), callback(Pubs), @@ -811,13 +790,16 @@ del_ps(F, SeqIds, S = #s { queue_name = DbQueueName }) -> lists:foldl( fun( SeqId, Si) -> DbList = mysql_helper:read_p_record(DbQueueName, SeqId), - [#p_record {m = MBin}] = - emysql_util:as_record(DbList, - p_record, - record_info(fields, - p_record)), + rabbit_log:info(">>>>> DbQueueName = ~p~n", [DbQueueName]), + rabbit_log:info(">>>>> DbList = ~p~n", [DbList]), + Temp = emysql_util:as_record(DbList, + p_record, + record_info(fields, + p_record)), + rabbit_log:info(">>>>> Temp = ~p~n", [Temp]), + [#p_record {m = MBin}] = Temp, M = binary_to_term(MBin), - mysql_helper:delete_message_from_p_by_seq_id(SeqId), + mysql_helper:delete_message_from_p_by_seq_id(SeqId, DbQueueName), F(M, Si) end, S, |