diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-14 23:42:49 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-14 23:42:49 +0000 |
commit | 3b6abf573736adc7e205ffbc5d8ab5f3c6206ac7 (patch) | |
tree | dc959c23a13d6f9e80713a15ae1d654dada1d807 | |
parent | dc873bed6f3634945a8881a88d0d52091018b33d (diff) | |
parent | 0b06bcaf2eed27870dcdf16e538c88751eee7527 (diff) | |
download | rabbitmq-server-3b6abf573736adc7e205ffbc5d8ab5f3c6206ac7.tar.gz |
Merging bug23948 to bug23942
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 7 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 13 | ||||
-rw-r--r-- | src/gm.erl | 13 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 25 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 98 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 21 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 41 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 187 |
9 files changed, 237 insertions, 184 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index accb2c0e..b2bf6bbb 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -25,11 +25,13 @@ -type(message_properties_transformer() :: fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). +-type(async_callback() :: fun ((fun ((state()) -> state())) -> 'ok')). +-type(sync_callback() :: fun ((fun ((state()) -> state())) -> 'ok' | 'error')). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> - state()). +-spec(init/5 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery(), + async_callback(), sync_callback()) -> state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). @@ -41,6 +43,7 @@ (false, rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> {undefined, state()}). +-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). -spec(dropwhile/2 :: (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index eed62729..7c3ba742 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -1147,11 +1147,14 @@ notify_age(CStates, AverageAge) -> end, CStates). notify_age0(Clients, CStates, Required) -> - Notifications = - [CState || CState <- CStates, CState#cstate.callback =/= undefined], - {L1, L2} = lists:split(random:uniform(length(Notifications)), - Notifications), - notify(Clients, Required, L2 ++ L1). + case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of + [] -> + ok; + Notifications -> + {L1, L2} = lists:split(random:uniform(length(Notifications)), + Notifications), + notify(Clients, Required, L2 ++ L1) + end. notify(_Clients, _Required, []) -> ok; @@ -931,6 +931,12 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) -> prune_or_create_group(Self, GroupName)); Alive -> Left = lists:nth(random:uniform(length(Alive)), Alive), + Handler = + fun () -> + join_group( + Self, GroupName, + record_dead_member_in_group(Left, GroupName)) + end, try case gen_server2:call( Left, {add_on_right, Self}, infinity) of @@ -940,9 +946,10 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) -> catch exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown -> - join_group( - Self, GroupName, - record_dead_member_in_group(Left, GroupName)) + Handler(); + exit:{{R, _}, _} + when R =:= nodedown; R =:= shutdown -> + Handler() end end end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3aa20821..c7391965 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -17,23 +17,24 @@ -module(rabbit_amqqueue). -export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). --export([internal_declare/2, internal_delete/1, - maybe_run_queue_via_backing_queue/2, - maybe_run_queue_via_backing_queue_async/2, - sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/4, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([emit_stats/1]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +%% internal +-export([internal_declare/2, internal_delete/1, + run_backing_queue/2, run_backing_queue_async/2, + sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, + set_maximum_since_use/2, maybe_expire/1, drop_expired/1, + emit_stats/1]). + -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -140,9 +141,9 @@ rabbit_types:connection_exit() | fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit())). --spec(maybe_run_queue_via_backing_queue/2 :: +-spec(run_backing_queue/2 :: (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(maybe_run_queue_via_backing_queue_async/2 :: +-spec(run_backing_queue_async/2 :: (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). @@ -438,11 +439,11 @@ internal_delete(QueueName) -> end end). -maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). +run_backing_queue(QPid, Fun) -> + gen_server2:call(QPid, {run_backing_queue, Fun}, infinity). -maybe_run_queue_via_backing_queue_async(QPid, Fun) -> - gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). +run_backing_queue_async(QPid, Fun) -> + gen_server2:cast(QPid, {run_backing_queue, Fun}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 54c92dc7..7c4b5190 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -149,7 +149,7 @@ declare(Recover, From, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQS = BQ:init(QName, IsDurable, Recover), + BQS = bq_init(BQ, QName, IsDurable, Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -159,6 +159,20 @@ declare(Recover, From, Q1 -> {stop, normal, {existing, Q1}, State} end. +bq_init(BQ, QName, IsDurable, Recover) -> + Self = self(), + BQ:init(QName, IsDurable, Recover, + fun (Fun) -> + rabbit_amqqueue:run_backing_queue_async(Self, Fun) + end, + fun (Fun) -> + rabbit_misc:with_exit_handler( + fun () -> error end, + fun () -> + rabbit_amqqueue:run_backing_queue(Self, Fun) + end) + end). + process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> lists:foldl(fun({Arg, Fun}, State1) -> case rabbit_misc:table_lookup(Arguments, Arg) of @@ -201,13 +215,15 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, NewState1, Timeout}. -next_state(State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - ensure_rate_timer(State), - State2 = ensure_stats_timer(State1), - case BQ:needs_idle_timeout(BQS) of - true -> {ensure_sync_timer(State2), 0}; - false -> {stop_sync_timer(State2), hibernate} +next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + State1 = ensure_stats_timer( + ensure_rate_timer( + confirm_messages(MsgIds, State#q{ + backing_queue_state = BQS1}))), + case BQ:needs_idle_timeout(BQS1) of + true -> {ensure_sync_timer(State1), 0}; + false -> {stop_sync_timer(State1), hibernate} end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> @@ -399,6 +415,8 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. +confirm_messages([], State) -> + State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> @@ -496,10 +514,9 @@ deliver_or_enqueue(Delivery, State) -> end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> - maybe_run_queue_via_backing_queue( - fun (BQS) -> - {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)} - end, State). + run_backing_queue( + fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end, + State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -602,13 +619,10 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> - maybe_run_queue_via_backing_queue( - fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State). -maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {MsgIds, BQS1} = Fun(BQS), - run_message_queue( - confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})). +run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> + run_message_queue(State#q{backing_queue_state = Fun(BQS)}). commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, State = #q{backing_queue = BQ, @@ -738,29 +752,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - {maybe_run_queue_via_backing_queue, _Fun} -> 6; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {run_backing_queue, _Fun} -> 6; + _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - delete_immediately -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _AckTags, _ChPid} -> 7; - {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; - {unblock, _ChPid} -> 7; - {maybe_run_queue_via_backing_queue, _Fun} -> 6; - sync_timeout -> 6; - _ -> 0 + update_ram_duration -> 8; + delete_immediately -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + {ack, _Txn, _AckTags, _ChPid} -> 7; + {reject, _AckTags, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + {run_backing_queue, _Fun} -> 6; + sync_timeout -> 6; + _ -> 0 end. prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, @@ -784,7 +798,7 @@ handle_call({init, Recover}, From, _ -> rabbit_log:warning( "Queue ~p exclusive owner went away~n", [QName]) end, - BQS = BQ:init(QName, IsDurable, Recover), + BQS = bq_init(BQ, QName, IsDurable, Recover), %% Rely on terminate to delete the queue. {stop, normal, State#q{backing_queue_state = BQS}} end; @@ -972,12 +986,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue_and_run(AckTags, State)) end; -handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> - reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). +handle_call({run_backing_queue, Fun}, _From, State) -> + reply(ok, run_backing_queue(Fun, State)). -handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> - noreply(maybe_run_queue_via_backing_queue(Fun, State)); +handle_cast({run_backing_queue, Fun}, State) -> + noreply(run_backing_queue(Fun, State)); handle_cast(sync_timeout, State) -> noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 03c1fdd1..a15ff846 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -33,7 +33,22 @@ behaviour_info(callbacks) -> {stop, 0}, %% Initialise the backing queue and its state. - {init, 3}, + %% + %% Takes + %% 1. the queue name + %% 2. a boolean indicating whether the queue is durable + %% 3. a boolean indicating whether the queue is an existing queue + %% that should be recovered + %% 4. an asynchronous callback which accepts a function from + %% state to state and invokes it with the current backing + %% queue state. This is useful for handling events, e.g. when + %% the backing queue does not have its own process to receive + %% such events, or when the processing of an event results in + %% a state transition the queue logic needs to know about + %% (such as messages getting confirmed). + %% 5. a synchronous callback. Same as the asynchronous callback + %% but waits for completion and returns 'error' on error. + {init, 5}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 1}, @@ -54,6 +69,10 @@ behaviour_info(callbacks) -> %% (i.e. saves the round trip through the backing queue). {publish_delivered, 4}, + %% Return ids of messages which have been confirmed since + %% the last invocation of this function (or initialisation). + {drain_confirmed, 1}, + %% Drop messages from the head of the queue while the supplied %% predicate returns true. {dropwhile, 2}, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 4f5d2411..a08bbd70 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -646,6 +646,15 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), + {ok, GCPid} = rabbit_msg_store_gc:start_link( + #gc_state { dir = Dir, + index_module = IndexModule, + index_state = IndexState, + file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, + msg_store = self() + }), + State = #msstate { dir = Dir, index_module = IndexModule, index_state = IndexState, @@ -657,7 +666,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> sum_valid_data = 0, sum_file_size = 0, pending_gc_completion = orddict:new(), - gc_pid = undefined, + gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, @@ -680,17 +689,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {ok, Offset} = file_handle_cache:position(CurHdl, Offset), ok = file_handle_cache:truncate(CurHdl), - {ok, GCPid} = rabbit_msg_store_gc:start_link( - #gc_state { dir = Dir, - index_module = IndexModule, - index_state = IndexState, - file_summary_ets = FileSummaryEts, - file_handles_ets = FileHandlesEts, - msg_store = self() - }), - - {ok, maybe_compact( - State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }), + {ok, maybe_compact(State1 #msstate { current_file_handle = CurHdl }), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -716,15 +715,15 @@ handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, - State = #msstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts, - clients = Clients, - gc_pid = GCPid }) -> + State = #msstate { dir = Dir, + index_state = IndexState, + index_module = IndexModule, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts, + clients = Clients, + gc_pid = GCPid }) -> Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c2ed3fb0..d5956c4c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2095,6 +2095,10 @@ test_queue_index() -> passed. +variable_queue_init(QName, IsDurable, Recover) -> + rabbit_variable_queue:init(QName, IsDurable, Recover, + fun nop/1, fun nop/1, fun nop/2, fun nop/1). + variable_queue_publish(IsPersistent, Count, VQ) -> lists:foldl( fun (_N, VQN) -> @@ -2125,8 +2129,7 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false, - fun nop/2, fun nop/1), + VQ = variable_queue_init(test_queue(), true, false), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -2301,8 +2304,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/2, fun nop/1), + VQ7 = variable_queue_init(test_queue(), true, true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2318,8 +2320,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/2, fun nop/1), + VQ7 = variable_queue_init(test_queue(), true, true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2350,8 +2351,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true, - fun nop/2, fun nop/1), + VQ1 = variable_queue_init(QName, true, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index be6691e9..1b29756b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,9 +16,9 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, - tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, +-export([init/5, terminate/1, delete_and_terminate/1, + purge/1, publish/3, publish_delivered/4, drain_confirmed/1, + fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, @@ -27,7 +27,7 @@ -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/5]). +-export([start_msg_store/2, stop_msg_store/0, init/7]). %%---------------------------------------------------------------------------- %% Definitions: @@ -238,6 +238,9 @@ durable, transient_threshold, + async_callback, + sync_callback, + len, persistent_count, @@ -252,6 +255,7 @@ msgs_on_disk, msg_indices_on_disk, unconfirmed, + confirmed, ack_out_counter, ack_in_counter, ack_rates @@ -332,11 +336,14 @@ {any(), binary()}}, on_sync :: sync(), durable :: boolean(), + transient_threshold :: non_neg_integer(), + + async_callback :: async_callback(), + sync_callback :: sync_callback(), len :: non_neg_integer(), persistent_count :: non_neg_integer(), - transient_threshold :: non_neg_integer(), target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), @@ -347,6 +354,7 @@ msgs_on_disk :: gb_set(), msg_indices_on_disk :: gb_set(), unconfirmed :: gb_set(), + confirmed :: gb_set(), ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). @@ -397,25 +405,26 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover) -> - Self = self(), - init(QueueName, IsDurable, Recover, +init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) -> + init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback, fun (MsgIds, ActionTaken) -> - msgs_written_to_disk(Self, MsgIds, ActionTaken) + msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) end, - fun (MsgIds) -> msg_indices_written_to_disk(Self, MsgIds) end). + fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). -init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(QueueName, IsDurable, false, AsyncCallback, SyncCallback, + MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], + init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback, case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun); + MsgOnDiskFun, AsyncCallback); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(QueueName, true, true, AsyncCallback, SyncCallback, + MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -425,9 +434,9 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun), + MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef, - undefined), + undefined, AsyncCallback), {DeltaCount, IndexState} = rabbit_queue_index:recover( QueueName, Terms1, @@ -436,7 +445,7 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, + init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, PersistentClient, TransientClient). terminate(State) -> @@ -511,9 +520,9 @@ publish(Msg, MsgProps, State) -> publish_delivered(false, #basic_message { id = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, - State = #vqstate { len = 0 }) -> + State = #vqstate { async_callback = Callback, len = 0 }) -> case NeedsConfirming of - true -> blind_confirm(self(), gb_sets:singleton(MsgId)); + true -> blind_confirm(Callback, gb_sets:singleton(MsgId)); false -> ok end, {undefined, a(State)}; @@ -542,6 +551,9 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, persistent_count = PCount1, unconfirmed = UC1 }))}. +drain_confirmed(State = #vqstate { confirmed = C }) -> + {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. + dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), a(State1). @@ -685,6 +697,8 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable, tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable, + async_callback = AsyncCallback, + sync_callback = SyncCallback, msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), @@ -693,10 +707,13 @@ tx_commit(Txn, Fun, MsgPropsFun, HasPersistentPubs = PersistentMsgIds =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of - true -> ok = msg_store_sync( - MSCState, true, PersistentMsgIds, - msg_store_callback(PersistentMsgIds, Pubs, AckTags1, - Fun, MsgPropsFun)), + true -> MsgStoreCallback = + fun () -> msg_store_callback( + PersistentMsgIds, Pubs, AckTags1, Fun, + MsgPropsFun, AsyncCallback, SyncCallback) + end, + ok = msg_store_sync(MSCState, true, PersistentMsgIds, + fun () -> spawn(MsgStoreCallback) end), State; false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, Fun, MsgPropsFun, State) @@ -929,13 +946,13 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore, MsgOnDiskFun) -> - msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun). +msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> + msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback). -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) -> +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> + CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), rabbit_msg_store:client_init( - MsgStore, Ref, MsgOnDiskFun, - msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)). + MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( @@ -968,15 +985,9 @@ msg_store_close_fds(MSCState, IsPersistent) -> fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). msg_store_close_fds_fun(IsPersistent) -> - Self = self(), - fun () -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - Self, - fun (State = #vqstate { msg_store_clients = MSCState }) -> - {ok, MSCState1} = - msg_store_close_fds(MSCState, IsPersistent), - {[], State #vqstate { msg_store_clients = MSCState1 }} - end) + fun (State = #vqstate { msg_store_clients = MSCState }) -> + {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), + State #vqstate { msg_store_clients = MSCState1 } end. maybe_write_delivered(false, _SeqId, IndexState) -> @@ -1062,7 +1073,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %%---------------------------------------------------------------------------- init(IsDurable, IndexState, DeltaCount, Terms, - PersistentClient, TransientClient) -> + AsyncCallback, SyncCallback, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), @@ -1088,6 +1099,9 @@ init(IsDurable, IndexState, DeltaCount, Terms, durable = IsDurable, transient_threshold = NextSeqId, + async_callback = AsyncCallback, + sync_callback = SyncCallback, + len = DeltaCount1, persistent_count = DeltaCount1, @@ -1102,6 +1116,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), unconfirmed = gb_sets:new(), + confirmed = gb_sets:new(), ack_out_counter = 0, ack_in_counter = 0, ack_rates = blank_rate(Now, 0) }, @@ -1114,23 +1129,19 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. -msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun) -> - Self = self(), - F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> {[], tx_commit_post_msg_store( - true, Pubs, AckTags, - Fun, MsgPropsFun, StateN)} - end) - end, - fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( - fun () -> remove_persistent_messages( - PersistentMsgIds) - end, F) - end) +msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, + AsyncCallback, SyncCallback) -> + case SyncCallback(fun (StateN) -> + tx_commit_post_msg_store(true, Pubs, AckTags, + Fun, MsgPropsFun, StateN) + end) of + ok -> ok; + error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback) end. -remove_persistent_messages(MsgIds) -> - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined), +remove_persistent_messages(MsgIds, AsyncCallback) -> + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, + undefined, AsyncCallback), ok = rabbit_msg_store:remove(MsgIds, PersistentClient), rabbit_msg_store:client_delete_and_terminate(PersistentClient). @@ -1417,12 +1428,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) -> false -> State end. -remove_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> +record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC, + confirmed = C }) -> State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet), msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet), - unconfirmed = gb_sets:difference(UC, MsgIdSet) }. + unconfirmed = gb_sets:difference(UC, MsgIdSet), + confirmed = gb_sets:union (C, MsgIdSet) }. needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, unconfirmed = UC }) -> @@ -1439,38 +1452,32 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, %% subtraction. not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). -msgs_confirmed(MsgIdSet, State) -> - {gb_sets:to_list(MsgIdSet), remove_confirms(MsgIdSet, State)}. - -blind_confirm(QPid, MsgIdSet) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State) -> msgs_confirmed(MsgIdSet, State) end). - -msgs_written_to_disk(QPid, MsgIdSet, removed) -> - blind_confirm(QPid, MsgIdSet); -msgs_written_to_disk(QPid, MsgIdSet, written) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - Written = gb_sets:intersection(UC, MsgIdSet), - msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:union(MOD, Written) }) - end). - -msg_indices_written_to_disk(QPid, MsgIdSet) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - Written = gb_sets:intersection(UC, MsgIdSet), - msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:union(MIOD, Written) }) - end). +blind_confirm(Callback, MsgIdSet) -> + Callback(fun (State) -> record_confirms(MsgIdSet, State) end). + +msgs_written_to_disk(Callback, MsgIdSet, removed) -> + blind_confirm(Callback, MsgIdSet); +msgs_written_to_disk(Callback, MsgIdSet, written) -> + Callback(fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + Confirmed = gb_sets:intersection(UC, MsgIdSet), + record_confirms(gb_sets:intersection(MsgIdSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:union(MOD, Confirmed) }) + end). + +msg_indices_written_to_disk(Callback, MsgIdSet) -> + Callback(fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + Confirmed = gb_sets:intersection(UC, MsgIdSet), + record_confirms(gb_sets:intersection(MsgIdSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:union(MIOD, Confirmed) }) + end). %%---------------------------------------------------------------------------- %% Phase changes |