summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-01-04 16:31:13 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-01-04 16:31:13 +0000
commitffbb5c778b325af2001201f85e29195a796ff2be (patch)
tree512dcdaa5a7816b00b26db6de68d33242242d850
parentf8b609ef801de8e6463df7392fada663be076338 (diff)
parent0c603bc18f3276b7e1a72712f63f32aeb6de35e0 (diff)
downloadrabbitmq-server-ffbb5c778b325af2001201f85e29195a796ff2be.tar.gz
Merge bug25372 into bug25334
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--src/gm.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl48
-rw-r--r--src/rabbit_auth_backend_internal.erl10
-rw-r--r--src/rabbit_backing_queue.erl26
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl15
-rw-r--r--src/rabbit_misc.erl12
-rw-r--r--src/rabbit_mnesia.erl23
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_tests.erl19
-rw-r--r--src/rabbit_trace.erl18
-rw-r--r--src/rabbit_variable_queue.erl29
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--src/rabbit_vm.erl21
16 files changed, 149 insertions, 99 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 5d9b9e2e..6e02c7a4 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -123,6 +123,9 @@ done
rm -rf %{buildroot}
%changelog
+* Tue Dec 11 2012 simon@rabbitmq.com 3.0.1-1
+- New Upstream Release
+
* Fri Nov 16 2012 simon@rabbitmq.com 3.0.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 17327133..aed68b96 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.0.1-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Tue, 11 Dec 2012 11:29:55 +0000
+
rabbitmq-server (3.0.0-1) unstable; urgency=low
* New Upstream Release
diff --git a/src/gm.erl b/src/gm.erl
index 4a95de0d..2057b1f5 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -667,6 +667,9 @@ handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
+handle_info(timeout, State) ->
+ noreply(flush_broadcast_buffer(State));
+
handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
@@ -834,10 +837,13 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
- {noreply, ensure_broadcast_timer(State), hibernate}.
+ {noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
reply(Reply, State) ->
- {reply, Reply, ensure_broadcast_timer(State), hibernate}.
+ {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
+
+flush_timeout(#state{broadcast_buffer = []}) -> hibernate;
+flush_timeout(_) -> 0.
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = undefined }) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2ffa2a1a..b5ad1ac0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -283,21 +283,17 @@ terminate_shutdown(Fun, State) ->
end.
reply(Reply, NewState) ->
- assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
- {reply, Reply, NewState1, Timeout}.
+ {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
noreply(NewState) ->
- assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
- {noreply, NewState1, Timeout}.
+ {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ assert_invariant(State),
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = ensure_stats_timer(
- ensure_rate_timer(
- confirm_messages(MsgIds, State#q{
- backing_queue_state = BQS1}))),
+ State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}),
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -327,15 +323,11 @@ ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
TRef = erlang:send_after(
?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
ensure_rate_timer(State) ->
State.
stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
State;
-stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
erlang:cancel_timer(TRef),
State#q{rate_timer_ref = undefined}.
@@ -728,7 +720,7 @@ drop_expired_messages(State = #q{dlx = DLX,
undefined -> BQ:dropwhile(ExpirePred, BQS);
_ -> {Next, Msgs, BQS2} =
BQ:fetchwhile(ExpirePred,
- fun accumulate_msgs/4,
+ fun accumulate_msgs/3,
[], BQS),
case Msgs of
[] -> ok;
@@ -742,7 +734,7 @@ drop_expired_messages(State = #q{dlx = DLX,
#message_properties{expiry = Exp} -> Exp
end, State#q{backing_queue_state = BQS1}).
-accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc].
ensure_ttl_timer(undefined, State) ->
State;
@@ -799,12 +791,9 @@ stop(State) -> stop(undefined, noreply, State).
stop(From, Reply, State = #q{unconfirmed = UC}) ->
case {dtree:is_empty(UC), Reply} of
- {true, noreply} ->
- {stop, normal, State};
- {true, _} ->
- {stop, normal, Reply, State};
- {false, _} ->
- noreply(State#q{delayed_stop = {From, Reply}})
+ {true, noreply} -> {stop, normal, State};
+ {true, _} -> {stop, normal, Reply, State};
+ {false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
end.
cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
@@ -1213,8 +1202,9 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end,
- BQS, AckTags),
+ {ok, BQS1} = BQ:ackfold(
+ fun (M, A, ok) -> DLXFun([{M, A}]) end,
+ ok, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
@@ -1327,10 +1317,10 @@ handle_info(drop_expired, State) ->
handle_info(emit_stats, State) ->
emit_stats(State),
- {noreply, State1, Timeout} = noreply(State),
- %% Need to reset *after* we've been through noreply/1 so we do not
- %% just create another timer always and therefore never hibernate
- {noreply, rabbit_event:reset_stats_timer(State1, #q.stats_timer), Timeout};
+ %% Don't call noreply/1, we don't want to set timers
+ {State1, Timeout} = next_state(rabbit_event:reset_stats_timer(
+ State, #q.stats_timer)),
+ {noreply, State1, Timeout};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -1354,8 +1344,10 @@ handle_info(update_ram_duration, State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State#q{rate_timer_ref = just_measured,
- backing_queue_state = BQS2});
+ %% Don't call noreply/1, we don't want to set timers
+ {State1, Timeout} = next_state(State#q{rate_timer_ref = undefined,
+ backing_queue_state = BQS2}),
+ {noreply, State1, Timeout};
handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined}));
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 7b9df81e..919be3f3 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -49,7 +49,7 @@
-spec(hash_password/1 :: (rabbit_types:password())
-> rabbit_types:password_hash()).
-spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok').
--spec(list_users/0 :: () -> rabbit_types:infos()).
+-spec(list_users/0 :: () -> [rabbit_types:infos()]).
-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(lookup_user/1 :: (rabbit_types:username())
-> rabbit_types:ok(rabbit_types:internal_user())
@@ -58,14 +58,14 @@
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
-> 'ok').
--spec(list_permissions/0 :: () -> rabbit_types:infos()).
+-spec(list_permissions/0 :: () -> [rabbit_types:infos()]).
-spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> rabbit_types:infos()).
+ (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_user_permissions/1 ::
- (rabbit_types:username()) -> rabbit_types:infos()).
+ (rabbit_types:username()) -> [rabbit_types:infos()]).
-spec(list_user_vhost_permissions/2 ::
(rabbit_types:username(), rabbit_types:vhost())
- -> rabbit_types:infos()).
+ -> [rabbit_types:infos()]).
-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 272df5c1..99b5946e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -35,8 +35,7 @@
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
- 'undefined').
+-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
@@ -133,14 +132,11 @@
-> {rabbit_types:message_properties() | undefined, state()}.
%% Like dropwhile, except messages are fetched in "require
-%% acknowledgement" mode and are passed, together with their Delivered
-%% flag and ack tag, to the supplied function. The function is also
-%% fed an accumulator. The result of fetchwhile is as for dropwhile
-%% plus the accumulator.
--callback fetchwhile(msg_pred(),
- fun ((rabbit_types:basic_message(), boolean(), ack(), A)
- -> A),
- A, state())
+%% acknowledgement" mode and are passed, together with their ack tag,
+%% to the supplied function. The function is also fed an
+%% accumulator. The result of fetchwhile is as for dropwhile plus the
+%% accumulator.
+-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
-> {rabbit_types:message_properties() | undefined,
A, state()}.
@@ -156,14 +152,14 @@
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
-%% Acktags supplied are for messages which should be processed. The
-%% provided callback function is called with each message.
--callback foreach_ack(msg_fun(), state(), [ack()]) -> state().
-
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over messages by ack tag. The supplied function is called with
+%% each message, its ack tag, and an accumulator.
+-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
+
%% Fold over all the messages in a queue and return the accumulated
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
@@ -233,7 +229,7 @@ behaviour_info(callbacks) ->
{delete_and_terminate, 2}, {purge, 1}, {publish, 5},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
- {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
+ {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a3c82865..617ea25f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -635,7 +635,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_trace_in(Message, TraceState),
+ rabbit_trace:tap_in(Message, TraceState),
Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
{noreply,
@@ -1253,7 +1253,7 @@ record_sent(ConsumerTag, AckRequired,
true -> incr_stats([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ rabbit_trace:tap_out(Msg, TraceState),
UAMQ1 = case AckRequired of
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
UAMQ);
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e3d967bc..e857f395 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,11 +18,11 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/5, publish_delivered/4,
- discard/3, fetch/2, drop/2, ack/2,
- requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
+ status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -281,10 +281,6 @@ ack(AckTags, State = #state { gm = GM,
end,
{MsgIds, State #state { backing_queue_state = BQS1 }}.
-foreach_ack(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }.
-
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -292,6 +288,11 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+ackfold(MsgFun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags),
+ {Acc1, State #state { backing_queue_state = BQS1 }}.
+
fold(Fun, Acc, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fold(Fun, Acc, BQS),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 81bb6769..4efde50e 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,6 +46,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
+-export([version_minor_equivalent/2]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([parse_arguments/3]).
@@ -191,6 +192,7 @@
-spec(version_compare/3 ::
(string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
-> boolean()).
+-spec(version_minor_equivalent/2 :: (string(), string()) -> boolean()).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
@@ -734,6 +736,16 @@ version_compare(A, B) ->
ANum > BNum -> gt
end.
+%% a.b.c and a.b.d match, but a.b.c and a.d.e don't. If
+%% versions do not match that pattern, just compare them.
+version_minor_equivalent(A, B) ->
+ {ok, RE} = re:compile("^(\\d+\\.\\d+)(\\.\\d+)\$"),
+ Opts = [{capture, all_but_first, list}],
+ case {re:run(A, RE, Opts), re:run(B, RE, Opts)} of
+ {{match, [A1|_]}, {match, [B1|_]}} -> A1 =:= B1;
+ _ -> A =:= B
+ end.
+
dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A).
dict_cons(Key, Value, Dict) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 942048f9..6a442fec 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -68,7 +68,8 @@
%% Various queries to get the status of the db
-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
+ {'running_nodes', [node()]} |
+ {'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
@@ -757,9 +758,16 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
[node(), Node, Node])}}
end.
-check_version_consistency(This, Remote, _) when This =:= Remote ->
- ok;
check_version_consistency(This, Remote, Name) ->
+ check_version_consistency(This, Remote, Name, fun (A, B) -> A =:= B end).
+
+check_version_consistency(This, Remote, Name, Comp) ->
+ case Comp(This, Remote) of
+ true -> ok;
+ false -> version_error(Name, This, Remote)
+ end.
+
+version_error(Name, This, Remote) ->
{error, {inconsistent_cluster,
rabbit_misc:format("~s version mismatch: local node is ~s, "
"remote node ~s", [Name, This, Remote])}}.
@@ -767,8 +775,15 @@ check_version_consistency(This, Remote, Name) ->
check_otp_consistency(Remote) ->
check_version_consistency(erlang:system_info(otp_release), Remote, "OTP").
+%% Unlike the rest of 3.0.x, 3.0.0 is not compatible. This can be
+%% removed after 3.1.0 is released.
+check_rabbit_consistency("3.0.0") ->
+ version_error("Rabbit", rabbit_misc:version(), "3.0.0");
+
check_rabbit_consistency(Remote) ->
- check_version_consistency(rabbit_misc:version(), Remote, "Rabbit").
+ check_version_consistency(
+ rabbit_misc:version(), Remote, "Rabbit",
+ fun rabbit_misc:version_minor_equivalent/2).
%% This is fairly tricky. We want to know if the node is in the state
%% that a `reset' would leave it in. We cannot simply check if the
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 8d0e4456..258ac0ce 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -53,7 +53,7 @@
-spec(notify_joined_cluster/0 :: () -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
--spec(partitions/0 :: () -> {node(), [{atom(), node()}]}).
+-spec(partitions/0 :: () -> {node(), [node()]}).
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2226f445..09ed3d08 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -38,6 +38,7 @@ all_tests() ->
passed = mirrored_supervisor_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
ok = file_handle_cache:set_limit(10),
+ passed = test_version_equivalance(),
passed = test_multi_call(),
passed = test_file_handle_cache(),
passed = test_backing_queue(),
@@ -141,6 +142,16 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed.
+test_version_equivalance() ->
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0"),
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.1"),
+ true = rabbit_misc:version_minor_equivalent("%%VSN%%", "%%VSN%%"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.1.0"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0.1"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"),
+ passed.
+
test_multi_call() ->
Fun = fun() ->
receive
@@ -2423,7 +2434,7 @@ test_dropfetchwhile(VQ0) ->
{#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} =
rabbit_variable_queue:fetchwhile(
fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end,
- fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) ->
+ fun (Msg, AckTag, {MsgAcc, AckAcc}) ->
{[Msg | MsgAcc], [AckTag | AckAcc]}
end, {[], []}, VQ1),
true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)],
@@ -2462,7 +2473,7 @@ test_fetchwhile_varying_ram_duration(VQ0) ->
fun (VQ1) ->
{_, ok, VQ2} = rabbit_variable_queue:fetchwhile(
fun (_) -> false end,
- fun (_, _, _, A) -> A end,
+ fun (_, _, A) -> A end,
ok, VQ1),
VQ2
end, VQ0).
@@ -2597,8 +2608,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_variable_queue_fold_msg_on_disk(VQ0) ->
VQ1 = variable_queue_publish(true, 1, VQ0),
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
- VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end,
- VQ2, AckTags),
+ {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end,
+ ok, VQ2, AckTags),
VQ3.
test_queue_recover() ->
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 3a5b96de..b9a7cc15 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,7 +16,7 @@
-module(rabbit_trace).
--export([init/1, tracing/1, tap_trace_in/2, tap_trace_out/2, start/1, stop/1]).
+-export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -31,9 +31,9 @@
-type(state() :: rabbit_types:exchange() | 'none').
-spec(init/1 :: (rabbit_types:vhost()) -> state()).
--spec(tracing/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
--spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
+-spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()).
+-spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
+-spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
-spec(start/1 :: (rabbit_types:vhost()) -> 'ok').
-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok').
@@ -43,23 +43,21 @@
%%----------------------------------------------------------------------------
init(VHost) ->
- case tracing(VHost) of
+ case enabled(VHost) of
false -> none;
true -> {ok, X} = rabbit_exchange:lookup(
rabbit_misc:r(VHost, exchange, ?XNAME)),
X
end.
-tracing(VHost) ->
+enabled(VHost) ->
{ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
lists:member(VHost, VHosts).
-tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}},
- TraceX) ->
+tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) ->
maybe_trace(TraceX, Msg, <<"publish">>, XName, []).
-tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg},
- TraceX) ->
+tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
maybe_trace(TraceX, Msg, <<"deliver">>, QName,
[{<<"redelivered">>, signedint, RedeliveredNum}]).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3e4c7c86..05468a6e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,10 +19,10 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4,
- fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
+ fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
+ is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -597,10 +597,9 @@ fetchwhile(Pred, Fun, Acc, State) ->
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, IsDelivered, AckTag}, State3} =
+ {{Msg, _IsDelivered, AckTag}, State3} =
internal_fetch(true, MsgStatus1, State2),
- Acc1 = Fun(Msg, IsDelivered, AckTag, Acc),
- fetchwhile(Pred, Fun, Acc1, State3);
+ fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.
@@ -650,16 +649,6 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-foreach_ack(undefined, State, _AckTags) ->
- State;
-foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- a(lists:foldl(fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), false, State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags)).
-
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
q4 = Q4,
@@ -681,6 +670,16 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+ackfold(MsgFun, Acc, State, AckTags) ->
+ {AccN, StateN} =
+ lists:foldl(
+ fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) ->
+ {#msg_status { msg = Msg }, State1} =
+ read_msg(gb_trees:get(SeqId, PA), false, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
+ {AccN, a(StateN)}.
+
fold(Fun, Acc, #vqstate { q1 = Q1,
q2 = Q2,
delta = #delta { start_seq_id = DeltaSeqId,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 297fa56f..0bb18f4c 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -123,7 +123,7 @@ with(VHostPath, Thunk) ->
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, VHost) -> VHost;
-i(tracing, VHost) -> rabbit_trace:tracing(VHost);
+i(tracing, VHost) -> rabbit_trace:enabled(VHost);
i(Item, _) -> throw({bad_argument, Item}).
info(VHost) -> infos(?INFO_KEYS, VHost).
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 53f3df18..db674f91 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -84,7 +84,15 @@ sup_memory(Sup) ->
sup_children(Sup) ->
rabbit_misc:with_exit_handler(
- rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end).
+ rabbit_misc:const([]),
+ fun () ->
+ %% Just in case we end up talking to something that is
+ %% not a supervisor by mistake.
+ case supervisor:which_children(Sup) of
+ L when is_list(L) -> L;
+ _ -> []
+ end
+ end).
pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of
{memory, M} -> M;
@@ -119,10 +127,13 @@ plugin_memory() ->
is_plugin(atom_to_list(App))]).
plugin_memory(App) ->
- case catch application_master:get_child(
- application_controller:get_master(App)) of
- {Pid, _} -> sup_memory(Pid);
- _ -> 0
+ case application_controller:get_master(App) of
+ undefined -> 0;
+ Master -> case application_master:get_child(Master) of
+ {Pid, _} when is_pid(Pid) -> sup_memory(Pid);
+ Pid when is_pid(Pid) -> sup_memory(Pid);
+ _ -> 0
+ end
end.
is_plugin("rabbitmq_" ++ _) -> true;