summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-22 14:05:41 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-22 14:05:41 +0100
commit0382262fb47ae6b38a7dccdead2f4a737c0a26fa (patch)
tree4e86fea43d0114e3cd375e8cf3c1b381e63b3729
parent23054839976ab35667c18094cf553b776d8aa1de (diff)
downloadrabbitmq-server-0382262fb47ae6b38a7dccdead2f4a737c0a26fa.tar.gz
Unit test, and fix a bug found by the test.
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_tests.erl96
-rw-r--r--src/rabbit_tests_event_receiver.erl66
3 files changed, 162 insertions, 7 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index aff63b61..ac52a7f1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1246,8 +1246,5 @@ store(Type, QX, Measure, Val, Stats) ->
erase_queue_stats(QPid, #ch{stats_table = Table}) ->
erase({monitoring, QPid}),
- [ets:delete(Table, K) ||
- {K, _V} <- ets:match(Table, {{queue_stats, QPid, '_'}, '_'})],
- [ets:delete(Table, K) ||
- {K, _V} <-
- ets:match(Table, {{queue_exchange_stats, {QPid, '_'}, '_'}, '_'})].
+ ets:match_delete(Table, {{queue_stats, QPid, '_'}, '_'}),
+ ets:match_delete(Table, {{queue_exchange_stats, {QPid, '_'}, '_'}, '_'}).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index de06c048..53717774 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -61,6 +61,7 @@ all_tests() ->
passed = test_app_management(),
passed = test_log_management_during_startup(),
passed = test_memory_pressure(),
+ passed = test_statistics(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
@@ -922,9 +923,12 @@ test_memory_pressure_sync(Ch, Writer) ->
end.
test_memory_pressure_spawn() ->
+ test_spawn(fun test_memory_pressure_receiver/1).
+
+test_spawn(Receiver) ->
Me = self(),
- Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ Writer = spawn(fun () -> Receiver(Me) end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"guest">>, <<"/">>,
self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
MRef = erlang:monitor(process, Ch),
@@ -1008,6 +1012,94 @@ test_memory_pressure() ->
passed.
+test_statistics_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_statistics_receiver(Pid)
+ end.
+
+test_statistics_event_receiver(Pid) ->
+ receive
+ Foo ->
+ Pid ! Foo,
+ test_statistics_event_receiver(Pid)
+ end.
+
+test_statistics_receive_event(Ch, Retries, Matcher) ->
+ rabbit_channel:emit_stats(Ch),
+ receive #event{type = channel_stats, props = Props} ->
+ case Matcher(Props) of
+ true ->
+ Props;
+ _ ->
+ case Retries of
+ 0 -> throw(failed_to_receive_matching_event);
+ _ -> timer:sleep(10),
+ test_statistics_receive_event(Ch, Retries - 1,
+ Matcher)
+ end
+ end
+ after 1000 -> throw(failed_to_receive_event)
+ end.
+
+test_statistics() ->
+ %% ATM this just tests the queue / exchange stats in channels. That's
+ %% by far the most complex code though.
+
+ %% Set up a channel and queue
+ {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1),
+ rabbit_channel:do(Ch, #'queue.declare'{}),
+ QName = receive #'queue.declare_ok'{queue = Q0} ->
+ Q0
+ after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
+ QPid = Q#amqqueue.pid,
+ X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
+
+ rabbit_tests_event_receiver:start(self()),
+
+ %% Check stats empty
+ Event = test_statistics_receive_event(Ch, 10, fun (_) -> true end),
+ [] = proplists:get_value(queue_stats, Event),
+ [] = proplists:get_value(exchange_stats, Event),
+ [] = proplists:get_value(queue_exchange_stats, Event),
+
+ %% Publish and get a message
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>,
+ routing_key = QName},
+ rabbit_basic:build_content(#'P_basic'{}, <<"">>)),
+ rabbit_channel:do(Ch, #'basic.get'{queue = QName}),
+
+ %% Check the stats reflect that
+ Event2 = test_statistics_receive_event(
+ Ch, 10,
+ fun (E) ->
+ length(proplists:get_value(queue_exchange_stats, E)) > 0
+ end),
+ [{QPid,[{get,1}]}] = proplists:get_value(queue_stats, Event2),
+ [{X,[{publish,1}]}] = proplists:get_value(exchange_stats, Event2),
+ [{{QPid,X},[{publish,1}]}] =
+ proplists:get_value(queue_exchange_stats, Event2),
+
+ %% Check the stats remove stuff on queue deletion
+ rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
+ Event3 = test_statistics_receive_event(
+ Ch, 10,
+ fun (E) ->
+ length(proplists:get_value(queue_exchange_stats, E)) == 0
+ end),
+
+ [] = proplists:get_value(queue_stats, Event3),
+ [{X,[{publish,1}]}] = proplists:get_value(exchange_stats, Event3),
+ [] = proplists:get_value(queue_exchange_stats, Event3),
+
+ rabbit_tests_event_receiver:stop(),
+ passed.
+
test_delegates_async(SecondaryNode) ->
Self = self(),
Sender = fun (Pid) -> Pid ! {invoked, Self} end,
diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl
new file mode 100644
index 00000000..a92e3da7
--- /dev/null
+++ b/src/rabbit_tests_event_receiver.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_tests_event_receiver).
+
+-export([start/1, stop/0]).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+start(Pid) ->
+ gen_event:add_handler(rabbit_event, ?MODULE, [Pid]).
+
+stop() ->
+ gen_event:delete_handler(rabbit_event, ?MODULE, []).
+
+%%----------------------------------------------------------------------------
+
+init([Pid]) ->
+ {ok, Pid}.
+
+handle_call(_Request, Pid) ->
+ {ok, not_understood, Pid}.
+
+handle_event(Event, Pid) ->
+ Pid ! Event,
+ {ok, Pid}.
+
+handle_info(_Info, Pid) ->
+ {ok, Pid}.
+
+terminate(_Arg, _Pid) ->
+ ok.
+
+code_change(_OldVsn, Pid, _Extra) ->
+ {ok, Pid}.
+
+%%----------------------------------------------------------------------------