diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-06 10:13:22 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-06 10:13:22 +0100 |
commit | ef6335a97b6a3555a78ee0220e54e58ed270839f (patch) | |
tree | 7c885196abb5540a75b5edce3669d0c7cf9d0b98 | |
parent | 37f21f92299df57edf05c8c2291538cc69e07ec9 (diff) | |
parent | 5e62212375a4292f311f045f3287c6cf7f3dab9c (diff) | |
download | rabbitmq-server-ef6335a97b6a3555a78ee0220e54e58ed270839f.tar.gz |
merge default into bug20284
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 21 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 148 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 14 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 225 | ||||
-rw-r--r-- | src/rabbit_invariable_queue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 174 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 43 | ||||
-rw-r--r-- | src/rabbit_router.erl | 22 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 35 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 156 |
13 files changed, 632 insertions, 236 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 24aa8d98..5770cb95 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -69,7 +69,8 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, txn, sender, message}). +-record(delivery, {mandatory, immediate, txn, sender, message, + origin, msg_seq_no}). -record(amqp_error, {name, explanation, method = none}). -record(event, {type, props, timestamp}). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 38c6f939..9f3a0227 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -37,6 +37,7 @@ -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). +-type(confirm_required() :: boolean()). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -45,12 +46,12 @@ -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()). --spec(publish_delivered/3 :: - (ack_required(), rabbit_types:basic_message(), state()) -> - {ack(), state()}). +-spec(publish/3 :: (rabbit_types:basic_message(), + confirm_required(), state()) -> state()). +-spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(), + confirm_required(), state()) -> {ack(), state()}). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). --spec(ack/2 :: ([ack()], state()) -> state()). +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 42bddc5e..e1dc73e1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,6 +34,7 @@ -export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, + maybe_run_queue_via_backing_queue_async/2, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1]). -export([pseudo_queue/2]). @@ -159,6 +160,8 @@ rabbit_types:connection_exit()). -spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue_async/2 :: + (pid(), (fun ((A) -> A))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -370,16 +373,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). -deliver(QPid, #delivery{immediate = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, - infinity); -deliver(QPid, #delivery{mandatory = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), +deliver(QPid, Delivery = #delivery{immediate = true}) -> + gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); +deliver(QPid, Delivery = #delivery{mandatory = true}) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity), true; -deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), +deliver(QPid, Delivery) -> + gen_server2:cast(QPid, {deliver, Delivery}), true. requeue(QPid, MsgIds, ChPid) -> @@ -460,6 +460,9 @@ internal_delete(QueueName) -> maybe_run_queue_via_backing_queue(QPid, Fun) -> gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). +maybe_run_queue_via_backing_queue_async(QPid, Fun) -> + gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). + update_ram_duration(QPid) -> gen_server2:cast(QPid, update_ram_duration). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 61204deb..453f342e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -61,7 +61,8 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer + stats_timer, + guid_to_channel }). -record(consumer, {tag, ack_required}). @@ -123,7 +124,8 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, - stats_timer = rabbit_event:init_stats_timer()}, hibernate, + stats_timer = rabbit_event:init_stats_timer(), + guid_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -342,11 +344,13 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = case AckRequired of - true -> sets:add_element( - AckTag, ChAckTags); - false -> ChAckTags - end, + {State2, ChAckTags1} = + case AckRequired of + true -> {State1, + sets:add_element(AckTag, ChAckTags)}; + false -> {confirm_message(Message, State1), + ChAckTags} + end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, store_ch_record(NewC), @@ -362,10 +366,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State2 = State1#q{ + State3 = State2#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State2); + deliver_msgs_to_consumers(Funs, FunAcc1, State3); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), @@ -394,7 +398,34 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), {{Message, IsDelivered, AckTag}, 0 == Remaining, - State #q { backing_queue_state = BQS1 }}. + State#q{backing_queue_state = BQS1}}. + +confirm_messages(Guids, State) -> + lists:foldl(fun confirm_message_by_guid/2, State, Guids). + +confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> + case dict:find(Guid, GTC) of + {ok, {_ , undefined}} -> ok; + {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); + _ -> ok + end, + State#q{guid_to_channel = dict:erase(Guid, GTC)}. + +confirm_message(#basic_message{guid = Guid}, State) -> + confirm_message_by_guid(Guid, State). + +record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> + State; +record_confirm_message(#delivery{msg_seq_no = MsgSeqNo, + sender = ChPid, + message = #basic_message{guid = Guid}}, + State = #q{guid_to_channel = GTC}) -> + State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}. + +ack_by_acktags(AckTags, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {AckdGuids, BQS1} = BQ:ack(AckTags, BQS), + confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}). run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, @@ -403,29 +434,36 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> +attempt_delivery(#delivery{txn = none, + message = Message, + msg_seq_no = MsgSeqNo}, + State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, BQS), + BQ:publish_delivered(AckRequired, Message, + MsgSeqNo =/= undefined, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +attempt_delivery(#delivery{txn = Txn, + sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. -deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - case attempt_delivery(Txn, ChPid, Message, State) of - {true, NewState} -> - {true, NewState}; - {false, NewState} -> - %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, State #q.backing_queue_state), - {false, NewState#q{backing_queue_state = BQS}} +deliver_or_enqueue(Delivery, State) -> + case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of + {true, State1} -> + {true, State1}; + {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery, + BQS1 = BQ:publish(Message, MsgSeqNo =/= undefined, BQS), + {false, State1#q{backing_queue_state = BQS1}} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -523,7 +561,12 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = Fun(BQS)}). + {BQS2, State1} = + case Fun(BQS) of + {BQS1, {confirm, Guids}} -> {BQS1, confirm_messages(Guids, State)}; + BQS1 -> {BQS1, State} + end, + run_message_queue(State1#q{backing_queue_state = BQS2}). commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -657,7 +700,8 @@ handle_call(consumers, _From, [{ChPid, ConsumerTag, AckRequired} | Acc] end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); -handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> +handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, + _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -671,12 +715,16 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), - reply(Delivered, NewState); - -handle_call({deliver, Txn, Message, ChPid}, _From, State) -> + {Delivered, State1} = + attempt_delivery(Delivery, record_confirm_message(Delivery, State)), + reply(Delivered, case Delivered of + true -> State1; + false -> confirm_message(Message, State1) + end); + +handle_call({deliver, Delivery}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), + {Delivered, NewState} = deliver_or_enqueue(Delivery, State), reply(Delivered, NewState); handle_call({commit, Txn, ChPid}, From, State) -> @@ -702,14 +750,16 @@ handle_call({basic_get, ChPid, NoAck}, _From, case BQ:fetch(AckRequired, BQS) of {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> - case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( - C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); - false -> ok - end, + State2 = + case AckRequired of + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + ChAckTags1 = sets:add_element(AckTag, ChAckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + State1; + false -> confirm_message(Message, State1) + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1}) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -829,9 +879,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast({deliver, Txn, Message, ChPid}, State) -> + +handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Fun, State)); + +handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), + {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), noreply(NewState); handle_cast({ack, Txn, AckTags, ChPid}, @@ -840,18 +894,21 @@ handle_cast({ack, Txn, AckTags, ChPid}, not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - {C1, BQS1} = + {C1, State1} = case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; - _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} + NewC = C#cr{acktags = ChAckTags1}, + NewState = ack_by_acktags(AckTags, State), + {NewC, NewState}; + _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), + {C#cr{txn = Txn}, + State#q{backing_queue_state = BQS1}} end, store_ch_record(C1), - noreply(State#q{backing_queue_state = BQS1}) + noreply(State1) end; -handle_cast({reject, AckTags, Requeue, ChPid}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +handle_cast({reject, AckTags, Requeue, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -860,8 +917,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), - State #q { backing_queue_state = BQS1 } + false -> ack_by_acktags(AckTags, State) end) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2230c507..32f9f15a 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,12 +62,12 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 2}, + {publish, 3}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 3}, + {publish_delivered, 4}, %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 38412982..1ac39b65 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/4]). +-export([publish/1, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). -export([is_message_persistent/1]). @@ -50,9 +50,10 @@ -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/4 :: +-spec(delivery/5 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message()) -> rabbit_types:delivery()). + rabbit_types:message(), undefined | integer()) -> + rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) -> @@ -88,9 +89,9 @@ publish(Delivery = #delivery{ Other end. -delivery(Mandatory, Immediate, Txn, Message) -> +delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message}. + sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 @@ -157,7 +158,8 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, BodyBin) -> publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, - properties(Properties), BodyBin))). + properties(Properties), BodyBin), + undefined)). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fe36cef9..7d9f8064 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,7 @@ -export([start_link/7, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1]). +-export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -48,7 +48,9 @@ start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, stats_timer}). + consumer_mapping, blocking, queue_collector_pid, stats_timer, + confirm_enabled, published_count, confirm_multiple, confirm_tref, + held_confirms, unconfirmed, qpid_to_msgs}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -70,6 +72,8 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -99,6 +103,8 @@ -spec(info_all/0 :: () -> [[rabbit_types:info()]]). -spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(flush_multiple_acks/1 :: (pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), integer()) -> 'ok'). -endif. @@ -153,6 +159,12 @@ emit_stats(Pid) -> flush(Pid) -> gen_server2:call(Pid, flush). +flush_multiple_acks(Pid) -> + gen_server2:cast(Pid, flush_multiple_acks). + +confirm(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm, MsgSeqNo}). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, @@ -177,7 +189,13 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = StatsTimer}, + stats_timer = StatsTimer, + confirm_enabled = false, + published_count = 0, + confirm_multiple = false, + held_confirms = gb_sets:new(), + unconfirmed = gb_sets:new(), + qpid_to_msgs = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -258,19 +276,43 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), {noreply, - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}. - -handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}; + +handle_cast(flush_multiple_acks, + State = #ch{writer_pid = WriterPid, + held_confirms = As, + unconfirmed = UC}) -> + flush_multiple(WriterPid, As, UC), + {noreply, State#ch{held_confirms = gb_sets:new(), + confirm_tref = undefined}}; + +handle_cast({confirm, MsgSeqNo}, State) -> + {noreply, send_or_enqueue_ack(MsgSeqNo, State)}. + +handle_info({'DOWN', _MRef, process, QPid, _Reason}, + State = #ch{qpid_to_msgs = QTM}) -> + State2 = case dict:find(QPid, QTM) of + {ok, Msgs} -> State1 = gb_sets:fold(fun send_or_enqueue_ack/2, + State, Msgs), + State1 #ch{qpid_to_msgs = dict:erase(QPid, QTM)}; + error -> State + end, erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State)}. + {noreply, queue_blocked(QPid, State2)}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{writer_pid = WriterPid, + held_confirms = As, + stats_timer = StatsTimer, + unconfirmed = UC}) -> ok = clear_permission_cache(), - rabbit_event:if_enabled(StatsTimer, fun () -> + flush_multiple(WriterPid, As, UC), + rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), - {hibernate, - State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}. + StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), + {hibernate, State#ch{held_confirms = gb_sets:new(), + stats_timer = StatsTimer1, + confirm_tref = undefined}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -402,6 +444,47 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +send_or_enqueue_ack(undefined, State) -> + State; +send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) -> + State; +send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) -> + do_if_unconfirmed( + MsgSeqNo, State, + fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = MSN}), + State1 + end); +send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> + do_if_unconfirmed( + MsgSeqNo, State, + fun(MSN, State1 = #ch{held_confirms = As}) -> + start_ack_timer(State1#ch{held_confirms = + gb_sets:add(MSN, As)}) + end). + +msg_sent_to_queue(undefined, _QPid, State) -> + State; +msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> + Msgs1 = case dict:find(QPid, QTM) of + {ok, Msgs} -> Msgs; + error -> erlang:monitor(process, QPid), + gb_sets:new() + end, + QTM1 = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1), QTM), + State#ch{qpid_to_msgs = QTM1}. + +do_if_unconfirmed(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) -> + case gb_sets:is_element(MsgSeqNo, UC) of + true -> QTM = dict:map(fun (_, Msgs) -> + gb_sets:delete_any(MsgSeqNo, Msgs) + end, State#ch.qpid_to_msgs), + State1 = Fun(MsgSeqNo, State#ch{qpid_to_msgs = QTM}), + State1#ch{unconfirmed = gb_sets:delete(MsgSeqNo, UC)}; + false -> State + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -424,9 +507,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + confirm_enabled = ConfirmEnabled}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -434,6 +517,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), + {MsgSeqNo, State1} + = case ConfirmEnabled of + false -> {undefined, State}; + true -> Count = State#ch.published_count, + {Count, + State#ch{published_count = Count + 1, + unconfirmed = + gb_sets:add(Count, State#ch.unconfirmed)}} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -442,18 +534,19 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), - case RoutingRes of - routed -> ok; - unroutable -> ok = basic_return(Message, WriterPid, no_route); - not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) - end, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + case IsPersistent of + true -> MsgSeqNo; + false -> undefined + end)), + State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent, + MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State), + QPid <- DeliveredQPids]], publish, State2), {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) + none -> State2; + _ -> add_tx_participants(DeliveredQPids, State2) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -841,6 +934,11 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); + +handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from confirm to tx mode", []); + handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; @@ -861,6 +959,25 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; +handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) + when TxId =/= none -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from tx to confirm mode", []); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, State = #ch{confirm_enabled = false}) -> + return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, + NoWait, #'confirm.select_ok'{}); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, State = #ch{confirm_enabled = true, + confirm_multiple = Multiple}) -> + return_ok(State, NoWait, #'confirm.select_ok'{}); + +handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot change confirm_multiple setting", []); + handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -1083,6 +1200,21 @@ is_message_persistent(Content) -> IsPersistent end. +process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_route), + send_or_enqueue_ack(MsgSeqNo, State); +process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_consumers), + send_or_enqueue_ack(MsgSeqNo, State); +process_routing_result(routed, [], _, MsgSeqNo, _, State) -> + send_or_enqueue_ack(MsgSeqNo, State); +process_routing_result(routed, _, false, MsgSeqNo, _, State) -> + send_or_enqueue_ack(MsgSeqNo, State); +process_routing_result(routed, QPids, true, MsgSeqNo, _, State) -> + lists:foldl(fun (QPid, State0) -> + msg_sent_to_queue(MsgSeqNo, QPid, State0) + end, State, QPids). + lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; lock_message(false, _MsgStruct, State) -> @@ -1104,7 +1236,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(_State) -> +terminate(State) -> + stop_ack_timer(State), pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). @@ -1186,3 +1319,47 @@ erase_queue_stats(QPid) -> erase({queue_stats, QPid}), [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + +start_ack_timer(State = #ch{confirm_tref = undefined}) -> + {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL, + ?MODULE, flush_multiple_acks, [self()]), + State#ch{confirm_tref = TRef}; +start_ack_timer(State) -> + State. + +stop_ack_timer(State = #ch{confirm_tref = undefined}) -> + State; +stop_ack_timer(State = #ch{confirm_tref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#ch{confirm_tref = undefined}. + +flush_multiple(WriterPid, As, NA) -> + case gb_sets:is_empty(As) of + true -> ok; + false -> [First | Rest] = gb_sets:to_list(As), + [rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = A}) || + A <- case Rest of + [] -> [First]; + _ -> flush_multiple( + First, Rest, WriterPid, + case gb_sets:is_empty(NA) of + false -> gb_sets:smallest(NA); + true -> gb_sets:largest(As) + 1 + end) + end], + ok + end. + +flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) -> + ExpNext = Prev + 1, + case {SNA >= Cur, Cur} of + {true, ExpNext} -> flush_multiple(Cur, Rest, WriterPid, SNA); + _ -> flush_multiple(Prev, [], WriterPid, SNA), + [Cur | Rest] + end; +flush_multiple(Prev, [], WriterPid, _) -> + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = Prev, + multiple = true}), + []. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 4e0dad84..664ef653 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -31,8 +31,8 @@ -module(rabbit_invariable_queue). --export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, - publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, + publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -99,14 +99,14 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, - len = Len }) -> +publish(Msg, _, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg), State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. -publish_delivered(false, _Msg, State) -> +publish_delivered(false, _Msg, _, State) -> {blank_ack, State}; -publish_delivered(true, Msg = #basic_message { guid = Guid }, +publish_delivered(true, Msg = #basic_message { guid = Guid }, _, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> ok = persist_message(QName, IsDurable, none, Msg), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index bbecbfe2..c2e74a23 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/2, client_terminate/2, + sync/3, client_init/3, client_terminate/2, client_delete_and_terminate/3, successfully_recovered_state/1]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -82,7 +82,9 @@ cur_file_cache_ets, %% tid of current file cache table client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? - file_size_limit %% how big are our files allowed to get? + file_size_limit, %% how big are our files allowed to get? + client_ondisk_callback, %% client ref to callback function mapping + cref_to_guids %% client ref to synced messages mapping }). -record(client_msstate, @@ -94,7 +96,8 @@ file_handles_ets, file_summary_ets, dedup_cache_ets, - cur_file_cache_ets + cur_file_cache_ets, + client_ref }). -record(file_summary, @@ -115,10 +118,12 @@ file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), dedup_cache_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid() }). + cur_file_cache_ets :: ets:tid(), + client_ref :: rabbit_guid:guid()}). -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). +-type(guid_fun() :: fun (([rabbit_guid:guid()]) -> any())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', @@ -134,10 +139,11 @@ -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). +-spec(client_init/3 :: (server(), rabbit_guid:guid(), guid_fun()) -> + client_msstate()). -spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). -spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). + (client_msstate(), server(), rabbit_guid:guid()) -> 'ok'). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), @@ -309,9 +315,10 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> [{timeout, infinity}]). write(Server, Guid, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid}), CState}. + {gen_server2:cast(Server, {write, CRef, Guid}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -353,10 +360,11 @@ gc_done(Server, Reclaimed, Source, Destination) -> set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). -client_init(Server, Ref) -> +client_init(Server, Ref, MsgOnDiskFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), + gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, + infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -365,11 +373,12 @@ client_init(Server, Ref) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }. + cur_file_cache_ets = CurFileCacheEts, + client_ref = Ref}. client_terminate(CState, Server) -> close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). + ok = gen_server2:call(Server, {client_terminate, CState}, infinity). client_delete_and_terminate(CState, Server, Ref) -> close_all_handles(CState), @@ -485,6 +494,13 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, end end. +clear_client_callback(CRef, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> + State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), + cref_to_guids = dict:erase(CRef, CTG)}. + + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -553,7 +569,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, successfully_recovered = CleanShutdown, - file_size_limit = FileSizeLimit + file_size_limit = FileSizeLimit, + client_ondisk_callback = dict:new(), + cref_to_guids = dict:new() }, %% If we didn't recover the msg location index then we need to @@ -577,10 +595,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - {new_client_state, _Ref} -> 7; - successfully_recovered_state -> 7; - {read, _Guid} -> 2; - _ -> 0 + {new_client_state, _Ref, _MODC} -> 7; + successfully_recovered_state -> 7; + {read, _Guid} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -599,33 +617,43 @@ handle_call({contains, Guid}, From, State) -> State1 = contains_message(Guid, From, State), noreply(State1); -handle_call({new_client_state, CRef}, _From, - State = #msstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs, - gc_pid = GCPid }) -> +handle_call({new_client_state, CRef, Callback}, _From, + State = #msstate { dir = Dir, + index_state = IndexState, + index_module = IndexModule, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs, + client_ondisk_callback = CODC, + gc_pid = GCPid }) -> + CODC1 = case Callback of + undefined -> CODC; + _ -> dict:store(CRef, Callback, CODC) + end, reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); + State #msstate { client_refs = sets:add_element(CRef, ClientRefs), + client_ondisk_callback = CODC1 }); handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call(client_terminate, _From, State) -> - reply(ok, State). +handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From, + State) -> + reply(ok, clear_client_callback(CRef, State)). + +handle_cast({write, CRef, Guid}, + State = #msstate { current_file_handle = CurHdl, + current_file = CurFile, + sum_valid_data = SumValid, + sum_file_size = SumFileSize, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts, + client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> -handle_cast({write, Guid}, - State = #msstate { current_file_handle = CurHdl, - current_file = CurFile, - sum_valid_data = SumValid, - sum_file_size = SumFileSize, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of @@ -648,18 +676,28 @@ handle_cast({write, Guid}, [{#file_summary.valid_total_size, ValidTotalSize1}, {#file_summary.file_size, FileSize + TotalSize}]), NextOffset = CurOffset + TotalSize, - noreply( - maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })); - #msg_location { ref_count = RefCount } -> + CTG1 = case dict:find(CRef, CODC) of + {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG); + error -> CTG + end, + noreply(maybe_roll_to_new_file( + NextOffset, State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize, + cref_to_guids = CTG1 })); + #msg_location { ref_count = RefCount, file = File } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC ok = index_update_fields(Guid, {#msg_location.ref_count, RefCount + 1}, State), - noreply(State) + CTG1 = case {dict:find(CRef, CODC), File =:= CurFile} of + {{ok, _} , true} -> rabbit_misc:dict_cons(CRef, Guid, + CTG); + {{ok, Fun}, false} -> Fun([Guid]), CTG; + _ -> CTG + end, + noreply(State #msstate { cref_to_guids = CTG1 }) end; handle_cast({remove, Guids}, State) -> @@ -730,8 +768,9 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_cast({client_delete, CRef}, State = #msstate { client_refs = ClientRefs }) -> - noreply( - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). + State1 = clear_client_callback(CRef, State), + noreply(State1 #msstate { + client_refs = sets:del_element(CRef, ClientRefs) }). handle_info(timeout, State) -> noreply(internal_sync(State)); @@ -783,14 +822,19 @@ reply(Reply, State) -> {State1, Timeout} = next_state(State), {reply, Reply, State1, Timeout}. -next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> - {State, hibernate}; -next_state(State = #msstate { sync_timer_ref = undefined }) -> - {start_sync_timer(State), 0}; -next_state(State = #msstate { on_sync = [] }) -> - {stop_sync_timer(State), hibernate}; -next_state(State) -> - {State, 0}. +next_state(State = #msstate { sync_timer_ref = undefined, + on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of + {[], 0} -> {State, hibernate}; + _ -> {start_sync_timer(State), 0} + end; +next_state(State = #msstate { on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of + {[], 0} -> {stop_sync_timer(State), hibernate}; + _ -> {State, 0} + end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), @@ -802,15 +846,21 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs }) -> +internal_sync(State = #msstate { current_file_handle = CurHdl, + on_sync = Syncs, + client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), - case Syncs of - [] -> State1; - _ -> ok = file_handle_cache:sync(CurHdl), - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - State1 #msstate { on_sync = [] } - end. + CGs = dict:fold(fun (_CRef, [], NS) -> NS; + (CRef, Guids, NS) -> [{CRef, Guids} | NS] + end, [], CTG), + if Syncs =:= [] andalso CGs =:= [] -> ok; + true -> file_handle_cache:sync(CurHdl) + end, + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + [(dict:fetch(CRef, CODC))(Guids) || {CRef, Guids} <- CGs], + State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. + read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0b98290c..6f196336 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/4, +-export([init/5, terminate/2, delete_and_terminate/1, publish/4, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -166,7 +166,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries }). + max_journal_entries, on_sync, unsynced_guids }). -record(segment, { num, path, journal_entries, unacked }). @@ -185,18 +185,21 @@ })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict:dictionary(), [segment()]}). +-type(on_sync_fun() :: fun (([rabbit_guid:guid()]) -> ok)). -type(qistate() :: #qistate { dir :: file:filename(), segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), dirty_count :: integer(), - max_journal_entries :: non_neg_integer() + max_journal_entries :: non_neg_integer(), + on_sync :: on_sync_fun(), + unsynced_guids :: [rabbit_guid:guid()] }). -type(startup_fun_state() :: - {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), + {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), A}). --spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), - fun ((rabbit_guid:guid()) -> boolean())) -> +-spec(init/5 :: (rabbit_amqqueue:name(), boolean(), boolean(), + fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) -> {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). @@ -222,12 +225,12 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) -> +init(Name, false, _MsgStoreRecovered, _ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = filelib:is_file(Dir), %% is_file == is file or dir - {0, [], State}; + {0, [], State #qistate { on_sync = OnSyncFun }}; -init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> +init(Name, true, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), Terms = case read_shutdown_terms(Dir) of {error, _} -> []; @@ -240,7 +243,7 @@ init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> init_clean(RecoveredCounts, State); false -> init_dirty(CleanShutdown, ContainsCheckFun, State) end, - {Count, Terms, State1}. + {Count, Terms, State1 #qistate { on_sync = OnSyncFun }}. terminate(Terms, State) -> {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), @@ -252,9 +255,13 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, IsPersistent, + State = #qistate { unsynced_guids = UnsyncedGuids }) + when is_binary(Guid) -> ?GUID_BYTES = size(Guid), - {JournalHdl, State1} = get_journal_handle(State), + {JournalHdl, State1} = get_journal_handle( + State #qistate { + unsynced_guids = [Guid | UnsyncedGuids] }), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; @@ -282,7 +289,7 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% seqids not being in the journal, provided the transaction isn't %% emptied (handled above anyway). ok = file_handle_cache:sync(JournalHdl), - State. + notify_sync(State). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -371,7 +378,9 @@ blank_state(QueueName) -> segments = segments_new(), journal_handle = undefined, dirty_count = 0, - max_journal_entries = MaxJournal }. + max_journal_entries = MaxJournal, + on_sync = fun (_) -> ok end, + unsynced_guids = [] }. clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -576,7 +585,7 @@ flush_journal(State = #qistate { segments = Segments }) -> {JournalHdl, State1} = get_journal_handle(State #qistate { segments = Segments1 }), ok = file_handle_cache:clear(JournalHdl), - State1 #qistate { dirty_count = 0 }. + notify_sync(State1 #qistate { dirty_count = 0 }). append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> @@ -668,6 +677,10 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). +notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> + OnSyncFun(UG), + State #qistate { unsynced_guids = [] }. + %%---------------------------------------------------------------------------- %% segment manipulation %%---------------------------------------------------------------------------- diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index bd57f737..a1a341a9 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -70,16 +70,18 @@ deliver(QPids, Delivery = #delivery{mandatory = false, QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QPids, Delivery) -> - {Success, _} = - delegate:invoke(QPids, - fun (Pid) -> - rabbit_amqqueue:deliver(Pid, Delivery) - end), - {Routed, Handled} = - lists:foldl(fun fold_deliveries/2, {false, []}, Success), - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, Handled}). +deliver(QPids, Delivery = #delivery{mandatory = Mandatory, + immediate = Immediate}) -> + {Success, _} = delegate:invoke( + QPids, fun (Pid) -> + rabbit_amqqueue:deliver(Pid, Delivery) + end), + case check_delivery(Mandatory, Immediate, + lists:foldl(fun fold_deliveries/2, + {false, []}, Success)) of + {routed, Qs} -> {routed, Qs}; + O -> O + end. %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b36ee0be..e03d1e94 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1469,7 +1469,7 @@ msg_store_remove(Guids) -> foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). + rabbit_msg_store:client_init(MsgStore, Ref, undefined), L), MsgStore). test_msg_store() -> restart_msg_store_empty(), @@ -1479,7 +1479,7 @@ test_msg_store() -> %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, Guids), Ref = rabbit_guid:guid(), - MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, undefined), %% publish the first half {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState), %% sync on the first half @@ -1553,7 +1553,7 @@ test_msg_store() -> %% check we don't contain any of the msgs false = msg_store_contains(false, Guids), %% publish the first half again - MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, undefined), {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( @@ -1607,6 +1607,9 @@ init_test_queue() -> test_queue(), true, false, fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) + end, + fun (_) -> + ok %% Sync! end). restart_test_queue(Qi) -> @@ -1642,7 +1645,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, Guid, MSCStateN), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} - end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), + end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref, undefined)}, SeqIds), ok = rabbit_msg_store:client_delete_and_terminate( MSCStateEnd, MsgStore, Ref), {A, B}. @@ -1789,7 +1792,8 @@ variable_queue_publish(IsPersistent, Count, VQ) -> <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), VQN) + end}, <<>>), + false, VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1809,7 +1813,8 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false), + VQ = rabbit_variable_queue:init(test_queue(), true, false, + fun nop/1, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1849,7 +1854,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {VQ9, _} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1859,7 +1864,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). + {VQ3, _} = rabbit_variable_queue:ack([AckTag], VQ2), + publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -1892,7 +1898,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {VQ9, _} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1921,7 +1927,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -1937,7 +1944,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -1967,10 +1975,13 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true), + VQ1 = rabbit_variable_queue:init(QName, true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) end), passed. + +nop(_) -> ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cbc71bcc..cf61ecbe 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,8 +31,8 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, +-export([init/5, init/3, terminate/1, delete_and_terminate/1, + purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, @@ -236,8 +236,11 @@ ram_index_count, out_counter, in_counter, - rates - }). + rates, + msgs_on_disk, + msg_indices_on_disk, + unconfirmed + }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -322,7 +325,10 @@ ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), - rates :: rates() }). + rates :: rates(), + msgs_on_disk :: gb_set(), + msg_indices_on_disk :: gb_set(), + unconfirmed :: gb_set()}). -include("rabbit_backing_queue_spec.hrl"). @@ -369,13 +375,21 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(QueueName, IsDurable, Recover) -> + Self = self(), + init(QueueName, IsDurable, Recover, + fun (Guids) -> msgs_written_to_disk(Self, Guids) end, + fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). + +init(QueueName, IsDurable, Recover, + MsgOnDiskFun, MsgIdxOnDiskFun) -> {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end), + end, + MsgIdxOnDiskFun), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {PRef, TRef, Terms1} = @@ -393,12 +407,16 @@ init(QueueName, IsDurable, Recover) -> end_seq_id = NextSeqId } end, Now = now(), + PersistentClient = case IsDurable of - true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); + true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef, + MsgOnDiskFun); false -> undefined end, - TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), + TransientClient = + rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef, undefined), + State = #vqstate { q1 = queue:new(), q2 = bpqueue:new(), @@ -428,7 +446,10 @@ init(QueueName, IsDurable, Recover) -> ingress = {Now, DeltaCount1}, avg_egress = 0.0, avg_ingress = 0.0, - timestamp = Now } }, + timestamp = Now }, + msgs_on_disk = gb_sets:new(), + msg_indices_on_disk = gb_sets:new(), + unconfirmed = gb_sets:new() }, a(maybe_deltas_to_betas(State)). terminate(State) -> @@ -497,31 +518,37 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, State) -> - {_SeqId, State1} = publish(Msg, false, false, State), +publish(Msg, NeedsConfirming, State) -> + {_SeqId, State1} = publish(Msg, false, false, NeedsConfirming, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> +publish_delivered(false, _Msg, _NC, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; -publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, +publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, + guid = Guid }, + NeedsConfirming, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, pending_ack = PA, - durable = IsDurable }) -> + durable = IsDurable, + unconfirmed = Unconfirmed }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. + Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), + {SeqId, a(State1 #vqstate { + next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1, + unconfirmed = Unconfirmed1 })}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, @@ -581,9 +608,10 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, end. ack(AckTags, State) -> - a(ack(fun rabbit_msg_store:remove/2, - fun (_AckEntry, State1) -> State1 end, - AckTags, State)). + {Guids, State1} = ack(fun rabbit_msg_store:remove/2, + fun (_AckEntry, State1) -> State1 end, + AckTags, State), + {Guids, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, State = #vqstate { durable = IsDurable, @@ -632,20 +660,21 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> end)}. requeue(AckTags, State) -> - a(reduce_memory_use( + {_Guids, State1} = ack(fun rabbit_msg_store:release/2, fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), + {_SeqId, State2} = publish(Msg, true, false, false, State1), State2; ({IsPersistent, Guid}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), + {_SeqId, State3} = publish(Msg, true, true, false, State2), State3 end, - AckTags, State))). + AckTags, State), + a(reduce_memory_use(State1)). len(#vqstate { len = Len }) -> Len. @@ -790,6 +819,9 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. +gb_sets_maybe_insert(false, _Val, Set) -> Set; +gb_sets_maybe_insert(true, Val, Set) -> gb_sets:insert(Val, Set). + msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, @@ -951,14 +983,16 @@ tx_commit_index(State = #vqstate { on_sync = #sync { PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), Pubs = lists:append(lists:reverse(SPubs)), + {_Guids, NewState} = ack(Acks, State), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), + {SeqId, State3} = publish(Msg, false, IsPersistent1, false, + State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + end, {PAcks, NewState}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1012,15 +1046,16 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered, MsgOnDisk, +publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, + IsDelivered, MsgOnDisk, NeedsConfirming, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - ram_msg_count = RamMsgCount }) -> + ram_msg_count = RamMsgCount, + unconfirmed = Unconfirmed}) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, @@ -1030,11 +1065,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), + Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1}}. + ram_msg_count = RamMsgCount + 1, + unconfirmed = Unconfirmed1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> @@ -1119,7 +1156,7 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = @@ -1131,13 +1168,16 @@ ack(MsgStoreFun, Fun, AckTags, State) -> pending_ack = dict:erase(SeqId, PA) })} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold(fun (MsgStore, Guids, ok) -> - MsgStoreFun(MsgStore, Guids) - end, ok, GuidsByStore), + AckdGuids = lists:concat( + orddict:fold(fun (MsgStore, Guids, Gs) -> + MsgStoreFun(MsgStore, Guids), + [Guids | Gs] + end, [], GuidsByStore)), + State2 = remove_confirms(gb_sets:from_list(AckdGuids), State1), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. + {AckdGuids, State2 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1154,6 +1194,46 @@ find_persistent_count(LensByStore) -> end. %%---------------------------------------------------------------------------- +%% Internal plumbing for confirms (aka publisher acks) +%%---------------------------------------------------------------------------- + +remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), + msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), + unconfirmed = gb_sets:difference(UC, GuidSet) }. + +msgs_confirmed(GuidSet, State) -> + {remove_confirms(GuidSet, State), {confirm, gb_sets:to_list(GuidSet)}}. + +msgs_written_to_disk(QPid, Guids) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + GuidSet = gb_sets:from_list(Guids), + msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:intersection( + gb_sets:union(MOD, GuidSet), UC) }) + end). + +msg_indices_written_to_disk(QPid, Guids) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + GuidSet = gb_sets:from_list(Guids), + msgs_confirmed(gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:intersection( + gb_sets:union(MIOD, GuidSet), UC) }) + end). + +%%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |