summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-03 17:27:16 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-03 17:27:16 +0000
commitf9cf13987aecfe7756db2de53bfd89149ad59aab (patch)
treef5b169b745f9397e7812688ee1f191efea689f32
parented6171b1940055da68709789cb265e0e920be173 (diff)
downloadrabbitmq-server-f9cf13987aecfe7756db2de53bfd89149ad59aab.tar.gz
Fix confirms
-rw-r--r--src/rabbit_queue_index.erl75
-rw-r--r--src/rabbit_variable_queue.erl31
-rw-r--r--test/src/rabbit_tests.erl2
3 files changed, 69 insertions, 39 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 90729e33..a78dacec 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -16,7 +16,7 @@
-module(rabbit_queue_index).
--export([erase/1, init/2, recover/5,
+-export([erase/1, init/3, recover/6,
terminate/2, delete_and_terminate/1,
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
@@ -173,10 +173,11 @@
%%----------------------------------------------------------------------------
--record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unconfirmed }).
+-record(qistate, {dir, segments, journal_handle, dirty_count,
+ max_journal_entries, on_sync, on_sync_msg,
+ unconfirmed, unconfirmed_msg}).
--record(segment, { num, path, journal_entries, unacked }).
+-record(segment, {num, path, journal_entries, unacked}).
-include("rabbit.hrl").
@@ -204,7 +205,9 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unconfirmed :: gb_sets:set()
+ on_sync_msg :: on_sync_fun(),
+ unconfirmed :: gb_sets:set(),
+ unconfirmed_msg :: gb_sets:set()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -212,9 +215,11 @@
-type(shutdown_terms() :: [term()] | 'non_clean_shutdown').
-spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok').
--spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
--spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- contains_predicate(), on_sync_fun()) ->
+-spec(init/3 :: (rabbit_amqqueue:name(),
+ on_sync_fun(), on_sync_fun()) -> qistate()).
+-spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
+ contains_predicate(),
+ on_sync_fun(), on_sync_fun()) ->
{'undefined' | non_neg_integer(),
'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
@@ -253,14 +258,17 @@ erase(Name) ->
false -> ok
end.
-init(Name, OnSyncFun) ->
+init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
- State #qistate { on_sync = OnSyncFun }.
+ State#qistate{on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun}.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
+recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
+ OnSyncFun, OnSyncMsgFun) ->
State = blank_state(Name),
- State1 = State #qistate { on_sync = OnSyncFun },
+ State1 = State #qistate{on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun},
CleanShutdown = Terms /= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
@@ -280,7 +288,8 @@ delete_and_terminate(State) ->
State1.
publish(MsgOrId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unconfirmed = Unconfirmed }) ->
+ State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
MsgId = case MsgOrId of
#basic_message{id = Id} -> Id;
Id when is_binary(Id) -> Id
@@ -288,10 +297,12 @@ publish(MsgOrId, SeqId, MsgProps, IsPersistent,
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
- case MsgProps#message_properties.needs_confirming of
- true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed),
- State #qistate { unconfirmed = Unconfirmed1 };
- false -> State
+ case {MsgProps#message_properties.needs_confirming, MsgOrId} of
+ {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
+ State#qistate{unconfirmed = UC1};
+ {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
+ State#qistate{unconfirmed_msg = UCM1};
+ {false, _} -> State
end),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
@@ -317,10 +328,12 @@ sync(State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
notify_sync(State).
-needs_sync(#qistate { journal_handle = undefined }) ->
+needs_sync(#qistate{journal_handle = undefined}) ->
false;
-needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) ->
- case gb_sets:is_empty(UC) of
+needs_sync(#qistate{journal_handle = JournalHdl,
+ unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of
true -> case file_handle_cache:needs_sync(JournalHdl) of
true -> other;
false -> false
@@ -425,7 +438,9 @@ blank_state_dir(Dir) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unconfirmed = gb_sets:new() }.
+ on_sync_msg = fun (_) -> ok end,
+ unconfirmed = gb_sets:new(),
+ unconfirmed_msg = gb_sets:new() }.
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -781,11 +796,19 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
-notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) ->
- case gb_sets:is_empty(UC) of
- true -> State;
- false -> OnSyncFun(UC),
- State #qistate { unconfirmed = gb_sets:new() }
+notify_sync(State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun}) ->
+ State1 = case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> OnSyncFun(UC),
+ State#qistate{unconfirmed = gb_sets:new()}
+ end,
+ case gb_sets:is_empty(UCM) of
+ true -> State1;
+ false -> OnSyncMsgFun(UCM),
+ State1#qistate{unconfirmed_msg = gb_sets:new()}
end.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 21c955db..d63378eb 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -28,7 +28,7 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/6]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -362,7 +362,7 @@
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
- msgs_on_disk :: gb_sets:set(), %% TODO fix confirms!
+ msgs_on_disk :: gb_sets:set(),
msg_indices_on_disk :: gb_sets:set(),
unconfirmed :: gb_sets:set(),
confirmed :: gb_sets:set(),
@@ -426,16 +426,19 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(Queue, Recover, AsyncCallback) ->
- init(Queue, Recover, AsyncCallback,
- fun (MsgIds, ActionTaken) ->
- msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
- end,
- fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
+init(Queue, Recover, Callback) ->
+ init(
+ Queue, Recover, Callback,
+ fun (MsgIds, ActionTaken) ->
+ msgs_written_to_disk(Callback, MsgIds, ActionTaken)
+ end,
+ fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
+ fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
init(#amqqueue { name = QueueName, durable = IsDurable }, new,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
- IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
+ IndexState = rabbit_queue_index:init(QueueName,
+ MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
@@ -446,7 +449,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
{PersistentClient, ContainsCheckFun} =
case IsDurable of
@@ -461,7 +464,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
- ContainsCheckFun, MsgIdxOnDiskFun),
+ ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
@@ -1479,6 +1482,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
gb_sets:union(MIOD, Confirmed) })
end).
+msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
+
%%----------------------------------------------------------------------------
%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index dcbec8f6..1fc708df 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -2422,7 +2422,7 @@ variable_queue_init(Q, Recover) ->
Q, case Recover of
true -> non_clean_shutdown;
false -> new
- end, fun nop/2, fun nop/2, fun nop/1).
+ end, fun nop/2, fun nop/2, fun nop/1, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).