summaryrefslogtreecommitdiff
path: root/src/rabbit_misc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_misc.erl')
-rw-r--r--src/rabbit_misc.erl56
1 files changed, 36 insertions, 20 deletions
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 58e93a3f..6f353da5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -67,7 +67,7 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
--export([ensure_timer/4, stop_timer/2]).
+-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([moving_average/4]).
@@ -81,7 +81,7 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1]).
+-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -94,6 +94,7 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -209,7 +210,8 @@
[string()])
-> {'ok', {atom(), [{string(), string()}], [string()]}} |
'no_command').
--spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
+-spec(all_module_attributes/1 ::
+ (atom()) -> [{atom(), atom(), [term()]}]).
-spec(build_acyclic_graph/3 ::
(graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
-> rabbit_types:ok_or_error2(digraph(),
@@ -245,6 +247,8 @@
-> {any(), non_neg_integer()}).
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
+-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()).
+-spec(cancel_timer/1 :: (tref()) -> 'ok').
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
@@ -849,20 +853,20 @@ module_attributes(Module) ->
end.
all_module_attributes(Name) ->
- Modules =
+ Targets =
lists:usort(
lists:append(
- [Modules || {App, _, _} <- application:loaded_applications(),
- {ok, Modules} <- [application:get_key(App, modules)]])),
+ [[{App, Module} || Module <- Modules] ||
+ {App, _, _} <- application:loaded_applications(),
+ {ok, Modules} <- [application:get_key(App, modules)]])),
lists:foldl(
- fun (Module, Acc) ->
+ fun ({App, Module}, Acc) ->
case lists:append([Atts || {N, Atts} <- module_attributes(Module),
N =:= Name]) of
[] -> Acc;
- Atts -> [{Module, Atts} | Acc]
+ Atts -> [{App, Module, Atts} | Acc]
end
- end, [], Modules).
-
+ end, [], Targets).
build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
G = digraph:new([acyclic]),
@@ -870,13 +874,13 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
[case digraph:vertex(G, Vertex) of
false -> digraph:add_vertex(G, Vertex, Label);
_ -> ok = throw({graph_error, {vertex, duplicate, Vertex}})
- end || {Module, Atts} <- Graph,
- {Vertex, Label} <- VertexFun(Module, Atts)],
+ end || GraphElem <- Graph,
+ {Vertex, Label} <- VertexFun(GraphElem)],
[case digraph:add_edge(G, From, To) of
{error, E} -> throw({graph_error, {edge, E, From, To}});
_ -> ok
- end || {Module, Atts} <- Graph,
- {From, To} <- EdgeFun(Module, Atts)],
+ end || GraphElem <- Graph,
+ {From, To} <- EdgeFun(GraphElem)],
{ok, G}
catch {graph_error, Reason} ->
true = digraph:delete(G),
@@ -1012,7 +1016,6 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
-check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}};
check_expiry(N) when N < 0 -> {error, {value_negative, N}};
check_expiry(_N) -> ok.
@@ -1040,7 +1043,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
ensure_timer(State, Idx, After, Msg) ->
case element(Idx, State) of
- undefined -> TRef = erlang:send_after(After, self(), Msg),
+ undefined -> TRef = send_after(After, self(), Msg),
setelement(Idx, State, TRef);
_ -> State
end.
@@ -1048,12 +1051,25 @@ ensure_timer(State, Idx, After, Msg) ->
stop_timer(State, Idx) ->
case element(Idx, State) of
undefined -> State;
- TRef -> case erlang:cancel_timer(TRef) of
- false -> State;
- _ -> setelement(Idx, State, undefined)
- end
+ TRef -> cancel_timer(TRef),
+ setelement(Idx, State, undefined)
end.
+%% timer:send_after/3 goes through a single timer process but allows
+%% long delays. erlang:send_after/3 does not have a bottleneck but
+%% only allows max 2^32-1 millis.
+-define(MAX_ERLANG_SEND_AFTER, 4294967295).
+send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER ->
+ {ok, Ref} = timer:send_after(Millis, Pid, Msg),
+ {timer, Ref};
+send_after(Millis, Pid, Msg) ->
+ {erlang, erlang:send_after(Millis, Pid, Msg)}.
+
+cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref),
+ ok;
+cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref),
+ ok.
+
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).