diff options
Diffstat (limited to 'src/rabbit_misc.erl')
-rw-r--r-- | src/rabbit_misc.erl | 68 |
1 files changed, 48 insertions, 20 deletions
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 18c07f86..180993a5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -45,7 +45,7 @@ -export([with_local_io/1, local_info_msg/2]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). --export([pid_to_string/1, string_to_pid/1]). +-export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]). -export([version_compare/2, version_compare/3]). -export([version_minor_equivalent/2]). -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). @@ -67,10 +67,11 @@ -export([check_expiry/1]). -export([base64url/1]). -export([interval_operation/4]). --export([ensure_timer/4, stop_timer/2]). +-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]). -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). -export([moving_average/4]). +-export([now_to_ms/1]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -94,6 +95,7 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -192,6 +194,7 @@ (rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(node_to_fake_pid/1 :: (atom()) -> pid()). -spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). -spec(version_compare/3 :: (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) @@ -209,7 +212,8 @@ [string()]) -> {'ok', {atom(), [{string(), string()}], [string()]}} | 'no_command'). --spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). +-spec(all_module_attributes/1 :: + (atom()) -> [{atom(), atom(), [term()]}]). -spec(build_acyclic_graph/3 :: (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) -> rabbit_types:ok_or_error2(digraph(), @@ -245,11 +249,16 @@ -> {any(), non_neg_integer()}). -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). +-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()). +-spec(cancel_timer/1 :: (tref()) -> 'ok'). -spec(get_parent/0 :: () -> pid()). -spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). -spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined') -> float()). +-spec(now_to_ms/1 :: ({non_neg_integer(), + non_neg_integer(), + non_neg_integer()}) -> pos_integer()). -endif. %%---------------------------------------------------------------------------- @@ -705,6 +714,10 @@ string_to_pid(Str) -> throw(Err) end. +%% node(node_to_fake_pid(Node)) =:= Node. +node_to_fake_pid(Node) -> + string_to_pid(format("<~s.0.0.0>", [Node])). + version_compare(A, B, lte) -> case version_compare(A, B) of eq -> true; @@ -849,20 +862,20 @@ module_attributes(Module) -> end. all_module_attributes(Name) -> - Modules = + Targets = lists:usort( lists:append( - [Modules || {App, _, _} <- application:loaded_applications(), - {ok, Modules} <- [application:get_key(App, modules)]])), + [[{App, Module} || Module <- Modules] || + {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), lists:foldl( - fun (Module, Acc) -> + fun ({App, Module}, Acc) -> case lists:append([Atts || {N, Atts} <- module_attributes(Module), N =:= Name]) of [] -> Acc; - Atts -> [{Module, Atts} | Acc] + Atts -> [{App, Module, Atts} | Acc] end - end, [], Modules). - + end, [], Targets). build_acyclic_graph(VertexFun, EdgeFun, Graph) -> G = digraph:new([acyclic]), @@ -870,13 +883,13 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> [case digraph:vertex(G, Vertex) of false -> digraph:add_vertex(G, Vertex, Label); _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}}) - end || {Module, Atts} <- Graph, - {Vertex, Label} <- VertexFun(Module, Atts)], + end || GraphElem <- Graph, + {Vertex, Label} <- VertexFun(GraphElem)], [case digraph:add_edge(G, From, To) of {error, E} -> throw({graph_error, {edge, E, From, To}}); _ -> ok - end || {Module, Atts} <- Graph, - {From, To} <- EdgeFun(Module, Atts)], + end || GraphElem <- Graph, + {From, To} <- EdgeFun(GraphElem)], {ok, G} catch {graph_error, Reason} -> true = digraph:delete(G), @@ -1012,7 +1025,9 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. -check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}}; +now_to_ms({Mega, Sec, Micro}) -> + (Mega * 1000000 * 1000000 + Sec * 1000000 + Micro) div 1000. + check_expiry(N) when N < 0 -> {error, {value_negative, N}}; check_expiry(_N) -> ok. @@ -1040,7 +1055,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) -> ensure_timer(State, Idx, After, Msg) -> case element(Idx, State) of - undefined -> TRef = erlang:send_after(After, self(), Msg), + undefined -> TRef = send_after(After, self(), Msg), setelement(Idx, State, TRef); _ -> State end. @@ -1048,12 +1063,25 @@ ensure_timer(State, Idx, After, Msg) -> stop_timer(State, Idx) -> case element(Idx, State) of undefined -> State; - TRef -> case erlang:cancel_timer(TRef) of - false -> State; - _ -> setelement(Idx, State, undefined) - end + TRef -> cancel_timer(TRef), + setelement(Idx, State, undefined) end. +%% timer:send_after/3 goes through a single timer process but allows +%% long delays. erlang:send_after/3 does not have a bottleneck but +%% only allows max 2^32-1 millis. +-define(MAX_ERLANG_SEND_AFTER, 4294967295). +send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER -> + {ok, Ref} = timer:send_after(Millis, Pid, Msg), + {timer, Ref}; +send_after(Millis, Pid, Msg) -> + {erlang, erlang:send_after(Millis, Pid, Msg)}. + +cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref), + ok; +cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref), + ok. + store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). store_proc_name(TypeProcName) -> put(process_name, TypeProcName). |