diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit.erl | 14 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 13 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 8 | ||||
-rw-r--r-- | src/rabbit_event.erl | 9 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 8 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 15 |
8 files changed, 52 insertions, 43 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 045c5d58..fd89fd95 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -20,7 +20,7 @@ -export([start/0, boot/0, stop/0, stop_and_halt/0, await_startup/0, status/0, is_running/0, - is_running/1, environment/0, rotate_logs/1, force_event_refresh/0, + is_running/1, environment/0, rotate_logs/1, force_event_refresh/1, start_fhc/0]). -export([start/2, stop/1]). @@ -227,7 +227,7 @@ -spec(is_running/1 :: (node()) -> boolean()). -spec(environment/0 :: () -> [{param(), term()}]). -spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). @@ -696,11 +696,11 @@ log_rotation_result(ok, {error, SaslLogError}) -> log_rotation_result(ok, ok) -> ok. -force_event_refresh() -> - rabbit_direct:force_event_refresh(), - rabbit_networking:force_connection_event_refresh(), - rabbit_channel:force_event_refresh(), - rabbit_amqqueue:force_event_refresh(). +force_event_refresh(Ref) -> + rabbit_direct:force_event_refresh(Ref), + rabbit_networking:force_connection_event_refresh(Ref), + rabbit_channel:force_event_refresh(Ref), + rabbit_amqqueue:force_event_refresh(Ref). %%--------------------------------------------------------------------------- %% misc diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2b86435d..c0478579 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -24,7 +24,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([force_event_refresh/0, notify_policy_changed/1]). +-export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). @@ -110,7 +110,7 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), @@ -502,19 +502,20 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). %% the first place since a node failed). Therefore we keep poking at %% the list of queues until we were able to talk to a live process or %% the queue no longer exists. -force_event_refresh() -> force_event_refresh([Q#amqqueue.name || Q <- list()]). +force_event_refresh(Ref) -> + force_event_refresh([Q#amqqueue.name || Q <- list()], Ref). -force_event_refresh(QNames) -> +force_event_refresh(QNames, Ref) -> Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], {_, Bad} = gen_server2:mcall( - [{Q#amqqueue.pid, force_event_refresh} || Q <- Qs]), + [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]), FailedPids = [Pid || {Pid, _Reason} <- Bad], Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs, lists:member(Pid, FailedPids)], case Failed of [] -> ok; _ -> timer:sleep(?FAILOVER_WAIT_MILLIS), - force_event_refresh(Failed) + force_event_refresh(Failed, Ref) end. notify_policy_changed(#amqqueue{pid = QPid}) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 66b57ce8..a1997376 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -824,14 +824,15 @@ emit_stats(State, Extra) -> not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). -emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) -> +emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) -> rabbit_event:notify(consumer_created, [{consumer_tag, CTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, {channel, ChPid}, {queue, QName}, - {arguments, Args}]). + {arguments, Args}], + Ref). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> rabbit_event:notify(consumer_deleted, @@ -967,7 +968,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), Args), + not NoAck, qname(State1), Args, none), notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1048,19 +1049,19 @@ handle_call(sync_mirrors, _From, State) -> handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State); -handle_call(force_event_refresh, _From, +handle_call({force_event_refresh, Ref}, _From, State = #q{consumers = Consumers, exclusive_consumer = Exclusive}) -> - rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref), QName = qname(State), AllConsumers = rabbit_queue_consumers:all(Consumers), case Exclusive of none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName, Args) || + Ch, CTag, false, AckRequired, QName, Args, Ref) || {Ch, CTag, AckRequired, Args} <- AllConsumers]; {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers, emit_consumer_created( - Ch, CTag, true, AckRequired, QName, Args) + Ch, CTag, true, AckRequired, QName, Args, Ref) end, reply(ok, State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2c091b9b..7907c96c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -24,7 +24,7 @@ -export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). --export([force_event_refresh/0]). +-export([force_event_refresh/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -106,7 +106,7 @@ -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(refresh_config_local/0 :: () -> 'ok'). -spec(ready_for_close/1 :: (pid()) -> 'ok'). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -endif. @@ -179,8 +179,8 @@ refresh_config_local() -> ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). -force_event_refresh() -> - [gen_server2:cast(C, force_event_refresh) || C <- list()], +force_event_refresh(Ref) -> + [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()], ok. %%--------------------------------------------------------------------------- @@ -335,8 +335,9 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> || {ConsumerTag, CreditDrained} <- CTagCredit], noreply(State); -handle_cast(force_event_refresh, State) -> - rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), +handle_cast({force_event_refresh, Ref}, State) -> + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State), + Ref), noreply(State); handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 267b581d..c372d5f1 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, force_event_refresh/0, list/0, connect/5, +-export([boot/0, force_event_refresh/1, list/0, connect/5, start_channel/9, disconnect/2]). %% Internal -export([list_local/0]). @@ -28,7 +28,7 @@ -ifdef(use_specs). -spec(boot/0 :: () -> 'ok'). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). -spec(connect/5 :: (('nouser' | @@ -54,8 +54,8 @@ boot() -> rabbit_sup:start_supervisor_child( [{local, rabbit_direct_client_sup}, {rabbit_channel_sup, start_link, []}]). -force_event_refresh() -> - [Pid ! force_event_refresh || Pid<- list()], +force_event_refresh(Ref) -> + [Pid ! {force_event_refresh, Ref} || Pid <- list()], ok. list_local() -> diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index a713d76b..fa28d825 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -22,7 +22,7 @@ -export([init_stats_timer/2, init_disabled_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]). -export([stats_level/2, if_enabled/3]). --export([notify/2, notify_if/3]). +-export([notify/2, notify/3, notify_if/3]). %%---------------------------------------------------------------------------- @@ -41,6 +41,7 @@ -type(event() :: #event { type :: event_type(), props :: event_props(), + reference :: 'none' | reference(), timestamp :: event_timestamp() }). -type(level() :: 'none' | 'coarse' | 'fine'). @@ -58,6 +59,7 @@ -spec(stats_level/2 :: (container(), pos()) -> level()). -spec(if_enabled/3 :: (container(), pos(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). +-spec(notify/3 :: (event_type(), event_props(), reference()) -> 'ok'). -spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). -endif. @@ -140,7 +142,10 @@ if_enabled(C, P, Fun) -> notify_if(true, Type, Props) -> notify(Type, Props); notify_if(false, _Type, _Props) -> ok. -notify(Type, Props) -> +notify(Type, Props) -> notify(Type, Props, none). + +notify(Type, Props, Ref) -> gen_event:notify(?MODULE, #event{type = Type, props = Props, + reference = Ref, timestamp = os:timestamp()}). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 91be4dcb..42438790 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -22,7 +22,7 @@ connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, - close_connection/2, force_connection_event_refresh/0, tcp_host/1]). + close_connection/2, force_connection_event_refresh/1, tcp_host/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([tcp_listener_addresses/1, tcp_listener_spec/6, @@ -80,7 +80,7 @@ -spec(connection_info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(close_connection/2 :: (pid(), string()) -> 'ok'). --spec(force_connection_event_refresh/0 :: () -> 'ok'). +-spec(force_connection_event_refresh/1 :: (reference()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]). @@ -331,8 +331,8 @@ close_connection(Pid, Explanation) -> false -> throw({error, {not_a_connection_pid, Pid}}) end. -force_connection_event_refresh() -> - [rabbit_reader:force_event_refresh(C) || C <- connections()], +force_connection_event_refresh(Ref) -> + [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], ok. %%-------------------------------------------------------------------- diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 47bc99d8..9ffcd203 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,7 +18,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1, +-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -77,7 +77,7 @@ -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). --spec(force_event_refresh/1 :: (pid()) -> 'ok'). +-spec(force_event_refresh/2 :: (pid(), reference()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> @@ -134,8 +134,8 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. -force_event_refresh(Pid) -> - gen_server:cast(Pid, force_event_refresh). +force_event_refresh(Pid, Ref) -> + gen_server:cast(Pid, {force_event_refresh, Ref}). conserve_resources(Pid, Source, Conserve) -> Pid ! {conserve_resources, Source, Conserve}, @@ -399,10 +399,11 @@ handle_other({'$gen_call', From, {info, Items}}, State) -> catch Error -> {error, Error} end), State; -handle_other({'$gen_cast', force_event_refresh}, State) +handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) when ?IS_RUNNING(State) -> - rabbit_event:notify(connection_created, - [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), + rabbit_event:notify( + connection_created, + [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), State; handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. |