From 66beecac50e410a83441807f8800ff32721b9c00 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 2 Jun 2011 15:09:00 +0100 Subject: Reduce diff from bug23554: minor tweaks to gm_soak_test (correction of error messages) --- src/gm_soak_test.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index dae42ac7..5e5a3a5a 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -80,12 +80,12 @@ handle_msg([], From, {test_msg, Num}) -> {ok, Num} -> ok; {ok, Num1} when Num < Num1 -> exit({{from, From}, - {duplicate_delivery_of, Num1}, - {expecting, Num}}); + {duplicate_delivery_of, Num}, + {expecting, Num1}}); {ok, Num1} -> exit({{from, From}, - {missing_delivery_of, Num}, - {received_early, Num1}}); + {received_early, Num}, + {expecting, Num1}}); error -> exit({{from, From}, {received_premature_delivery, Num}}) -- cgit v1.2.1 From 8dada8a4ccb33e7a36ac3c1592379ae4ac904df4 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 2 Jun 2011 15:25:42 +0100 Subject: Reduce diff from bug23554: Allow formatting of nested info items --- src/rabbit_control.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 8172f804..8e0a2a53 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -372,6 +372,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = Value) when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item([T | _] = Value) + when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse + is_list(T) -> + "[" ++ + lists:nthtail(2, lists:append( + [", " ++ format_info_item(E) || E <- Value])) ++ "]"; format_info_item(Value) -> io_lib:format("~w", [Value]). -- cgit v1.2.1 From 3b57619d1e4b2ed349390f82e68ba8ae2517c425 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 2 Jun 2011 15:32:33 +0100 Subject: Reduce diff from bug23554: Scaffolding for programmatically deciding backing queue --- src/rabbit_amqqueue_process.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8091e2c2..f7b710a4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -97,12 +97,11 @@ info_keys() -> ?INFO_KEYS. init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, BQ} = application:get_env(backing_queue_module), {ok, #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, has_had_consumers = false, - backing_queue = BQ, + backing_queue = backing_queue_module(Q), backing_queue_state = undefined, active_consumers = queue:new(), blocked_consumers = queue:new(), @@ -226,6 +225,10 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. +backing_queue_module(#amqqueue{}) -> + {ok, BQM} = application:get_env(backing_queue_module), + BQM. + ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), -- cgit v1.2.1 From 4013096a30291bd1f98b6016e018f405c7dbe0f8 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 2 Jun 2011 15:40:26 +0100 Subject: Reduce diff from bug23554: Extend backing queue api to present reason for termination --- include/rabbit_backing_queue_spec.hrl | 4 ++-- src/rabbit_amqqueue_process.erl | 12 ++++++------ src/rabbit_backing_queue.erl | 4 ++-- src/rabbit_tests.erl | 8 ++++---- src/rabbit_variable_queue.erl | 6 +++--- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 1c2b94e2..295d9039 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -32,8 +32,8 @@ -spec(stop/0 :: () -> 'ok'). -spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(), async_callback(), sync_callback()) -> state()). --spec(terminate/1 :: (state()) -> state()). --spec(delete_and_terminate/1 :: (state()) -> state()). +-spec(terminate/2 :: (any(), state()) -> state()). +-spec(delete_and_terminate/2 :: (any(), state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). -spec(publish/4 :: (rabbit_types:basic_message(), rabbit_types:message_properties(), pid(), state()) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f7b710a4..07a24af8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -114,16 +114,16 @@ init(Q) -> msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(shutdown, State = #q{backing_queue = BQ}) -> - terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); -terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> - terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); -terminate(_Reason, State = #q{backing_queue = BQ}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +terminate(Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> rabbit_event:notify( queue_deleted, [{pid, self()}]), - BQS1 = BQ:delete_and_terminate(BQS), + BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete %% doesn't return 'ok'. rabbit_amqqueue:internal_delete(qname(State)), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index addaabc5..217ad3eb 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -49,11 +49,11 @@ behaviour_info(callbacks) -> {init, 4}, %% Called on queue shutdown when queue isn't being deleted. - {terminate, 1}, + {terminate, 2}, %% Called when the queue is terminating and needs to delete all %% its content. - {delete_and_terminate, 1}, + {delete_and_terminate, 2}, %% Remove all messages in the queue, but not messages which have %% been fetched and are pending acks. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1a37cdff..3f4aa54e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) -> {delta, {delta, undefined, 0, undefined}}, {q3, 0}, {q4, 0}, {len, 0}]), - _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)), + _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), passed. test_variable_queue() -> @@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count + Count, VQ3), {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), - _VQ6 = rabbit_variable_queue:terminate(VQ5), + _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), @@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> {_Guids, VQ4} = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:timeout(VQ4), - _VQ6 = rabbit_variable_queue:terminate(VQ5), + _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2336,7 +2336,7 @@ test_queue_recover() -> VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), + _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), rabbit_amqqueue:internal_delete(QName) end), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8ac3ad43..a167cca0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,7 @@ -module(rabbit_variable_queue). --export([init/4, terminate/1, delete_and_terminate/1, +-export([init/4, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, @@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, PersistentClient, TransientClient). -terminate(State) -> +terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = @@ -473,7 +473,7 @@ terminate(State) -> %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. -delete_and_terminate(State) -> +delete_and_terminate(_Reason, State) -> %% TODO: there is no need to interact with qi at all - which we do %% as part of 'purge' and 'remove_pending_ack', other than %% deleting it. -- cgit v1.2.1