diff options
Diffstat (limited to 'src/rabbit_misc.erl')
-rw-r--r-- | src/rabbit_misc.erl | 56 |
1 files changed, 36 insertions, 20 deletions
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 58e93a3f..6f353da5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -67,7 +67,7 @@ -export([check_expiry/1]). -export([base64url/1]). -export([interval_operation/4]). --export([ensure_timer/4, stop_timer/2]). +-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]). -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). -export([moving_average/4]). @@ -81,7 +81,7 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1]). +-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -94,6 +94,7 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -209,7 +210,8 @@ [string()]) -> {'ok', {atom(), [{string(), string()}], [string()]}} | 'no_command'). --spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). +-spec(all_module_attributes/1 :: + (atom()) -> [{atom(), atom(), [term()]}]). -spec(build_acyclic_graph/3 :: (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) -> rabbit_types:ok_or_error2(digraph(), @@ -245,6 +247,8 @@ -> {any(), non_neg_integer()}). -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). +-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()). +-spec(cancel_timer/1 :: (tref()) -> 'ok'). -spec(get_parent/0 :: () -> pid()). -spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). @@ -849,20 +853,20 @@ module_attributes(Module) -> end. all_module_attributes(Name) -> - Modules = + Targets = lists:usort( lists:append( - [Modules || {App, _, _} <- application:loaded_applications(), - {ok, Modules} <- [application:get_key(App, modules)]])), + [[{App, Module} || Module <- Modules] || + {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), lists:foldl( - fun (Module, Acc) -> + fun ({App, Module}, Acc) -> case lists:append([Atts || {N, Atts} <- module_attributes(Module), N =:= Name]) of [] -> Acc; - Atts -> [{Module, Atts} | Acc] + Atts -> [{App, Module, Atts} | Acc] end - end, [], Modules). - + end, [], Targets). build_acyclic_graph(VertexFun, EdgeFun, Graph) -> G = digraph:new([acyclic]), @@ -870,13 +874,13 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> [case digraph:vertex(G, Vertex) of false -> digraph:add_vertex(G, Vertex, Label); _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}}) - end || {Module, Atts} <- Graph, - {Vertex, Label} <- VertexFun(Module, Atts)], + end || GraphElem <- Graph, + {Vertex, Label} <- VertexFun(GraphElem)], [case digraph:add_edge(G, From, To) of {error, E} -> throw({graph_error, {edge, E, From, To}}); _ -> ok - end || {Module, Atts} <- Graph, - {From, To} <- EdgeFun(Module, Atts)], + end || GraphElem <- Graph, + {From, To} <- EdgeFun(GraphElem)], {ok, G} catch {graph_error, Reason} -> true = digraph:delete(G), @@ -1012,7 +1016,6 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. -check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}}; check_expiry(N) when N < 0 -> {error, {value_negative, N}}; check_expiry(_N) -> ok. @@ -1040,7 +1043,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) -> ensure_timer(State, Idx, After, Msg) -> case element(Idx, State) of - undefined -> TRef = erlang:send_after(After, self(), Msg), + undefined -> TRef = send_after(After, self(), Msg), setelement(Idx, State, TRef); _ -> State end. @@ -1048,12 +1051,25 @@ ensure_timer(State, Idx, After, Msg) -> stop_timer(State, Idx) -> case element(Idx, State) of undefined -> State; - TRef -> case erlang:cancel_timer(TRef) of - false -> State; - _ -> setelement(Idx, State, undefined) - end + TRef -> cancel_timer(TRef), + setelement(Idx, State, undefined) end. +%% timer:send_after/3 goes through a single timer process but allows +%% long delays. erlang:send_after/3 does not have a bottleneck but +%% only allows max 2^32-1 millis. +-define(MAX_ERLANG_SEND_AFTER, 4294967295). +send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER -> + {ok, Ref} = timer:send_after(Millis, Pid, Msg), + {timer, Ref}; +send_after(Millis, Pid, Msg) -> + {erlang, erlang:send_after(Millis, Pid, Msg)}. + +cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref), + ok; +cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref), + ok. + store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). store_proc_name(TypeProcName) -> put(process_name, TypeProcName). |