From 096411d81b3b1e8bcf41fd910050b678eb5d2849 Mon Sep 17 00:00:00 2001 From: Jerry Kuch Date: Tue, 15 Mar 2011 14:20:12 -0700 Subject: Fix typing blunders to pass more qpid tests. --- src/mysql_helper.erl | 8 ++++---- src/rabbit_mysql_queue.erl | 21 ++++++++++++++------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl index c3a36231..1fb03d77 100644 --- a/src/mysql_helper.erl +++ b/src/mysql_helper.erl @@ -185,18 +185,18 @@ count_rows_for_queue(TableType, DbQueueName) -> {result_packet, _,_,[[Val]],_} = QueryResult, Val. -write_message_to_q(DbQueueName, Msg, IsPersistent) -> +write_message_to_q(DbQueueName, M, IsPersistent) -> emysql:execute(?RABBIT_DB_POOL_NAME, write_msg_to_q_stmt, - [DbQueueName, term_to_binary(Msg), IsPersistent]), + [DbQueueName, term_to_binary(M), IsPersistent]), ok. %% BUGBUG: Since the q table shadows is_persistent for convenience, will %% the pending acks table need to as well? -write_message_to_p(DbQueueName, SeqId, Msg) -> +write_message_to_p(DbQueueName, SeqId, M) -> emysql:execute(?RABBIT_DB_POOL_NAME, insert_p_stmt, - [SeqId, DbQueueName, term_to_binary(Msg)]), + [SeqId, DbQueueName, term_to_binary(M)]), ok. delete_message_from_p_by_seq_id(SeqId) -> diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl index b12ed520..fa01f7e8 100644 --- a/src/rabbit_mysql_queue.erl +++ b/src/rabbit_mysql_queue.erl @@ -280,7 +280,8 @@ delete_and_terminate(S = #s { queue_name = DbQueueName }) -> mysql_helper:clear_table(q, DbQueueName), mysql_helper:clear_table(p, DbQueueName), mysql_helper:commit_mysql_transaction(), - rabbit_log:info("delete_and_terminate ->~n ~p", [S]). + rabbit_log:info("delete_and_terminate ->~n ~p", [S]), + S. %%---------------------------------------------------------------------------- @@ -356,7 +357,11 @@ publish_delivered(true, rabbit_log:info("publish_delivered(true,~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]), mysql_helper:begin_mysql_transaction(), - mysql_helper:write_message_to_p(DbQueueName, SeqId, Msg), + mysql_helper:write_message_to_p(DbQueueName, + SeqId, + (m(Msg, + SeqId, + Props))#m{is_delivered = true}), RS = S #s { next_seq_id = SeqId + 1 }, save(RS), mysql_helper:commit_mysql_transaction(), @@ -573,7 +578,7 @@ requeue(SeqIds, PropsF, S) -> S), save(RS), mysql_helper:commit_mysql_transaction(), - % rabbit_log:info("requeue ->~n ~p", [Result]), + % rabbit_log:info("requeue ->~n ~p", [RS]), callback([]), RS. @@ -806,10 +811,12 @@ del_ps(F, SeqIds, S = #s { queue_name = DbQueueName }) -> lists:foldl( fun( SeqId, Si) -> DbList = mysql_helper:read_p_record(DbQueueName, SeqId), - [#p_record {m = M}] = emysql_util:as_record(DbList, - p_record, - record_info(fields, - p_record)), + [#p_record {m = MBin}] = + emysql_util:as_record(DbList, + p_record, + record_info(fields, + p_record)), + M = binary_to_term(MBin), mysql_helper:delete_message_from_p_by_seq_id(SeqId), F(M, Si) end, -- cgit v1.2.1