diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-01-04 16:31:13 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-01-04 16:31:13 +0000 |
commit | ffbb5c778b325af2001201f85e29195a796ff2be (patch) | |
tree | 512dcdaa5a7816b00b26db6de68d33242242d850 | |
parent | f8b609ef801de8e6463df7392fada663be076338 (diff) | |
parent | 0c603bc18f3276b7e1a72712f63f32aeb6de35e0 (diff) | |
download | rabbitmq-server-ffbb5c778b325af2001201f85e29195a796ff2be.tar.gz |
Merge bug25372 into bug25334
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
-rw-r--r-- | src/gm.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 48 | ||||
-rw-r--r-- | src/rabbit_auth_backend_internal.erl | 10 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 26 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 15 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 23 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 19 | ||||
-rw-r--r-- | src/rabbit_trace.erl | 18 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 29 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 2 | ||||
-rw-r--r-- | src/rabbit_vm.erl | 21 |
16 files changed, 149 insertions, 99 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 5d9b9e2e..6e02c7a4 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -123,6 +123,9 @@ done rm -rf %{buildroot} %changelog +* Tue Dec 11 2012 simon@rabbitmq.com 3.0.1-1 +- New Upstream Release + * Fri Nov 16 2012 simon@rabbitmq.com 3.0.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 17327133..aed68b96 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (3.0.1-1) unstable; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Tue, 11 Dec 2012 11:29:55 +0000 + rabbitmq-server (3.0.0-1) unstable; urgency=low * New Upstream Release @@ -667,6 +667,9 @@ handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); +handle_info(timeout, State) -> + noreply(flush_broadcast_buffer(State)); + handle_info({'DOWN', MRef, process, _Pid, Reason}, State = #state { self = Self, left = Left, @@ -834,10 +837,13 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, ensure_broadcast_timer(State), hibernate}. + {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. reply(Reply, State) -> - {reply, Reply, ensure_broadcast_timer(State), hibernate}. + {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. + +flush_timeout(#state{broadcast_buffer = []}) -> hibernate; +flush_timeout(_) -> 0. ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = undefined }) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2ffa2a1a..b5ad1ac0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -283,21 +283,17 @@ terminate_shutdown(Fun, State) -> end. reply(Reply, NewState) -> - assert_invariant(NewState), {NewState1, Timeout} = next_state(NewState), - {reply, Reply, NewState1, Timeout}. + {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. noreply(NewState) -> - assert_invariant(NewState), {NewState1, Timeout} = next_state(NewState), - {noreply, NewState1, Timeout}. + {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + assert_invariant(State), {MsgIds, BQS1} = BQ:drain_confirmed(BQS), - State1 = ensure_stats_timer( - ensure_rate_timer( - confirm_messages(MsgIds, State#q{ - backing_queue_state = BQS1}))), + State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}), case BQ:needs_timeout(BQS1) of false -> {stop_sync_timer(State1), hibernate }; idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; @@ -327,15 +323,11 @@ ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> TRef = erlang:send_after( ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration), State#q{rate_timer_ref = TRef}; -ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> - State#q{rate_timer_ref = undefined}; ensure_rate_timer(State) -> State. stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> State; -stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> - State#q{rate_timer_ref = undefined}; stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> erlang:cancel_timer(TRef), State#q{rate_timer_ref = undefined}. @@ -728,7 +720,7 @@ drop_expired_messages(State = #q{dlx = DLX, undefined -> BQ:dropwhile(ExpirePred, BQS); _ -> {Next, Msgs, BQS2} = BQ:fetchwhile(ExpirePred, - fun accumulate_msgs/4, + fun accumulate_msgs/3, [], BQS), case Msgs of [] -> ok; @@ -742,7 +734,7 @@ drop_expired_messages(State = #q{dlx = DLX, #message_properties{expiry = Exp} -> Exp end, State#q{backing_queue_state = BQS1}). -accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc]. +accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc]. ensure_ttl_timer(undefined, State) -> State; @@ -799,12 +791,9 @@ stop(State) -> stop(undefined, noreply, State). stop(From, Reply, State = #q{unconfirmed = UC}) -> case {dtree:is_empty(UC), Reply} of - {true, noreply} -> - {stop, normal, State}; - {true, _} -> - {stop, normal, Reply, State}; - {false, _} -> - noreply(State#q{delayed_stop = {From, Reply}}) + {true, noreply} -> {stop, normal, State}; + {true, _} -> {stop, normal, Reply, State}; + {false, _} -> noreply(State#q{delayed_stop = {From, Reply}}) end. cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, @@ -1213,8 +1202,9 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end, - BQS, AckTags), + {ok, BQS1} = BQ:ackfold( + fun (M, A, ok) -> DLXFun([{M, A}]) end, + ok, BQS, AckTags), State1#q{backing_queue_state = BQS1} end)); @@ -1327,10 +1317,10 @@ handle_info(drop_expired, State) -> handle_info(emit_stats, State) -> emit_stats(State), - {noreply, State1, Timeout} = noreply(State), - %% Need to reset *after* we've been through noreply/1 so we do not - %% just create another timer always and therefore never hibernate - {noreply, rabbit_event:reset_stats_timer(State1, #q.stats_timer), Timeout}; + %% Don't call noreply/1, we don't want to set timers + {State1, Timeout} = next_state(rabbit_event:reset_stats_timer( + State, #q.stats_timer)), + {noreply, State1, Timeout}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -1354,8 +1344,10 @@ handle_info(update_ram_duration, State = #q{backing_queue = BQ, DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State#q{rate_timer_ref = just_measured, - backing_queue_state = BQS2}); + %% Don't call noreply/1, we don't want to set timers + {State1, Timeout} = next_state(State#q{rate_timer_ref = undefined, + backing_queue_state = BQS2}), + {noreply, State1, Timeout}; handle_info(sync_timeout, State) -> noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 7b9df81e..919be3f3 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -49,7 +49,7 @@ -spec(hash_password/1 :: (rabbit_types:password()) -> rabbit_types:password_hash()). -spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok'). --spec(list_users/0 :: () -> rabbit_types:infos()). +-spec(list_users/0 :: () -> [rabbit_types:infos()]). -spec(user_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(lookup_user/1 :: (rabbit_types:username()) -> rabbit_types:ok(rabbit_types:internal_user()) @@ -58,14 +58,14 @@ regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) -> 'ok'). --spec(list_permissions/0 :: () -> rabbit_types:infos()). +-spec(list_permissions/0 :: () -> [rabbit_types:infos()]). -spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> rabbit_types:infos()). + (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_user_permissions/1 :: - (rabbit_types:username()) -> rabbit_types:infos()). + (rabbit_types:username()) -> [rabbit_types:infos()]). -spec(list_user_vhost_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) - -> rabbit_types:infos()). + -> [rabbit_types:infos()]). -spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 272df5c1..99b5946e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -35,8 +35,7 @@ fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). --type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | - 'undefined'). +-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)). -type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). %% Called on startup with a list of durable queue names. The queues @@ -133,14 +132,11 @@ -> {rabbit_types:message_properties() | undefined, state()}. %% Like dropwhile, except messages are fetched in "require -%% acknowledgement" mode and are passed, together with their Delivered -%% flag and ack tag, to the supplied function. The function is also -%% fed an accumulator. The result of fetchwhile is as for dropwhile -%% plus the accumulator. --callback fetchwhile(msg_pred(), - fun ((rabbit_types:basic_message(), boolean(), ack(), A) - -> A), - A, state()) +%% acknowledgement" mode and are passed, together with their ack tag, +%% to the supplied function. The function is also fed an +%% accumulator. The result of fetchwhile is as for dropwhile plus the +%% accumulator. +-callback fetchwhile(msg_pred(), msg_fun(A), A, state()) -> {rabbit_types:message_properties() | undefined, A, state()}. @@ -156,14 +152,14 @@ %% about. Must return 1 msg_id per Ack, in the same order as Acks. -callback ack([ack()], state()) -> {msg_ids(), state()}. -%% Acktags supplied are for messages which should be processed. The -%% provided callback function is called with each message. --callback foreach_ack(msg_fun(), state(), [ack()]) -> state(). - %% Reinsert messages into the queue which have already been delivered %% and were pending acknowledgement. -callback requeue([ack()], state()) -> {msg_ids(), state()}. +%% Fold over messages by ack tag. The supplied function is called with +%% each message, its ack tag, and an accumulator. +-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}. + %% Fold over all the messages in a queue and return the accumulated %% results, leaving the queue undisturbed. -callback fold(fun((rabbit_types:basic_message(), @@ -233,7 +229,7 @@ behaviour_info(callbacks) -> {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, - {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1}, + {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a3c82865..617ea25f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -635,7 +635,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> - rabbit_trace:tap_trace_in(Message, TraceState), + rabbit_trace:tap_in(Message, TraceState), Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), {noreply, @@ -1253,7 +1253,7 @@ record_sent(ConsumerTag, AckRequired, true -> incr_stats([{queue_stats, QName, 1}], redeliver, State); false -> ok end, - rabbit_trace:tap_trace_out(Msg, TraceState), + rabbit_trace:tap_out(Msg, TraceState), UAMQ1 = case AckRequired of true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, UAMQ); diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e3d967bc..e857f395 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,11 +18,11 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/5, publish_delivered/4, - discard/3, fetch/2, drop/2, ack/2, - requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, + discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, + len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, foreach_ack/3]). + status/1, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -281,10 +281,6 @@ ack(AckTags, State = #state { gm = GM, end, {MsgIds, State #state { backing_queue_state = BQS1 }}. -foreach_ack(MsgFun, State = #state { backing_queue = BQ, - backing_queue_state = BQS }, AckTags) -> - State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }. - requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> @@ -292,6 +288,11 @@ requeue(AckTags, State = #state { gm = GM, ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. +ackfold(MsgFun, Acc, State = #state { backing_queue = BQ, + backing_queue_state = BQS }, AckTags) -> + {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags), + {Acc1, State #state { backing_queue_state = BQS1 }}. + fold(Fun, Acc, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {Result, BQS1} = BQ:fold(Fun, Acc, BQS), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 81bb6769..4efde50e 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,6 +46,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). +-export([version_minor_equivalent/2]). -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([parse_arguments/3]). @@ -191,6 +192,7 @@ -spec(version_compare/3 :: (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()). +-spec(version_minor_equivalent/2 :: (string(), string()) -> boolean()). -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). @@ -734,6 +736,16 @@ version_compare(A, B) -> ANum > BNum -> gt end. +%% a.b.c and a.b.d match, but a.b.c and a.d.e don't. If +%% versions do not match that pattern, just compare them. +version_minor_equivalent(A, B) -> + {ok, RE} = re:compile("^(\\d+\\.\\d+)(\\.\\d+)\$"), + Opts = [{capture, all_but_first, list}], + case {re:run(A, RE, Opts), re:run(B, RE, Opts)} of + {{match, [A1|_]}, {match, [B1|_]}} -> A1 =:= B1; + _ -> A =:= B + end. + dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A). dict_cons(Key, Value, Dict) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 942048f9..6a442fec 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -68,7 +68,8 @@ %% Various queries to get the status of the db -spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | - {'running_nodes', [node()]}]). + {'running_nodes', [node()]} | + {'partitions', [{node(), [node()]}]}]). -spec(is_clustered/0 :: () -> boolean()). -spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]). -spec(node_type/0 :: () -> node_type()). @@ -757,9 +758,16 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> [node(), Node, Node])}} end. -check_version_consistency(This, Remote, _) when This =:= Remote -> - ok; check_version_consistency(This, Remote, Name) -> + check_version_consistency(This, Remote, Name, fun (A, B) -> A =:= B end). + +check_version_consistency(This, Remote, Name, Comp) -> + case Comp(This, Remote) of + true -> ok; + false -> version_error(Name, This, Remote) + end. + +version_error(Name, This, Remote) -> {error, {inconsistent_cluster, rabbit_misc:format("~s version mismatch: local node is ~s, " "remote node ~s", [Name, This, Remote])}}. @@ -767,8 +775,15 @@ check_version_consistency(This, Remote, Name) -> check_otp_consistency(Remote) -> check_version_consistency(erlang:system_info(otp_release), Remote, "OTP"). +%% Unlike the rest of 3.0.x, 3.0.0 is not compatible. This can be +%% removed after 3.1.0 is released. +check_rabbit_consistency("3.0.0") -> + version_error("Rabbit", rabbit_misc:version(), "3.0.0"); + check_rabbit_consistency(Remote) -> - check_version_consistency(rabbit_misc:version(), Remote, "Rabbit"). + check_version_consistency( + rabbit_misc:version(), Remote, "Rabbit", + fun rabbit_misc:version_minor_equivalent/2). %% This is fairly tricky. We want to know if the node is in the state %% that a `reset' would leave it in. We cannot simply check if the diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 8d0e4456..258ac0ce 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -53,7 +53,7 @@ -spec(notify_joined_cluster/0 :: () -> 'ok'). -spec(notify_left_cluster/1 :: (node()) -> 'ok'). --spec(partitions/0 :: () -> {node(), [{atom(), node()}]}). +-spec(partitions/0 :: () -> {node(), [node()]}). -endif. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2226f445..09ed3d08 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -38,6 +38,7 @@ all_tests() -> passed = mirrored_supervisor_tests:all_tests(), application:set_env(rabbit, file_handles_high_watermark, 10, infinity), ok = file_handle_cache:set_limit(10), + passed = test_version_equivalance(), passed = test_multi_call(), passed = test_file_handle_cache(), passed = test_backing_queue(), @@ -141,6 +142,16 @@ run_cluster_dependent_tests(SecondaryNode) -> passed. +test_version_equivalance() -> + true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0"), + true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.1"), + true = rabbit_misc:version_minor_equivalent("%%VSN%%", "%%VSN%%"), + false = rabbit_misc:version_minor_equivalent("3.0.0", "3.1.0"), + false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0"), + false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0.1"), + false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"), + passed. + test_multi_call() -> Fun = fun() -> receive @@ -2423,7 +2434,7 @@ test_dropfetchwhile(VQ0) -> {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} = rabbit_variable_queue:fetchwhile( fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end, - fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) -> + fun (Msg, AckTag, {MsgAcc, AckAcc}) -> {[Msg | MsgAcc], [AckTag | AckAcc]} end, {[], []}, VQ1), true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)], @@ -2462,7 +2473,7 @@ test_fetchwhile_varying_ram_duration(VQ0) -> fun (VQ1) -> {_, ok, VQ2} = rabbit_variable_queue:fetchwhile( fun (_) -> false end, - fun (_, _, _, A) -> A end, + fun (_, _, A) -> A end, ok, VQ1), VQ2 end, VQ0). @@ -2597,8 +2608,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_variable_queue_fold_msg_on_disk(VQ0) -> VQ1 = variable_queue_publish(true, 1, VQ0), {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1), - VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end, - VQ2, AckTags), + {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end, + ok, VQ2, AckTags), VQ3. test_queue_recover() -> diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 3a5b96de..b9a7cc15 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -16,7 +16,7 @@ -module(rabbit_trace). --export([init/1, tracing/1, tap_trace_in/2, tap_trace_out/2, start/1, stop/1]). +-export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -31,9 +31,9 @@ -type(state() :: rabbit_types:exchange() | 'none'). -spec(init/1 :: (rabbit_types:vhost()) -> state()). --spec(tracing/1 :: (rabbit_types:vhost()) -> boolean()). --spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok'). --spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok'). +-spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()). +-spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok'). +-spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok'). -spec(start/1 :: (rabbit_types:vhost()) -> 'ok'). -spec(stop/1 :: (rabbit_types:vhost()) -> 'ok'). @@ -43,23 +43,21 @@ %%---------------------------------------------------------------------------- init(VHost) -> - case tracing(VHost) of + case enabled(VHost) of false -> none; true -> {ok, X} = rabbit_exchange:lookup( rabbit_misc:r(VHost, exchange, ?XNAME)), X end. -tracing(VHost) -> +enabled(VHost) -> {ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS), lists:member(VHost, VHosts). -tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, - TraceX) -> +tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) -> maybe_trace(TraceX, Msg, <<"publish">>, XName, []). -tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, - TraceX) -> +tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) -> RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, maybe_trace(TraceX, Msg, <<"deliver">>, QName, [{<<"redelivered">>, signedint, RedeliveredNum}]). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3e4c7c86..05468a6e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -19,10 +19,10 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, - fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, + fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]). + is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -597,10 +597,9 @@ fetchwhile(Pred, Fun, Acc, State) -> {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {{Msg, IsDelivered, AckTag}, State3} = + {{Msg, _IsDelivered, AckTag}, State3} = internal_fetch(true, MsgStatus1, State2), - Acc1 = Fun(Msg, IsDelivered, AckTag, Acc), - fetchwhile(Pred, Fun, Acc1, State3); + fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3); false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} end end. @@ -650,16 +649,6 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -foreach_ack(undefined, State, _AckTags) -> - State; -foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> - a(lists:foldl(fun(SeqId, State1) -> - {MsgStatus, State2} = - read_msg(gb_trees:get(SeqId, PA), false, State1), - MsgFun(MsgStatus#msg_status.msg, SeqId), - State2 - end, State, AckTags)). - requeue(AckTags, #vqstate { delta = Delta, q3 = Q3, q4 = Q4, @@ -681,6 +670,16 @@ requeue(AckTags, #vqstate { delta = Delta, in_counter = InCounter + MsgCount, len = Len + MsgCount }))}. +ackfold(MsgFun, Acc, State, AckTags) -> + {AccN, StateN} = + lists:foldl( + fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) -> + {#msg_status { msg = Msg }, State1} = + read_msg(gb_trees:get(SeqId, PA), false, State0), + {MsgFun(Msg, SeqId, Acc0), State1} + end, {Acc, State}, AckTags), + {AccN, a(StateN)}. + fold(Fun, Acc, #vqstate { q1 = Q1, q2 = Q2, delta = #delta { start_seq_id = DeltaSeqId, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 297fa56f..0bb18f4c 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -123,7 +123,7 @@ with(VHostPath, Thunk) -> infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. i(name, VHost) -> VHost; -i(tracing, VHost) -> rabbit_trace:tracing(VHost); +i(tracing, VHost) -> rabbit_trace:enabled(VHost); i(Item, _) -> throw({bad_argument, Item}). info(VHost) -> infos(?INFO_KEYS, VHost). diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 53f3df18..db674f91 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -84,7 +84,15 @@ sup_memory(Sup) -> sup_children(Sup) -> rabbit_misc:with_exit_handler( - rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end). + rabbit_misc:const([]), + fun () -> + %% Just in case we end up talking to something that is + %% not a supervisor by mistake. + case supervisor:which_children(Sup) of + L when is_list(L) -> L; + _ -> [] + end + end). pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of {memory, M} -> M; @@ -119,10 +127,13 @@ plugin_memory() -> is_plugin(atom_to_list(App))]). plugin_memory(App) -> - case catch application_master:get_child( - application_controller:get_master(App)) of - {Pid, _} -> sup_memory(Pid); - _ -> 0 + case application_controller:get_master(App) of + undefined -> 0; + Master -> case application_master:get_child(Master) of + {Pid, _} when is_pid(Pid) -> sup_memory(Pid); + Pid when is_pid(Pid) -> sup_memory(Pid); + _ -> 0 + end end. is_plugin("rabbitmq_" ++ _) -> true; |