diff options
-rw-r--r-- | src/gen_server2.erl | 6 | ||||
-rw-r--r-- | src/rabbit.erl | 38 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 39 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 1 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 122 |
6 files changed, 99 insertions, 126 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index c33582e3..94a23fb9 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -607,9 +607,9 @@ process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, Msg) -> case Msg of {system, From, Req} -> - sys:handle_system_msg - (Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, TimeoutState, Queue]); + sys:handle_system_msg( + Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, TimeoutState, Queue]); %% gen_server puts Hib on the end as the 7th arg, but that %% version of the function seems not to be documented so %% leaving out for now. diff --git a/src/rabbit.erl b/src/rabbit.erl index 26f244bc..40f38b3b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -47,7 +47,8 @@ [{description, "codec correctness check"}, {mfa, {rabbit_binary_generator, check_empty_content_body_frame_size, - []}}]}). + []}}, + {enables, external_infrastructure}]}). -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, @@ -65,21 +66,21 @@ [{description, "exchange type registry"}, {mfa, {rabbit_sup, start_child, [rabbit_exchange_type_registry]}}, - {enables, kernel_ready}, - {requires, external_infrastructure}]}). + {requires, external_infrastructure}, + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_log]}}, - {enables, kernel_ready}, - {requires, external_infrastructure}]}). + {requires, external_infrastructure}, + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}, - {requires, external_infrastructure}]}). + {requires, external_infrastructure}, + {enables, kernel_ready}]}). -rabbit_boot_step({kernel_ready, [{description, "kernel ready"}, @@ -113,35 +114,36 @@ {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, - [{description, "core initialized"}]}). + [{description, "core initialized"}, + {requires, kernel_ready}]}). -rabbit_boot_step({empty_db_check, [{description, "empty DB check"}, {mfa, {?MODULE, maybe_insert_default_data, []}}, - {requires, core_initialized}]}). + {requires, core_initialized}, + {enables, routing_ready}]}). -rabbit_boot_step({exchange_recovery, [{description, "exchange recovery"}, {mfa, {rabbit_exchange, recover, []}}, - {requires, empty_db_check}]}). + {requires, empty_db_check}, + {enables, routing_ready}]}). -rabbit_boot_step({queue_sup_queue_recovery, [{description, "queue supervisor and queue recovery"}, {mfa, {rabbit_amqqueue, start, []}}, - {requires, empty_db_check}]}). - --rabbit_boot_step({persister, - [{mfa, {rabbit_sup, start_child, - [rabbit_persister]}}, - {requires, queue_sup_queue_recovery}]}). + {requires, empty_db_check}, + {enables, routing_ready}]}). -rabbit_boot_step({routing_ready, - [{description, "message delivery logic ready"}]}). + [{description, "message delivery logic ready"}, + {requires, core_initialized}]}). -rabbit_boot_step({log_relay, [{description, "error log relay"}, {mfa, {rabbit_error_logger, boot, []}}, - {requires, routing_ready}]}). + {requires, routing_ready}, + {enables, networking}]}). -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index efa62e1b..23a4932a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). + stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). @@ -88,7 +88,6 @@ {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). -spec(deliver/2 :: (pid(), delivery()) -> boolean()). --spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). @@ -118,6 +117,9 @@ start() -> DurableQueues = find_durable_queues(), + ok = rabbit_sup:start_child( + rabbit_persister, + [[QName || #amqqueue{name = QName} <- DurableQueues]]), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, @@ -137,27 +139,13 @@ find_durable_queues() -> end). recover_durable_queues(DurableQueues) -> - lists:foldl( - fun (RecoveredQ, Acc) -> - Q = start_queue_process(RecoveredQ), - %% We need to catch the case where a client connected to - %% another node has deleted the queue (and possibly - %% re-created it). - case rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, - read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end - end) of - true -> [Q | Acc]; - false -> exit(Q#amqqueue.pid, shutdown), - Acc - end - end, [], DurableQueues). + Qs = [start_queue_process(Q) || Q <- DurableQueues], + %% Issue inits to *all* the queues so that they all init at the same time + [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs], + [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs], + rabbit_misc:execute_mnesia_transaction( + fun () -> [ok = store_queue(Q) || Q <- Qs] end), + Qs. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -165,6 +153,8 @@ declare(QueueName, Durable, AutoDelete, Args) -> auto_delete = AutoDelete, arguments = Args, pid = none}), + ok = gen_server2:cast(Q#amqqueue.pid, {init, false}), + ok = gen_server2:call(Q#amqqueue.pid, sync, infinity), internal_declare(Q, true). internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> @@ -280,9 +270,6 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), true. -redeliver(QPid, Messages) -> - delegate:gs2_cast(QPid, {redeliver, Messages}). - requeue(QPid, MsgIds, ChPid) -> delegate:gs2_cast(QPid, {requeue, MsgIds, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 449e79ea..5e325794 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -92,7 +92,7 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). info_keys() -> ?INFO_KEYS. - + %%---------------------------------------------------------------------------- init(Q) -> @@ -102,11 +102,13 @@ init(Q) -> exclusive_consumer = none, has_had_consumers = false, next_msg_id = 1, - message_buffer = queue:new(), + message_buffer = undefined, active_consumers = queue:new(), blocked_consumers = queue:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +terminate(_Reason, #q{message_buffer = undefined}) -> + ok; terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? QName = qname(State), @@ -541,6 +543,9 @@ i(Item, _) -> %--------------------------------------------------------------------------- +handle_call(sync, _From, State) -> + reply(ok, State); + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -748,6 +753,13 @@ handle_call({claim_queue, ReaderPid}, _From, reply(locked, State) end. +handle_cast({init, Recover}, State = #q{message_buffer = undefined}) -> + Messages = case Recover of + true -> rabbit_persister:queue_content(qname(State)); + false -> [] + end, + noreply(State#q{message_buffer = queue:from_list(Messages)}); + handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), @@ -775,9 +787,6 @@ handle_cast({rollback, Txn, ChPid}, State) -> record_current_channel_tx(ChPid, none), noreply(State); -handle_cast({redeliver, Messages}, State) -> - noreply(deliver_or_enqueue_n(Messages, State)); - handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 7978573d..c3d0b7b7 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -51,6 +51,7 @@ binary, {packet, raw}, % no packaging {reuseaddr, true}, % allow rebind without waiting + {backlog, 128}, % use the maximum listen(2) backlog value %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. %% {delay_send, true}, {exit_on_close, false} diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index a9e0cab9..a8e41baf 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -33,14 +33,14 @@ -behaviour(gen_server). --export([start_link/0]). +-export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([transaction/1, extend_transaction/2, dirty_work/1, commit_transaction/1, rollback_transaction/1, - force_snapshot/0]). + force_snapshot/0, queue_content/1]). -include("rabbit.hrl"). @@ -52,8 +52,7 @@ -define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). -record(pstate, {log_handle, entry_count, deadline, - pending_logs, pending_replies, - snapshot}). + pending_logs, pending_replies, snapshot}). %% two tables for efficient persistency %% one maps a key to a message @@ -72,20 +71,22 @@ {deliver, pmsg()} | {ack, pmsg()}). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: ([queue_name()]) -> + {'ok', pid()} | 'ignore' | {'error', any()}). -spec(transaction/1 :: ([work_item()]) -> 'ok'). -spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok'). -spec(dirty_work/1 :: ([work_item()]) -> 'ok'). -spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). +-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]). -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +start_link(DurableQueues) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []). transaction(MessageList) -> ?LOGDEBUG("transaction ~p~n", [MessageList]), @@ -111,15 +112,18 @@ rollback_transaction(TxnKey) -> force_snapshot() -> gen_server:call(?SERVER, force_snapshot, infinity). +queue_content(QName) -> + gen_server:call(?SERVER, {queue_content, QName}, infinity). + %%-------------------------------------------------------------------- -init(_Args) -> +init([DurableQueues]) -> process_flag(trap_exit, true), FileName = base_filename(), ok = filelib:ensure_dir(FileName), Snapshot = #psnapshot{transactions = dict:new(), messages = ets:new(messages, []), - queues = ets:new(queues, []), + queues = ets:new(queues, [ordered_set]), next_seq_id = 0}, LogHandle = case disk_log:open([{name, rabbit_persister}, @@ -135,7 +139,8 @@ init(_Args) -> [Recovered, Bad]), LH end, - {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot), + {Res, NewSnapshot} = + internal_load_snapshot(LogHandle, DurableQueues, Snapshot), case Res of ok -> ok = take_snapshot(LogHandle, NewSnapshot); @@ -143,12 +148,12 @@ init(_Args) -> rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], - pending_replies = [], - snapshot = NewSnapshot}, + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], + pending_replies = [], + snapshot = NewSnapshot}, {ok, State}. handle_call({transaction, Key, MessageList}, From, State) -> @@ -158,6 +163,13 @@ handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); handle_call(force_snapshot, _From, State) -> do_reply(ok, flush(true, State)); +handle_call({queue_content, QName}, _From, + State = #pstate{snapshot = #psnapshot{messages = Messages, + queues = Queues}}) -> + MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}], + do_reply([{ets:lookup_element(Messages, K, 2), D} || + {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))], + State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -339,10 +351,10 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore - prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> - sets:add_element(PKey, S) - end, sets:new(), Queues)), + PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> + sets:add_element(PKey, S) + end, sets:new(), Queues), + prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), InnerSnapshot = {{txns, Ts}, {messages, ets:tab2list(Messages)}, {queues, ets:tab2list(Queues)}, @@ -351,20 +363,21 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, term_to_binary(InnerSnapshot)}. -prune_table(Tab, Keys) -> +prune_table(Tab, Pred) -> true = ets:safe_fixtable(Tab, true), - ok = prune_table(Tab, Keys, ets:first(Tab)), + ok = prune_table(Tab, Pred, ets:first(Tab)), true = ets:safe_fixtable(Tab, false). -prune_table(_Tab, _Keys, '$end_of_table') -> ok; -prune_table(Tab, Keys, Key) -> - case sets:is_element(Key, Keys) of +prune_table(_Tab, _Pred, '$end_of_table') -> ok; +prune_table(Tab, Pred, Key) -> + case Pred(Key) of true -> ok; false -> ets:delete(Tab, Key) end, - prune_table(Tab, Keys, ets:next(Tab, Key)). + prune_table(Tab, Pred, ets:next(Tab, Key)). internal_load_snapshot(LogHandle, + DurableQueues, Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), @@ -378,11 +391,18 @@ internal_load_snapshot(LogHandle, Snapshot#psnapshot{ transactions = Ts, next_seq_id = NextSeqId}), - Snapshot2 = requeue_messages(Snapshot1), + %% Remove all entries for queues that no longer exist. + %% Note that the 'messages' table is pruned when the next + %% snapshot is taken. + DurableQueuesSet = sets:from_list(DurableQueues), + prune_table(Snapshot1#psnapshot.queues, + fun ({QName, _PKey}) -> + sets:is_element(QName, DurableQueuesSet) + end), %% uncompleted transactions are discarded - this is TRTTD %% since we only get into this code on node restart, so %% any uncompleted transactions will have been aborted. - {ok, Snapshot2#psnapshot{transactions = dict:new()}}; + {ok, Snapshot1#psnapshot{transactions = dict:new()}}; {error, Reason} -> {{error, Reason}, Snapshot} end. @@ -394,52 +414,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> check_version(_Other) -> {error, unrecognised_persister_log_format}. -requeue_messages(Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - Work = ets:foldl( - fun ({{QName, PKey}, Delivered, SeqId}, Acc) -> - rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc) - end, dict:new(), Queues), - %% unstable parallel map, because order doesn't matter - L = lists:append( - rabbit_misc:upmap( - %% we do as much work as possible in spawned worker - %% processes, but we need to make sure the ets:inserts are - %% performed in self() - fun ({QName, Requeues}) -> - requeue(QName, Requeues, Messages) - end, dict:to_list(Work))), - NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L], - NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L], - ets:delete_all_objects(Messages), - ets:delete_all_objects(Queues), - true = ets:insert(Messages, NewMessages), - true = ets:insert(Queues, NewQueues), - %% contains the mutated messages and queues tables - Snapshot. - -requeue(QName, Requeues, Messages) -> - case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{pid = QPid}} -> - RequeueMessages = - [{SeqId, QName, PKey, Message, Delivered} || - {SeqId, PKey, Delivered} <- Requeues, - {_, Message} <- ets:lookup(Messages, PKey)], - rabbit_amqqueue:redeliver( - QPid, - %% Messages published by the same process receive - %% persistence keys that are monotonically - %% increasing. Since message ordering is defined on a - %% per-channel basis, and channels are bound to specific - %% processes, sorting the list does provide the correct - %% ordering properties. - [{Message, Delivered} || {_, _, _, Message, Delivered} <- - lists:sort(RequeueMessages)]), - RequeueMessages; - {error, not_found} -> - [] - end. - replay([], LogHandle, K, Snapshot) -> case disk_log:chunk(LogHandle, K) of {K1, Items} -> |