diff options
-rw-r--r-- | src/app_utils.erl | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 4 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 13 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 2 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 2 | ||||
-rw-r--r-- | src/rabbit_plugins.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 15 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_vm.erl | 2 |
9 files changed, 36 insertions, 13 deletions
diff --git a/src/app_utils.erl b/src/app_utils.erl index 8da436c0..b102ce75 100644 --- a/src/app_utils.erl +++ b/src/app_utils.erl @@ -93,7 +93,7 @@ app_dependency_order(RootApps, StripUnreachable) -> %% Private API wait_for_application(Application) -> - case lists:keymember(Application, 1, application:which_applications()) of + case lists:keymember(Application, 1, rabbit_misc:which_applications()) of true -> ok; false -> timer:sleep(1000), wait_for_application(Application) diff --git a/src/rabbit.erl b/src/rabbit.erl index 3cfa21ba..450a7f32 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -393,7 +393,7 @@ await_startup() -> status() -> S1 = [{pid, list_to_integer(os:getpid())}, - {running_applications, application:which_applications(infinity)}, + {running_applications, rabbit_misc:which_applications()}, {os, os:type()}, {erlang_version, erlang:system_info(system_version)}, {memory, rabbit_vm:memory()}], @@ -421,7 +421,7 @@ status() -> is_running() -> is_running(node()). -is_running(Node) -> rabbit_nodes:is_running(Node, rabbit). +is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit). environment() -> lists:keysort(1, [P || P = {K, _} <- application:get_all_env(rabbit), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index c36fb147..a1e95fd5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -61,7 +61,7 @@ -export([multi_call/2]). -export([os_cmd/1]). -export([gb_sets_difference/2]). --export([version/0]). +-export([version/0, which_applications/0]). -export([sequence_error/1]). -export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]). -export([check_expiry/1]). @@ -232,6 +232,7 @@ -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). -spec(version/0 :: () -> string()). +-spec(which_applications/0 :: () -> [{atom(), string(), string()}]). -spec(sequence_error/1 :: ([({'error', any()} | any())]) -> {'error', any()} | any()). -spec(json_encode/1 :: (any()) -> {'ok', string()} | {'error', any()}). @@ -985,6 +986,16 @@ version() -> {ok, VSN} = application:get_key(rabbit, vsn), VSN. +%% application:which_applications(infinity) is dangerous, since it can +%% cause deadlocks on shutdown. So we have to use a timeout variant, +%% but w/o creating spurious timeout errors. +which_applications() -> + try + application:which_applications() + catch + exit:{timeout, _} -> [] + end. + sequence_error([T]) -> T; sequence_error([{error, _} = Error | _]) -> Error; sequence_error([_ | Rest]) -> sequence_error(Rest). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 7d844c72..7fcd1f99 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -465,4 +465,4 @@ alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)]. alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)). alive_rabbit_nodes(Nodes) -> - [N || N <- alive_nodes(Nodes), rabbit_nodes:is_process_running(N, rabbit)]. + [N || N <- alive_nodes(Nodes), rabbit:is_running(N)]. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 5640f12a..b85646d2 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -96,7 +96,7 @@ cookie_hash() -> base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). is_running(Node, Application) -> - case rpc:call(Node, application, which_applications, [infinity]) of + case rpc:call(Node, rabbit_misc, which_applications, []) of {badrpc, _} -> false; Apps -> proplists:is_defined(Application, Apps) end. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 58c906eb..6f6515b0 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -47,7 +47,7 @@ setup() -> active() -> {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ], - [App || {App, _, _} <- application:which_applications(), + [App || {App, _, _} <- rabbit_misc:which_applications(), lists:member(App, InstalledPlugins)]. %% @doc Get the list of plugins which are ready to be enabled. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f32fe740..21c54f3e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2393,6 +2393,7 @@ test_variable_queue() -> fun test_variable_queue_ack_limiting/1, fun test_variable_queue_purge/1, fun test_variable_queue_requeue/1, + fun test_variable_queue_requeue_ram_beta/1, fun test_variable_queue_fold/1]], passed. @@ -2491,6 +2492,20 @@ test_variable_queue_requeue(VQ0) -> {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2), VQ3. +%% requeue from ram_pending_ack into q3, move to delta and then empty queue +test_variable_queue_requeue_ram_beta(VQ0) -> + Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2, + VQ1 = rabbit_tests:variable_queue_publish(false, Count, VQ0), + {VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1), + {Back, Front} = lists:split(Count div 2, AcksR), + {_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2), + VQ4 = rabbit_variable_queue:set_ram_duration_target(0, VQ3), + {_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ4), + VQ6 = requeue_one_by_one(Front, VQ5), + {VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6), + {_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7), + VQ8. + test_variable_queue_purge(VQ0) -> LenDepth = fun (VQ) -> {rabbit_variable_queue:len(VQ), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f7c6c729..5b39c2c6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1363,11 +1363,8 @@ publish_alpha(MsgStatus, State) -> {MsgStatus, inc_ram_msg_count(State)}. publish_beta(MsgStatus, State) -> - {#msg_status { msg = Msg} = MsgStatus1, - #vqstate { ram_msg_count = RamMsgCount } = State1} = - maybe_write_to_disk(true, false, MsgStatus, State), - {MsgStatus1, State1 #vqstate { - ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}. + {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), + {m(trim_msg_status(MsgStatus1)), State1}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index c28b0cd5..e97824b9 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -99,7 +99,7 @@ bytes(Words) -> Words * erlang:system_info(wordsize). plugin_sups() -> lists:append([plugin_sup(App) || - {App, _, _} <- application:which_applications(), + {App, _, _} <- rabbit_misc:which_applications(), is_plugin(atom_to_list(App))]). plugin_sup(App) -> |