diff options
authorSimon MacMullen <>2014-09-30 13:46:20 +0100
committerSimon MacMullen <>2014-09-30 13:46:20 +0100
commit2a743d63fc1680315fdb5152da322987cbda3366 (patch)
parent0227c663a6177e7872a68b089b4f15a7cdd491ad (diff)
Split out slaves into their own bucket.
1 files changed, 90 insertions, 41 deletions
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index bcdd77ae..bd964aa0 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -35,10 +35,12 @@
%% Like erlang:memory(), but with awareness of rabbit-y things
memory() ->
All = interesting_sups(),
- {Sums, _Other} = sum_processes(lists:append(All), [memory]),
+ {Sums, _Other} = sum_processes(
+ lists:append(All), distinguishers(), [memory]),
- [Conns, Qs, MsgIndexProc, MgmtDbProc, Plugins] =
- [aggregate_memory(Names, Sums) || Names <- All],
+ [Qs, QsSlave, Conns, MsgIndexProc, MgmtDbProc, Plugins] =
+ [aggregate_memory(Names, Sums)
+ || Names <- distinguished_interesting_sups()],
Mnesia = mnesia_memory(),
MsgIndexETS = ets_memory(rabbit_msg_store_ets_index),
@@ -55,19 +57,20 @@ memory() ->
OtherProc = Processes - Conns - Qs - MsgIndexProc - Plugins - MgmtDbProc,
- [{total, Total},
- {connection_procs, Conns},
- {queue_procs, Qs},
- {plugins, Plugins},
- {other_proc, lists:max([0, OtherProc])}, %% [1]
- {mnesia, Mnesia},
- {mgmt_db, MgmtDbETS + MgmtDbProc},
- {msg_index, MsgIndexETS + MsgIndexProc},
- {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS},
- {binary, Bin},
- {code, Code},
- {atom, Atom},
- {other_system, System - ETS - Atom - Bin - Code}].
+ [{total, Total},
+ {connection_procs, Conns},
+ {queue_procs, Qs},
+ {queue_slave_procs, QsSlave},
+ {plugins, Plugins},
+ {other_proc, lists:max([0, OtherProc])}, %% [1]
+ {mnesia, Mnesia},
+ {mgmt_db, MgmtDbETS + MgmtDbProc},
+ {msg_index, MsgIndexETS + MsgIndexProc},
+ {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS},
+ {binary, Bin},
+ {code, Code},
+ {atom, Atom},
+ {other_system, System - ETS - Atom - Bin - Code}].
%% [1] - erlang:memory(processes) can be less than the sum of its
%% parts. Rather than display something nonsensical, just silence any
@@ -83,7 +86,7 @@ binary() ->
lists:foldl(fun ({Ptr, Sz, _RefCnt}, Acc0) ->
sets:add_element({Ptr, Sz}, Acc0)
end, Acc, Info)
- end, [{binary, sets:new()}]),
+ end, distinguishers(), [{binary, sets:new()}]),
[{K, aggregate_binary(V)} || {K, V} <- Sums ++ [{unknown, Rest}]].
@@ -103,12 +106,22 @@ ets_memory(Name) ->
bytes(Words) -> Words * erlang:system_info(wordsize).
interesting_sups() ->
+ QProcs = [rabbit_amqqueue_sup_sup],
+ [QProcs | interesting_sups0()].
+interesting_sups0() ->
ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup],
- QProcs = [rabbit_amqqueue_sup_sup],
MsgIndexProcs = [msg_store_transient, msg_store_persistent],
MgmtDbProcs = [rabbit_mgmt_sup_sup],
PluginProcs = plugin_sups(),
- [ConnProcs, QProcs, MsgIndexProcs, MgmtDbProcs, PluginProcs].
+ [ConnProcs, MsgIndexProcs, MgmtDbProcs, PluginProcs].
+distinguishers() -> [{rabbit_amqqueue_sup_sup, fun queue_type/1}].
+distinguished_interesting_sups() ->
+ QProcs = [[{rabbit_amqqueue_sup_sup, master}],
+ [{rabbit_amqqueue_sup_sup, slave}]],
+ QProcs ++ interesting_sups0().
plugin_sups() ->
lists:append([plugin_sup(App) ||
@@ -138,13 +151,20 @@ aggregate_memory(Names, Sums) ->
lists:sum([extract_memory(Name, Sums) || Name <- Names]).
extract_memory(Name, Sums) ->
- {value, {_, Accs}} = lists:keysearch(Name, 1, Sums),
- {value, {memory, V}} = lists:keysearch(memory, 1, Accs),
- V.
+ case keyfind(Name, Sums) of
+ {value, Accs} -> keyfetch(memory, Accs);
+ false -> 0
+ end.
aggregate_binary([{binary, Set}]) ->
sets:fold(fun({_Ptr, Sz}, Acc) -> Acc + Sz end, 0, Set).
+queue_type(PDict) ->
+ case keyfind(process_name, PDict) of
+ {value, {rabbit_mirror_queue_slave, _}} -> slave;
+ _ -> master
+ end.
%% NB: this code is non-rabbit specific.
@@ -156,14 +176,17 @@ aggregate_binary([{binary, Set}]) ->
-type(info_item() :: {info_key(), info_value()}).
-type(accumulate() :: fun ((info_key(), info_value(), info_value()) ->
--spec(sum_processes/2 :: ([process()], [info_key()]) ->
+-type(distinguisher() :: fun (([{term(), term()}]) -> atom())).
+-type(distinguishers() :: [{info_key(), distinguisher()}]).
+-spec(sum_processes/3 :: ([process()], distinguishers(), [info_key()]) ->
{[{process(), [info_item()]}], [info_item()]}).
--spec(sum_processes/3 :: ([process()], accumulate(), [info_item()]) ->
+-spec(sum_processes/4 :: ([process()], accumulate(), distinguishers(),
+ [info_item()]) ->
{[{process(), [info_item()]}], [info_item()]}).
-sum_processes(Names, Items) ->
- sum_processes(Names, fun (_, X, Y) -> X + Y end,
+sum_processes(Names, Distinguishers, Items) ->
+ sum_processes(Names, fun (_, X, Y) -> X + Y end, Distinguishers,
[{Item, 0} || Item <- Items]).
%% summarize the process_info of all processes based on their
@@ -197,10 +220,8 @@ sum_processes(Names, Items) ->
%% these must match whatever is contained in the '$ancestor' process
%% dictionary entry. Generally that means for all registered processes
%% the name should be used.
-sum_processes(Names, Fun, Acc0) ->
- Items = [Item || {Item, _Val0} <- Acc0],
- Acc0Dict = orddict:from_list(Acc0),
- NameAccs0 = orddict:from_list([{Name, Acc0Dict} || Name <- Names]),
+sum_processes(Names, Fun, Distinguishers, Specs) ->
+ Items = [Item || {Item, _Blank0} <- Specs],
{NameAccs, OtherAcc} =
fun (Pid, Acc) ->
@@ -216,10 +237,15 @@ sum_processes(Names, Fun, Acc0) ->
[] -> [];
N -> [N]
- accumulate(find_ancestor(Extra, D, Names), Fun,
- orddict:from_list(Vals), Acc)
+ Name0 = find_ancestor(Extra, D, Names),
+ Name = case keyfind(Name0, Distinguishers) of
+ {value, DistFun} -> {Name0, DistFun(D)};
+ false -> Name0
+ end,
+ accumulate(
+ Name, Fun, orddict:from_list(Vals), Acc, Specs)
- end, {NameAccs0, Acc0Dict}, processes()),
+ end, {orddict:new(), orddict:new()}, processes()),
%% these conversions aren't strictly necessary; we do them simply
%% for the sake of encapsulating the representation.
{[{Name, orddict:to_list(Accs)} ||
@@ -227,9 +253,9 @@ sum_processes(Names, Fun, Acc0) ->
find_ancestor(Extra, D, Names) ->
- Ancestors = case lists:keysearch('$ancestors', 1, D) of
- {value, {_, Ancs}} -> Ancs;
- false -> []
+ Ancestors = case keyfind('$ancestors', D) of
+ {value, Ancs} -> Ancs;
+ false -> []
case lists:splitwith(fun (A) -> not lists:member(A, Names) end,
Extra ++ Ancestors) of
@@ -237,8 +263,31 @@ find_ancestor(Extra, D, Names) ->
{_, [Name | _]} -> Name
-accumulate(undefined, Fun, ValsDict, {NameAccs, OtherAcc}) ->
- {NameAccs, orddict:merge(Fun, ValsDict, OtherAcc)};
-accumulate(Name, Fun, ValsDict, {NameAccs, OtherAcc}) ->
- F = fun (NameAcc) -> orddict:merge(Fun, ValsDict, NameAcc) end,
- {orddict:update(Name, F, NameAccs), OtherAcc}.
+accumulate(undefined, Fun, ValsDict, {NameAccs, OtherAcc}, Specs) ->
+ {NameAccs, merge(Fun, ValsDict, OtherAcc, Specs)};
+accumulate(Name, Fun, ValsDict, {NameAccs, OtherAcc}, Specs) ->
+ F = fun (NameAcc) -> merge(Fun, ValsDict, NameAcc, Specs) end,
+ {case orddict:is_key(Name, NameAccs) of
+ true -> orddict:update(Name, F, NameAccs);
+ false -> orddict:store( Name, F(orddict:new()), NameAccs)
+ end, OtherAcc}.
+merge(Fun, New, Old, Specs) ->
+ orddict:fold(fun (K, V, Acc) ->
+ orddict:store(
+ K, Fun(K, V, case orddict:find(K, Acc) of
+ {ok, V0} -> V0;
+ error -> new_from_spec(K, Specs)
+ end), Acc)
+ end, Old, New).
+new_from_spec({K, _}, Specs) -> keyfetch(K, Specs);
+new_from_spec(K, Specs) -> keyfetch(K, Specs).
+keyfetch(K, L) -> {value, {_, V}} = lists:keysearch(K, 1, L),
+ V.
+keyfind(K, L) -> case lists:keysearch(K, 1, L) of
+ {value, {_, V}} -> {value, V};
+ false -> false
+ end.