summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-03 21:20:14 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-03 21:20:14 +0100
commit39f46d22ed36b864a38cafe9b8f18ce5bec313d8 (patch)
tree126ce89772bbf98b0ca03ed73b0e602333eab1ac
parent120be66887e4d116dfc6a5ae1a4f68bb310c269b (diff)
downloadrabbitmq-server-bug22616.tar.gz
pluggable queue backendsbug22616
- behaviour and specs for pluggable queue backends - reworking of the queue code to support pluggability, including hookups to memory monitoring and file handle management - interface to existing persister via the new API All these changes were cherry-picked from the bug21673 branch.
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit_backing_queue_spec.hrl63
-rw-r--r--src/rabbit_amqqueue.erl20
-rw-r--r--src/rabbit_amqqueue_process.erl582
-rw-r--r--src/rabbit_backing_queue.erl133
-rw-r--r--src/rabbit_invariable_queue.erl264
6 files changed, 772 insertions, 291 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index ad8e3549..bdf407eb 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -18,6 +18,7 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
+ {backing_queue_module, rabbit_invariable_queue},
{persister_max_wrap_entries, 500},
{persister_hibernate_after, 10000},
{default_user, <<"guest">>},
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
new file mode 100644
index 00000000..1b536dfa
--- /dev/null
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -0,0 +1,63 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-type(fetch_result() ::
+ %% Message, IsDelivered, AckTag, Remaining_Len
+ ('empty'|{basic_message(), boolean(), ack(), non_neg_integer()})).
+-type(is_durable() :: boolean()).
+-type(attempt_recovery() :: boolean()).
+-type(purged_msg_count() :: non_neg_integer()).
+-type(ack_required() :: boolean()).
+
+-spec(start/1 :: ([queue_name()]) -> 'ok').
+-spec(init/3 :: (queue_name(), is_durable(), attempt_recovery()) -> state()).
+-spec(terminate/1 :: (state()) -> state()).
+-spec(delete_and_terminate/1 :: (state()) -> state()).
+-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
+-spec(publish/2 :: (basic_message(), state()) -> state()).
+-spec(publish_delivered/3 ::
+ (ack_required(), basic_message(), state()) -> {ack(), state()}).
+-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
+-spec(ack/2 :: ([ack()], state()) -> state()).
+-spec(tx_publish/3 :: (txn(), basic_message(), state()) -> state()).
+-spec(tx_ack/3 :: (txn(), [ack()], state()) -> state()).
+-spec(tx_rollback/2 :: (txn(), state()) -> {[ack()], state()}).
+-spec(tx_commit/3 :: (txn(), fun (() -> any()), state()) -> {[ack()], state()}).
+-spec(requeue/2 :: ([ack()], state()) -> state()).
+-spec(len/1 :: (state()) -> non_neg_integer()).
+-spec(is_empty/1 :: (state()) -> boolean()).
+-spec(set_ram_duration_target/2 ::
+ (('undefined' | 'infinity' | number()), state()) -> state()).
+-spec(ram_duration/1 :: (state()) -> {number(), state()}).
+-spec(needs_sync/1 :: (state()) -> boolean()).
+-spec(sync/1 :: (state()) -> state()).
+-spec(handle_pre_hibernate/1 :: (state()) -> state()).
+-spec(status/1 :: (state()) -> [{atom(), any()}]).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b86cdd04..2d75b15b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -33,6 +33,8 @@
-export([start/0, declare/4, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
+ maybe_run_queue_via_backing_queue/2,
+ update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
@@ -109,6 +111,9 @@
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
+-spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok').
+-spec(update_ram_duration/1 :: (pid()) -> 'ok').
+-spec(set_ram_duration_target/2 :: (pid(), number()) -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -119,9 +124,8 @@
start() ->
DurableQueues = find_durable_queues(),
- ok = rabbit_sup:start_child(
- rabbit_persister,
- [[QName || #amqqueue{name = QName} <- DurableQueues]]),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
@@ -352,6 +356,16 @@ internal_delete(QueueName) ->
ok
end.
+maybe_run_queue_via_backing_queue(QPid, Fun) ->
+ gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun},
+ infinity).
+
+update_ram_duration(QPid) ->
+ gen_server2:pcast(QPid, 8, update_ram_duration).
+
+set_ram_duration_target(QPid, Duration) ->
+ gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}).
+
set_maximum_since_use(QPid, Age) ->
gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d745a69c..06712e9c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -35,11 +35,14 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
+-define(UNSENT_MESSAGE_LIMIT, 100).
+-define(SYNC_INTERVAL, 5). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-export([start_link/1, info_keys/0]).
--export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2, handle_pre_hibernate/1]).
-import(queue).
-import(erlang).
@@ -50,21 +53,22 @@
owner,
exclusive_consumer,
has_had_consumers,
- next_msg_id,
- message_buffer,
+ backing_queue,
+ backing_queue_state,
active_consumers,
- blocked_consumers}).
+ blocked_consumers,
+ sync_timer_ref,
+ rate_timer_ref
+ }).
-record(consumer, {tag, ack_required}).
--record(tx, {is_persistent, pending_messages, pending_acks}).
-
%% These are held in our process dictionary
-record(cr, {consumer_count,
ch_pid,
limiter_pid,
monitor_ref,
- unacked_messages,
+ acktags,
is_limit_active,
txn,
unsent_message_count}).
@@ -82,7 +86,9 @@
messages_unacknowledged,
messages,
consumers,
- memory]).
+ memory,
+ backing_queue_status
+ ]).
%%----------------------------------------------------------------------------
@@ -94,34 +100,108 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ process_flag(trap_exit, true),
+ {ok, BQ} = application:get_env(backing_queue_module),
+
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
- next_msg_id = 1,
- message_buffer = undefined,
+ backing_queue = BQ,
+ backing_queue_state = undefined,
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, hibernate,
+ blocked_consumers = queue:new(),
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(_Reason, #q{message_buffer = undefined}) ->
- ok;
-terminate(_Reason, State) ->
+terminate(shutdown, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
+terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
+terminate(_Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
- QName = qname(State),
- lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end,
- all_tx()),
- ok = purge_message_buffer(QName, State#q.message_buffer),
- ok = rabbit_amqqueue:internal_delete(QName).
+ State1 = terminate_shutdown(fun (BQS) -> BQ:delete_and_terminate(BQS) end,
+ State),
+ ok = rabbit_amqqueue:internal_delete(qname(State1)).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
+terminate_shutdown(Fun, State) ->
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ stop_sync_timer(stop_rate_timer(State)),
+ case BQS of
+ undefined -> State;
+ _ -> ok = rabbit_memory_monitor:deregister(self()),
+ BQS1 = lists:foldl(
+ fun (#cr{txn = none}, BQSN) ->
+ BQSN;
+ (#cr{txn = Txn}, BQSN) ->
+ {_AckTags, BQSN1} =
+ BQ:tx_rollback(Txn, BQSN),
+ BQSN1
+ end, BQS, all_ch_record()),
+ State1#q{backing_queue_state = Fun(BQS1)}
+ end.
-noreply(NewState) -> {noreply, NewState, hibernate}.
+reply(Reply, NewState) ->
+ assert_invariant(NewState),
+ {NewState1, Timeout} = next_state(NewState),
+ {reply, Reply, NewState1, Timeout}.
+
+noreply(NewState) ->
+ assert_invariant(NewState),
+ {NewState1, Timeout} = next_state(NewState),
+ {noreply, NewState1, Timeout}.
+
+next_state(State) ->
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ ensure_rate_timer(State),
+ case BQ:needs_sync(BQS)of
+ true -> {ensure_sync_timer(State1), 0};
+ false -> {stop_sync_timer(State1), hibernate}
+ end.
+
+ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
+ {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL,
+ rabbit_amqqueue, maybe_run_queue_via_backing_queue,
+ [self(), fun (BQS) -> BQ:sync(BQS) end]),
+ State#q{sync_timer_ref = TRef};
+ensure_sync_timer(State) ->
+ State.
+
+stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
+ State;
+stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#q{sync_timer_ref = undefined}.
+
+ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
+ {ok, TRef} = timer:apply_after(
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ rabbit_amqqueue, update_ram_duration,
+ [self()]),
+ State#q{rate_timer_ref = TRef};
+ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
+ State#q{rate_timer_ref = undefined};
+ensure_rate_timer(State) ->
+ State.
+
+stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
+ State;
+stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
+ State#q{rate_timer_ref = undefined};
+stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#q{rate_timer_ref = undefined}.
+
+assert_invariant(#q{active_consumers = AC,
+ backing_queue = BQ, backing_queue_state = BQS}) ->
+ true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -137,7 +217,7 @@ ch_record(ChPid) ->
C = #cr{consumer_count = 0,
ch_pid = ChPid,
monitor_ref = MonitorRef,
- unacked_messages = dict:new(),
+ acktags = sets:new(),
is_limit_active = false,
txn = none,
unsent_message_count = 0},
@@ -168,29 +248,33 @@ record_current_channel_tx(ChPid, Txn) ->
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-deliver_immediately(Message, IsDelivered,
- State = #q{q = #amqqueue{name = QName},
- active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers,
- next_msg_id = NextId}) ->
+deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
+ State = #q{q = #amqqueue{name = QName},
+ active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
ActiveConsumersTail} ->
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
- unacked_messages = UAM} = ch_record(ChPid),
- case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
+ acktags = ChAckTags} = ch_record(ChPid),
+ IsMsgReady = PredFun(FunAcc, State),
+ case (IsMsgReady andalso
+ rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
true ->
+ {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
+ DeliverFun(AckRequired, FunAcc, State),
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, IsDelivered, Message}),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
+ {QName, self(), AckTag, IsDelivered, Message}),
+ ChAckTags1 = case AckRequired of
+ true -> sets:add_element(
+ AckTag, ChAckTags);
+ false -> ChAckTags
+ end,
NewC = C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM},
+ acktags = ChAckTags1},
store_ch_record(NewC),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
@@ -204,88 +288,85 @@ deliver_immediately(Message, IsDelivered,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- {offered, AckRequired,
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers,
- next_msg_id = NextId + 1}};
- false ->
+ State2 = State1#q{
+ active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers},
+ deliver_msgs_to_consumers(Funs, FunAcc1, State2);
+ %% if IsMsgReady then we've hit the limiter
+ false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
BlockedConsumers),
- deliver_immediately(
- Message, IsDelivered,
+ deliver_msgs_to_consumers(
+ Funs, FunAcc,
State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
- end;
- {empty, _} ->
- {not_offered, State}
- end.
-
-run_message_queue(State = #q{message_buffer = MessageBuffer}) ->
- run_message_queue(MessageBuffer, State).
-
-run_message_queue(MessageBuffer, State) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, IsDelivered}}, BufferTail} ->
- case deliver_immediately(Message, IsDelivered, State) of
- {offered, true, NewState} ->
- persist_delivery(qname(State), Message, IsDelivered),
- run_message_queue(BufferTail, NewState);
- {offered, false, NewState} ->
- persist_auto_ack(qname(State), Message),
- run_message_queue(BufferTail, NewState);
- {not_offered, NewState} ->
- NewState#q{message_buffer = MessageBuffer}
+ blocked_consumers = NewBlockedConsumers});
+ false ->
+ %% no message was ready, so we don't need to block anyone
+ {FunAcc, State}
end;
{empty, _} ->
- State#q{message_buffer = MessageBuffer}
+ {FunAcc, State}
end.
-attempt_delivery(none, _ChPid, Message, State) ->
- case deliver_immediately(Message, false, State) of
- {offered, false, State1} ->
- {true, State1};
- {offered, true, State1} ->
- persist_message(none, qname(State), Message),
- persist_delivery(qname(State), Message, false),
- {true, State1};
- {not_offered, State1} ->
- {false, State1}
- end;
-attempt_delivery(Txn, ChPid, Message, State) ->
- persist_message(Txn, qname(State), Message),
- record_pending_message(Txn, ChPid, Message),
- {true, State}.
+deliver_from_queue_pred(IsEmpty, _State) ->
+ not IsEmpty.
+
+deliver_from_queue_deliver(AckRequired, false,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {{Message, IsDelivered, AckTag, Remaining}, BQS1} =
+ BQ:fetch(AckRequired, BQS),
+ {{Message, IsDelivered, AckTag}, 0 == Remaining,
+ State #q { backing_queue_state = BQS1 }}.
+
+run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Funs = {fun deliver_from_queue_pred/2,
+ fun deliver_from_queue_deliver/3},
+ IsEmpty = BQ:is_empty(BQS),
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
+ State1.
+
+attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
+ PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
+ DeliverFun =
+ fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
+ {AckTag, BQS1} =
+ BQ:publish_delivered(AckRequired, Message, BQS),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS1}}
+ end,
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
+attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ record_current_channel_tx(ChPid, Txn),
+ {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
-deliver_or_enqueue(Txn, ChPid, Message, State) ->
+deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
- persist_message(Txn, qname(State), Message),
- NewMB = queue:in({Message, false}, NewState#q.message_buffer),
- {false, NewState#q{message_buffer = NewMB}}
+ %% Txn is none and no unblocked channels with consumers
+ BQS = BQ:publish(Message, State #q.backing_queue_state),
+ {false, NewState#q{backing_queue_state = BQS}}
end.
-deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
- run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)),
- State).
+requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
+ maybe_run_queue_via_backing_queue(
+ fun (BQS) -> BQ:requeue(AckTags, BQS) end, State).
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
remove_consumer(ChPid, ConsumerTag, Queue) ->
- %% TODO: replace this with queue:filter/2 once we move to R12
- queue:from_list(lists:filter(
- fun ({CP, #consumer{tag = CT}}) ->
- (CP /= ChPid) or (CT /= ConsumerTag)
- end, queue:to_list(Queue))).
+ queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP /= ChPid) or (CT /= ConsumerTag)
+ end, Queue).
remove_consumers(ChPid, Queue) ->
- %% TODO: replace this with queue:filter/2 once we move to R12
- queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(Queue))).
+ queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue).
move_consumers(ChPid, From, To) ->
{Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
@@ -320,7 +401,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
not_found ->
{ok, State};
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
- unacked_messages = UAM} ->
+ acktags = ChAckTags} ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
State1 = State#q{
@@ -334,15 +415,12 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
ChPid, State#q.blocked_consumers)},
case should_auto_delete(State1) of
true -> {stop, State1};
- false -> case Txn of
- none -> ok;
- _ -> ok = rollback_work(Txn, qname(State1)),
- erase_tx(Txn)
- end,
- {ok, deliver_or_enqueue_n(
- [{Message, true} ||
- {_MsgId, Message} <- dict:to_list(UAM)],
- State1)}
+ false -> State2 = case Txn of
+ none -> State1;
+ _ -> rollback_transaction(Txn, ChPid,
+ State1)
+ end,
+ {ok, requeue_and_run(sets:to_list(ChAckTags), State2)}
end
end.
@@ -373,122 +451,30 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-persist_message(_Txn, _QName, #basic_message{is_persistent = false}) ->
- ok;
-persist_message(Txn, QName, Message) ->
- M = Message#basic_message{
- %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore
- content = rabbit_binary_parser:clear_decoded_content(
- Message#basic_message.content)},
- persist_work(Txn, QName,
- [{publish, M, {QName, M#basic_message.guid}}]).
-
-persist_delivery(_QName, _Message,
- true) ->
- ok;
-persist_delivery(_QName, #basic_message{is_persistent = false},
- _IsDelivered) ->
- ok;
-persist_delivery(QName, #basic_message{guid = Guid},
- _IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, Guid}}]).
-
-persist_acks(Txn, QName, Messages) ->
- persist_work(Txn, QName,
- [{ack, {QName, Guid}} || #basic_message{
- guid = Guid, is_persistent = true} <- Messages]).
-
-persist_auto_ack(_QName, #basic_message{is_persistent = false}) ->
- ok;
-persist_auto_ack(QName, #basic_message{guid = Guid}) ->
- %% auto-acks are always non-transactional
- rabbit_persister:dirty_work([{ack, {QName, Guid}}]).
-
-persist_work(_Txn,_QName, []) ->
- ok;
-persist_work(none, _QName, WorkList) ->
- rabbit_persister:dirty_work(WorkList);
-persist_work(Txn, QName, WorkList) ->
- mark_tx_persistent(Txn),
- rabbit_persister:extend_transaction({Txn, QName}, WorkList).
-
-commit_work(Txn, QName) ->
- do_if_persistent(fun rabbit_persister:commit_transaction/1,
- Txn, QName).
-
-rollback_work(Txn, QName) ->
- do_if_persistent(fun rabbit_persister:rollback_transaction/1,
- Txn, QName).
-
-%% optimisation: don't do unnecessary work
-%% it would be nice if this was handled by the persister
-do_if_persistent(F, Txn, QName) ->
- case is_tx_persistent(Txn) of
- false -> ok;
- true -> ok = F({Txn, QName})
- end.
-
-lookup_tx(Txn) ->
- case get({txn, Txn}) of
- undefined -> #tx{is_persistent = false,
- pending_messages = [],
- pending_acks = []};
- V -> V
- end.
-
-store_tx(Txn, Tx) ->
- put({txn, Txn}, Tx).
-
-erase_tx(Txn) ->
- erase({txn, Txn}).
-
-all_tx() ->
- [Txn || {{txn, Txn}, _} <- get()].
-
-mark_tx_persistent(Txn) ->
- Tx = lookup_tx(Txn),
- store_tx(Txn, Tx#tx{is_persistent = true}).
-
-is_tx_persistent(Txn) ->
- #tx{is_persistent = Res} = lookup_tx(Txn),
- Res.
-
-record_pending_message(Txn, ChPid, Message) ->
- Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
- record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}).
+maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
+ run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
+
+commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {AckTags, BQS1} =
+ BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS),
+ %% ChPid must be known here because of the participant management
+ %% by the channel.
+ C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
+ State#q{backing_queue_state = BQS1}.
+
+rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ %% Iff we removed acktags from the channel record on ack+txn then
+ %% we would add them back in here (would also require ChPid)
+ record_current_channel_tx(ChPid, none),
+ State#q{backing_queue_state = BQS1}.
-record_pending_acks(Txn, ChPid, MsgIds) ->
- Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
- record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending]}).
-
-process_pending(Txn, ChPid, State) ->
- #tx{pending_messages = PendingMessages, pending_acks = PendingAcks} =
- lookup_tx(Txn),
- C = #cr{unacked_messages = UAM} = lookup_ch(ChPid),
- {_Acked, Remaining} = collect_messages(lists:append(PendingAcks), UAM),
- store_ch_record(C#cr{unacked_messages = Remaining}),
- deliver_or_enqueue_n(lists:reverse(PendingMessages), State).
-
-collect_messages(MsgIds, UAM) ->
- lists:mapfoldl(
- fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end,
- UAM, MsgIds).
-
-purge_message_buffer(QName, MessageBuffer) ->
- Messages =
- [[Message || {Message, _IsDelivered} <-
- queue:to_list(MessageBuffer)] |
- lists:map(
- fun (#cr{unacked_messages = UAM}) ->
- [Message || {_MsgId, Message} <- dict:to_list(UAM)]
- end,
- all_ch_record())],
- %% the simplest, though certainly not the most obvious or
- %% efficient, way to purge messages from the persister is to
- %% artifically ack them.
- persist_acks(none, QName, lists:append(Messages)).
+subtract_acks(A, B) when is_list(B) ->
+ lists:foldl(fun sets:del_element/2, A, B).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -510,11 +496,10 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
ConsumerTag;
-i(messages_ready, #q{message_buffer = MessageBuffer}) ->
- queue:len(MessageBuffer);
+i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([dict:size(UAM) ||
- #cr{unacked_messages = UAM} <- all_ch_record()]);
+ lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
@@ -523,6 +508,8 @@ i(consumers, State) ->
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
+i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:status(BQS);
i(Item, _) ->
throw({bad_argument, Item}).
@@ -572,13 +559,8 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
reply(Delivered, NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
- ok = commit_work(Txn, qname(State)),
- %% optimisation: we reply straight away so the sender can continue
- gen_server2:reply(From, ok),
- NewState = process_pending(Txn, ChPid, State),
- erase_tx(Txn),
- record_current_channel_tx(ChPid, none),
- noreply(NewState);
+ NewState = commit_transaction(Txn, From, ChPid, State),
+ noreply(run_message_queue(NewState));
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -593,26 +575,19 @@ handle_call({notify_down, ChPid}, _From, State) ->
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
- next_msg_id = NextId,
- message_buffer = MessageBuffer}) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, IsDelivered}}, BufferTail} ->
- AckRequired = not(NoAck),
+ backing_queue_state = BQS, backing_queue = BQ}) ->
+ AckRequired = not NoAck,
+ case BQ:fetch(AckRequired, BQS) of
+ {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1});
+ {{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
case AckRequired of
- true ->
- persist_delivery(QName, Message, IsDelivered),
- C = #cr{unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = dict:store(NextId, Message, UAM),
- store_ch_record(C#cr{unacked_messages = NewUAM});
- false ->
- persist_auto_ack(QName, Message)
+ true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ store_ch_record(
+ C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
+ false -> ok
end,
- Msg = {QName, self(), NextId, IsDelivered, Message},
- reply({ok, queue:len(BufferTail), Msg},
- State#q{message_buffer = BufferTail,
- next_msg_id = NextId + 1});
- {empty, _} ->
- reply(empty, State)
+ Msg = {QName, self(), AckTag, IsDelivered, Message},
+ reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1})
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
@@ -630,7 +605,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ok ->
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
- ack_required = not(NoAck)},
+ ack_required = not NoAck},
store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
case ConsumerCount of
@@ -692,14 +667,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- message_buffer = MessageBuffer,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
active_consumers = ActiveConsumers}) ->
- Length = queue:len(MessageBuffer),
- reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
+ reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
- State = #q{message_buffer = MessageBuffer}) ->
- IsEmpty = queue:is_empty(MessageBuffer),
+ State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) ->
@@ -707,13 +682,13 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IfUnused and not(IsUnused) ->
reply({error, in_use}, State);
true ->
- {stop, normal, {ok, queue:len(MessageBuffer)}, State}
+ {stop, normal, {ok, BQ:len(BQS)}, State}
end;
-handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
- ok = purge_message_buffer(qname(State), MessageBuffer),
- reply({ok, queue:len(MessageBuffer)},
- State#q{message_buffer = queue:new()});
+handle_call(purge, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {Count, BQS1} = BQ:purge(BQS),
+ reply({ok, Count}, State#q{backing_queue_state = BQS1});
handle_call({claim_queue, ReaderPid}, _From,
State = #q{owner = Owner, exclusive_consumer = Holder}) ->
@@ -736,55 +711,54 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(ok, State);
_ ->
reply(locked, State)
- end.
+ end;
+
+handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
+ reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
-handle_cast({init, Recover}, State = #q{message_buffer = undefined}) ->
+handle_cast({init, Recover},
+ State = #q{q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined}) ->
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
- Messages = case Recover of
- true -> rabbit_persister:queue_content(qname(State));
- false -> []
- end,
- noreply(State#q{message_buffer = queue:from_list(Messages)});
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)});
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
noreply(NewState);
-handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
+handle_cast({ack, Txn, AckTags, ChPid},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
- C = #cr{unacked_messages = UAM} ->
- {Acked, Remaining} = collect_messages(MsgIds, UAM),
- persist_acks(Txn, qname(State), Acked),
- case Txn of
- none ->
- store_ch_record(C#cr{unacked_messages = Remaining});
- _ ->
- record_pending_acks(Txn, ChPid, MsgIds)
- end,
- noreply(State)
+ C = #cr{acktags = ChAckTags} ->
+ {C1, BQS1} =
+ case Txn of
+ none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
+ _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
+ end,
+ store_ch_record(C1),
+ noreply(State #q { backing_queue_state = BQS1 })
end;
handle_cast({rollback, Txn, ChPid}, State) ->
- ok = rollback_work(Txn, qname(State)),
- erase_tx(Txn),
- record_current_channel_tx(ChPid, none),
- noreply(State);
+ noreply(rollback_transaction(Txn, ChPid, State));
-handle_cast({requeue, MsgIds, ChPid}, State) ->
+handle_cast({requeue, AckTags, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
noreply(State);
- C = #cr{unacked_messages = UAM} ->
- {Messages, NewUAM} = collect_messages(MsgIds, UAM),
- store_ch_record(C#cr{unacked_messages = NewUAM}),
- noreply(deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State))
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(requeue_and_run(AckTags, State))
end;
handle_cast({unblock, ChPid}, State) ->
@@ -819,6 +793,20 @@ handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
noreply(State);
+handle_cast(update_ram_duration, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State#q{rate_timer_ref = just_measured,
+ backing_queue_state = BQS2});
+
+handle_cast({set_ram_duration_target, Duration},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ noreply(State#q{backing_queue_state = BQS1});
+
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
@@ -842,6 +830,24 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
+handle_info(timeout, State = #q{backing_queue = BQ}) ->
+ noreply(maybe_run_queue_via_backing_queue(
+ fun (BQS) -> BQ:sync(BQS) end, State));
+
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
+
+handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
+ {hibernate, State};
+handle_pre_hibernate(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = BQ:handle_pre_hibernate(BQS),
+ %% no activity for a while == 0 egress and ingress rates
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), infinity),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
new file mode 100644
index 00000000..2dba00ad
--- /dev/null
+++ b/src/rabbit_backing_queue.erl
@@ -0,0 +1,133 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_backing_queue).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% Called on startup with a list of durable queue names. The
+ %% queues aren't being started at this point, but this call
+ %% allows the backing queue to perform any checking necessary for
+ %% the consistency of those queues, or initialise any other
+ %% shared resources.
+ {start, 1},
+
+ %% Initialise the backing queue and its state.
+ {init, 3},
+
+ %% Called on queue shutdown when queue isn't being deleted.
+ {terminate, 1},
+
+ %% Called when the queue is terminating and needs to delete all
+ %% its content.
+ {delete_and_terminate, 1},
+
+ %% Remove all messages in the queue, but not messages which have
+ %% been fetched and are pending acks.
+ {purge, 1},
+
+ %% Publish a message.
+ {publish, 2},
+
+ %% Called for messages which have already been passed straight
+ %% out to a client. The queue will be empty for these calls
+ %% (i.e. saves the round trip through the backing queue).
+ {publish_delivered, 3},
+
+ %% Produce the next message.
+ {fetch, 2},
+
+ %% Acktags supplied are for messages which can now be forgotten
+ %% about.
+ {ack, 2},
+
+ %% A publish, but in the context of a transaction.
+ {tx_publish, 3},
+
+ %% Acks, but in the context of a transaction.
+ {tx_ack, 3},
+
+ %% Undo anything which has been done in the context of the
+ %% specified transaction.
+ {tx_rollback, 2},
+
+ %% Commit a transaction. The Fun passed in must be called once
+ %% the messages have really been commited. This CPS permits the
+ %% possibility of commit coalescing.
+ {tx_commit, 3},
+
+ %% Reinsert messages into the queue which have already been
+ %% delivered and were pending acknowledgement.
+ {requeue, 2},
+
+ %% How long is my queue?
+ {len, 1},
+
+ %% Is my queue empty?
+ {is_empty, 1},
+
+ %% For the next three functions, the assumption is that you're
+ %% monitoring something like the ingress and egress rates of the
+ %% queue. The RAM duration is thus the length of time represented
+ %% by the messages held in RAM given the current rates. If you
+ %% want to ignore all of this stuff, then do so, and return 0 in
+ %% ram_duration/1.
+
+ %% The target is to have no more messages in RAM than indicated
+ %% by the duration and the current queue rates.
+ {set_ram_duration_target, 2},
+
+ %% Optionally recalculate the duration internally (likely to be
+ %% just update your internal rates), and report how many seconds
+ %% the messages in RAM represent given the current rates of the
+ %% queue.
+ {ram_duration, 1},
+
+ %% Should 'sync' be called as soon as the queue process can
+ %% manage (either on an empty mailbox, or when a timer fires)?
+ {needs_sync, 1},
+
+ %% Called (eventually) after needs_sync returns 'true'. Note this
+ %% may be called more than once for each 'true' returned from
+ %% needs_sync.
+ {sync, 1},
+
+ %% Called immediately before the queue hibernates.
+ {handle_pre_hibernate, 1},
+
+ %% Exists for debugging purposes, to be able to expose state via
+ %% rabbitmqctl list_queues backing_queue_status
+ {status, 1}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
new file mode 100644
index 00000000..b4fd9156
--- /dev/null
+++ b/src/rabbit_invariable_queue.erl
@@ -0,0 +1,264 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_invariable_queue).
+
+-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
+ publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
+ tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
+ set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1,
+ handle_pre_hibernate/1, status/1]).
+
+-export([start/1]).
+
+-behaviour(rabbit_backing_queue).
+
+-include("rabbit.hrl").
+
+-record(iv_state, { queue, qname, len, pending_ack }).
+-record(tx, { pending_messages, pending_acks, is_persistent }).
+
+-ifdef(use_specs).
+
+-type(ack() :: guid() | 'blank_ack').
+-type(state() :: #iv_state { queue :: queue(),
+ qname :: queue_name(),
+ len :: non_neg_integer(),
+ pending_ack :: dict()
+ }).
+-include("rabbit_backing_queue_spec.hrl").
+
+-endif.
+
+start(DurableQueues) ->
+ ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]).
+
+init(QName, IsDurable, Recover) ->
+ Q = queue:from_list(case IsDurable andalso Recover of
+ true -> rabbit_persister:queue_content(QName);
+ false -> []
+ end),
+ #iv_state { queue = Q, qname = QName, len = queue:len(Q),
+ pending_ack = dict:new() }.
+
+terminate(State) ->
+ State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
+
+delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) ->
+ ok = persist_acks(none, QName, dict:fetch_keys(PA), PA),
+ {_PLen, State1} = purge(State),
+ terminate(State1).
+
+purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
+ %% We do not purge messages pending acks.
+ {AckTags, PA} =
+ rabbit_misc:queue_fold(
+ fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) ->
+ Acc;
+ ({Msg = #basic_message { guid = Guid }, IsDelivered},
+ {AckTagsN, PAN}) ->
+ ok = persist_delivery(QName, Msg, IsDelivered),
+ {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
+ end, {[], dict:new()}, Q),
+ ok = persist_acks(none, QName, AckTags, PA),
+ {Len, State #iv_state { len = 0, queue = queue:new() }}.
+
+publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) ->
+ ok = persist_message(none, QName, Msg),
+ State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
+
+publish_delivered(false, _Msg, State) ->
+ {blank_ack, State};
+publish_delivered(true, Msg = #basic_message { guid = Guid },
+ State = #iv_state { qname = QName, len = 0,
+ pending_ack = PA }) ->
+ ok = persist_message(none, QName, Msg),
+ ok = persist_delivery(QName, Msg, false),
+ {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
+
+fetch(_AckRequired, State = #iv_state { len = 0 }) ->
+ {empty, State};
+fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len,
+ pending_ack = PA }) ->
+ {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
+ queue:out(Q),
+ Len1 = Len - 1,
+ ok = persist_delivery(QName, Msg, IsDelivered),
+ PA1 = dict:store(Guid, Msg, PA),
+ {AckTag, PA2} = case AckRequired of
+ true -> {Guid, PA1};
+ false -> ok = persist_acks(none, QName, [Guid], PA1),
+ {blank_ack, PA}
+ end,
+ {{Msg, IsDelivered, AckTag, Len1},
+ State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
+
+ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
+ ok = persist_acks(none, QName, AckTags, PA),
+ PA1 = remove_acks(AckTags, PA),
+ State #iv_state { pending_ack = PA1 }.
+
+tx_publish(Txn, Msg, State = #iv_state { qname = QName }) ->
+ Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
+ ok = persist_message(Txn, QName, Msg),
+ State.
+
+tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
+ Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
+ ok = persist_acks(Txn, QName, AckTags, PA),
+ State.
+
+tx_rollback(Txn, State = #iv_state { qname = QName }) ->
+ #tx { pending_acks = AckTags } = lookup_tx(Txn),
+ ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1,
+ Txn, QName),
+ erase_tx(Txn),
+ {lists:flatten(AckTags), State}.
+
+tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA,
+ queue = Q, len = Len }) ->
+ #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn),
+ ok = do_if_persistent(fun rabbit_persister:commit_transaction/1,
+ Txn, QName),
+ erase_tx(Txn),
+ Fun(),
+ AckTags1 = lists:flatten(AckTags),
+ PA1 = remove_acks(AckTags1, PA),
+ {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) ->
+ {queue:in({Msg, false}, QN), LenN + 1}
+ end, {Q, Len}, PubsRev),
+ {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
+
+requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q,
+ len = Len }) ->
+ %% We don't need to touch the persister here - the persister will
+ %% already have these messages published and delivered as
+ %% necessary. The complication is that the persister's seq_id will
+ %% now be wrong, given the position of these messages in our queue
+ %% here. However, the persister's seq_id is only used for sorting
+ %% on startup, and requeue is silent as to where the requeued
+ %% messages should appear, thus the persister is permitted to sort
+ %% based on seq_id, even though it'll likely give a different
+ %% order to the last known state of our queue, prior to shutdown.
+ {Q1, Len1} = lists:foldl(
+ fun (Guid, {QN, LenN}) ->
+ {ok, Msg = #basic_message {}} = dict:find(Guid, PA),
+ {queue:in({Msg, true}, QN), LenN + 1}
+ end, {Q, Len}, AckTags),
+ PA1 = remove_acks(AckTags, PA),
+ State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }.
+
+len(#iv_state { len = Len }) -> Len.
+
+is_empty(State) -> 0 == len(State).
+
+set_ram_duration_target(_DurationTarget, State) -> State.
+
+ram_duration(State) -> {0, State}.
+
+needs_sync(_State) -> false.
+
+sync(State) -> State.
+
+handle_pre_hibernate(State) -> State.
+
+status(_State) -> [].
+
+%%----------------------------------------------------------------------------
+
+remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags).
+
+%%----------------------------------------------------------------------------
+
+lookup_tx(Txn) ->
+ case get({txn, Txn}) of
+ undefined -> #tx { pending_messages = [],
+ pending_acks = [],
+ is_persistent = false };
+ V -> V
+ end.
+
+store_tx(Txn, Tx) ->
+ put({txn, Txn}, Tx).
+
+erase_tx(Txn) ->
+ erase({txn, Txn}).
+
+mark_tx_persistent(Txn) ->
+ store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }).
+
+is_tx_persistent(Txn) ->
+ (lookup_tx(Txn)) #tx.is_persistent.
+
+do_if_persistent(F, Txn, QName) ->
+ ok = case is_tx_persistent(Txn) of
+ false -> ok;
+ true -> F({Txn, QName})
+ end.
+
+%%----------------------------------------------------------------------------
+
+persist_message(_Txn, _QName, #basic_message { is_persistent = false }) ->
+ ok;
+persist_message(Txn, QName, Msg) ->
+ Msg1 = Msg #basic_message {
+ %% don't persist any recoverable decoded properties,
+ %% rebuild from properties_bin on restore
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)},
+ persist_work(Txn, QName,
+ [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]).
+
+persist_delivery(_QName, #basic_message { is_persistent = false },
+ _IsDelivered) ->
+ ok;
+persist_delivery(_QName, _Message, true) ->
+ ok;
+persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) ->
+ persist_work(none, QName, [{deliver, {QName, Guid}}]).
+
+persist_acks(Txn, QName, AckTags, PA) ->
+ persist_work(Txn, QName,
+ [{ack, {QName, Guid}} || Guid <- AckTags,
+ begin
+ {ok, Msg} = dict:find(Guid, PA),
+ Msg #basic_message.is_persistent
+ end]).
+
+persist_work(_Txn,_QName, []) ->
+ ok;
+persist_work(none, _QName, WorkList) ->
+ rabbit_persister:dirty_work(WorkList);
+persist_work(Txn, QName, WorkList) ->
+ mark_tx_persistent(Txn),
+ rabbit_persister:extend_transaction({Txn, QName}, WorkList).