summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-06 10:13:22 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-06 10:13:22 +0100
commitef6335a97b6a3555a78ee0220e54e58ed270839f (patch)
tree7c885196abb5540a75b5edce3669d0c7cf9d0b98
parent37f21f92299df57edf05c8c2291538cc69e07ec9 (diff)
parent5e62212375a4292f311f045f3287c6cf7f3dab9c (diff)
downloadrabbitmq-server-ef6335a97b6a3555a78ee0220e54e58ed270839f.tar.gz
merge default into bug20284
-rw-r--r--include/rabbit.hrl3
-rw-r--r--include/rabbit_backing_queue_spec.hrl11
-rw-r--r--src/rabbit_amqqueue.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl148
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_basic.erl14
-rw-r--r--src/rabbit_channel.erl225
-rw-r--r--src/rabbit_invariable_queue.erl12
-rw-r--r--src/rabbit_msg_store.erl174
-rw-r--r--src/rabbit_queue_index.erl43
-rw-r--r--src/rabbit_router.erl22
-rw-r--r--src/rabbit_tests.erl35
-rw-r--r--src/rabbit_variable_queue.erl156
13 files changed, 632 insertions, 236 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 24aa8d98..5770cb95 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -69,7 +69,8 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, immediate, txn, sender, message}).
+-record(delivery, {mandatory, immediate, txn, sender, message,
+ origin, msg_seq_no}).
-record(amqp_error, {name, explanation, method = none}).
-record(event, {type, props, timestamp}).
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 38c6f939..9f3a0227 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -37,6 +37,7 @@
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(ack_required() :: boolean()).
+-type(confirm_required() :: boolean()).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
@@ -45,12 +46,12 @@
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()).
--spec(publish_delivered/3 ::
- (ack_required(), rabbit_types:basic_message(), state()) ->
- {ack(), state()}).
+-spec(publish/3 :: (rabbit_types:basic_message(),
+ confirm_required(), state()) -> state()).
+-spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(),
+ confirm_required(), state()) -> {ack(), state()}).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
--spec(ack/2 :: ([ack()], state()) -> state()).
+-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
-spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(),
state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 42bddc5e..e1dc73e1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -34,6 +34,7 @@
-export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
+ maybe_run_queue_via_backing_queue_async/2,
update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1]).
-export([pseudo_queue/2]).
@@ -159,6 +160,8 @@
rabbit_types:connection_exit()).
-spec(maybe_run_queue_via_backing_queue/2 ::
(pid(), (fun ((A) -> A))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue_async/2 ::
+ (pid(), (fun ((A) -> A))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -370,16 +373,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
-deliver(QPid, #delivery{immediate = true,
- txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid},
- infinity);
-deliver(QPid, #delivery{mandatory = true,
- txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity),
+deliver(QPid, Delivery = #delivery{immediate = true}) ->
+ gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
+deliver(QPid, Delivery = #delivery{mandatory = true}) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity),
true;
-deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
+deliver(QPid, Delivery) ->
+ gen_server2:cast(QPid, {deliver, Delivery}),
true.
requeue(QPid, MsgIds, ChPid) ->
@@ -460,6 +460,9 @@ internal_delete(QueueName) ->
maybe_run_queue_via_backing_queue(QPid, Fun) ->
gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
+maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
+ gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+
update_ram_duration(QPid) ->
gen_server2:cast(QPid, update_ram_duration).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 61204deb..453f342e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -61,7 +61,8 @@
sync_timer_ref,
rate_timer_ref,
expiry_timer_ref,
- stats_timer
+ stats_timer,
+ guid_to_channel
}).
-record(consumer, {tag, ack_required}).
@@ -123,7 +124,8 @@ init(Q) ->
sync_timer_ref = undefined,
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
- stats_timer = rabbit_event:init_stats_timer()}, hibernate,
+ stats_timer = rabbit_event:init_stats_timer(),
+ guid_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -342,11 +344,13 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- ChAckTags1 = case AckRequired of
- true -> sets:add_element(
- AckTag, ChAckTags);
- false -> ChAckTags
- end,
+ {State2, ChAckTags1} =
+ case AckRequired of
+ true -> {State1,
+ sets:add_element(AckTag, ChAckTags)};
+ false -> {confirm_message(Message, State1),
+ ChAckTags}
+ end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
store_ch_record(NewC),
@@ -362,10 +366,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State2 = State1#q{
+ State3 = State2#q{
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State2);
+ deliver_msgs_to_consumers(Funs, FunAcc1, State3);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
@@ -394,7 +398,34 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag, Remaining}, BQS1} =
BQ:fetch(AckRequired, BQS),
{{Message, IsDelivered, AckTag}, 0 == Remaining,
- State #q { backing_queue_state = BQS1 }}.
+ State#q{backing_queue_state = BQS1}}.
+
+confirm_messages(Guids, State) ->
+ lists:foldl(fun confirm_message_by_guid/2, State, Guids).
+
+confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
+ case dict:find(Guid, GTC) of
+ {ok, {_ , undefined}} -> ok;
+ {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
+ _ -> ok
+ end,
+ State#q{guid_to_channel = dict:erase(Guid, GTC)}.
+
+confirm_message(#basic_message{guid = Guid}, State) ->
+ confirm_message_by_guid(Guid, State).
+
+record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
+ State;
+record_confirm_message(#delivery{msg_seq_no = MsgSeqNo,
+ sender = ChPid,
+ message = #basic_message{guid = Guid}},
+ State = #q{guid_to_channel = GTC}) ->
+ State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}.
+
+ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {AckdGuids, BQS1} = BQ:ack(AckTags, BQS),
+ confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}).
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -403,29 +434,36 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
-attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
+attempt_delivery(#delivery{txn = none,
+ message = Message,
+ msg_seq_no = MsgSeqNo},
+ State = #q{backing_queue = BQ}) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
{AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Message, BQS),
+ BQ:publish_delivered(AckRequired, Message,
+ MsgSeqNo =/= undefined, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+attempt_delivery(#delivery{txn = Txn,
+ sender = ChPid,
+ message = Message},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
{true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
-deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
- case attempt_delivery(Txn, ChPid, Message, State) of
- {true, NewState} ->
- {true, NewState};
- {false, NewState} ->
- %% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message, State #q.backing_queue_state),
- {false, NewState#q{backing_queue_state = BQS}}
+deliver_or_enqueue(Delivery, State) ->
+ case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
+ {true, State1} ->
+ {true, State1};
+ {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery,
+ BQS1 = BQ:publish(Message, MsgSeqNo =/= undefined, BQS),
+ {false, State1#q{backing_queue_state = BQS1}}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -523,7 +561,12 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
+ {BQS2, State1} =
+ case Fun(BQS) of
+ {BQS1, {confirm, Guids}} -> {BQS1, confirm_messages(Guids, State)};
+ BQS1 -> {BQS1, State}
+ end,
+ run_message_queue(State1#q{backing_queue_state = BQS2}).
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -657,7 +700,8 @@ handle_call(consumers, _From,
[{ChPid, ConsumerTag, AckRequired} | Acc]
end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
-handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
+handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
+ _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -671,12 +715,16 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
- reply(Delivered, NewState);
-
-handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
+ {Delivered, State1} =
+ attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
+ reply(Delivered, case Delivered of
+ true -> State1;
+ false -> confirm_message(Message, State1)
+ end);
+
+handle_call({deliver, Delivery}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
+ {Delivered, NewState} = deliver_or_enqueue(Delivery, State),
reply(Delivered, NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
@@ -702,14 +750,16 @@ handle_call({basic_get, ChPid, NoAck}, _From,
case BQ:fetch(AckRequired, BQS) of
{empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
- case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- store_ch_record(
- C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
- false -> ok
- end,
+ State2 =
+ case AckRequired of
+ true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ ChAckTags1 = sets:add_element(AckTag, ChAckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ State1;
+ false -> confirm_message(Message, State1)
+ end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1})
+ reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1})
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
@@ -829,9 +879,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
-handle_cast({deliver, Txn, Message, ChPid}, State) ->
+
+handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Fun, State));
+
+handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
+ {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
noreply(NewState);
handle_cast({ack, Txn, AckTags, ChPid},
@@ -840,18 +894,21 @@ handle_cast({ack, Txn, AckTags, ChPid},
not_found ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
- {C1, BQS1} =
+ {C1, State1} =
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
- _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
+ NewC = C#cr{acktags = ChAckTags1},
+ NewState = ack_by_acktags(AckTags, State),
+ {NewC, NewState};
+ _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
+ {C#cr{txn = Txn},
+ State#q{backing_queue_state = BQS1}}
end,
store_ch_record(C1),
- noreply(State#q{backing_queue_state = BQS1})
+ noreply(State1)
end;
-handle_cast({reject, AckTags, Requeue, ChPid},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -860,8 +917,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> BQS1 = BQ:ack(AckTags, BQS),
- State #q { backing_queue_state = BQS1 }
+ false -> ack_by_acktags(AckTags, State)
end)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2230c507..32f9f15a 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -62,12 +62,12 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 2},
+ {publish, 3},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 3},
+ {publish_delivered, 4},
%% Produce the next message.
{fetch, 2},
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 38412982..1ac39b65 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/4]).
+-export([publish/1, message/4, properties/1, delivery/5]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
-export([is_message_persistent/1]).
@@ -50,9 +50,10 @@
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/4 ::
+-spec(delivery/5 ::
(boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- rabbit_types:message()) -> rabbit_types:delivery()).
+ rabbit_types:message(), undefined | integer()) ->
+ rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
properties_input(), binary()) ->
@@ -88,9 +89,9 @@ publish(Delivery = #delivery{
Other
end.
-delivery(Mandatory, Immediate, Txn, Message) ->
+delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
- sender = self(), message = Message}.
+ sender = self(), message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) ->
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
@@ -157,7 +158,8 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
BodyBin) ->
publish(delivery(Mandatory, Immediate, Txn,
message(ExchangeName, RoutingKeyBin,
- properties(Properties), BodyBin))).
+ properties(Properties), BodyBin),
+ undefined)).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fe36cef9..7d9f8064 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,7 @@
-export([start_link/7, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1, flush/1]).
+-export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -48,7 +48,9 @@
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, stats_timer}).
+ consumer_mapping, blocking, queue_collector_pid, stats_timer,
+ confirm_enabled, published_count, confirm_multiple, confirm_tref,
+ held_confirms, unconfirmed, qpid_to_msgs}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -70,6 +72,8 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -99,6 +103,8 @@
-spec(info_all/0 :: () -> [[rabbit_types:info()]]).
-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
+-spec(flush_multiple_acks/1 :: (pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), integer()) -> 'ok').
-endif.
@@ -153,6 +159,12 @@ emit_stats(Pid) ->
flush(Pid) ->
gen_server2:call(Pid, flush).
+flush_multiple_acks(Pid) ->
+ gen_server2:cast(Pid, flush_multiple_acks).
+
+confirm(Pid, MsgSeqNo) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNo}).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
@@ -177,7 +189,13 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
consumer_mapping = dict:new(),
blocking = dict:new(),
queue_collector_pid = CollectorPid,
- stats_timer = StatsTimer},
+ stats_timer = StatsTimer,
+ confirm_enabled = false,
+ published_count = 0,
+ confirm_multiple = false,
+ held_confirms = gb_sets:new(),
+ unconfirmed = gb_sets:new(),
+ qpid_to_msgs = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -258,19 +276,43 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
internal_emit_stats(State),
{noreply,
- State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}.
-
-handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}};
+
+handle_cast(flush_multiple_acks,
+ State = #ch{writer_pid = WriterPid,
+ held_confirms = As,
+ unconfirmed = UC}) ->
+ flush_multiple(WriterPid, As, UC),
+ {noreply, State#ch{held_confirms = gb_sets:new(),
+ confirm_tref = undefined}};
+
+handle_cast({confirm, MsgSeqNo}, State) ->
+ {noreply, send_or_enqueue_ack(MsgSeqNo, State)}.
+
+handle_info({'DOWN', _MRef, process, QPid, _Reason},
+ State = #ch{qpid_to_msgs = QTM}) ->
+ State2 = case dict:find(QPid, QTM) of
+ {ok, Msgs} -> State1 = gb_sets:fold(fun send_or_enqueue_ack/2,
+ State, Msgs),
+ State1 #ch{qpid_to_msgs = dict:erase(QPid, QTM)};
+ error -> State
+ end,
erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State)}.
+ {noreply, queue_blocked(QPid, State2)}.
-handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
+handle_pre_hibernate(State = #ch{writer_pid = WriterPid,
+ held_confirms = As,
+ stats_timer = StatsTimer,
+ unconfirmed = UC}) ->
ok = clear_permission_cache(),
- rabbit_event:if_enabled(StatsTimer, fun () ->
+ flush_multiple(WriterPid, As, UC),
+ rabbit_event:if_enabled(StatsTimer, fun() ->
internal_emit_stats(State)
end),
- {hibernate,
- State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}.
+ StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
+ {hibernate, State#ch{held_confirms = gb_sets:new(),
+ stats_timer = StatsTimer1,
+ confirm_tref = undefined}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -402,6 +444,47 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
+send_or_enqueue_ack(undefined, State) ->
+ State;
+send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) ->
+ State;
+send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) ->
+ do_if_unconfirmed(
+ MsgSeqNo, State,
+ fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = MSN}),
+ State1
+ end);
+send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) ->
+ do_if_unconfirmed(
+ MsgSeqNo, State,
+ fun(MSN, State1 = #ch{held_confirms = As}) ->
+ start_ack_timer(State1#ch{held_confirms =
+ gb_sets:add(MSN, As)})
+ end).
+
+msg_sent_to_queue(undefined, _QPid, State) ->
+ State;
+msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) ->
+ Msgs1 = case dict:find(QPid, QTM) of
+ {ok, Msgs} -> Msgs;
+ error -> erlang:monitor(process, QPid),
+ gb_sets:new()
+ end,
+ QTM1 = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1), QTM),
+ State#ch{qpid_to_msgs = QTM1}.
+
+do_if_unconfirmed(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) ->
+ case gb_sets:is_element(MsgSeqNo, UC) of
+ true -> QTM = dict:map(fun (_, Msgs) ->
+ gb_sets:delete_any(MsgSeqNo, Msgs)
+ end, State#ch.qpid_to_msgs),
+ State1 = Fun(MsgSeqNo, State#ch{qpid_to_msgs = QTM}),
+ State1#ch{unconfirmed = gb_sets:delete(MsgSeqNo, UC)};
+ false -> State
+ end.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -424,9 +507,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
- Content, State = #ch{virtual_host = VHostPath,
- transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ Content, State = #ch{virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ confirm_enabled = ConfirmEnabled}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -434,6 +517,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
+ {MsgSeqNo, State1}
+ = case ConfirmEnabled of
+ false -> {undefined, State};
+ true -> Count = State#ch.published_count,
+ {Count,
+ State#ch{published_count = Count + 1,
+ unconfirmed =
+ gb_sets:add(Count, State#ch.unconfirmed)}}
+ end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
@@ -442,18 +534,19 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
- case RoutingRes of
- routed -> ok;
- unroutable -> ok = basic_return(Message, WriterPid, no_route);
- not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
- end,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
+ case IsPersistent of
+ true -> MsgSeqNo;
+ false -> undefined
+ end)),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent,
+ MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State),
+ QPid <- DeliveredQPids]], publish, State2),
{noreply, case TxnKey of
- none -> State;
- _ -> add_tx_participants(DeliveredQPids, State)
+ none -> State2;
+ _ -> add_tx_participants(DeliveredQPids, State2)
end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
@@ -841,6 +934,11 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
+
+handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "cannot switch from confirm to tx mode", []);
+
handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) ->
{reply, #'tx.select_ok'{}, new_tx(State)};
@@ -861,6 +959,25 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
+handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
+ when TxId =/= none ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "cannot switch from tx to confirm mode", []);
+
+handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
+ _, State = #ch{confirm_enabled = false}) ->
+ return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple},
+ NoWait, #'confirm.select_ok'{});
+
+handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
+ _, State = #ch{confirm_enabled = true,
+ confirm_multiple = Multiple}) ->
+ return_ok(State, NoWait, #'confirm.select_ok'{});
+
+handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "cannot change confirm_multiple setting", []);
+
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
@@ -1083,6 +1200,21 @@ is_message_persistent(Content) ->
IsPersistent
end.
+process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) ->
+ ok = basic_return(Message, State#ch.writer_pid, no_route),
+ send_or_enqueue_ack(MsgSeqNo, State);
+process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) ->
+ ok = basic_return(Message, State#ch.writer_pid, no_consumers),
+ send_or_enqueue_ack(MsgSeqNo, State);
+process_routing_result(routed, [], _, MsgSeqNo, _, State) ->
+ send_or_enqueue_ack(MsgSeqNo, State);
+process_routing_result(routed, _, false, MsgSeqNo, _, State) ->
+ send_or_enqueue_ack(MsgSeqNo, State);
+process_routing_result(routed, QPids, true, MsgSeqNo, _, State) ->
+ lists:foldl(fun (QPid, State0) ->
+ msg_sent_to_queue(MsgSeqNo, QPid, State0)
+ end, State, QPids).
+
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
lock_message(false, _MsgStruct, State) ->
@@ -1104,7 +1236,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
-terminate(_State) ->
+terminate(State) ->
+ stop_ack_timer(State),
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
@@ -1186,3 +1319,47 @@ erase_queue_stats(QPid) ->
erase({queue_stats, QPid}),
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
+
+start_ack_timer(State = #ch{confirm_tref = undefined}) ->
+ {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL,
+ ?MODULE, flush_multiple_acks, [self()]),
+ State#ch{confirm_tref = TRef};
+start_ack_timer(State) ->
+ State.
+
+stop_ack_timer(State = #ch{confirm_tref = undefined}) ->
+ State;
+stop_ack_timer(State = #ch{confirm_tref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#ch{confirm_tref = undefined}.
+
+flush_multiple(WriterPid, As, NA) ->
+ case gb_sets:is_empty(As) of
+ true -> ok;
+ false -> [First | Rest] = gb_sets:to_list(As),
+ [rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = A}) ||
+ A <- case Rest of
+ [] -> [First];
+ _ -> flush_multiple(
+ First, Rest, WriterPid,
+ case gb_sets:is_empty(NA) of
+ false -> gb_sets:smallest(NA);
+ true -> gb_sets:largest(As) + 1
+ end)
+ end],
+ ok
+ end.
+
+flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) ->
+ ExpNext = Prev + 1,
+ case {SNA >= Cur, Cur} of
+ {true, ExpNext} -> flush_multiple(Cur, Rest, WriterPid, SNA);
+ _ -> flush_multiple(Prev, [], WriterPid, SNA),
+ [Cur | Rest]
+ end;
+flush_multiple(Prev, [], WriterPid, _) ->
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = Prev,
+ multiple = true}),
+ [].
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 4e0dad84..664ef653 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -31,8 +31,8 @@
-module(rabbit_invariable_queue).
--export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
- publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
+-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
+ publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3,
tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
idle_timeout/1, handle_pre_hibernate/1, status/1]).
@@ -99,14 +99,14 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
- len = Len }) ->
+publish(Msg, _, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
ok = persist_message(QName, IsDurable, none, Msg),
State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
-publish_delivered(false, _Msg, State) ->
+publish_delivered(false, _Msg, _, State) ->
{blank_ack, State};
-publish_delivered(true, Msg = #basic_message { guid = Guid },
+publish_delivered(true, Msg = #basic_message { guid = Guid }, _,
State = #iv_state { qname = QName, durable = IsDurable,
len = 0, pending_ack = PA }) ->
ok = persist_message(QName, IsDurable, none, Msg),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index bbecbfe2..c2e74a23 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/2, client_terminate/2,
+ sync/3, client_init/3, client_terminate/2,
client_delete_and_terminate/3, successfully_recovered_state/1]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -82,7 +82,9 @@
cur_file_cache_ets, %% tid of current file cache table
client_refs, %% set of references of all registered clients
successfully_recovered, %% boolean: did we recover state?
- file_size_limit %% how big are our files allowed to get?
+ file_size_limit, %% how big are our files allowed to get?
+ client_ondisk_callback, %% client ref to callback function mapping
+ cref_to_guids %% client ref to synced messages mapping
}).
-record(client_msstate,
@@ -94,7 +96,8 @@
file_handles_ets,
file_summary_ets,
dedup_cache_ets,
- cur_file_cache_ets
+ cur_file_cache_ets,
+ client_ref
}).
-record(file_summary,
@@ -115,10 +118,12 @@
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
dedup_cache_ets :: ets:tid(),
- cur_file_cache_ets :: ets:tid() }).
+ cur_file_cache_ets :: ets:tid(),
+ client_ref :: rabbit_guid:guid()}).
-type(startup_fun_state() ::
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
+-type(guid_fun() :: fun (([rabbit_guid:guid()]) -> any())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
@@ -134,10 +139,11 @@
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(client_init/2 :: (server(), binary()) -> client_msstate()).
+-spec(client_init/3 :: (server(), rabbit_guid:guid(), guid_fun()) ->
+ client_msstate()).
-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
-spec(client_delete_and_terminate/3 ::
- (client_msstate(), server(), binary()) -> 'ok').
+ (client_msstate(), server(), rabbit_guid:guid()) -> 'ok').
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
@@ -309,9 +315,10 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
[{timeout, infinity}]).
write(Server, Guid, Msg,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ client_ref = CRef }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- {gen_server2:cast(Server, {write, Guid}), CState}.
+ {gen_server2:cast(Server, {write, CRef, Guid}), CState}.
read(Server, Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
@@ -353,10 +360,11 @@ gc_done(Server, Reclaimed, Source, Destination) ->
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
-client_init(Server, Ref) ->
+client_init(Server, Ref, MsgOnDiskFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref}, infinity),
+ gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun},
+ infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -365,11 +373,12 @@ client_init(Server, Ref) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }.
+ cur_file_cache_ets = CurFileCacheEts,
+ client_ref = Ref}.
client_terminate(CState, Server) ->
close_all_handles(CState),
- ok = gen_server2:call(Server, client_terminate, infinity).
+ ok = gen_server2:call(Server, {client_terminate, CState}, infinity).
client_delete_and_terminate(CState, Server, Ref) ->
close_all_handles(CState),
@@ -485,6 +494,13 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
end
end.
+clear_client_callback(CRef,
+ State = #msstate { client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
+ State #msstate { client_ondisk_callback = dict:erase(CRef, CODC),
+ cref_to_guids = dict:erase(CRef, CTG)}.
+
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -553,7 +569,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
cur_file_cache_ets = CurFileCacheEts,
client_refs = ClientRefs1,
successfully_recovered = CleanShutdown,
- file_size_limit = FileSizeLimit
+ file_size_limit = FileSizeLimit,
+ client_ondisk_callback = dict:new(),
+ cref_to_guids = dict:new()
},
%% If we didn't recover the msg location index then we need to
@@ -577,10 +595,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- {new_client_state, _Ref} -> 7;
- successfully_recovered_state -> 7;
- {read, _Guid} -> 2;
- _ -> 0
+ {new_client_state, _Ref, _MODC} -> 7;
+ successfully_recovered_state -> 7;
+ {read, _Guid} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -599,33 +617,43 @@ handle_call({contains, Guid}, From, State) ->
State1 = contains_message(Guid, From, State),
noreply(State1);
-handle_call({new_client_state, CRef}, _From,
- State = #msstate { dir = Dir,
- index_state = IndexState,
- index_module = IndexModule,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts,
- client_refs = ClientRefs,
- gc_pid = GCPid }) ->
+handle_call({new_client_state, CRef, Callback}, _From,
+ State = #msstate { dir = Dir,
+ index_state = IndexState,
+ index_module = IndexModule,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts,
+ client_refs = ClientRefs,
+ client_ondisk_callback = CODC,
+ gc_pid = GCPid }) ->
+ CODC1 = case Callback of
+ undefined -> CODC;
+ _ -> dict:store(CRef, Callback, CODC)
+ end,
reply({IndexState, IndexModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
+ State #msstate { client_refs = sets:add_element(CRef, ClientRefs),
+ client_ondisk_callback = CODC1 });
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call(client_terminate, _From, State) ->
- reply(ok, State).
+handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From,
+ State) ->
+ reply(ok, clear_client_callback(CRef, State)).
+
+handle_cast({write, CRef, Guid},
+ State = #msstate { current_file_handle = CurHdl,
+ current_file = CurFile,
+ sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ file_summary_ets = FileSummaryEts,
+ cur_file_cache_ets = CurFileCacheEts,
+ client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
-handle_cast({write, Guid},
- State = #msstate { current_file_handle = CurHdl,
- current_file = CurFile,
- sum_valid_data = SumValid,
- sum_file_size = SumFileSize,
- file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
case index_lookup(Guid, State) of
@@ -648,18 +676,28 @@ handle_cast({write, Guid},
[{#file_summary.valid_total_size, ValidTotalSize1},
{#file_summary.file_size, FileSize + TotalSize}]),
NextOffset = CurOffset + TotalSize,
- noreply(
- maybe_roll_to_new_file(
- NextOffset, State #msstate {
- sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize }));
- #msg_location { ref_count = RefCount } ->
+ CTG1 = case dict:find(CRef, CODC) of
+ {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG);
+ error -> CTG
+ end,
+ noreply(maybe_roll_to_new_file(
+ NextOffset, State #msstate {
+ sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize,
+ cref_to_guids = CTG1 }));
+ #msg_location { ref_count = RefCount, file = File } ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
ok = index_update_fields(Guid,
{#msg_location.ref_count, RefCount + 1},
State),
- noreply(State)
+ CTG1 = case {dict:find(CRef, CODC), File =:= CurFile} of
+ {{ok, _} , true} -> rabbit_misc:dict_cons(CRef, Guid,
+ CTG);
+ {{ok, Fun}, false} -> Fun([Guid]), CTG;
+ _ -> CTG
+ end,
+ noreply(State #msstate { cref_to_guids = CTG1 })
end;
handle_cast({remove, Guids}, State) ->
@@ -730,8 +768,9 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast({client_delete, CRef},
State = #msstate { client_refs = ClientRefs }) ->
- noreply(
- State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
+ State1 = clear_client_callback(CRef, State),
+ noreply(State1 #msstate {
+ client_refs = sets:del_element(CRef, ClientRefs) }).
handle_info(timeout, State) ->
noreply(internal_sync(State));
@@ -783,14 +822,19 @@ reply(Reply, State) ->
{State1, Timeout} = next_state(State),
{reply, Reply, State1, Timeout}.
-next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) ->
- {State, hibernate};
-next_state(State = #msstate { sync_timer_ref = undefined }) ->
- {start_sync_timer(State), 0};
-next_state(State = #msstate { on_sync = [] }) ->
- {stop_sync_timer(State), hibernate};
-next_state(State) ->
- {State, 0}.
+next_state(State = #msstate { sync_timer_ref = undefined,
+ on_sync = Syncs,
+ cref_to_guids = CTG }) ->
+ case {Syncs, dict:size(CTG)} of
+ {[], 0} -> {State, hibernate};
+ _ -> {start_sync_timer(State), 0}
+ end;
+next_state(State = #msstate { on_sync = Syncs,
+ cref_to_guids = CTG }) ->
+ case {Syncs, dict:size(CTG)} of
+ {[], 0} -> {stop_sync_timer(State), hibernate};
+ _ -> {State, 0}
+ end.
start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
{ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]),
@@ -802,15 +846,21 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #msstate { sync_timer_ref = undefined }.
-internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs }) ->
+internal_sync(State = #msstate { current_file_handle = CurHdl,
+ on_sync = Syncs,
+ client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
State1 = stop_sync_timer(State),
- case Syncs of
- [] -> State1;
- _ -> ok = file_handle_cache:sync(CurHdl),
- lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- State1 #msstate { on_sync = [] }
- end.
+ CGs = dict:fold(fun (_CRef, [], NS) -> NS;
+ (CRef, Guids, NS) -> [{CRef, Guids} | NS]
+ end, [], CTG),
+ if Syncs =:= [] andalso CGs =:= [] -> ok;
+ true -> file_handle_cache:sync(CurHdl)
+ end,
+ lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
+ [(dict:fetch(CRef, CODC))(Guids) || {CRef, Guids} <- CGs],
+ State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+
read_message(Guid, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0b98290c..6f196336 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/4, terminate/2, delete_and_terminate/1, publish/4,
+-export([init/5, terminate/2, delete_and_terminate/1, publish/4,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -166,7 +166,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries }).
+ max_journal_entries, on_sync, unsynced_guids }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -185,18 +185,21 @@
})).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict:dictionary(), [segment()]}).
+-type(on_sync_fun() :: fun (([rabbit_guid:guid()]) -> ok)).
-type(qistate() :: #qistate { dir :: file:filename(),
segments :: 'undefined' | seg_dict(),
journal_handle :: hdl(),
dirty_count :: integer(),
- max_journal_entries :: non_neg_integer()
+ max_journal_entries :: non_neg_integer(),
+ on_sync :: on_sync_fun(),
+ unsynced_guids :: [rabbit_guid:guid()]
}).
-type(startup_fun_state() ::
- {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
+ {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
A}).
--spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean())) ->
+-spec(init/5 :: (rabbit_amqqueue:name(), boolean(), boolean(),
+ fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) ->
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
@@ -222,12 +225,12 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) ->
+init(Name, false, _MsgStoreRecovered, _ContainsCheckFun, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = filelib:is_file(Dir), %% is_file == is file or dir
- {0, [], State};
+ {0, [], State #qistate { on_sync = OnSyncFun }};
-init(Name, true, MsgStoreRecovered, ContainsCheckFun) ->
+init(Name, true, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
@@ -240,7 +243,7 @@ init(Name, true, MsgStoreRecovered, ContainsCheckFun) ->
init_clean(RecoveredCounts, State);
false -> init_dirty(CleanShutdown, ContainsCheckFun, State)
end,
- {Count, Terms, State1}.
+ {Count, Terms, State1 #qistate { on_sync = OnSyncFun }}.
terminate(Terms, State) ->
{SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
@@ -252,9 +255,13 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
+publish(Guid, SeqId, IsPersistent,
+ State = #qistate { unsynced_guids = UnsyncedGuids })
+ when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
- {JournalHdl, State1} = get_journal_handle(State),
+ {JournalHdl, State1} = get_journal_handle(
+ State #qistate {
+ unsynced_guids = [Guid | UnsyncedGuids] }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -282,7 +289,7 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% seqids not being in the journal, provided the transaction isn't
%% emptied (handled above anyway).
ok = file_handle_cache:sync(JournalHdl),
- State.
+ notify_sync(State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -371,7 +378,9 @@ blank_state(QueueName) ->
segments = segments_new(),
journal_handle = undefined,
dirty_count = 0,
- max_journal_entries = MaxJournal }.
+ max_journal_entries = MaxJournal,
+ on_sync = fun (_) -> ok end,
+ unsynced_guids = [] }.
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -576,7 +585,7 @@ flush_journal(State = #qistate { segments = Segments }) ->
{JournalHdl, State1} =
get_journal_handle(State #qistate { segments = Segments1 }),
ok = file_handle_cache:clear(JournalHdl),
- State1 #qistate { dirty_count = 0 }.
+ notify_sync(State1 #qistate { dirty_count = 0 }).
append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
@@ -668,6 +677,10 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
+ OnSyncFun(UG),
+ State #qistate { unsynced_guids = [] }.
+
%%----------------------------------------------------------------------------
%% segment manipulation
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index bd57f737..a1a341a9 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -70,16 +70,18 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
-deliver(QPids, Delivery) ->
- {Success, _} =
- delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
- end),
- {Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- {Routed, Handled}).
+deliver(QPids, Delivery = #delivery{mandatory = Mandatory,
+ immediate = Immediate}) ->
+ {Success, _} = delegate:invoke(
+ QPids, fun (Pid) ->
+ rabbit_amqqueue:deliver(Pid, Delivery)
+ end),
+ case check_delivery(Mandatory, Immediate,
+ lists:foldl(fun fold_deliveries/2,
+ {false, []}, Success)) of
+ {routed, Qs} -> {routed, Qs};
+ O -> O
+ end.
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b36ee0be..e03d1e94 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1469,7 +1469,7 @@ msg_store_remove(Guids) ->
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore).
+ rabbit_msg_store:client_init(MsgStore, Ref, undefined), L), MsgStore).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1479,7 +1479,7 @@ test_msg_store() ->
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, Guids),
Ref = rabbit_guid:guid(),
- MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, undefined),
%% publish the first half
{ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState),
%% sync on the first half
@@ -1553,7 +1553,7 @@ test_msg_store() ->
%% check we don't contain any of the msgs
false = msg_store_contains(false, Guids),
%% publish the first half again
- MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, undefined),
{ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
@@ -1607,6 +1607,9 @@ init_test_queue() ->
test_queue(), true, false,
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
+ end,
+ fun (_) ->
+ ok %% Sync!
end).
restart_test_queue(Qi) ->
@@ -1642,7 +1645,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
{ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
Guid, MSCStateN),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
- end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds),
+ end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref, undefined)}, SeqIds),
ok = rabbit_msg_store:client_delete_and_terminate(
MSCStateEnd, MsgStore, Ref),
{A, B}.
@@ -1789,7 +1792,8 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>), VQN)
+ end}, <<>>),
+ false, VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -1809,7 +1813,8 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false),
+ VQ = rabbit_variable_queue:init(test_queue(), true, false,
+ fun nop/1, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1849,7 +1854,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
+ {VQ9, _} = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1859,7 +1864,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)).
+ {VQ3, _} = rabbit_variable_queue:ack([AckTag], VQ2),
+ publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -1892,7 +1898,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {VQ9, _} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1921,7 +1927,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ fun nop/1, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -1937,7 +1944,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ fun nop/1, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -1967,10 +1975,13 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true),
+ VQ1 = rabbit_variable_queue:init(QName, true, true,
+ fun nop/1, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
+
+nop(_) -> ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index cbc71bcc..cf61ecbe 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,8 +31,8 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
+-export([init/5, init/3, terminate/1, delete_and_terminate/1,
+ purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
@@ -236,8 +236,11 @@
ram_index_count,
out_counter,
in_counter,
- rates
- }).
+ rates,
+ msgs_on_disk,
+ msg_indices_on_disk,
+ unconfirmed
+ }).
-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
@@ -322,7 +325,10 @@
ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
- rates :: rates() }).
+ rates :: rates(),
+ msgs_on_disk :: gb_set(),
+ msg_indices_on_disk :: gb_set(),
+ unconfirmed :: gb_set()}).
-include("rabbit_backing_queue_spec.hrl").
@@ -369,13 +375,21 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
init(QueueName, IsDurable, Recover) ->
+ Self = self(),
+ init(QueueName, IsDurable, Recover,
+ fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
+ fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
+
+init(QueueName, IsDurable, Recover,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
{DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(
QueueName, Recover,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
- end),
+ end,
+ MsgIdxOnDiskFun),
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
{PRef, TRef, Terms1} =
@@ -393,12 +407,16 @@ init(QueueName, IsDurable, Recover) ->
end_seq_id = NextSeqId }
end,
Now = now(),
+
PersistentClient =
case IsDurable of
- true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef);
+ true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef,
+ MsgOnDiskFun);
false -> undefined
end,
- TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
+ TransientClient =
+ rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef, undefined),
+
State = #vqstate {
q1 = queue:new(),
q2 = bpqueue:new(),
@@ -428,7 +446,10 @@ init(QueueName, IsDurable, Recover) ->
ingress = {Now, DeltaCount1},
avg_egress = 0.0,
avg_ingress = 0.0,
- timestamp = Now } },
+ timestamp = Now },
+ msgs_on_disk = gb_sets:new(),
+ msg_indices_on_disk = gb_sets:new(),
+ unconfirmed = gb_sets:new() },
a(maybe_deltas_to_betas(State)).
terminate(State) ->
@@ -497,31 +518,37 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, State) ->
- {_SeqId, State1} = publish(Msg, false, false, State),
+publish(Msg, NeedsConfirming, State) ->
+ {_SeqId, State1} = publish(Msg, false, false, NeedsConfirming, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
+publish_delivered(false, _Msg, _NC, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
-publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
+publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
+ guid = Guid },
+ NeedsConfirming,
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
persistent_count = PCount,
pending_ack = PA,
- durable = IsDurable }) ->
+ durable = IsDurable,
+ unconfirmed = Unconfirmed }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(m(MsgStatus1), PA),
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- pending_ack = PA1 })}.
+ Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ {SeqId, a(State1 #vqstate {
+ next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ pending_ack = PA1,
+ unconfirmed = Unconfirmed1 })}.
fetch(AckRequired, State = #vqstate { q4 = Q4,
ram_msg_count = RamMsgCount,
@@ -581,9 +608,10 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
end.
ack(AckTags, State) ->
- a(ack(fun rabbit_msg_store:remove/2,
- fun (_AckEntry, State1) -> State1 end,
- AckTags, State)).
+ {Guids, State1} = ack(fun rabbit_msg_store:remove/2,
+ fun (_AckEntry, State1) -> State1 end,
+ AckTags, State),
+ {Guids, a(State1)}.
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
State = #vqstate { durable = IsDurable,
@@ -632,20 +660,21 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
end)}.
requeue(AckTags, State) ->
- a(reduce_memory_use(
+ {_Guids, State1} =
ack(fun rabbit_msg_store:release/2,
fun (#msg_status { msg = Msg }, State1) ->
- {_SeqId, State2} = publish(Msg, true, false, State1),
+ {_SeqId, State2} = publish(Msg, true, false, false, State1),
State2;
({IsPersistent, Guid}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
read_from_msg_store(MSCState, IsPersistent, Guid),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, true, true, State2),
+ {_SeqId, State3} = publish(Msg, true, true, false, State2),
State3
end,
- AckTags, State))).
+ AckTags, State),
+ a(reduce_memory_use(State1)).
len(#vqstate { len = Len }) -> Len.
@@ -790,6 +819,9 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
+gb_sets_maybe_insert(false, _Val, Set) -> Set;
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:insert(Val, Set).
+
msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
@@ -951,14 +983,16 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
Pubs = lists:append(lists:reverse(SPubs)),
+ {_Guids, NewState} = ack(Acks, State),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} = publish(Msg, false, IsPersistent1, State2),
+ {SeqId, State3} = publish(Msg, false, IsPersistent1, false,
+ State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, ack(Acks, State)}, Pubs),
+ end, {PAcks, NewState}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1012,15 +1046,16 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-publish(Msg = #basic_message { is_persistent = IsPersistent },
- IsDelivered, MsgOnDisk,
+publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
+ IsDelivered, MsgOnDisk, NeedsConfirming,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- ram_msg_count = RamMsgCount }) ->
+ ram_msg_count = RamMsgCount,
+ unconfirmed = Unconfirmed}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
@@ -1030,11 +1065,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
+ Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1}}.
+ ram_msg_count = RamMsgCount + 1,
+ unconfirmed = Unconfirmed1 }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, MSCState) ->
@@ -1119,7 +1156,7 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- State;
+ {[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
@@ -1131,13 +1168,16 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
pending_ack = dict:erase(SeqId, PA) })}
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = orddict:fold(fun (MsgStore, Guids, ok) ->
- MsgStoreFun(MsgStore, Guids)
- end, ok, GuidsByStore),
+ AckdGuids = lists:concat(
+ orddict:fold(fun (MsgStore, Guids, Gs) ->
+ MsgStoreFun(MsgStore, Guids),
+ [Guids | Gs]
+ end, [], GuidsByStore)),
+ State2 = remove_confirms(gb_sets:from_list(AckdGuids), State1),
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }.
+ {AckdGuids, State2 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1 }}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
@@ -1154,6 +1194,46 @@ find_persistent_count(LensByStore) ->
end.
%%----------------------------------------------------------------------------
+%% Internal plumbing for confirms (aka publisher acks)
+%%----------------------------------------------------------------------------
+
+remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet),
+ msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
+ unconfirmed = gb_sets:difference(UC, GuidSet) }.
+
+msgs_confirmed(GuidSet, State) ->
+ {remove_confirms(GuidSet, State), {confirm, gb_sets:to_list(GuidSet)}}.
+
+msgs_written_to_disk(QPid, Guids) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun(State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ GuidSet = gb_sets:from_list(Guids),
+ msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MOD, GuidSet), UC) })
+ end).
+
+msg_indices_written_to_disk(QPid, Guids) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun(State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ GuidSet = gb_sets:from_list(Guids),
+ msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MIOD, GuidSet), UC) })
+ end).
+
+%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------