summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/app_utils.erl2
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_nodes.erl2
-rw-r--r--src/rabbit_plugins.erl2
-rw-r--r--src/rabbit_tests.erl15
-rw-r--r--src/rabbit_variable_queue.erl7
-rw-r--r--src/rabbit_vm.erl2
9 files changed, 36 insertions, 13 deletions
diff --git a/src/app_utils.erl b/src/app_utils.erl
index 8da436c0..b102ce75 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -93,7 +93,7 @@ app_dependency_order(RootApps, StripUnreachable) ->
%% Private API
wait_for_application(Application) ->
- case lists:keymember(Application, 1, application:which_applications()) of
+ case lists:keymember(Application, 1, rabbit_misc:which_applications()) of
true -> ok;
false -> timer:sleep(1000),
wait_for_application(Application)
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 3cfa21ba..450a7f32 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -393,7 +393,7 @@ await_startup() ->
status() ->
S1 = [{pid, list_to_integer(os:getpid())},
- {running_applications, application:which_applications(infinity)},
+ {running_applications, rabbit_misc:which_applications()},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
{memory, rabbit_vm:memory()}],
@@ -421,7 +421,7 @@ status() ->
is_running() -> is_running(node()).
-is_running(Node) -> rabbit_nodes:is_running(Node, rabbit).
+is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
environment() ->
lists:keysort(1, [P || P = {K, _} <- application:get_all_env(rabbit),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c36fb147..a1e95fd5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -61,7 +61,7 @@
-export([multi_call/2]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
--export([version/0]).
+-export([version/0, which_applications/0]).
-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
-export([check_expiry/1]).
@@ -232,6 +232,7 @@
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
-spec(version/0 :: () -> string()).
+-spec(which_applications/0 :: () -> [{atom(), string(), string()}]).
-spec(sequence_error/1 :: ([({'error', any()} | any())])
-> {'error', any()} | any()).
-spec(json_encode/1 :: (any()) -> {'ok', string()} | {'error', any()}).
@@ -985,6 +986,16 @@ version() ->
{ok, VSN} = application:get_key(rabbit, vsn),
VSN.
+%% application:which_applications(infinity) is dangerous, since it can
+%% cause deadlocks on shutdown. So we have to use a timeout variant,
+%% but w/o creating spurious timeout errors.
+which_applications() ->
+ try
+ application:which_applications()
+ catch
+ exit:{timeout, _} -> []
+ end.
+
sequence_error([T]) -> T;
sequence_error([{error, _} = Error | _]) -> Error;
sequence_error([_ | Rest]) -> sequence_error(Rest).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 7d844c72..7fcd1f99 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -465,4 +465,4 @@ alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)].
alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
alive_rabbit_nodes(Nodes) ->
- [N || N <- alive_nodes(Nodes), rabbit_nodes:is_process_running(N, rabbit)].
+ [N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 5640f12a..b85646d2 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -96,7 +96,7 @@ cookie_hash() ->
base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
is_running(Node, Application) ->
- case rpc:call(Node, application, which_applications, [infinity]) of
+ case rpc:call(Node, rabbit_misc, which_applications, []) of
{badrpc, _} -> false;
Apps -> proplists:is_defined(Application, Apps)
end.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 58c906eb..6f6515b0 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -47,7 +47,7 @@ setup() ->
active() ->
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ],
- [App || {App, _, _} <- application:which_applications(),
+ [App || {App, _, _} <- rabbit_misc:which_applications(),
lists:member(App, InstalledPlugins)].
%% @doc Get the list of plugins which are ready to be enabled.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f32fe740..21c54f3e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2393,6 +2393,7 @@ test_variable_queue() ->
fun test_variable_queue_ack_limiting/1,
fun test_variable_queue_purge/1,
fun test_variable_queue_requeue/1,
+ fun test_variable_queue_requeue_ram_beta/1,
fun test_variable_queue_fold/1]],
passed.
@@ -2491,6 +2492,20 @@ test_variable_queue_requeue(VQ0) ->
{empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
VQ3.
+%% requeue from ram_pending_ack into q3, move to delta and then empty queue
+test_variable_queue_requeue_ram_beta(VQ0) ->
+ Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2,
+ VQ1 = rabbit_tests:variable_queue_publish(false, Count, VQ0),
+ {VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1),
+ {Back, Front} = lists:split(Count div 2, AcksR),
+ {_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2),
+ VQ4 = rabbit_variable_queue:set_ram_duration_target(0, VQ3),
+ {_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ4),
+ VQ6 = requeue_one_by_one(Front, VQ5),
+ {VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6),
+ {_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7),
+ VQ8.
+
test_variable_queue_purge(VQ0) ->
LenDepth = fun (VQ) ->
{rabbit_variable_queue:len(VQ),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f7c6c729..5b39c2c6 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1363,11 +1363,8 @@ publish_alpha(MsgStatus, State) ->
{MsgStatus, inc_ram_msg_count(State)}.
publish_beta(MsgStatus, State) ->
- {#msg_status { msg = Msg} = MsgStatus1,
- #vqstate { ram_msg_count = RamMsgCount } = State1} =
- maybe_write_to_disk(true, false, MsgStatus, State),
- {MsgStatus1, State1 #vqstate {
- ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
+ {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
+ {m(trim_msg_status(MsgStatus1)), State1}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index c28b0cd5..e97824b9 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -99,7 +99,7 @@ bytes(Words) -> Words * erlang:system_info(wordsize).
plugin_sups() ->
lists:append([plugin_sup(App) ||
- {App, _, _} <- application:which_applications(),
+ {App, _, _} <- rabbit_misc:which_applications(),
is_plugin(atom_to_list(App))]).
plugin_sup(App) ->