summaryrefslogtreecommitdiff
path: root/src/rabbit_tests.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_tests.erl')
-rw-r--r--src/rabbit_tests.erl96
1 files changed, 94 insertions, 2 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index de06c048..53717774 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -61,6 +61,7 @@ all_tests() ->
passed = test_app_management(),
passed = test_log_management_during_startup(),
passed = test_memory_pressure(),
+ passed = test_statistics(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
@@ -922,9 +923,12 @@ test_memory_pressure_sync(Ch, Writer) ->
end.
test_memory_pressure_spawn() ->
+ test_spawn(fun test_memory_pressure_receiver/1).
+
+test_spawn(Receiver) ->
Me = self(),
- Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ Writer = spawn(fun () -> Receiver(Me) end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"guest">>, <<"/">>,
self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
MRef = erlang:monitor(process, Ch),
@@ -1008,6 +1012,94 @@ test_memory_pressure() ->
passed.
+test_statistics_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_statistics_receiver(Pid)
+ end.
+
+test_statistics_event_receiver(Pid) ->
+ receive
+ Foo ->
+ Pid ! Foo,
+ test_statistics_event_receiver(Pid)
+ end.
+
+test_statistics_receive_event(Ch, Retries, Matcher) ->
+ rabbit_channel:emit_stats(Ch),
+ receive #event{type = channel_stats, props = Props} ->
+ case Matcher(Props) of
+ true ->
+ Props;
+ _ ->
+ case Retries of
+ 0 -> throw(failed_to_receive_matching_event);
+ _ -> timer:sleep(10),
+ test_statistics_receive_event(Ch, Retries - 1,
+ Matcher)
+ end
+ end
+ after 1000 -> throw(failed_to_receive_event)
+ end.
+
+test_statistics() ->
+ %% ATM this just tests the queue / exchange stats in channels. That's
+ %% by far the most complex code though.
+
+ %% Set up a channel and queue
+ {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1),
+ rabbit_channel:do(Ch, #'queue.declare'{}),
+ QName = receive #'queue.declare_ok'{queue = Q0} ->
+ Q0
+ after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
+ QPid = Q#amqqueue.pid,
+ X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
+
+ rabbit_tests_event_receiver:start(self()),
+
+ %% Check stats empty
+ Event = test_statistics_receive_event(Ch, 10, fun (_) -> true end),
+ [] = proplists:get_value(queue_stats, Event),
+ [] = proplists:get_value(exchange_stats, Event),
+ [] = proplists:get_value(queue_exchange_stats, Event),
+
+ %% Publish and get a message
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>,
+ routing_key = QName},
+ rabbit_basic:build_content(#'P_basic'{}, <<"">>)),
+ rabbit_channel:do(Ch, #'basic.get'{queue = QName}),
+
+ %% Check the stats reflect that
+ Event2 = test_statistics_receive_event(
+ Ch, 10,
+ fun (E) ->
+ length(proplists:get_value(queue_exchange_stats, E)) > 0
+ end),
+ [{QPid,[{get,1}]}] = proplists:get_value(queue_stats, Event2),
+ [{X,[{publish,1}]}] = proplists:get_value(exchange_stats, Event2),
+ [{{QPid,X},[{publish,1}]}] =
+ proplists:get_value(queue_exchange_stats, Event2),
+
+ %% Check the stats remove stuff on queue deletion
+ rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
+ Event3 = test_statistics_receive_event(
+ Ch, 10,
+ fun (E) ->
+ length(proplists:get_value(queue_exchange_stats, E)) == 0
+ end),
+
+ [] = proplists:get_value(queue_stats, Event3),
+ [{X,[{publish,1}]}] = proplists:get_value(exchange_stats, Event3),
+ [] = proplists:get_value(queue_exchange_stats, Event3),
+
+ rabbit_tests_event_receiver:stop(),
+ passed.
+
test_delegates_async(SecondaryNode) ->
Self = self(),
Sender = fun (Pid) -> Pid ! {invoked, Self} end,