diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-24 13:10:27 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-24 13:10:27 +0000 |
commit | 423aca76adbca197fe4bbcb8ee720122bc75ece4 (patch) | |
tree | 592efb4cfcc2094a0b31f1fbe85f8bb87e76c4fb | |
parent | 7424bac1a36e1d9512a32edcd1b0bd0b93ab31fc (diff) | |
parent | 83b78c937c167f09c7d71493b044d33e8fb10b79 (diff) | |
download | rabbitmq-server-423aca76adbca197fe4bbcb8ee720122bc75ece4.tar.gz |
merge default into bug23749
-rw-r--r-- | docs/rabbitmqctl.1.xml | 3 | ||||
-rw-r--r-- | include/rabbit.hrl | 1 | ||||
-rw-r--r-- | src/rabbit.erl | 116 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 11 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 74 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 57 | ||||
-rw-r--r-- | src/rabbit_plugins.erl | 9 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 72 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 154 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 24 |
15 files changed, 322 insertions, 238 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index c7069aed..bbd2fe5b 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -465,8 +465,7 @@ synchronise itself. The queue will block while synchronisation takes place (all publishers to and consumers from the queue will block). The queue must be - mirrored, and must not have any pending unacknowledged - messages for this command to succeed. + mirrored for this command to succeed. </para> <para> Note that unsynchronised queues from which messages are diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 7385b4b3..78763045 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -88,7 +88,6 @@ -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). --define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8"). -define(ERTS_MINIMUM, "5.6.3"). %% EMPTY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1 diff --git a/src/rabbit.erl b/src/rabbit.erl index 16694105..0e6c970f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -258,16 +258,28 @@ %%---------------------------------------------------------------------------- +%% HiPE compilation happens before we have log handlers - so we have +%% to io:format/2, it's all we can do. + maybe_hipe_compile() -> {ok, Want} = application:get_env(rabbit, hipe_compile), Can = code:which(hipe) =/= non_existing, case {Want, Can} of - {true, true} -> hipe_compile(); - {true, false} -> io:format("Not HiPE compiling: HiPE not found in " - "this Erlang installation.~n"); - {false, _} -> ok + {true, true} -> hipe_compile(), + true; + {true, false} -> false; + {false, _} -> true end. +warn_if_hipe_compilation_failed(true) -> + ok; +warn_if_hipe_compilation_failed(false) -> + error_logger:warning_msg( + "Not HiPE compiling: HiPE not found in this Erlang installation.~n"). + +%% HiPE compilation happens before we have log handlers and can take a +%% long time, so make an exception to our no-stdout policy and display +%% progress via stdout. hipe_compile() -> Count = length(?HIPE_WORTHY), io:format("~nHiPE compiling: |~s|~n |", @@ -311,14 +323,15 @@ start() -> rabbit_mnesia:check_cluster_consistency(), ok = app_utils:start_applications( app_startup_order(), fun handle_app_error/2), - ok = print_plugin_info(rabbit_plugins:active()) + ok = log_broker_started(rabbit_plugins:active()) end). boot() -> start_it(fun() -> ok = ensure_application_loaded(), - maybe_hipe_compile(), + Success = maybe_hipe_compile(), ok = ensure_working_log_handlers(), + warn_if_hipe_compilation_failed(Success), rabbit_node_monitor:prepare_cluster_status_files(), ok = rabbit_upgrade:maybe_upgrade_mnesia(), %% It's important that the consistency check happens after @@ -332,7 +345,7 @@ boot() -> false), ok = app_utils:start_applications( StartupApps, fun handle_app_error/2), - ok = print_plugin_info(Plugins) + ok = log_broker_started(Plugins) end). handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) -> @@ -427,8 +440,8 @@ start(normal, []) -> {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), print_banner(), + log_banner(), [ok = run_boot_step(Step) || Step <- boot_steps()], - io:format("~nbroker running~n"), {ok, SupPid}; Error -> Error @@ -457,22 +470,16 @@ app_shutdown_order() -> %%--------------------------------------------------------------------------- %% boot step logic -run_boot_step({StepName, Attributes}) -> - Description = case lists:keysearch(description, 1, Attributes) of - {value, {_, D}} -> D; - false -> StepName - end, +run_boot_step({_StepName, Attributes}) -> case [MFA || {mfa, MFA} <- Attributes] of [] -> - io:format("-- ~s~n", [Description]); + ok; MFAs -> - io:format("starting ~-60s ...", [Description]), [try apply(M,F,A) catch _:Reason -> boot_error(Reason, erlang:get_stacktrace()) end || {M,F,A} <- MFAs], - io:format("done~n"), ok end. @@ -689,24 +696,15 @@ force_event_refresh() -> %%--------------------------------------------------------------------------- %% misc -print_plugin_info([]) -> - ok; -print_plugin_info(Plugins) -> - %% This gets invoked by rabbitmqctl start_app, outside the context - %% of the rabbit application +log_broker_started(Plugins) -> rabbit_misc:with_local_io( fun() -> - io:format("~n-- plugins running~n"), - [print_plugin_info( - AppName, element(2, application:get_key(AppName, vsn))) - || AppName <- Plugins], - ok + error_logger:info_msg( + "Server startup complete; plugins are: ~p~n", [Plugins]), + io:format("~n Broker running with ~p plugins.~n", + [length(Plugins)]) end). -print_plugin_info(Plugin, Vsn) -> - Len = 76 - length(Vsn), - io:format("~-" ++ integer_to_list(Len) ++ "s ~s~n", [Plugin, Vsn]). - erts_version_check() -> FoundVer = erlang:system_info(version), case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of @@ -718,49 +716,41 @@ erts_version_check() -> print_banner() -> {ok, Product} = application:get_key(id), {ok, Version} = application:get_key(vsn), - ProductLen = string:len(Product), - io:format("~n" - "+---+ +---+~n" - "| | | |~n" - "| | | |~n" - "| | | |~n" - "| +---+ +-------+~n" - "| |~n" - "| ~s +---+ |~n" - "| | | |~n" - "| ~s +---+ |~n" - "| |~n" - "+-------------------+~n" - "~s~n~s~n~s~n~n", - [Product, string:right([$v|Version], ProductLen), - ?PROTOCOL_VERSION, - ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), + io:format("~n## ## ~s ~s. ~s~n## ## ~s~n########## ~n", + [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), + io:format("###### ## Logs: ~s~n########## ~s~n", + [log_location(kernel), log_location(sasl)]). + +log_banner() -> + {ok, Product} = application:get_key(id), + {ok, Version} = application:get_key(vsn), Settings = [{"node", node()}, - {"app descriptor", app_location()}, {"home dir", home_dir()}, {"config file(s)", config_files()}, {"cookie hash", rabbit_nodes:cookie_hash()}, {"log", log_location(kernel)}, {"sasl log", log_location(sasl)}, {"database dir", rabbit_mnesia:dir()}, - {"erlang version", erlang:system_info(version)}], + {"erlang version", erlang:system_info(otp_release)}], DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]), Format = fun (K, V) -> - io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", - [K, V]) + rabbit_misc:format( + "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", [K, V]) end, - lists:foreach(fun ({"config file(s)" = K, []}) -> - Format(K, "(none)"); - ({"config file(s)" = K, [V0 | Vs]}) -> - Format(K, V0), [Format("", V) || V <- Vs]; - ({K, V}) -> - Format(K, V) - end, Settings), - io:nl(). - -app_location() -> - {ok, Application} = application:get_application(), - filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")). + Banner = iolist_to_binary( + rabbit_misc:format( + "~s ~s~n~s~n~s~n", + [Product, Version, ?COPYRIGHT_MESSAGE, + ?INFORMATION_MESSAGE]) ++ + [case S of + {"config file(s)" = K, []} -> + Format(K, "(none)"); + {"config file(s)" = K, [V0 | Vs]} -> + Format(K, V0), [Format("", V) || V <- Vs]; + {K, V} -> + Format(K, V) + end || S <- Settings]), + error_logger:info_msg("~s", [Banner]). home_dir() -> case init:get_argument(home) of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 788ec558..ca384053 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -175,8 +175,7 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). --spec(sync_mirrors/1 :: (pid()) -> - 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('not_mirrored')). -spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}). -endif. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 918d1782..c9f73546 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1187,7 +1187,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue(AckTags, ChPid, State)); handle_call(sync_mirrors, _From, - State = #q{backing_queue = rabbit_mirror_queue_master = BQ, + State = #q{backing_queue = rabbit_mirror_queue_master, backing_queue_state = BQS}) -> S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, HandleInfo = fun (Status) -> @@ -1203,13 +1203,9 @@ handle_call(sync_mirrors, _From, State, #q.stats_timer, fun() -> emit_stats(State#q{status = Status}) end) end, - case BQ:depth(BQS) - BQ:len(BQS) of - 0 -> case rabbit_mirror_queue_master:sync_mirrors( - HandleInfo, EmitStats, BQS) of - {ok, BQS1} -> reply(ok, S(BQS1)); - {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} - end; - _ -> reply({error, pending_acks}, State) + case rabbit_mirror_queue_master:sync_mirrors(HandleInfo, EmitStats, BQS) of + {ok, BQS1} -> reply(ok, S(BQS1)); + {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} end; handle_call(sync_mirrors, _From, State) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 9a3c67f9..4245f7e2 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -75,6 +75,10 @@ %% except those that have been fetched already and are pending acks. -callback purge(state()) -> {purged_msg_count(), state()}. +%% Remove all messages in the queue which have been fetched and are +%% pending acks. +-callback purge_acks(state()) -> state(). + %% Publish a message. -callback publish(rabbit_types:basic_message(), rabbit_types:message_properties(), boolean(), pid(), @@ -164,7 +168,7 @@ %% results, leaving the queue undisturbed. -callback fold(fun((rabbit_types:basic_message(), rabbit_types:message_properties(), - A) -> {('stop' | 'cont'), A}), + boolean(), A) -> {('stop' | 'cont'), A}), A, state()) -> {A, state()}. %% How long is my queue? @@ -226,7 +230,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, + {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 5b3b8aa8..5feaee46 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -334,7 +334,7 @@ postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) -> {_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) -> {stop, Acc}; ({_SeqId, {MsgProps, Msg}}, {cont, Acc}) -> - FoldFun(Msg, MsgProps, Acc) + FoldFun(Msg, MsgProps, false, Acc) end, {cont, Acc0}, gb_trees:to_list(Messages)), true = Model =:= Res; @@ -397,10 +397,11 @@ rand_choice(List, Selection, N) -> N - 1). makefoldfun(Size) -> - fun (Msg, _MsgProps, Acc) -> - case length(Acc) > Size of - false -> {cont, [Msg | Acc]}; - true -> {stop, Acc} + fun (Msg, _MsgProps, Unacked, Acc) -> + case {length(Acc) > Size, Unacked} of + {false, false} -> {cont, [Msg | Acc]}; + {false, true} -> {cont, Acc}; + {true, _} -> {stop, Acc} end end. foldacc() -> []. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8cd3a580..74cf4bee 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -273,7 +273,7 @@ handle_cast({method, Method, Content, Flow}, end, try handle_method(Method, Content, State) of {reply, Reply, NewState} -> - ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), + ok = send(Reply, NewState), noreply(NewState); {noreply, NewState} -> noreply(NewState); @@ -295,18 +295,20 @@ handle_cast(ready_for_close, State = #ch{state = closing, ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), {stop, normal, State}; -handle_cast(terminate, State) -> +handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:flush(WriterPid), {stop, normal, State}; -handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, - State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command(WriterPid, Msg), - noreply(consumer_monitor(ConsumerTag, State)); +handle_cast({command, #'basic.consume_ok'{consumer_tag = CTag} = Msg}, State) -> + ok = send(Msg, State), + noreply(consumer_monitor(CTag, State)); -handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command(WriterPid, Msg), +handle_cast({command, Msg}, State) -> + ok = send(Msg, State), noreply(State); +handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) -> + noreply(State); handle_cast({deliver, ConsumerTag, AckRequired, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -420,6 +422,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. +send(_Command, #ch{state = closing}) -> + ok; +send(Command, #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command(WriterPid, Command). + handle_exception(Reason, State = #ch{protocol = Protocol, channel = Channel, writer_pid = WriterPid, @@ -564,12 +571,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> case sets:is_element(QPid, Blocking) of false -> State; true -> Blocking1 = sets:del_element(QPid, Blocking), - ok = case sets:size(Blocking1) of - 0 -> rabbit_writer:send_command( - State#ch.writer_pid, - #'channel.flow_ok'{active = false}); - _ -> ok - end, + case sets:size(Blocking1) of + 0 -> ok = send(#'channel.flow_ok'{active = false}, State); + _ -> ok + end, State#ch{blocking = Blocking1} end. @@ -856,12 +861,9 @@ handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> rabbit_misc:protocol_error(not_implemented, "requeue=false", []); handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> - {noreply, State2 = #ch{writer_pid = WriterPid}} = - handle_method(#'basic.recover_async'{requeue = Requeue}, - Content, - State), - ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}), - {noreply, State2}; + {noreply, State1} = handle_method(#'basic.recover_async'{requeue = Requeue}, + Content, State), + {reply, #'basic.recover_ok'{}, State1}; handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, @@ -1187,17 +1189,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, - queue_consumers = QCons, - writer_pid = WriterPid}) -> + queue_consumers = QCons}) -> ConsumerTags = case dict:find(QPid, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, ConsumerMapping1 = gb_sets:fold(fun (CTag, CMap) -> - Cancel = #'basic.cancel'{consumer_tag = CTag, - nowait = true}, - ok = rabbit_writer:send_command(WriterPid, Cancel), + ok = send(#'basic.cancel'{consumer_tag = CTag, + nowait = true}, + State), dict:erase(CTag, CMap) end, ConsumerMapping, ConsumerTags), State#ch{consumer_mapping = ConsumerMapping1, @@ -1459,12 +1460,17 @@ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> send_nacks([], State) -> State; +send_nacks(_MXs, State = #ch{state = closing, + tx = none}) -> %% optimisation + State; send_nacks(MXs, State = #ch{tx = none}) -> coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, multiple = Multiple} end, State); +send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation + State#ch{tx = failed}; send_nacks(_, State) -> maybe_complete_tx(State#ch{tx = failed}). @@ -1483,9 +1489,10 @@ send_confirms(State) -> send_confirms([], State) -> State; -send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = MsgSeqNo}), +send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation + State; +send_confirms([MsgSeqNo], State) -> + ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State), State; send_confirms(Cs, State) -> coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) -> @@ -1493,8 +1500,7 @@ send_confirms(Cs, State) -> multiple = Multiple} end, State). -coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> +coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), CutOff = case dtree:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; @@ -1503,11 +1509,9 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of [] -> ok; - _ -> ok = rabbit_writer:send_command( - WriterPid, MkMsgFun(lists:last(Ms), true)) + _ -> ok = send(MkMsgFun(lists:last(Ms), true), State) end, - [ok = rabbit_writer:send_command( - WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], + [ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss], State. ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L]; @@ -1524,7 +1528,7 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) -> end. complete_tx(State = #ch{tx = committing}) -> - ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}), + ok = send(#'tx.commit_ok'{}, State), State#ch{tx = new_tx()}; complete_tx(State = #ch{tx = failed}) -> {noreply, State1} = handle_exception( diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b5f72cad..c704804e 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/5, publish_delivered/4, + purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, @@ -198,6 +198,8 @@ purge(State = #state { gm = GM, {Count, BQS1} = BQ:purge(BQS), {Count, State #state { backing_queue_state = BQS1 }}. +purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}). + publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, State = #state { gm = GM, seen_status = SS, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 867aa2ed..27b0326d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -221,10 +221,12 @@ handle_cast({sync_start, Ref, Syncer}, backing_queue = BQ, backing_queue_state = BQS }) -> State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), - S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined, - rate_timer_ref = TRefN, - backing_queue_state = BQSN} end, - %% [0] We can only sync when there are no pending acks + S = fun({MA, TRefN, BQSN}) -> + State1#state{depth_delta = undefined, + msg_id_ack = dict:from_list(MA), + rate_timer_ref = TRefN, + backing_queue_state = BQSN} + end, case rabbit_mirror_queue_sync:slave( DD, Ref, TRef, Syncer, BQ, BQS, fun (BQN, BQSN) -> @@ -234,7 +236,7 @@ handle_cast({sync_start, Ref, Syncer}, {TRefN, BQSN1} end) of denied -> noreply(State1); - {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0] + {ok, Res} -> noreply(set_delta(0, S(Res))); {failed, Res} -> noreply(S(Res)); {stop, Reason, Res} -> {stop, Reason, S(Res)} end; diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index f2ab67cd..b8cfe4a9 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -57,6 +57,9 @@ -type(log_fun() :: fun ((string(), [any()]) -> 'ok')). -type(bq() :: atom()). -type(bqs() :: any()). +-type(ack() :: any()). +-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(), + bqs()}). -spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). -spec(master_go/7 :: (pid(), reference(), log_fun(), @@ -69,8 +72,8 @@ -spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(), bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) -> 'denied' | - {'ok' | 'failed', {timer:tref(), bqs()}} | - {'stop', any(), {timer:tref(), bqs()}}). + {'ok' | 'failed', slave_sync_state()} | + {'stop', any(), slave_sync_state()}). -endif. @@ -91,16 +94,16 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> end. master_go0(Args, BQ, BQS) -> - case BQ:fold(fun (Msg, MsgProps, Acc) -> - master_send(Msg, MsgProps, Args, Acc) + case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) -> + master_send(Msg, MsgProps, Unacked, Args, Acc) end, {0, erlang:now()}, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; {_, BQS1} -> master_done(Args, BQS1) end. -master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, - {I, Last}) -> +master_send(Msg, MsgProps, Unacked, + {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) -> T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of true -> EmitStats({syncing, I}), Log("~p messages", [I]), @@ -119,7 +122,7 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), gen_server2:reply(From, ok), {stop, cancelled}; - {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, + {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked}, {cont, {I + 1, T}}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} @@ -164,11 +167,11 @@ syncer(Ref, Log, MPid, SPids) -> syncer_loop(Ref, MPid, SPids) -> MPid ! {next, Ref}, receive - {msg, Ref, Msg, MsgProps} -> + {msg, Ref, Msg, MsgProps, Unacked} -> SPids1 = wait_for_credit(SPids), [begin credit_flow:send(SPid), - SPid ! {sync_msg, Ref, Msg, MsgProps} + SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked} end || SPid <- SPids1], syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> @@ -204,12 +207,12 @@ slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) -> slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> MRef = erlang:monitor(process, Syncer), Syncer ! {sync_ready, Ref, self()}, - {_MsgCount, BQS1} = BQ:purge(BQS), + {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)), slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration, - rabbit_misc:get_parent()}, TRef, BQS1). + rabbit_misc:get_parent()}, {[], TRef, BQS1}). slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, - TRef, BQS) -> + State = {MA, TRef, BQS}) -> receive {'DOWN', MRef, process, Syncer, _Reason} -> %% If the master dies half way we are not in the usual @@ -218,34 +221,40 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, %% sync with a newly promoted master, or even just receive %% messages from it, we have a hole in the middle. So the %% only thing to do here is purge. - {_MsgCount, BQS1} = BQ:purge(BQS), + {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)), credit_flow:peer_down(Syncer), - {failed, {TRef, BQS1}}; + {failed, {[], TRef, BQS1}}; {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), - slave_sync_loop(Args, TRef, BQS); + slave_sync_loop(Args, State); {sync_complete, Ref} -> erlang:demonitor(MRef, [flush]), credit_flow:peer_down(Syncer), - {ok, {TRef, BQS}}; + {ok, State}; {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age), - slave_sync_loop(Args, TRef, BQS); + slave_sync_loop(Args, State); {'$gen_cast', {set_ram_duration_target, Duration}} -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), - slave_sync_loop(Args, TRef, BQS1); + slave_sync_loop(Args, {MA, TRef, BQS1}); update_ram_duration -> {TRef1, BQS1} = UpdateRamDuration(BQ, BQS), - slave_sync_loop(Args, TRef1, BQS1); - {sync_msg, Ref, Msg, Props} -> + slave_sync_loop(Args, {MA, TRef1, BQS1}); + {sync_msg, Ref, Msg, Props, Unacked} -> credit_flow:ack(Syncer), Props1 = Props#message_properties{needs_confirming = false}, - BQS1 = BQ:publish(Msg, Props1, true, none, BQS), - slave_sync_loop(Args, TRef, BQS1); + {MA1, BQS1} = + case Unacked of + false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)}; + true -> {AckTag, BQS2} = BQ:publish_delivered( + Msg, Props1, none, BQS), + {[{Msg#basic_message.id, AckTag} | MA], BQS2} + end, + slave_sync_loop(Args, {MA1, TRef, BQS1}); {'EXIT', Parent, Reason} -> - {stop, Reason, {TRef, BQS}}; + {stop, Reason, State}; %% If the master throws an exception {'$gen_cast', {gm, {delete_and_terminate, Reason}}} -> BQ:delete_and_terminate(Reason, BQS), - {stop, Reason, {TRef, undefined}} + {stop, Reason, {[], TRef, undefined}} end. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 9f94af7d..d2f36590 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -64,8 +64,8 @@ list(PluginsDir) -> [plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]), case Problems of [] -> ok; - _ -> io:format("Warning: Problem reading some plugins: ~p~n", - [Problems]) + _ -> error_logger:warning_msg( + "Problem reading some plugins: ~p~n", [Problems]) end, Plugins. @@ -112,8 +112,9 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> case Enabled -- plugin_names(ToUnpackPlugins) of [] -> ok; - Missing -> io:format("Warning: the following enabled plugins were " - "not found: ~p~n", [Missing]) + Missing -> error_logger:warning_msg( + "The following enabled plugins were not found: ~p~n", + [Missing]) end, %% Eliminate the contents of the destination directory diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 95e23d29..e7d2953a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1111,7 +1111,7 @@ test_policy_validation() -> test_server_status() -> %% create a few things so there is some useful information to list - Writer = spawn(fun () -> receive shutdown -> ok end end), + Writer = spawn(fun test_writer/0), {ok, Ch} = rabbit_channel:start_link( 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1, user(<<"user">>), <<"/">>, [], self(), @@ -1202,10 +1202,34 @@ find_listener() -> N =:= node()], {H, P}. +test_amqp_connection_refusal() -> + [passed = test_amqp_connection_refusal(V) || + V <- [<<"AMQP",9,9,9,9>>, <<"AMQP",0,1,0,0>>, <<"XXXX",0,0,9,1>>]], + passed. + +test_amqp_connection_refusal(Header) -> + {H, P} = find_listener(), + {ok, C} = gen_tcp:connect(H, P, [binary, {active, false}]), + ok = gen_tcp:send(C, Header), + {ok, <<"AMQP",0,0,9,1>>} = gen_tcp:recv(C, 8, 100), + ok = gen_tcp:close(C), + passed. + +find_listener() -> + [#listener{host = H, port = P} | _] = + [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), + N =:= node()], + {H, P}. + +test_writer() -> test_writer(none). + test_writer(Pid) -> receive - shutdown -> ok; - {send_command, Method} -> Pid ! Method, test_writer(Pid) + {'$gen_call', From, flush} -> gen_server:reply(From, ok), + test_writer(Pid); + {send_command, Method} -> Pid ! Method, + test_writer(Pid); + shutdown -> ok end. test_spawn() -> @@ -2364,22 +2388,26 @@ test_variable_queue() -> fun test_dropwhile_varying_ram_duration/1, fun test_fetchwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, + fun test_variable_queue_purge/1, fun test_variable_queue_requeue/1, fun test_variable_queue_fold/1]], passed. test_variable_queue_fold(VQ0) -> - {RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), - Count = rabbit_variable_queue:len(VQ1), - Msgs = RequeuedMsgs ++ FreshMsgs, - lists:foldl( - fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end, - VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). - -test_variable_queue_fold(Cut, Msgs, VQ0) -> + {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), + Count = rabbit_variable_queue:depth(VQ1), + Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs), + lists:foldl(fun (Cut, VQ2) -> + test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2) + end, VQ1, [0, 1, 2, Count div 2, + Count - 1, Count, Count + 1, Count * 2]). + +test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) -> {Acc, VQ1} = rabbit_variable_queue:fold( - fun (M, _, A) -> + fun (M, _, Pending, A) -> MInt = msg2int(M), + Pending = lists:member(MInt, PendingMsgs), %% assert case MInt =< Cut of true -> {cont, [MInt | A]}; false -> {stop, A} @@ -2440,10 +2468,11 @@ variable_queue_with_holes(VQ0) -> Depth = rabbit_variable_queue:depth(VQ8), Len = Depth - length(Subset3), Len = rabbit_variable_queue:len(VQ8), - {(Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}. + {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}. test_variable_queue_requeue(VQ0) -> - {RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), + {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), Msgs = lists:zip(RequeuedMsgs, lists:duplicate(length(RequeuedMsgs), true)) ++ @@ -2459,6 +2488,21 @@ test_variable_queue_requeue(VQ0) -> {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2), VQ3. +test_variable_queue_purge(VQ0) -> + LenDepth = fun (VQ) -> + {rabbit_variable_queue:len(VQ), + rabbit_variable_queue:depth(VQ)} + end, + VQ1 = variable_queue_publish(false, 10, VQ0), + {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1), + {4, VQ3} = rabbit_variable_queue:purge(VQ2), + {0, 6} = LenDepth(VQ3), + {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3), + {2, 6} = LenDepth(VQ4), + VQ5 = rabbit_variable_queue:purge_acks(VQ4), + {2, 2} = LenDepth(VQ5), + VQ5. + test_variable_queue_ack_limiting(VQ0) -> %% start by sending in a bunch of messages Len = 1024, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8a7045ea..34a4b52f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,7 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, purge/1, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, @@ -519,6 +519,8 @@ purge(State = #vqstate { q4 = Q4, ram_msg_count = 0, persistent_count = PCount1 })}. +purge_acks(State) -> a(purge_pending_ack(false, State)). + publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, @@ -676,25 +678,12 @@ ackfold(MsgFun, Acc, State, AckTags) -> end, {Acc, State}, AckTags), {AccN, a(StateN)}. -fold(Fun, Acc, #vqstate { q1 = Q1, - q2 = Q2, - delta = #delta { start_seq_id = DeltaSeqId, - end_seq_id = DeltaSeqIdEnd }, - q3 = Q3, - q4 = Q4 } = State) -> - QFun = fun(MsgStatus, {Acc0, State0}) -> - {Msg, State1} = read_msg(MsgStatus, State0), - {StopGo, AccNext} = - Fun(Msg, MsgStatus#msg_status.msg_props, Acc0), - {StopGo, {AccNext, State1}} - end, - {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4), - {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3), - {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2}, - DeltaSeqId, DeltaSeqIdEnd, State2), - {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2), - {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1), - {Acc5, State5}. +fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> + {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, + [msg_iterator(State), + disk_ack_iterator(State), + ram_ack_iterator(State)]), + ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). len(#vqstate { len = Len }) -> Len. @@ -1101,14 +1090,16 @@ queue_out(State = #vqstate { q4 = Q4 }) -> read_msg(#msg_status{msg = undefined, msg_id = MsgId, - is_persistent = IsPersistent}, - State = #vqstate{msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate {msg_store_clients = MSCState1}}; + is_persistent = IsPersistent}, State) -> + read_msg(MsgId, IsPersistent, State); read_msg(#msg_status{msg = Msg}, State) -> {Msg, State}. +read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {Msg, State #vqstate {msg_store_clients = MSCState1}}. + inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> State#vqstate{ram_msg_count = RamMsgCount + 1}. @@ -1389,7 +1380,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- -%% Internal plumbing for requeue and fold +%% Internal plumbing for requeue %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> @@ -1459,48 +1450,81 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. -qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A; -qfoldl( Fun, {cont, Acc} = A, Q) -> +%%---------------------------------------------------------------------------- +%% Iterator +%%---------------------------------------------------------------------------- + +ram_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}. + +disk_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. + +msg_iterator(State) -> istate(start, State). + +istate(start, State) -> {q4, State#vqstate.q4, State}; +istate(q4, State) -> {q3, State#vqstate.q3, State}; +istate(q3, State) -> {delta, State#vqstate.delta, State}; +istate(delta, State) -> {q2, State#vqstate.q2, State}; +istate(q2, State) -> {q1, State#vqstate.q1, State}; +istate(q1, _State) -> done. + +next({ack, It}, IndexState) -> + case gb_trees:next(It) of + none -> {empty, IndexState}; + {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, + {value, MsgStatus, true, Next, IndexState} + end; +next(done, IndexState) -> {empty, IndexState}; +next({delta, #delta{start_seq_id = SeqId, + end_seq_id = SeqId}, State}, IndexState) -> + next(istate(delta, State), IndexState); +next({delta, #delta{start_seq_id = SeqId, + end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> + SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), + SeqId1 = lists:min([SeqIdB, SeqIdEnd]), + {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), + next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); +next({delta, Delta, [], State}, IndexState) -> + next({delta, Delta, State}, IndexState); +next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> + case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse + gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of + false -> Next = {delta, Delta, Rest, State}, + {value, beta_msg_status(M), false, Next, IndexState}; + true -> next({delta, Delta, Rest, State}, IndexState) + end; +next({Key, Q, State}, IndexState) -> case ?QUEUE:out(Q) of - {empty, _Q} -> A; - {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1) + {empty, _Q} -> next(istate(Key, State), IndexState); + {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, + {value, MsgStatus, false, Next, IndexState} end. -lfoldl(_Fun, {stop, _Acc} = A, _L) -> A; -lfoldl(_Fun, {cont, _Acc} = A, []) -> A; -lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T). - -delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> - {stop, {Acc, State}}; -delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> - {cont, {Acc, State}}; -delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, - #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - index_state = IndexState, - msg_store_clients = MSCState } = State) -> - DeltaSeqId1 = lists:min( - [rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, - IndexState), - {StopCont, {Acc1, MSCState1}} = - lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered}, - {Acc0, MSCState0}) -> - case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA)) of - false -> {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState0, IsPersistent, - MsgId), - {StopCont, AccNext} = - Fun(Msg, MsgProps, Acc0), - {StopCont, {AccNext, MSCState1}}; - true -> {cont, {Acc0, MSCState0}} - end - end, {cont, {Acc, MSCState}}, List), - delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, - State #vqstate { index_state = IndexState1, - msg_store_clients = MSCState1 }). +inext(It, {Its, IndexState}) -> + case next(It, IndexState) of + {empty, IndexState1} -> + {Its, IndexState1}; + {value, MsgStatus1, Unacked, It1, IndexState1} -> + {[{MsgStatus1, Unacked, It1} | Its], IndexState1} + end. + +ifold(_Fun, Acc, [], State) -> + {Acc, State}; +ifold(Fun, Acc, Its, State) -> + [{MsgStatus, Unacked, It} | Rest] = + lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, + {#msg_status{seq_id = SeqId2}, _, _}) -> + SeqId1 =< SeqId2 + end, Its), + {Msg, State1} = read_msg(MsgStatus, State), + case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of + {stop, Acc1} -> + {Acc1, State}; + {cont, Acc1} -> + {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}), + ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1}) + end. %%---------------------------------------------------------------------------- %% Phase changes diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index a7ea3d99..059d3839 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -21,7 +21,8 @@ -export([start/5, start_link/5, start/6, start_link/6]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, - send_command_and_notify/4, send_command_and_notify/5]). + send_command_and_notify/4, send_command_and_notify/5, + flush/1]). -export([internal_send_command/4, internal_send_command/6]). %% internal @@ -69,6 +70,7 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:protocol()) @@ -130,7 +132,7 @@ mainloop1(State) -> receive Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(flush(State)) + ?MODULE:mainloop1(internal_flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -138,12 +140,18 @@ handle_message({send_command, MethodRecord}, State) -> handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, State)), gen_server:reply(From, ok), State1; handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, Content, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, Content, State)), + gen_server:reply(From, ok), + State1; +handle_message({'$gen_call', From, flush}, State) -> + State1 = internal_flush(State), gen_server:reply(From, ok), State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> @@ -192,6 +200,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. +flush(W) -> call(W, flush). + %%--------------------------------------------------------------------------- call(Pid, Msg) -> @@ -251,13 +261,13 @@ internal_send_command_async(MethodRecord, Content, maybe_flush(State = #wstate{pending = Pending}) -> case iolist_size(Pending) >= ?FLUSH_THRESHOLD of - true -> flush(State); + true -> internal_flush(State); false -> State end. -flush(State = #wstate{pending = []}) -> +internal_flush(State = #wstate{pending = []}) -> State; -flush(State = #wstate{sock = Sock, pending = Pending}) -> +internal_flush(State = #wstate{sock = Sock, pending = Pending}) -> ok = port_cmd(Sock, lists:reverse(Pending)), State#wstate{pending = []}. |