diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-09-30 13:46:20 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-09-30 13:46:20 +0100 |
commit | 2a743d63fc1680315fdb5152da322987cbda3366 (patch) | |
tree | ccfb25a4332dc4c199336fda16ac4a68a50cedf5 | |
parent | 0227c663a6177e7872a68b089b4f15a7cdd491ad (diff) | |
download | rabbitmq-server-2a743d63fc1680315fdb5152da322987cbda3366.tar.gz |
Split out slaves into their own bucket.
-rw-r--r-- | src/rabbit_vm.erl | 131 |
1 files changed, 90 insertions, 41 deletions
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index bcdd77ae..bd964aa0 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -35,10 +35,12 @@ %% Like erlang:memory(), but with awareness of rabbit-y things memory() -> All = interesting_sups(), - {Sums, _Other} = sum_processes(lists:append(All), [memory]), + {Sums, _Other} = sum_processes( + lists:append(All), distinguishers(), [memory]), - [Conns, Qs, MsgIndexProc, MgmtDbProc, Plugins] = - [aggregate_memory(Names, Sums) || Names <- All], + [Qs, QsSlave, Conns, MsgIndexProc, MgmtDbProc, Plugins] = + [aggregate_memory(Names, Sums) + || Names <- distinguished_interesting_sups()], Mnesia = mnesia_memory(), MsgIndexETS = ets_memory(rabbit_msg_store_ets_index), @@ -55,19 +57,20 @@ memory() -> OtherProc = Processes - Conns - Qs - MsgIndexProc - Plugins - MgmtDbProc, - [{total, Total}, - {connection_procs, Conns}, - {queue_procs, Qs}, - {plugins, Plugins}, - {other_proc, lists:max([0, OtherProc])}, %% [1] - {mnesia, Mnesia}, - {mgmt_db, MgmtDbETS + MgmtDbProc}, - {msg_index, MsgIndexETS + MsgIndexProc}, - {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS}, - {binary, Bin}, - {code, Code}, - {atom, Atom}, - {other_system, System - ETS - Atom - Bin - Code}]. + [{total, Total}, + {connection_procs, Conns}, + {queue_procs, Qs}, + {queue_slave_procs, QsSlave}, + {plugins, Plugins}, + {other_proc, lists:max([0, OtherProc])}, %% [1] + {mnesia, Mnesia}, + {mgmt_db, MgmtDbETS + MgmtDbProc}, + {msg_index, MsgIndexETS + MsgIndexProc}, + {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS}, + {binary, Bin}, + {code, Code}, + {atom, Atom}, + {other_system, System - ETS - Atom - Bin - Code}]. %% [1] - erlang:memory(processes) can be less than the sum of its %% parts. Rather than display something nonsensical, just silence any @@ -83,7 +86,7 @@ binary() -> lists:foldl(fun ({Ptr, Sz, _RefCnt}, Acc0) -> sets:add_element({Ptr, Sz}, Acc0) end, Acc, Info) - end, [{binary, sets:new()}]), + end, distinguishers(), [{binary, sets:new()}]), [{K, aggregate_binary(V)} || {K, V} <- Sums ++ [{unknown, Rest}]]. %%---------------------------------------------------------------------------- @@ -103,12 +106,22 @@ ets_memory(Name) -> bytes(Words) -> Words * erlang:system_info(wordsize). interesting_sups() -> + QProcs = [rabbit_amqqueue_sup_sup], + [QProcs | interesting_sups0()]. + +interesting_sups0() -> ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup], - QProcs = [rabbit_amqqueue_sup_sup], MsgIndexProcs = [msg_store_transient, msg_store_persistent], MgmtDbProcs = [rabbit_mgmt_sup_sup], PluginProcs = plugin_sups(), - [ConnProcs, QProcs, MsgIndexProcs, MgmtDbProcs, PluginProcs]. + [ConnProcs, MsgIndexProcs, MgmtDbProcs, PluginProcs]. + +distinguishers() -> [{rabbit_amqqueue_sup_sup, fun queue_type/1}]. + +distinguished_interesting_sups() -> + QProcs = [[{rabbit_amqqueue_sup_sup, master}], + [{rabbit_amqqueue_sup_sup, slave}]], + QProcs ++ interesting_sups0(). plugin_sups() -> lists:append([plugin_sup(App) || @@ -138,13 +151,20 @@ aggregate_memory(Names, Sums) -> lists:sum([extract_memory(Name, Sums) || Name <- Names]). extract_memory(Name, Sums) -> - {value, {_, Accs}} = lists:keysearch(Name, 1, Sums), - {value, {memory, V}} = lists:keysearch(memory, 1, Accs), - V. + case keyfind(Name, Sums) of + {value, Accs} -> keyfetch(memory, Accs); + false -> 0 + end. aggregate_binary([{binary, Set}]) -> sets:fold(fun({_Ptr, Sz}, Acc) -> Acc + Sz end, 0, Set). +queue_type(PDict) -> + case keyfind(process_name, PDict) of + {value, {rabbit_mirror_queue_slave, _}} -> slave; + _ -> master + end. + %%---------------------------------------------------------------------------- %% NB: this code is non-rabbit specific. @@ -156,14 +176,17 @@ aggregate_binary([{binary, Set}]) -> -type(info_item() :: {info_key(), info_value()}). -type(accumulate() :: fun ((info_key(), info_value(), info_value()) -> info_value())). --spec(sum_processes/2 :: ([process()], [info_key()]) -> +-type(distinguisher() :: fun (([{term(), term()}]) -> atom())). +-type(distinguishers() :: [{info_key(), distinguisher()}]). +-spec(sum_processes/3 :: ([process()], distinguishers(), [info_key()]) -> {[{process(), [info_item()]}], [info_item()]}). --spec(sum_processes/3 :: ([process()], accumulate(), [info_item()]) -> +-spec(sum_processes/4 :: ([process()], accumulate(), distinguishers(), + [info_item()]) -> {[{process(), [info_item()]}], [info_item()]}). -endif. -sum_processes(Names, Items) -> - sum_processes(Names, fun (_, X, Y) -> X + Y end, +sum_processes(Names, Distinguishers, Items) -> + sum_processes(Names, fun (_, X, Y) -> X + Y end, Distinguishers, [{Item, 0} || Item <- Items]). %% summarize the process_info of all processes based on their @@ -197,10 +220,8 @@ sum_processes(Names, Items) -> %% these must match whatever is contained in the '$ancestor' process %% dictionary entry. Generally that means for all registered processes %% the name should be used. -sum_processes(Names, Fun, Acc0) -> - Items = [Item || {Item, _Val0} <- Acc0], - Acc0Dict = orddict:from_list(Acc0), - NameAccs0 = orddict:from_list([{Name, Acc0Dict} || Name <- Names]), +sum_processes(Names, Fun, Distinguishers, Specs) -> + Items = [Item || {Item, _Blank0} <- Specs], {NameAccs, OtherAcc} = lists:foldl( fun (Pid, Acc) -> @@ -216,10 +237,15 @@ sum_processes(Names, Fun, Acc0) -> [] -> []; N -> [N] end, - accumulate(find_ancestor(Extra, D, Names), Fun, - orddict:from_list(Vals), Acc) + Name0 = find_ancestor(Extra, D, Names), + Name = case keyfind(Name0, Distinguishers) of + {value, DistFun} -> {Name0, DistFun(D)}; + false -> Name0 + end, + accumulate( + Name, Fun, orddict:from_list(Vals), Acc, Specs) end - end, {NameAccs0, Acc0Dict}, processes()), + end, {orddict:new(), orddict:new()}, processes()), %% these conversions aren't strictly necessary; we do them simply %% for the sake of encapsulating the representation. {[{Name, orddict:to_list(Accs)} || @@ -227,9 +253,9 @@ sum_processes(Names, Fun, Acc0) -> orddict:to_list(OtherAcc)}. find_ancestor(Extra, D, Names) -> - Ancestors = case lists:keysearch('$ancestors', 1, D) of - {value, {_, Ancs}} -> Ancs; - false -> [] + Ancestors = case keyfind('$ancestors', D) of + {value, Ancs} -> Ancs; + false -> [] end, case lists:splitwith(fun (A) -> not lists:member(A, Names) end, Extra ++ Ancestors) of @@ -237,8 +263,31 @@ find_ancestor(Extra, D, Names) -> {_, [Name | _]} -> Name end. -accumulate(undefined, Fun, ValsDict, {NameAccs, OtherAcc}) -> - {NameAccs, orddict:merge(Fun, ValsDict, OtherAcc)}; -accumulate(Name, Fun, ValsDict, {NameAccs, OtherAcc}) -> - F = fun (NameAcc) -> orddict:merge(Fun, ValsDict, NameAcc) end, - {orddict:update(Name, F, NameAccs), OtherAcc}. +accumulate(undefined, Fun, ValsDict, {NameAccs, OtherAcc}, Specs) -> + {NameAccs, merge(Fun, ValsDict, OtherAcc, Specs)}; +accumulate(Name, Fun, ValsDict, {NameAccs, OtherAcc}, Specs) -> + F = fun (NameAcc) -> merge(Fun, ValsDict, NameAcc, Specs) end, + {case orddict:is_key(Name, NameAccs) of + true -> orddict:update(Name, F, NameAccs); + false -> orddict:store( Name, F(orddict:new()), NameAccs) + end, OtherAcc}. + +merge(Fun, New, Old, Specs) -> + orddict:fold(fun (K, V, Acc) -> + orddict:store( + K, Fun(K, V, case orddict:find(K, Acc) of + {ok, V0} -> V0; + error -> new_from_spec(K, Specs) + end), Acc) + end, Old, New). + +new_from_spec({K, _}, Specs) -> keyfetch(K, Specs); +new_from_spec(K, Specs) -> keyfetch(K, Specs). + +keyfetch(K, L) -> {value, {_, V}} = lists:keysearch(K, 1, L), + V. + +keyfind(K, L) -> case lists:keysearch(K, 1, L) of + {value, {_, V}} -> {value, V}; + false -> false + end. |