summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2011-03-15 17:35:08 -0700
committerJerry Kuch <jerryk@vmware.com>2011-03-15 17:35:08 -0700
commit234846f7878d13573178fbc1e71d9629c04bf640 (patch)
tree4266a720b1522f43d19a83946c33eaa4e2382e00
parent096411d81b3b1e8bcf41fd910050b678eb5d2849 (diff)
downloadrabbitmq-server-bug23577.tar.gz
Fix transactions. All qpid tests run.bug23577
-rw-r--r--sql_scripts/setup_all.sql3
-rw-r--r--src/mysql_helper.erl12
-rw-r--r--src/rabbit_mysql_queue.erl86
3 files changed, 41 insertions, 60 deletions
diff --git a/sql_scripts/setup_all.sql b/sql_scripts/setup_all.sql
index 01208f5e..877fdd85 100644
--- a/sql_scripts/setup_all.sql
+++ b/sql_scripts/setup_all.sql
@@ -44,8 +44,7 @@ CREATE TABLE IF NOT EXISTS
CREATE TABLE IF NOT EXISTS
p (seq_id BIGINT UNSIGNED NOT NULL,
queue_name VARCHAR(256) NOT NULL, -- (BUGBUG: max size?)
- m MEDIUMBLOB,
- PRIMARY KEY(seq_id))
+ m MEDIUMBLOB)
ENGINE=InnoDB;
CREATE INDEX p_seq_id_index ON p(seq_id);
CREATE INDEX p_queue_name_index ON p(queue_name);
diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl
index 1fb03d77..94d516de 100644
--- a/src/mysql_helper.erl
+++ b/src/mysql_helper.erl
@@ -89,7 +89,7 @@ prepare_mysql_statements() ->
<<"INSERT INTO p(seq_id, queue_name, m) VALUES(?,?,?)">>},
{delete_p_stmt,<<"DELETE FROM p WHERE queue_name = ?">>},
{delete_p_by_seq_id_stmt,
- <<"DELETE FROM p WHERE seq_id = ?">>},
+ <<"DELETE FROM p WHERE seq_id = ? AND queue_name = ?">>},
{clear_p_stmt,
<<"DELETE FROM p WHERE queue_name = ?">>},
{count_p_stmt,
@@ -199,16 +199,16 @@ write_message_to_p(DbQueueName, SeqId, M) ->
[SeqId, DbQueueName, term_to_binary(M)]),
ok.
-delete_message_from_p_by_seq_id(SeqId) ->
+delete_message_from_p_by_seq_id(SeqId, DbQueueName) ->
emysql:execute(?RABBIT_DB_POOL_NAME,
delete_p_by_seq_id_stmt,
- [SeqId]),
+ [SeqId, DbQueueName]),
ok.
read_p_record(DbQueueName, SeqId) ->
- emysql:execute(?RABBIT_DB_POOL_NAME,
- read_p_stmt,
- [DbQueueName, SeqId]).
+ Result = emysql:execute(?RABBIT_DB_POOL_NAME,
+ read_p_stmt,
+ [DbQueueName, SeqId]).
q_peek(DbQueueName) ->
emysql:execute(?RABBIT_DB_POOL_NAME,
diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl
index fa01f7e8..44441225 100644
--- a/src/rabbit_mysql_queue.erl
+++ b/src/rabbit_mysql_queue.erl
@@ -453,8 +453,8 @@ ack(SeqIds, S) ->
%% msg and its properties in the to_pub field of the txn, waiting to
%% be committed.
%%
-%% tx_publish/4 creates an Mnesia transaction to run in, and therefore
-%% may not be called from inside another Mnesia transaction.
+%% tx_publish/4 creates a MySQL transaction to run in, and therefore
+%% may not be called from inside another MySQL transaction.
%%
%% -spec(tx_publish/4 ::
%% (rabbit_types:txn(),
@@ -465,17 +465,13 @@ ack(SeqIds, S) ->
tx_publish(Txn, Msg, Props, S) ->
% rabbit_log:info("tx_publish(~n ~p,~n ~p,~n ~p,~n ~p) ->", [Txn, Msg, Props, S]),
- {atomic, Result} =
- mnesia:transaction(
- fun () -> Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, S),
- RS = store_tx(Txn,
- Tx #tx { to_pub = [{Msg, Props} | Pubs] },
- S),
- save(RS),
- RS
- end),
+ Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, S),
+ RS = store_tx(Txn,
+ Tx #tx { to_pub = [{Msg, Props} | Pubs] },
+ S),
+ save(RS),
+ RS.
% rabbit_log:info("tx_publish ->~n ~p", [Result]),
- Result.
%%----------------------------------------------------------------------------
%% tx_ack/3 acks within an AMQP transaction. It stores the seq_id in
@@ -488,18 +484,14 @@ tx_publish(Txn, Msg, Props, S) ->
tx_ack(Txn, SeqIds, S) ->
% rabbit_log:info("tx_ack(~n ~p,~n ~p,~n ~p) ->", [Txn, SeqIds, S]),
- {atomic, Result} =
- mnesia:transaction(
- fun () -> Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, S),
- RS = store_tx(Txn,
- Tx #tx {
- to_ack = lists:append(SeqIds, SeqIds0) },
- S),
- save(RS),
- RS
- end),
- % rabbit_log:info("tx_ack ->~n ~p", [Result]),
- Result.
+ Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, S),
+ RS = store_tx(Txn,
+ Tx #tx {
+ to_ack = lists:append(SeqIds, SeqIds0) },
+ S),
+ save(RS),
+ % rabbit_log:info("tx_ack ->~n ~p", [RS]),
+ RS.
%%----------------------------------------------------------------------------
%% tx_rollback/2 aborts an AMQP transaction.
@@ -511,25 +503,17 @@ tx_ack(Txn, SeqIds, S) ->
tx_rollback(Txn, S) ->
% rabbit_log:info("tx_rollback(~n ~p,~n ~p) ->", [Txn, S]),
- {atomic, Result} =
- mnesia:transaction(fun () ->
- #tx { to_ack = SeqIds } = lookup_tx(Txn, S),
- RS = erase_tx(Txn, S),
- save(RS),
- {SeqIds, RS}
- end),
- % rabbit_log:info("tx_rollback ->~n ~p", [Result]),
- Result.
+ #tx { to_ack = SeqIds } = lookup_tx(Txn, S),
+ RS = erase_tx(Txn, S),
+ save(RS),
+ % rabbit_log:info("tx_rollback ->~n ~p", [RS]),
+ {SeqIds, RS}.
%%----------------------------------------------------------------------------
%% tx_commit/4 commits an AMQP transaction. The F passed in is called
%% once the msgs have really been commited. This CPS permits the
%% possibility of commit coalescing.
%%
-%% tx_commit/4 creates an Mnesia transaction to run in, and therefore
-%% may not be called from inside another Mnesia transaction. However,
-%% the supplied F is called outside the transaction.
-%%
%% -spec(tx_commit/4 ::
%% (rabbit_types:txn(),
%% fun (() -> any()),
@@ -539,15 +523,10 @@ tx_rollback(Txn, S) ->
tx_commit(Txn, F, PropsF, S) ->
% rabbit_log:info("tx_commit(~n ~p,~n ~p,~n ~p,~n ~p) ->", [Txn, F, PropsF, S]),
- {atomic, {Result, Pubs}} =
- mnesia:transaction(
- fun () ->
- #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, S),
- RS =
- tx_commit_state(Pubs, SeqIds, PropsF, erase_tx(Txn, S)),
- save(RS),
- {{SeqIds, RS}, Pubs}
- end),
+ #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, S),
+ RS = tx_commit_state(Pubs, SeqIds, PropsF, erase_tx(Txn, S)),
+ save(RS),
+ {Result = {SeqIds, RS}, Pubs},
F(),
% rabbit_log:info("tx_commit ->~n ~p", [Result]),
callback(Pubs),
@@ -811,13 +790,16 @@ del_ps(F, SeqIds, S = #s { queue_name = DbQueueName }) ->
lists:foldl(
fun( SeqId, Si) ->
DbList = mysql_helper:read_p_record(DbQueueName, SeqId),
- [#p_record {m = MBin}] =
- emysql_util:as_record(DbList,
- p_record,
- record_info(fields,
- p_record)),
+ rabbit_log:info(">>>>> DbQueueName = ~p~n", [DbQueueName]),
+ rabbit_log:info(">>>>> DbList = ~p~n", [DbList]),
+ Temp = emysql_util:as_record(DbList,
+ p_record,
+ record_info(fields,
+ p_record)),
+ rabbit_log:info(">>>>> Temp = ~p~n", [Temp]),
+ [#p_record {m = MBin}] = Temp,
M = binary_to_term(MBin),
- mysql_helper:delete_message_from_p_by_seq_id(SeqId),
+ mysql_helper:delete_message_from_p_by_seq_id(SeqId, DbQueueName),
F(M, Si)
end,
S,