summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl14
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_channel.erl13
-rw-r--r--src/rabbit_direct.erl8
-rw-r--r--src/rabbit_event.erl9
-rw-r--r--src/rabbit_networking.erl8
-rw-r--r--src/rabbit_reader.erl15
8 files changed, 52 insertions, 43 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 045c5d58..fd89fd95 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -20,7 +20,7 @@
-export([start/0, boot/0, stop/0,
stop_and_halt/0, await_startup/0, status/0, is_running/0,
- is_running/1, environment/0, rotate_logs/1, force_event_refresh/0,
+ is_running/1, environment/0, rotate_logs/1, force_event_refresh/1,
start_fhc/0]).
-export([start/2, stop/1]).
@@ -227,7 +227,7 @@
-spec(is_running/1 :: (node()) -> boolean()).
-spec(environment/0 :: () -> [{param(), term()}]).
-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
--spec(force_event_refresh/0 :: () -> 'ok').
+-spec(force_event_refresh/1 :: (reference()) -> 'ok').
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
@@ -696,11 +696,11 @@ log_rotation_result(ok, {error, SaslLogError}) ->
log_rotation_result(ok, ok) ->
ok.
-force_event_refresh() ->
- rabbit_direct:force_event_refresh(),
- rabbit_networking:force_connection_event_refresh(),
- rabbit_channel:force_event_refresh(),
- rabbit_amqqueue:force_event_refresh().
+force_event_refresh(Ref) ->
+ rabbit_direct:force_event_refresh(Ref),
+ rabbit_networking:force_connection_event_refresh(Ref),
+ rabbit_channel:force_event_refresh(Ref),
+ rabbit_amqqueue:force_event_refresh(Ref).
%%---------------------------------------------------------------------------
%% misc
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b86435d..c0478579 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -24,7 +24,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([force_event_refresh/0, notify_policy_changed/1]).
+-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
@@ -110,7 +110,7 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
--spec(force_event_refresh/0 :: () -> 'ok').
+-spec(force_event_refresh/1 :: (reference()) -> 'ok').
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 :: (rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean(),
@@ -502,19 +502,20 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
%% the first place since a node failed). Therefore we keep poking at
%% the list of queues until we were able to talk to a live process or
%% the queue no longer exists.
-force_event_refresh() -> force_event_refresh([Q#amqqueue.name || Q <- list()]).
+force_event_refresh(Ref) ->
+ force_event_refresh([Q#amqqueue.name || Q <- list()], Ref).
-force_event_refresh(QNames) ->
+force_event_refresh(QNames, Ref) ->
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
{_, Bad} = gen_server2:mcall(
- [{Q#amqqueue.pid, force_event_refresh} || Q <- Qs]),
+ [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]),
FailedPids = [Pid || {Pid, _Reason} <- Bad],
Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
lists:member(Pid, FailedPids)],
case Failed of
[] -> ok;
_ -> timer:sleep(?FAILOVER_WAIT_MILLIS),
- force_event_refresh(Failed)
+ force_event_refresh(Failed, Ref)
end.
notify_policy_changed(#amqqueue{pid = QPid}) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 66b57ce8..a1997376 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -824,14 +824,15 @@ emit_stats(State, Extra) ->
not lists:member(K, ExtraKs)],
rabbit_event:notify(queue_stats, Extra ++ Infos).
-emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) ->
+emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) ->
rabbit_event:notify(consumer_created,
[{consumer_tag, CTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
{queue, QName},
- {arguments, Args}]).
+ {arguments, Args}],
+ Ref).
emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
@@ -967,7 +968,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), Args),
+ not NoAck, qname(State1), Args, none),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1048,19 +1049,19 @@ handle_call(sync_mirrors, _From, State) ->
handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State);
-handle_call(force_event_refresh, _From,
+handle_call({force_event_refresh, Ref}, _From,
State = #q{consumers = Consumers,
exclusive_consumer = Exclusive}) ->
- rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref),
QName = qname(State),
AllConsumers = rabbit_queue_consumers:all(Consumers),
case Exclusive of
none -> [emit_consumer_created(
- Ch, CTag, false, AckRequired, QName, Args) ||
+ Ch, CTag, false, AckRequired, QName, Args, Ref) ||
{Ch, CTag, AckRequired, Args} <- AllConsumers];
{Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers,
emit_consumer_created(
- Ch, CTag, true, AckRequired, QName, Args)
+ Ch, CTag, true, AckRequired, QName, Args, Ref)
end,
reply(ok, State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2c091b9b..7907c96c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -24,7 +24,7 @@
-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
--export([force_event_refresh/0]).
+-export([force_event_refresh/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
@@ -106,7 +106,7 @@
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(refresh_config_local/0 :: () -> 'ok').
-spec(ready_for_close/1 :: (pid()) -> 'ok').
--spec(force_event_refresh/0 :: () -> 'ok').
+-spec(force_event_refresh/1 :: (reference()) -> 'ok').
-endif.
@@ -179,8 +179,8 @@ refresh_config_local() ->
ready_for_close(Pid) ->
gen_server2:cast(Pid, ready_for_close).
-force_event_refresh() ->
- [gen_server2:cast(C, force_event_refresh) || C <- list()],
+force_event_refresh(Ref) ->
+ [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()],
ok.
%%---------------------------------------------------------------------------
@@ -335,8 +335,9 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
|| {ConsumerTag, CreditDrained} <- CTagCredit],
noreply(State);
-handle_cast(force_event_refresh, State) ->
- rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
+handle_cast({force_event_refresh, Ref}, State) ->
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State),
+ Ref),
noreply(State);
handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 267b581d..c372d5f1 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, force_event_refresh/0, list/0, connect/5,
+-export([boot/0, force_event_refresh/1, list/0, connect/5,
start_channel/9, disconnect/2]).
%% Internal
-export([list_local/0]).
@@ -28,7 +28,7 @@
-ifdef(use_specs).
-spec(boot/0 :: () -> 'ok').
--spec(force_event_refresh/0 :: () -> 'ok').
+-spec(force_event_refresh/1 :: (reference()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
-spec(connect/5 :: (('nouser' |
@@ -54,8 +54,8 @@ boot() -> rabbit_sup:start_supervisor_child(
[{local, rabbit_direct_client_sup},
{rabbit_channel_sup, start_link, []}]).
-force_event_refresh() ->
- [Pid ! force_event_refresh || Pid<- list()],
+force_event_refresh(Ref) ->
+ [Pid ! {force_event_refresh, Ref} || Pid <- list()],
ok.
list_local() ->
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index a713d76b..fa28d825 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -22,7 +22,7 @@
-export([init_stats_timer/2, init_disabled_stats_timer/2,
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
--export([notify/2, notify_if/3]).
+-export([notify/2, notify/3, notify_if/3]).
%%----------------------------------------------------------------------------
@@ -41,6 +41,7 @@
-type(event() :: #event { type :: event_type(),
props :: event_props(),
+ reference :: 'none' | reference(),
timestamp :: event_timestamp() }).
-type(level() :: 'none' | 'coarse' | 'fine').
@@ -58,6 +59,7 @@
-spec(stats_level/2 :: (container(), pos()) -> level()).
-spec(if_enabled/3 :: (container(), pos(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
+-spec(notify/3 :: (event_type(), event_props(), reference()) -> 'ok').
-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok').
-endif.
@@ -140,7 +142,10 @@ if_enabled(C, P, Fun) ->
notify_if(true, Type, Props) -> notify(Type, Props);
notify_if(false, _Type, _Props) -> ok.
-notify(Type, Props) ->
+notify(Type, Props) -> notify(Type, Props, none).
+
+notify(Type, Props, Ref) ->
gen_event:notify(?MODULE, #event{type = Type,
props = Props,
+ reference = Ref,
timestamp = os:timestamp()}).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 91be4dcb..42438790 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -22,7 +22,7 @@
connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
- close_connection/2, force_connection_event_refresh/0, tcp_host/1]).
+ close_connection/2, force_connection_event_refresh/1, tcp_host/1]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/6,
@@ -80,7 +80,7 @@
-spec(connection_info_all/1 ::
(rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
--spec(force_connection_event_refresh/0 :: () -> 'ok').
+-spec(force_connection_event_refresh/1 :: (reference()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]).
@@ -331,8 +331,8 @@ close_connection(Pid, Explanation) ->
false -> throw({error, {not_a_connection_pid, Pid}})
end.
-force_connection_event_refresh() ->
- [rabbit_reader:force_event_refresh(C) || C <- connections()],
+force_connection_event_refresh(Ref) ->
+ [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()],
ok.
%%--------------------------------------------------------------------
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 47bc99d8..9ffcd203 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -18,7 +18,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1,
+-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/2,
shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -77,7 +77,7 @@
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
--spec(force_event_refresh/1 :: (pid()) -> 'ok').
+-spec(force_event_refresh/2 :: (pid(), reference()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
@@ -134,8 +134,8 @@ info(Pid, Items) ->
{error, Error} -> throw(Error)
end.
-force_event_refresh(Pid) ->
- gen_server:cast(Pid, force_event_refresh).
+force_event_refresh(Pid, Ref) ->
+ gen_server:cast(Pid, {force_event_refresh, Ref}).
conserve_resources(Pid, Source, Conserve) ->
Pid ! {conserve_resources, Source, Conserve},
@@ -399,10 +399,11 @@ handle_other({'$gen_call', From, {info, Items}}, State) ->
catch Error -> {error, Error}
end),
State;
-handle_other({'$gen_cast', force_event_refresh}, State)
+handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
when ?IS_RUNNING(State) ->
- rabbit_event:notify(connection_created,
- [{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
+ rabbit_event:notify(
+ connection_created,
+ [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref),
State;
handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.