summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-14 23:42:49 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-14 23:42:49 +0000
commit3b6abf573736adc7e205ffbc5d8ab5f3c6206ac7 (patch)
treedc959c23a13d6f9e80713a15ae1d654dada1d807
parentdc873bed6f3634945a8881a88d0d52091018b33d (diff)
parent0b06bcaf2eed27870dcdf16e538c88751eee7527 (diff)
downloadrabbitmq-server-3b6abf573736adc7e205ffbc5d8ab5f3c6206ac7.tar.gz
Merging bug23948 to bug23942
-rw-r--r--include/rabbit_backing_queue_spec.hrl7
-rw-r--r--src/file_handle_cache.erl13
-rw-r--r--src/gm.erl13
-rw-r--r--src/rabbit_amqqueue.erl25
-rw-r--r--src/rabbit_amqqueue_process.erl98
-rw-r--r--src/rabbit_backing_queue.erl21
-rw-r--r--src/rabbit_msg_store.erl41
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl187
9 files changed, 237 insertions, 184 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index accb2c0e..b2bf6bbb 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -25,11 +25,13 @@
-type(message_properties_transformer() ::
fun ((rabbit_types:message_properties())
-> rabbit_types:message_properties())).
+-type(async_callback() :: fun ((fun ((state()) -> state())) -> 'ok')).
+-type(sync_callback() :: fun ((fun ((state()) -> state())) -> 'ok' | 'error')).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) ->
- state()).
+-spec(init/5 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery(),
+ async_callback(), sync_callback()) -> state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
@@ -41,6 +43,7 @@
(false, rabbit_types:basic_message(),
rabbit_types:message_properties(), state())
-> {undefined, state()}).
+-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
-spec(dropwhile/2 ::
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index eed62729..7c3ba742 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -1147,11 +1147,14 @@ notify_age(CStates, AverageAge) ->
end, CStates).
notify_age0(Clients, CStates, Required) ->
- Notifications =
- [CState || CState <- CStates, CState#cstate.callback =/= undefined],
- {L1, L2} = lists:split(random:uniform(length(Notifications)),
- Notifications),
- notify(Clients, Required, L2 ++ L1).
+ case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of
+ [] ->
+ ok;
+ Notifications ->
+ {L1, L2} = lists:split(random:uniform(length(Notifications)),
+ Notifications),
+ notify(Clients, Required, L2 ++ L1)
+ end.
notify(_Clients, _Required, []) ->
ok;
diff --git a/src/gm.erl b/src/gm.erl
index fd8d9b77..8cf22581 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -931,6 +931,12 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
prune_or_create_group(Self, GroupName));
Alive ->
Left = lists:nth(random:uniform(length(Alive)), Alive),
+ Handler =
+ fun () ->
+ join_group(
+ Self, GroupName,
+ record_dead_member_in_group(Left, GroupName))
+ end,
try
case gen_server2:call(
Left, {add_on_right, Self}, infinity) of
@@ -940,9 +946,10 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
catch
exit:{R, _}
when R =:= noproc; R =:= normal; R =:= shutdown ->
- join_group(
- Self, GroupName,
- record_dead_member_in_group(Left, GroupName))
+ Handler();
+ exit:{{R, _}, _}
+ when R =:= nodedown; R =:= shutdown ->
+ Handler()
end
end
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3aa20821..c7391965 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -17,23 +17,24 @@
-module(rabbit_amqqueue).
-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
--export([internal_declare/2, internal_delete/1,
- maybe_run_queue_via_backing_queue/2,
- maybe_run_queue_via_backing_queue_async/2,
- sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/4, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([emit_stats/1]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+%% internal
+-export([internal_declare/2, internal_delete/1,
+ run_backing_queue/2, run_backing_queue_async/2,
+ sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
+ set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
+ emit_stats/1]).
+
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -140,9 +141,9 @@
rabbit_types:connection_exit() |
fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit())).
--spec(maybe_run_queue_via_backing_queue/2 ::
+-spec(run_backing_queue/2 ::
(pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(maybe_run_queue_via_backing_queue_async/2 ::
+-spec(run_backing_queue_async/2 ::
(pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
@@ -438,11 +439,11 @@ internal_delete(QueueName) ->
end
end).
-maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
+run_backing_queue(QPid, Fun) ->
+ gen_server2:call(QPid, {run_backing_queue, Fun}, infinity).
-maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
- gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+run_backing_queue_async(QPid, Fun) ->
+ gen_server2:cast(QPid, {run_backing_queue, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 54c92dc7..7c4b5190 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -149,7 +149,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = bq_init(BQ, QName, IsDurable, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -159,6 +159,20 @@ declare(Recover, From,
Q1 -> {stop, normal, {existing, Q1}, State}
end.
+bq_init(BQ, QName, IsDurable, Recover) ->
+ Self = self(),
+ BQ:init(QName, IsDurable, Recover,
+ fun (Fun) ->
+ rabbit_amqqueue:run_backing_queue_async(Self, Fun)
+ end,
+ fun (Fun) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> error end,
+ fun () ->
+ rabbit_amqqueue:run_backing_queue(Self, Fun)
+ end)
+ end).
+
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
lists:foldl(fun({Arg, Fun}, State1) ->
case rabbit_misc:table_lookup(Arguments, Arg) of
@@ -201,13 +215,15 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, NewState1, Timeout}.
-next_state(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- ensure_rate_timer(State),
- State2 = ensure_stats_timer(State1),
- case BQ:needs_idle_timeout(BQS) of
- true -> {ensure_sync_timer(State2), 0};
- false -> {stop_sync_timer(State2), hibernate}
+next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
+ State1 = ensure_stats_timer(
+ ensure_rate_timer(
+ confirm_messages(MsgIds, State#q{
+ backing_queue_state = BQS1}))),
+ case BQ:needs_idle_timeout(BQS1) of
+ true -> {ensure_sync_timer(State1), 0};
+ false -> {stop_sync_timer(State1), hibernate}
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
@@ -399,6 +415,8 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
+confirm_messages([], State) ->
+ State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
{CMs, MTC1} = lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
@@ -496,10 +514,9 @@ deliver_or_enqueue(Delivery, State) ->
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
- maybe_run_queue_via_backing_queue(
- fun (BQS) ->
- {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)}
- end, State).
+ run_backing_queue(
+ fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end,
+ State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -602,13 +619,10 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
- maybe_run_queue_via_backing_queue(
- fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+ run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State).
-maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {MsgIds, BQS1} = Fun(BQS),
- run_message_queue(
- confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})).
+run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
+ run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
State = #q{backing_queue = BQ,
@@ -738,29 +752,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {run_backing_queue, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _AckTags, _ChPid} -> 7;
- {reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- sync_timeout -> 6;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {reject, _AckTags, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {run_backing_queue, _Fun} -> 6;
+ sync_timeout -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -784,7 +798,7 @@ handle_call({init, Recover}, From,
_ -> rabbit_log:warning(
"Queue ~p exclusive owner went away~n", [QName])
end,
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = bq_init(BQ, QName, IsDurable, Recover),
%% Rely on terminate to delete the queue.
{stop, normal, State#q{backing_queue_state = BQS}}
end;
@@ -972,12 +986,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
- reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+handle_call({run_backing_queue, Fun}, _From, State) ->
+ reply(ok, run_backing_queue(Fun, State)).
-handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
- noreply(maybe_run_queue_via_backing_queue(Fun, State));
+handle_cast({run_backing_queue, Fun}, State) ->
+ noreply(run_backing_queue(Fun, State));
handle_cast(sync_timeout, State) ->
noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 03c1fdd1..a15ff846 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -33,7 +33,22 @@ behaviour_info(callbacks) ->
{stop, 0},
%% Initialise the backing queue and its state.
- {init, 3},
+ %%
+ %% Takes
+ %% 1. the queue name
+ %% 2. a boolean indicating whether the queue is durable
+ %% 3. a boolean indicating whether the queue is an existing queue
+ %% that should be recovered
+ %% 4. an asynchronous callback which accepts a function from
+ %% state to state and invokes it with the current backing
+ %% queue state. This is useful for handling events, e.g. when
+ %% the backing queue does not have its own process to receive
+ %% such events, or when the processing of an event results in
+ %% a state transition the queue logic needs to know about
+ %% (such as messages getting confirmed).
+ %% 5. a synchronous callback. Same as the asynchronous callback
+ %% but waits for completion and returns 'error' on error.
+ {init, 5},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 1},
@@ -54,6 +69,10 @@ behaviour_info(callbacks) ->
%% (i.e. saves the round trip through the backing queue).
{publish_delivered, 4},
+ %% Return ids of messages which have been confirmed since
+ %% the last invocation of this function (or initialisation).
+ {drain_confirmed, 1},
+
%% Drop messages from the head of the queue while the supplied
%% predicate returns true.
{dropwhile, 2},
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 4f5d2411..a08bbd70 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -646,6 +646,15 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
+ {ok, GCPid} = rabbit_msg_store_gc:start_link(
+ #gc_state { dir = Dir,
+ index_module = IndexModule,
+ index_state = IndexState,
+ file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
+ msg_store = self()
+ }),
+
State = #msstate { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
@@ -657,7 +666,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
sum_valid_data = 0,
sum_file_size = 0,
pending_gc_completion = orddict:new(),
- gc_pid = undefined,
+ gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
@@ -680,17 +689,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
{ok, Offset} = file_handle_cache:position(CurHdl, Offset),
ok = file_handle_cache:truncate(CurHdl),
- {ok, GCPid} = rabbit_msg_store_gc:start_link(
- #gc_state { dir = Dir,
- index_module = IndexModule,
- index_state = IndexState,
- file_summary_ets = FileSummaryEts,
- file_handles_ets = FileHandlesEts,
- msg_store = self()
- }),
-
- {ok, maybe_compact(
- State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }),
+ {ok, maybe_compact(State1 #msstate { current_file_handle = CurHdl }),
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -716,15 +715,15 @@ handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
- State = #msstate { dir = Dir,
- index_state = IndexState,
- index_module = IndexModule,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts,
- clients = Clients,
- gc_pid = GCPid }) ->
+ State = #msstate { dir = Dir,
+ index_state = IndexState,
+ index_module = IndexModule,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts,
+ clients = Clients,
+ gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c2ed3fb0..d5956c4c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2095,6 +2095,10 @@ test_queue_index() ->
passed.
+variable_queue_init(QName, IsDurable, Recover) ->
+ rabbit_variable_queue:init(QName, IsDurable, Recover,
+ fun nop/1, fun nop/1, fun nop/2, fun nop/1).
+
variable_queue_publish(IsPersistent, Count, VQ) ->
lists:foldl(
fun (_N, VQN) ->
@@ -2125,8 +2129,7 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false,
- fun nop/2, fun nop/1),
+ VQ = variable_queue_init(test_queue(), true, false),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -2301,8 +2304,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/2, fun nop/1),
+ VQ7 = variable_queue_init(test_queue(), true, true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2318,8 +2320,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/2, fun nop/1),
+ VQ7 = variable_queue_init(test_queue(), true, true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2350,8 +2351,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true,
- fun nop/2, fun nop/1),
+ VQ1 = variable_queue_init(QName, true, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index be6691e9..1b29756b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,9 +16,9 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
- tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+-export([init/5, terminate/1, delete_and_terminate/1,
+ purge/1, publish/3, publish_delivered/4, drain_confirmed/1,
+ fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
@@ -27,7 +27,7 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/7]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -238,6 +238,9 @@
durable,
transient_threshold,
+ async_callback,
+ sync_callback,
+
len,
persistent_count,
@@ -252,6 +255,7 @@
msgs_on_disk,
msg_indices_on_disk,
unconfirmed,
+ confirmed,
ack_out_counter,
ack_in_counter,
ack_rates
@@ -332,11 +336,14 @@
{any(), binary()}},
on_sync :: sync(),
durable :: boolean(),
+ transient_threshold :: non_neg_integer(),
+
+ async_callback :: async_callback(),
+ sync_callback :: sync_callback(),
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
- transient_threshold :: non_neg_integer(),
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
@@ -347,6 +354,7 @@
msgs_on_disk :: gb_set(),
msg_indices_on_disk :: gb_set(),
unconfirmed :: gb_set(),
+ confirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
@@ -397,25 +405,26 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover) ->
- Self = self(),
- init(QueueName, IsDurable, Recover,
+init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) ->
+ init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback,
fun (MsgIds, ActionTaken) ->
- msgs_written_to_disk(Self, MsgIds, ActionTaken)
+ msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
end,
- fun (MsgIds) -> msg_indices_written_to_disk(Self, MsgIds) end).
+ fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
-init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(QueueName, IsDurable, false, AsyncCallback, SyncCallback,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [],
+ init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback,
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun);
+ MsgOnDiskFun, AsyncCallback);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(QueueName, true, true, AsyncCallback, SyncCallback,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -425,9 +434,9 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun),
+ MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef,
- undefined),
+ undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
QueueName, Terms1,
@@ -436,7 +445,7 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1,
+ init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
terminate(State) ->
@@ -511,9 +520,9 @@ publish(Msg, MsgProps, State) ->
publish_delivered(false, #basic_message { id = MsgId },
#message_properties { needs_confirming = NeedsConfirming },
- State = #vqstate { len = 0 }) ->
+ State = #vqstate { async_callback = Callback, len = 0 }) ->
case NeedsConfirming of
- true -> blind_confirm(self(), gb_sets:singleton(MsgId));
+ true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
false -> ok
end,
{undefined, a(State)};
@@ -542,6 +551,9 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
persistent_count = PCount1,
unconfirmed = UC1 }))}.
+drain_confirmed(State = #vqstate { confirmed = C }) ->
+ {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
+
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
a(State1).
@@ -685,6 +697,8 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable,
tx_commit(Txn, Fun, MsgPropsFun,
State = #vqstate { durable = IsDurable,
+ async_callback = AsyncCallback,
+ sync_callback = SyncCallback,
msg_store_clients = MSCState }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
@@ -693,10 +707,13 @@ tx_commit(Txn, Fun, MsgPropsFun,
HasPersistentPubs = PersistentMsgIds =/= [],
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
- true -> ok = msg_store_sync(
- MSCState, true, PersistentMsgIds,
- msg_store_callback(PersistentMsgIds, Pubs, AckTags1,
- Fun, MsgPropsFun)),
+ true -> MsgStoreCallback =
+ fun () -> msg_store_callback(
+ PersistentMsgIds, Pubs, AckTags1, Fun,
+ MsgPropsFun, AsyncCallback, SyncCallback)
+ end,
+ ok = msg_store_sync(MSCState, true, PersistentMsgIds,
+ fun () -> spawn(MsgStoreCallback) end),
State;
false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
Fun, MsgPropsFun, State)
@@ -929,13 +946,13 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore, MsgOnDiskFun) ->
- msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun).
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+ msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) ->
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
rabbit_msg_store:client_init(
- MsgStore, Ref, MsgOnDiskFun,
- msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)).
+ MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
@@ -968,15 +985,9 @@ msg_store_close_fds(MSCState, IsPersistent) ->
fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
msg_store_close_fds_fun(IsPersistent) ->
- Self = self(),
- fun () ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- Self,
- fun (State = #vqstate { msg_store_clients = MSCState }) ->
- {ok, MSCState1} =
- msg_store_close_fds(MSCState, IsPersistent),
- {[], State #vqstate { msg_store_clients = MSCState1 }}
- end)
+ fun (State = #vqstate { msg_store_clients = MSCState }) ->
+ {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
+ State #vqstate { msg_store_clients = MSCState1 }
end.
maybe_write_delivered(false, _SeqId, IndexState) ->
@@ -1062,7 +1073,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, Terms,
- PersistentClient, TransientClient) ->
+ AsyncCallback, SyncCallback, PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
@@ -1088,6 +1099,9 @@ init(IsDurable, IndexState, DeltaCount, Terms,
durable = IsDurable,
transient_threshold = NextSeqId,
+ async_callback = AsyncCallback,
+ sync_callback = SyncCallback,
+
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1102,6 +1116,7 @@ init(IsDurable, IndexState, DeltaCount, Terms,
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
+ confirmed = gb_sets:new(),
ack_out_counter = 0,
ack_in_counter = 0,
ack_rates = blank_rate(Now, 0) },
@@ -1114,23 +1129,19 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun) ->
- Self = self(),
- F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> {[], tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)}
- end)
- end,
- fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
- fun () -> remove_persistent_messages(
- PersistentMsgIds)
- end, F)
- end)
+msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun,
+ AsyncCallback, SyncCallback) ->
+ case SyncCallback(fun (StateN) ->
+ tx_commit_post_msg_store(true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)
+ end) of
+ ok -> ok;
+ error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback)
end.
-remove_persistent_messages(MsgIds) ->
- PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined),
+remove_persistent_messages(MsgIds, AsyncCallback) ->
+ PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE,
+ undefined, AsyncCallback),
ok = rabbit_msg_store:remove(MsgIds, PersistentClient),
rabbit_msg_store:client_delete_and_terminate(PersistentClient).
@@ -1417,12 +1428,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
false -> State
end.
-remove_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC,
+ confirmed = C }) ->
State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet),
msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet),
- unconfirmed = gb_sets:difference(UC, MsgIdSet) }.
+ unconfirmed = gb_sets:difference(UC, MsgIdSet),
+ confirmed = gb_sets:union (C, MsgIdSet) }.
needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1439,38 +1452,32 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-msgs_confirmed(MsgIdSet, State) ->
- {gb_sets:to_list(MsgIdSet), remove_confirms(MsgIdSet, State)}.
-
-blind_confirm(QPid, MsgIdSet) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State) -> msgs_confirmed(MsgIdSet, State) end).
-
-msgs_written_to_disk(QPid, MsgIdSet, removed) ->
- blind_confirm(QPid, MsgIdSet);
-msgs_written_to_disk(QPid, MsgIdSet, written) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- Written = gb_sets:intersection(UC, MsgIdSet),
- msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:union(MOD, Written) })
- end).
-
-msg_indices_written_to_disk(QPid, MsgIdSet) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- Written = gb_sets:intersection(UC, MsgIdSet),
- msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:union(MIOD, Written) })
- end).
+blind_confirm(Callback, MsgIdSet) ->
+ Callback(fun (State) -> record_confirms(MsgIdSet, State) end).
+
+msgs_written_to_disk(Callback, MsgIdSet, removed) ->
+ blind_confirm(Callback, MsgIdSet);
+msgs_written_to_disk(Callback, MsgIdSet, written) ->
+ Callback(fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:union(MOD, Confirmed) })
+ end).
+
+msg_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:union(MIOD, Confirmed) })
+ end).
%%----------------------------------------------------------------------------
%% Phase changes