summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-06-03 15:27:59 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-06-03 15:27:59 +0100
commit4bfbabac01f9f790d91f241e5db6fcdc03fbe8e7 (patch)
tree66b2631953ed9a878216cc502682073173d4da6e
parent9c31f2a89f7e6ae55030892ea39fba85647db019 (diff)
parent4013096a30291bd1f98b6016e018f405c7dbe0f8 (diff)
downloadrabbitmq-server-4bfbabac01f9f790d91f241e5db6fcdc03fbe8e7.tar.gz
merge bug22953 into default
some minor tweaks and bug fixes to the broker needed by rabbitmq-federation: - make rabbit_binding:list_for_* usable in a transaction context - add rabbit_binding:peek_serial/1 to retrieve the exchange event serial w/o incrementing it - correct exchange type 'create' callback invocation in recover
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/gm_soak_test.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl19
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl6
7 files changed, 32 insertions, 23 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1c2b94e2..295d9039 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -32,8 +32,8 @@
-spec(stop/0 :: () -> 'ok').
-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
async_callback(), sync_callback()) -> state()).
--spec(terminate/1 :: (state()) -> state()).
--spec(delete_and_terminate/1 :: (state()) -> state()).
+-spec(terminate/2 :: (any(), state()) -> state()).
+-spec(delete_and_terminate/2 :: (any(), state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/4 :: (rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state()) ->
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index dae42ac7..5e5a3a5a 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -80,12 +80,12 @@ handle_msg([], From, {test_msg, Num}) ->
{ok, Num} -> ok;
{ok, Num1} when Num < Num1 ->
exit({{from, From},
- {duplicate_delivery_of, Num1},
- {expecting, Num}});
+ {duplicate_delivery_of, Num},
+ {expecting, Num1}});
{ok, Num1} ->
exit({{from, From},
- {missing_delivery_of, Num},
- {received_early, Num1}});
+ {received_early, Num},
+ {expecting, Num1}});
error ->
exit({{from, From},
{received_premature_delivery, Num}})
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8091e2c2..07a24af8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -97,12 +97,11 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
+ backing_queue = backing_queue_module(Q),
backing_queue_state = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -115,16 +114,16 @@ init(Q) ->
msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(shutdown, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate(_Reason, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate(Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
rabbit_event:notify(
queue_deleted, [{pid, self()}]),
- BQS1 = BQ:delete_and_terminate(BQS),
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
rabbit_amqqueue:internal_delete(qname(State)),
@@ -226,6 +225,10 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
+backing_queue_module(#amqqueue{}) ->
+ {ok, BQM} = application:get_env(backing_queue_module),
+ BQM.
+
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index addaabc5..217ad3eb 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -49,11 +49,11 @@ behaviour_info(callbacks) ->
{init, 4},
%% Called on queue shutdown when queue isn't being deleted.
- {terminate, 1},
+ {terminate, 2},
%% Called when the queue is terminating and needs to delete all
%% its content.
- {delete_and_terminate, 1},
+ {delete_and_terminate, 2},
%% Remove all messages in the queue, but not messages which have
%% been fetched and are pending acks.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 8172f804..8e0a2a53 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -372,6 +372,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
Value) when is_binary(TableEntryKey) andalso
is_atom(TableEntryType) ->
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item([T | _] = Value)
+ when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse
+ is_list(T) ->
+ "[" ++
+ lists:nthtail(2, lists:append(
+ [", " ++ format_info_item(E) || E <- Value])) ++ "]";
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1a37cdff..3f4aa54e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) ->
{delta, {delta, undefined, 0, undefined}},
{q3, 0}, {q4, 0},
{len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
+ _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
test_variable_queue() ->
@@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count + Count, VQ3),
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{_Guids, VQ4} =
rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2336,7 +2336,7 @@ test_queue_recover() ->
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
- _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
+ _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8ac3ad43..a167cca0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/4, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
@@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
-terminate(State) ->
+terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
@@ -473,7 +473,7 @@ terminate(State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
-delete_and_terminate(State) ->
+delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
%% as part of 'purge' and 'remove_pending_ack', other than
%% deleting it.