summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-24 13:10:27 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-24 13:10:27 +0000
commit423aca76adbca197fe4bbcb8ee720122bc75ece4 (patch)
tree592efb4cfcc2094a0b31f1fbe85f8bb87e76c4fb
parent7424bac1a36e1d9512a32edcd1b0bd0b93ab31fc (diff)
parent83b78c937c167f09c7d71493b044d33e8fb10b79 (diff)
downloadrabbitmq-server-423aca76adbca197fe4bbcb8ee720122bc75ece4.tar.gz
merge default into bug23749
-rw-r--r--docs/rabbitmqctl.1.xml3
-rw-r--r--include/rabbit.hrl1
-rw-r--r--src/rabbit.erl116
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_backing_queue.erl8
-rw-r--r--src/rabbit_backing_queue_qc.erl11
-rw-r--r--src/rabbit_channel.erl74
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl12
-rw-r--r--src/rabbit_mirror_queue_sync.erl57
-rw-r--r--src/rabbit_plugins.erl9
-rw-r--r--src/rabbit_tests.erl72
-rw-r--r--src/rabbit_variable_queue.erl154
-rw-r--r--src/rabbit_writer.erl24
15 files changed, 322 insertions, 238 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index c7069aed..bbd2fe5b 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -465,8 +465,7 @@
synchronise itself. The queue will block while
synchronisation takes place (all publishers to and
consumers from the queue will block). The queue must be
- mirrored, and must not have any pending unacknowledged
- messages for this command to succeed.
+ mirrored for this command to succeed.
</para>
<para>
Note that unsynchronised queues from which messages are
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 7385b4b3..78763045 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -88,7 +88,6 @@
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
--define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
%% EMPTY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 16694105..0e6c970f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -258,16 +258,28 @@
%%----------------------------------------------------------------------------
+%% HiPE compilation happens before we have log handlers - so we have
+%% to io:format/2, it's all we can do.
+
maybe_hipe_compile() ->
{ok, Want} = application:get_env(rabbit, hipe_compile),
Can = code:which(hipe) =/= non_existing,
case {Want, Can} of
- {true, true} -> hipe_compile();
- {true, false} -> io:format("Not HiPE compiling: HiPE not found in "
- "this Erlang installation.~n");
- {false, _} -> ok
+ {true, true} -> hipe_compile(),
+ true;
+ {true, false} -> false;
+ {false, _} -> true
end.
+warn_if_hipe_compilation_failed(true) ->
+ ok;
+warn_if_hipe_compilation_failed(false) ->
+ error_logger:warning_msg(
+ "Not HiPE compiling: HiPE not found in this Erlang installation.~n").
+
+%% HiPE compilation happens before we have log handlers and can take a
+%% long time, so make an exception to our no-stdout policy and display
+%% progress via stdout.
hipe_compile() ->
Count = length(?HIPE_WORTHY),
io:format("~nHiPE compiling: |~s|~n |",
@@ -311,14 +323,15 @@ start() ->
rabbit_mnesia:check_cluster_consistency(),
ok = app_utils:start_applications(
app_startup_order(), fun handle_app_error/2),
- ok = print_plugin_info(rabbit_plugins:active())
+ ok = log_broker_started(rabbit_plugins:active())
end).
boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
- maybe_hipe_compile(),
+ Success = maybe_hipe_compile(),
ok = ensure_working_log_handlers(),
+ warn_if_hipe_compilation_failed(Success),
rabbit_node_monitor:prepare_cluster_status_files(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
%% It's important that the consistency check happens after
@@ -332,7 +345,7 @@ boot() ->
false),
ok = app_utils:start_applications(
StartupApps, fun handle_app_error/2),
- ok = print_plugin_info(Plugins)
+ ok = log_broker_started(Plugins)
end).
handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
@@ -427,8 +440,8 @@ start(normal, []) ->
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
print_banner(),
+ log_banner(),
[ok = run_boot_step(Step) || Step <- boot_steps()],
- io:format("~nbroker running~n"),
{ok, SupPid};
Error ->
Error
@@ -457,22 +470,16 @@ app_shutdown_order() ->
%%---------------------------------------------------------------------------
%% boot step logic
-run_boot_step({StepName, Attributes}) ->
- Description = case lists:keysearch(description, 1, Attributes) of
- {value, {_, D}} -> D;
- false -> StepName
- end,
+run_boot_step({_StepName, Attributes}) ->
case [MFA || {mfa, MFA} <- Attributes] of
[] ->
- io:format("-- ~s~n", [Description]);
+ ok;
MFAs ->
- io:format("starting ~-60s ...", [Description]),
[try
apply(M,F,A)
catch
_:Reason -> boot_error(Reason, erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
- io:format("done~n"),
ok
end.
@@ -689,24 +696,15 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
%% misc
-print_plugin_info([]) ->
- ok;
-print_plugin_info(Plugins) ->
- %% This gets invoked by rabbitmqctl start_app, outside the context
- %% of the rabbit application
+log_broker_started(Plugins) ->
rabbit_misc:with_local_io(
fun() ->
- io:format("~n-- plugins running~n"),
- [print_plugin_info(
- AppName, element(2, application:get_key(AppName, vsn)))
- || AppName <- Plugins],
- ok
+ error_logger:info_msg(
+ "Server startup complete; plugins are: ~p~n", [Plugins]),
+ io:format("~n Broker running with ~p plugins.~n",
+ [length(Plugins)])
end).
-print_plugin_info(Plugin, Vsn) ->
- Len = 76 - length(Vsn),
- io:format("~-" ++ integer_to_list(Len) ++ "s ~s~n", [Plugin, Vsn]).
-
erts_version_check() ->
FoundVer = erlang:system_info(version),
case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
@@ -718,49 +716,41 @@ erts_version_check() ->
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
- ProductLen = string:len(Product),
- io:format("~n"
- "+---+ +---+~n"
- "| | | |~n"
- "| | | |~n"
- "| | | |~n"
- "| +---+ +-------+~n"
- "| |~n"
- "| ~s +---+ |~n"
- "| | | |~n"
- "| ~s +---+ |~n"
- "| |~n"
- "+-------------------+~n"
- "~s~n~s~n~s~n~n",
- [Product, string:right([$v|Version], ProductLen),
- ?PROTOCOL_VERSION,
- ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
+ io:format("~n## ## ~s ~s. ~s~n## ## ~s~n########## ~n",
+ [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
+ io:format("###### ## Logs: ~s~n########## ~s~n",
+ [log_location(kernel), log_location(sasl)]).
+
+log_banner() ->
+ {ok, Product} = application:get_key(id),
+ {ok, Version} = application:get_key(vsn),
Settings = [{"node", node()},
- {"app descriptor", app_location()},
{"home dir", home_dir()},
{"config file(s)", config_files()},
{"cookie hash", rabbit_nodes:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
{"database dir", rabbit_mnesia:dir()},
- {"erlang version", erlang:system_info(version)}],
+ {"erlang version", erlang:system_info(otp_release)}],
DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
Format = fun (K, V) ->
- io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
- [K, V])
+ rabbit_misc:format(
+ "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", [K, V])
end,
- lists:foreach(fun ({"config file(s)" = K, []}) ->
- Format(K, "(none)");
- ({"config file(s)" = K, [V0 | Vs]}) ->
- Format(K, V0), [Format("", V) || V <- Vs];
- ({K, V}) ->
- Format(K, V)
- end, Settings),
- io:nl().
-
-app_location() ->
- {ok, Application} = application:get_application(),
- filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")).
+ Banner = iolist_to_binary(
+ rabbit_misc:format(
+ "~s ~s~n~s~n~s~n",
+ [Product, Version, ?COPYRIGHT_MESSAGE,
+ ?INFORMATION_MESSAGE]) ++
+ [case S of
+ {"config file(s)" = K, []} ->
+ Format(K, "(none)");
+ {"config file(s)" = K, [V0 | Vs]} ->
+ Format(K, V0), [Format("", V) || V <- Vs];
+ {K, V} ->
+ Format(K, V)
+ end || S <- Settings]),
+ error_logger:info_msg("~s", [Banner]).
home_dir() ->
case init:get_argument(home) of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 788ec558..ca384053 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -175,8 +175,7 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
--spec(sync_mirrors/1 :: (pid()) ->
- 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
+-spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('not_mirrored')).
-spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}).
-endif.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 918d1782..c9f73546 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1187,7 +1187,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue(AckTags, ChPid, State));
handle_call(sync_mirrors, _From,
- State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
+ State = #q{backing_queue = rabbit_mirror_queue_master,
backing_queue_state = BQS}) ->
S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
HandleInfo = fun (Status) ->
@@ -1203,13 +1203,9 @@ handle_call(sync_mirrors, _From,
State, #q.stats_timer,
fun() -> emit_stats(State#q{status = Status}) end)
end,
- case BQ:depth(BQS) - BQ:len(BQS) of
- 0 -> case rabbit_mirror_queue_master:sync_mirrors(
- HandleInfo, EmitStats, BQS) of
- {ok, BQS1} -> reply(ok, S(BQS1));
- {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
- end;
- _ -> reply({error, pending_acks}, State)
+ case rabbit_mirror_queue_master:sync_mirrors(HandleInfo, EmitStats, BQS) of
+ {ok, BQS1} -> reply(ok, S(BQS1));
+ {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
end;
handle_call(sync_mirrors, _From, State) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 9a3c67f9..4245f7e2 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -75,6 +75,10 @@
%% except those that have been fetched already and are pending acks.
-callback purge(state()) -> {purged_msg_count(), state()}.
+%% Remove all messages in the queue which have been fetched and are
+%% pending acks.
+-callback purge_acks(state()) -> state().
+
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
rabbit_types:message_properties(), boolean(), pid(),
@@ -164,7 +168,7 @@
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
rabbit_types:message_properties(),
- A) -> {('stop' | 'cont'), A}),
+ boolean(), A) -> {('stop' | 'cont'), A}),
A, state()) -> {A, state()}.
%% How long is my queue?
@@ -226,7 +230,7 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
+ {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 5b3b8aa8..5feaee46 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -334,7 +334,7 @@ postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) ->
{_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) ->
{stop, Acc};
({_SeqId, {MsgProps, Msg}}, {cont, Acc}) ->
- FoldFun(Msg, MsgProps, Acc)
+ FoldFun(Msg, MsgProps, false, Acc)
end, {cont, Acc0}, gb_trees:to_list(Messages)),
true = Model =:= Res;
@@ -397,10 +397,11 @@ rand_choice(List, Selection, N) ->
N - 1).
makefoldfun(Size) ->
- fun (Msg, _MsgProps, Acc) ->
- case length(Acc) > Size of
- false -> {cont, [Msg | Acc]};
- true -> {stop, Acc}
+ fun (Msg, _MsgProps, Unacked, Acc) ->
+ case {length(Acc) > Size, Unacked} of
+ {false, false} -> {cont, [Msg | Acc]};
+ {false, true} -> {cont, Acc};
+ {true, _} -> {stop, Acc}
end
end.
foldacc() -> [].
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8cd3a580..74cf4bee 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -273,7 +273,7 @@ handle_cast({method, Method, Content, Flow},
end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
- ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
+ ok = send(Reply, NewState),
noreply(NewState);
{noreply, NewState} ->
noreply(NewState);
@@ -295,18 +295,20 @@ handle_cast(ready_for_close, State = #ch{state = closing,
ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
{stop, normal, State};
-handle_cast(terminate, State) ->
+handle_cast(terminate, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:flush(WriterPid),
{stop, normal, State};
-handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
- State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(WriterPid, Msg),
- noreply(consumer_monitor(ConsumerTag, State));
+handle_cast({command, #'basic.consume_ok'{consumer_tag = CTag} = Msg}, State) ->
+ ok = send(Msg, State),
+ noreply(consumer_monitor(CTag, State));
-handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(WriterPid, Msg),
+handle_cast({command, Msg}, State) ->
+ ok = send(Msg, State),
noreply(State);
+handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) ->
+ noreply(State);
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -420,6 +422,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
+send(_Command, #ch{state = closing}) ->
+ ok;
+send(Command, #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(WriterPid, Command).
+
handle_exception(Reason, State = #ch{protocol = Protocol,
channel = Channel,
writer_pid = WriterPid,
@@ -564,12 +571,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
case sets:is_element(QPid, Blocking) of
false -> State;
true -> Blocking1 = sets:del_element(QPid, Blocking),
- ok = case sets:size(Blocking1) of
- 0 -> rabbit_writer:send_command(
- State#ch.writer_pid,
- #'channel.flow_ok'{active = false});
- _ -> ok
- end,
+ case sets:size(Blocking1) of
+ 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
+ _ -> ok
+ end,
State#ch{blocking = Blocking1}
end.
@@ -856,12 +861,9 @@ handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
rabbit_misc:protocol_error(not_implemented, "requeue=false", []);
handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
- {noreply, State2 = #ch{writer_pid = WriterPid}} =
- handle_method(#'basic.recover_async'{requeue = Requeue},
- Content,
- State),
- ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
- {noreply, State2};
+ {noreply, State1} = handle_method(#'basic.recover_async'{requeue = Requeue},
+ Content, State),
+ {reply, #'basic.recover_ok'{}, State1};
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
@@ -1187,17 +1189,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
- queue_consumers = QCons,
- writer_pid = WriterPid}) ->
+ queue_consumers = QCons}) ->
ConsumerTags = case dict:find(QPid, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
ConsumerMapping1 =
gb_sets:fold(fun (CTag, CMap) ->
- Cancel = #'basic.cancel'{consumer_tag = CTag,
- nowait = true},
- ok = rabbit_writer:send_command(WriterPid, Cancel),
+ ok = send(#'basic.cancel'{consumer_tag = CTag,
+ nowait = true},
+ State),
dict:erase(CTag, CMap)
end, ConsumerMapping, ConsumerTags),
State#ch{consumer_mapping = ConsumerMapping1,
@@ -1459,12 +1460,17 @@ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
send_nacks([], State) ->
State;
+send_nacks(_MXs, State = #ch{state = closing,
+ tx = none}) -> %% optimisation
+ State;
send_nacks(MXs, State = #ch{tx = none}) ->
coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
end, State);
+send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
+ State#ch{tx = failed};
send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx = failed}).
@@ -1483,9 +1489,10 @@ send_confirms(State) ->
send_confirms([], State) ->
State;
-send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.ack'{delivery_tag = MsgSeqNo}),
+send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation
+ State;
+send_confirms([MsgSeqNo], State) ->
+ ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State),
State;
send_confirms(Cs, State) ->
coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
@@ -1493,8 +1500,7 @@ send_confirms(Cs, State) ->
multiple = Multiple}
end, State).
-coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
CutOff = case dtree:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
@@ -1503,11 +1509,9 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
[] -> ok;
- _ -> ok = rabbit_writer:send_command(
- WriterPid, MkMsgFun(lists:last(Ms), true))
+ _ -> ok = send(MkMsgFun(lists:last(Ms), true), State)
end,
- [ok = rabbit_writer:send_command(
- WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
+ [ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss],
State.
ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
@@ -1524,7 +1528,7 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
end.
complete_tx(State = #ch{tx = committing}) ->
- ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
+ ok = send(#'tx.commit_ok'{}, State),
State#ch{tx = new_tx()};
complete_tx(State = #ch{tx = failed}) ->
{noreply, State1} = handle_exception(
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b5f72cad..c704804e 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/5, publish_delivered/4,
+ purge/1, purge_acks/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
@@ -198,6 +198,8 @@ purge(State = #state { gm = GM,
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1 }}.
+purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
+
publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 867aa2ed..27b0326d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -221,10 +221,12 @@ handle_cast({sync_start, Ref, Syncer},
backing_queue = BQ,
backing_queue_state = BQS }) ->
State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
- S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined,
- rate_timer_ref = TRefN,
- backing_queue_state = BQSN} end,
- %% [0] We can only sync when there are no pending acks
+ S = fun({MA, TRefN, BQSN}) ->
+ State1#state{depth_delta = undefined,
+ msg_id_ack = dict:from_list(MA),
+ rate_timer_ref = TRefN,
+ backing_queue_state = BQSN}
+ end,
case rabbit_mirror_queue_sync:slave(
DD, Ref, TRef, Syncer, BQ, BQS,
fun (BQN, BQSN) ->
@@ -234,7 +236,7 @@ handle_cast({sync_start, Ref, Syncer},
{TRefN, BQSN1}
end) of
denied -> noreply(State1);
- {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0]
+ {ok, Res} -> noreply(set_delta(0, S(Res)));
{failed, Res} -> noreply(S(Res));
{stop, Reason, Res} -> {stop, Reason, S(Res)}
end;
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index f2ab67cd..b8cfe4a9 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -57,6 +57,9 @@
-type(log_fun() :: fun ((string(), [any()]) -> 'ok')).
-type(bq() :: atom()).
-type(bqs() :: any()).
+-type(ack() :: any()).
+-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
+ bqs()}).
-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
-spec(master_go/7 :: (pid(), reference(), log_fun(),
@@ -69,8 +72,8 @@
-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(),
bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
'denied' |
- {'ok' | 'failed', {timer:tref(), bqs()}} |
- {'stop', any(), {timer:tref(), bqs()}}).
+ {'ok' | 'failed', slave_sync_state()} |
+ {'stop', any(), slave_sync_state()}).
-endif.
@@ -91,16 +94,16 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
end.
master_go0(Args, BQ, BQS) ->
- case BQ:fold(fun (Msg, MsgProps, Acc) ->
- master_send(Msg, MsgProps, Args, Acc)
+ case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) ->
+ master_send(Msg, MsgProps, Unacked, Args, Acc)
end, {0, erlang:now()}, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
{{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
{_, BQS1} -> master_done(Args, BQS1)
end.
-master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
- {I, Last}) ->
+master_send(Msg, MsgProps, Unacked,
+ {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
true -> EmitStats({syncing, I}),
Log("~p messages", [I]),
@@ -119,7 +122,7 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
gen_server2:reply(From, ok),
{stop, cancelled};
- {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
+ {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked},
{cont, {I + 1, T}};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
@@ -164,11 +167,11 @@ syncer(Ref, Log, MPid, SPids) ->
syncer_loop(Ref, MPid, SPids) ->
MPid ! {next, Ref},
receive
- {msg, Ref, Msg, MsgProps} ->
+ {msg, Ref, Msg, MsgProps, Unacked} ->
SPids1 = wait_for_credit(SPids),
[begin
credit_flow:send(SPid),
- SPid ! {sync_msg, Ref, Msg, MsgProps}
+ SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked}
end || SPid <- SPids1],
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
@@ -204,12 +207,12 @@ slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) ->
slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
MRef = erlang:monitor(process, Syncer),
Syncer ! {sync_ready, Ref, self()},
- {_MsgCount, BQS1} = BQ:purge(BQS),
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration,
- rabbit_misc:get_parent()}, TRef, BQS1).
+ rabbit_misc:get_parent()}, {[], TRef, BQS1}).
slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
- TRef, BQS) ->
+ State = {MA, TRef, BQS}) ->
receive
{'DOWN', MRef, process, Syncer, _Reason} ->
%% If the master dies half way we are not in the usual
@@ -218,34 +221,40 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
%% sync with a newly promoted master, or even just receive
%% messages from it, we have a hole in the middle. So the
%% only thing to do here is purge.
- {_MsgCount, BQS1} = BQ:purge(BQS),
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
credit_flow:peer_down(Syncer),
- {failed, {TRef, BQS1}};
+ {failed, {[], TRef, BQS1}};
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
- slave_sync_loop(Args, TRef, BQS);
+ slave_sync_loop(Args, State);
{sync_complete, Ref} ->
erlang:demonitor(MRef, [flush]),
credit_flow:peer_down(Syncer),
- {ok, {TRef, BQS}};
+ {ok, State};
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age),
- slave_sync_loop(Args, TRef, BQS);
+ slave_sync_loop(Args, State);
{'$gen_cast', {set_ram_duration_target, Duration}} ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- slave_sync_loop(Args, TRef, BQS1);
+ slave_sync_loop(Args, {MA, TRef, BQS1});
update_ram_duration ->
{TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
- slave_sync_loop(Args, TRef1, BQS1);
- {sync_msg, Ref, Msg, Props} ->
+ slave_sync_loop(Args, {MA, TRef1, BQS1});
+ {sync_msg, Ref, Msg, Props, Unacked} ->
credit_flow:ack(Syncer),
Props1 = Props#message_properties{needs_confirming = false},
- BQS1 = BQ:publish(Msg, Props1, true, none, BQS),
- slave_sync_loop(Args, TRef, BQS1);
+ {MA1, BQS1} =
+ case Unacked of
+ false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)};
+ true -> {AckTag, BQS2} = BQ:publish_delivered(
+ Msg, Props1, none, BQS),
+ {[{Msg#basic_message.id, AckTag} | MA], BQS2}
+ end,
+ slave_sync_loop(Args, {MA1, TRef, BQS1});
{'EXIT', Parent, Reason} ->
- {stop, Reason, {TRef, BQS}};
+ {stop, Reason, State};
%% If the master throws an exception
{'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
BQ:delete_and_terminate(Reason, BQS),
- {stop, Reason, {TRef, undefined}}
+ {stop, Reason, {[], TRef, undefined}}
end.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 9f94af7d..d2f36590 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -64,8 +64,8 @@ list(PluginsDir) ->
[plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]),
case Problems of
[] -> ok;
- _ -> io:format("Warning: Problem reading some plugins: ~p~n",
- [Problems])
+ _ -> error_logger:warning_msg(
+ "Problem reading some plugins: ~p~n", [Problems])
end,
Plugins.
@@ -112,8 +112,9 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
case Enabled -- plugin_names(ToUnpackPlugins) of
[] -> ok;
- Missing -> io:format("Warning: the following enabled plugins were "
- "not found: ~p~n", [Missing])
+ Missing -> error_logger:warning_msg(
+ "The following enabled plugins were not found: ~p~n",
+ [Missing])
end,
%% Eliminate the contents of the destination directory
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 95e23d29..e7d2953a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1111,7 +1111,7 @@ test_policy_validation() ->
test_server_status() ->
%% create a few things so there is some useful information to list
- Writer = spawn(fun () -> receive shutdown -> ok end end),
+ Writer = spawn(fun test_writer/0),
{ok, Ch} = rabbit_channel:start_link(
1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1,
user(<<"user">>), <<"/">>, [], self(),
@@ -1202,10 +1202,34 @@ find_listener() ->
N =:= node()],
{H, P}.
+test_amqp_connection_refusal() ->
+ [passed = test_amqp_connection_refusal(V) ||
+ V <- [<<"AMQP",9,9,9,9>>, <<"AMQP",0,1,0,0>>, <<"XXXX",0,0,9,1>>]],
+ passed.
+
+test_amqp_connection_refusal(Header) ->
+ {H, P} = find_listener(),
+ {ok, C} = gen_tcp:connect(H, P, [binary, {active, false}]),
+ ok = gen_tcp:send(C, Header),
+ {ok, <<"AMQP",0,0,9,1>>} = gen_tcp:recv(C, 8, 100),
+ ok = gen_tcp:close(C),
+ passed.
+
+find_listener() ->
+ [#listener{host = H, port = P} | _] =
+ [L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
+ N =:= node()],
+ {H, P}.
+
+test_writer() -> test_writer(none).
+
test_writer(Pid) ->
receive
- shutdown -> ok;
- {send_command, Method} -> Pid ! Method, test_writer(Pid)
+ {'$gen_call', From, flush} -> gen_server:reply(From, ok),
+ test_writer(Pid);
+ {send_command, Method} -> Pid ! Method,
+ test_writer(Pid);
+ shutdown -> ok
end.
test_spawn() ->
@@ -2364,22 +2388,26 @@ test_variable_queue() ->
fun test_dropwhile_varying_ram_duration/1,
fun test_fetchwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
+ fun test_variable_queue_purge/1,
fun test_variable_queue_requeue/1,
fun test_variable_queue_fold/1]],
passed.
test_variable_queue_fold(VQ0) ->
- {RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
- Count = rabbit_variable_queue:len(VQ1),
- Msgs = RequeuedMsgs ++ FreshMsgs,
- lists:foldl(
- fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end,
- VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
-
-test_variable_queue_fold(Cut, Msgs, VQ0) ->
+ {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
+ Count = rabbit_variable_queue:depth(VQ1),
+ Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs),
+ lists:foldl(fun (Cut, VQ2) ->
+ test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2)
+ end, VQ1, [0, 1, 2, Count div 2,
+ Count - 1, Count, Count + 1, Count * 2]).
+
+test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) ->
{Acc, VQ1} = rabbit_variable_queue:fold(
- fun (M, _, A) ->
+ fun (M, _, Pending, A) ->
MInt = msg2int(M),
+ Pending = lists:member(MInt, PendingMsgs), %% assert
case MInt =< Cut of
true -> {cont, [MInt | A]};
false -> {stop, A}
@@ -2440,10 +2468,11 @@ variable_queue_with_holes(VQ0) ->
Depth = rabbit_variable_queue:depth(VQ8),
Len = Depth - length(Subset3),
Len = rabbit_variable_queue:len(VQ8),
- {(Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}.
+ {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}.
test_variable_queue_requeue(VQ0) ->
- {RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
+ {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
Msgs =
lists:zip(RequeuedMsgs,
lists:duplicate(length(RequeuedMsgs), true)) ++
@@ -2459,6 +2488,21 @@ test_variable_queue_requeue(VQ0) ->
{empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
VQ3.
+test_variable_queue_purge(VQ0) ->
+ LenDepth = fun (VQ) ->
+ {rabbit_variable_queue:len(VQ),
+ rabbit_variable_queue:depth(VQ)}
+ end,
+ VQ1 = variable_queue_publish(false, 10, VQ0),
+ {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1),
+ {4, VQ3} = rabbit_variable_queue:purge(VQ2),
+ {0, 6} = LenDepth(VQ3),
+ {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3),
+ {2, 6} = LenDepth(VQ4),
+ VQ5 = rabbit_variable_queue:purge_acks(VQ4),
+ {2, 2} = LenDepth(VQ5),
+ VQ5.
+
test_variable_queue_ack_limiting(VQ0) ->
%% start by sending in a bunch of messages
Len = 1024,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8a7045ea..34a4b52f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2, purge/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4,
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
@@ -519,6 +519,8 @@ purge(State = #vqstate { q4 = Q4,
ram_msg_count = 0,
persistent_count = PCount1 })}.
+purge_acks(State) -> a(purge_pending_ack(false, State)).
+
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
@@ -676,25 +678,12 @@ ackfold(MsgFun, Acc, State, AckTags) ->
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
-fold(Fun, Acc, #vqstate { q1 = Q1,
- q2 = Q2,
- delta = #delta { start_seq_id = DeltaSeqId,
- end_seq_id = DeltaSeqIdEnd },
- q3 = Q3,
- q4 = Q4 } = State) ->
- QFun = fun(MsgStatus, {Acc0, State0}) ->
- {Msg, State1} = read_msg(MsgStatus, State0),
- {StopGo, AccNext} =
- Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
- {StopGo, {AccNext, State1}}
- end,
- {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
- {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3),
- {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2},
- DeltaSeqId, DeltaSeqIdEnd, State2),
- {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2),
- {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1),
- {Acc5, State5}.
+fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
+ {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
+ [msg_iterator(State),
+ disk_ack_iterator(State),
+ ram_ack_iterator(State)]),
+ ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
@@ -1101,14 +1090,16 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
read_msg(#msg_status{msg = undefined,
msg_id = MsgId,
- is_persistent = IsPersistent},
- State = #vqstate{msg_store_clients = MSCState}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, MsgId),
- {Msg, State #vqstate {msg_store_clients = MSCState1}};
+ is_persistent = IsPersistent}, State) ->
+ read_msg(MsgId, IsPersistent, State);
read_msg(#msg_status{msg = Msg}, State) ->
{Msg, State}.
+read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {Msg, State #vqstate {msg_store_clients = MSCState1}}.
+
inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
State#vqstate{ram_msg_count = RamMsgCount + 1}.
@@ -1389,7 +1380,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue and fold
+%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
@@ -1459,48 +1450,81 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
-qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A;
-qfoldl( Fun, {cont, Acc} = A, Q) ->
+%%----------------------------------------------------------------------------
+%% Iterator
+%%----------------------------------------------------------------------------
+
+ram_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
+
+disk_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+
+msg_iterator(State) -> istate(start, State).
+
+istate(start, State) -> {q4, State#vqstate.q4, State};
+istate(q4, State) -> {q3, State#vqstate.q3, State};
+istate(q3, State) -> {delta, State#vqstate.delta, State};
+istate(delta, State) -> {q2, State#vqstate.q2, State};
+istate(q2, State) -> {q1, State#vqstate.q1, State};
+istate(q1, _State) -> done.
+
+next({ack, It}, IndexState) ->
+ case gb_trees:next(It) of
+ none -> {empty, IndexState};
+ {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
+ {value, MsgStatus, true, Next, IndexState}
+ end;
+next(done, IndexState) -> {empty, IndexState};
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqId}, State}, IndexState) ->
+ next(istate(delta, State), IndexState);
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
+ SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
+ SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
+next({delta, Delta, [], State}, IndexState) ->
+ next({delta, Delta, State}, IndexState);
+next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
+ case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
+ gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
+ false -> Next = {delta, Delta, Rest, State},
+ {value, beta_msg_status(M), false, Next, IndexState};
+ true -> next({delta, Delta, Rest, State}, IndexState)
+ end;
+next({Key, Q, State}, IndexState) ->
case ?QUEUE:out(Q) of
- {empty, _Q} -> A;
- {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1)
+ {empty, _Q} -> next(istate(Key, State), IndexState);
+ {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
+ {value, MsgStatus, false, Next, IndexState}
end.
-lfoldl(_Fun, {stop, _Acc} = A, _L) -> A;
-lfoldl(_Fun, {cont, _Acc} = A, []) -> A;
-lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T).
-
-delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
- {stop, {Acc, State}};
-delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
- {cont, {Acc, State}};
-delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
- #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- index_state = IndexState,
- msg_store_clients = MSCState } = State) ->
- DeltaSeqId1 = lists:min(
- [rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
- IndexState),
- {StopCont, {Acc1, MSCState1}} =
- lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered},
- {Acc0, MSCState0}) ->
- case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA)) of
- false -> {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState0, IsPersistent,
- MsgId),
- {StopCont, AccNext} =
- Fun(Msg, MsgProps, Acc0),
- {StopCont, {AccNext, MSCState1}};
- true -> {cont, {Acc0, MSCState0}}
- end
- end, {cont, {Acc, MSCState}}, List),
- delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
- State #vqstate { index_state = IndexState1,
- msg_store_clients = MSCState1 }).
+inext(It, {Its, IndexState}) ->
+ case next(It, IndexState) of
+ {empty, IndexState1} ->
+ {Its, IndexState1};
+ {value, MsgStatus1, Unacked, It1, IndexState1} ->
+ {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
+ end.
+
+ifold(_Fun, Acc, [], State) ->
+ {Acc, State};
+ifold(Fun, Acc, Its, State) ->
+ [{MsgStatus, Unacked, It} | Rest] =
+ lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
+ {#msg_status{seq_id = SeqId2}, _, _}) ->
+ SeqId1 =< SeqId2
+ end, Its),
+ {Msg, State1} = read_msg(MsgStatus, State),
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
+ {stop, Acc1} ->
+ {Acc1, State};
+ {cont, Acc1} ->
+ {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}),
+ ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1})
+ end.
%%----------------------------------------------------------------------------
%% Phase changes
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index a7ea3d99..059d3839 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -21,7 +21,8 @@
-export([start/5, start_link/5, start/6, start_link/6]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
- send_command_and_notify/4, send_command_and_notify/5]).
+ send_command_and_notify/4, send_command_and_notify/5,
+ flush/1]).
-export([internal_send_command/4, internal_send_command/6]).
%% internal
@@ -69,6 +70,7 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-spec(internal_send_command/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
rabbit_framing:amqp_method_record(), rabbit_types:protocol())
@@ -130,7 +132,7 @@ mainloop1(State) ->
receive
Message -> ?MODULE:mainloop1(handle_message(Message, State))
after 0 ->
- ?MODULE:mainloop1(flush(State))
+ ?MODULE:mainloop1(internal_flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
@@ -138,12 +140,18 @@ handle_message({send_command, MethodRecord}, State) ->
handle_message({send_command, MethodRecord, Content}, State) ->
internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
- State1 = flush(internal_send_command_async(MethodRecord, State)),
+ State1 = internal_flush(
+ internal_send_command_async(MethodRecord, State)),
gen_server:reply(From, ok),
State1;
handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
State) ->
- State1 = flush(internal_send_command_async(MethodRecord, Content, State)),
+ State1 = internal_flush(
+ internal_send_command_async(MethodRecord, Content, State)),
+ gen_server:reply(From, ok),
+ State1;
+handle_message({'$gen_call', From, flush}, State) ->
+ State1 = internal_flush(State),
gen_server:reply(From, ok),
State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
@@ -192,6 +200,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
+flush(W) -> call(W, flush).
+
%%---------------------------------------------------------------------------
call(Pid, Msg) ->
@@ -251,13 +261,13 @@ internal_send_command_async(MethodRecord, Content,
maybe_flush(State = #wstate{pending = Pending}) ->
case iolist_size(Pending) >= ?FLUSH_THRESHOLD of
- true -> flush(State);
+ true -> internal_flush(State);
false -> State
end.
-flush(State = #wstate{pending = []}) ->
+internal_flush(State = #wstate{pending = []}) ->
State;
-flush(State = #wstate{sock = Sock, pending = Pending}) ->
+internal_flush(State = #wstate{sock = Sock, pending = Pending}) ->
ok = port_cmd(Sock, lists:reverse(Pending)),
State#wstate{pending = []}.