summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-06-30 15:24:52 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-06-30 15:24:52 +0100
commitc28c5ad26e326c689cddc6baa0b4f5a1b8de3cf8 (patch)
tree8039811d20881d9e1b0d1601afe87936bd90b863
parent17f707949072f7ca07c7430c9bb3cde3de9a8a9c (diff)
parentf54136e6df161c8890c483f0811414ebcd29a465 (diff)
downloadrabbitmq-server-c28c5ad26e326c689cddc6baa0b4f5a1b8de3cf8.tar.gz
Merge bug26171
-rw-r--r--docs/rabbitmq-plugins.1.xml87
-rw-r--r--docs/rabbitmq.config.example16
-rw-r--r--docs/rabbitmqctl.1.xml4
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit.hrl27
-rw-r--r--packaging/common/README2
-rwxr-xr-xscripts/rabbitmq-plugins2
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rwxr-xr-xscripts/rabbitmq-server3
-rwxr-xr-xscripts/rabbitmq-server.bat1
-rwxr-xr-xscripts/rabbitmq-service.bat1
-rw-r--r--src/app_utils.erl7
-rw-r--r--src/rabbit.erl129
-rw-r--r--src/rabbit_amqqueue.erl91
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_channel.erl43
-rw-r--r--src/rabbit_event.erl21
-rw-r--r--src/rabbit_exchange.erl89
-rw-r--r--src/rabbit_exchange_decorator.erl24
-rw-r--r--src/rabbit_mirror_queue_misc.erl32
-rw-r--r--src/rabbit_mirror_queue_slave.erl5
-rw-r--r--src/rabbit_misc.erl68
-rw-r--r--src/rabbit_networking.erl11
-rw-r--r--src/rabbit_node_monitor.erl1
-rw-r--r--src/rabbit_plugins.erl132
-rw-r--r--src/rabbit_plugins_main.erl257
-rw-r--r--src/rabbit_policies.erl4
-rw-r--r--src/rabbit_policy.erl24
-rw-r--r--src/rabbit_queue_decorator.erl23
-rw-r--r--src/rabbit_reader.erl64
-rw-r--r--src/rabbit_runtime_parameters.erl4
-rw-r--r--src/rabbit_table.erl3
-rw-r--r--src/rabbit_trace.erl36
-rw-r--r--src/rabbit_upgrade_functions.erl18
-rw-r--r--src/rabbit_version.erl6
35 files changed, 909 insertions, 350 deletions
diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml
index 8ecb4fc8..f7be2d29 100644
--- a/docs/rabbitmq-plugins.1.xml
+++ b/docs/rabbitmq-plugins.1.xml
@@ -40,6 +40,7 @@
<refsynopsisdiv>
<cmdsynopsis>
<command>rabbitmq-plugins</command>
+ <arg choice="opt">-n <replaceable>node</replaceable></arg>
<arg choice="req"><replaceable>command</replaceable></arg>
<arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
</cmdsynopsis>
@@ -62,6 +63,16 @@
enabled. Implicitly enabled plugins are automatically disabled again
when they are no longer required.
</para>
+
+ <para>
+ The <command>enable</command>, <command>disable</command> and
+ <command>set</command> commands will update the plugins file and
+ then attempt to connect to the broker and ensure it is running
+ all enabled plugins. By default if it is not possible to connect
+ to the running broker (for example if it is stopped) then a
+ warning is displayed. Specify <command>--online</command> or
+ <command>--offline</command> to change this behaviour.
+ </para>
</refsect1>
<refsect1>
@@ -97,12 +108,14 @@
</variablelist>
<para>
Lists all plugins, their versions, dependencies and
- descriptions. Each plugin is prefixed with a status
- indicator - [ ] to indicate that the plugin is not
- enabled, [E] to indicate that it is explicitly enabled,
- [e] to indicate that it is implicitly enabled, and [!] to
- indicate that it is enabled but missing and thus not
- operational.
+ descriptions. Each plugin is prefixed with two status
+ indicator characters inside [ ]. The first indicator can
+ be " " to indicate that the plugin is not enabled, "E" to
+ indicate that it is explicitly enabled, "e" to indicate
+ that it is implicitly enabled, or "!" to indicate that it
+ is enabled but missing and thus not operational. The
+ second indicator can be " " to show that the plugin is not
+ running, or "*" to show that it is.
</para>
<para>
If the optional pattern is given, only plugins whose
@@ -130,17 +143,24 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>enable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>enable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>plugin</term>
<listitem><para>One or more plugins to enable.</para></listitem>
</varlistentry>
</variablelist>
<para>
- Enables the specified plugins and all their
- dependencies.
+ Enables the specified plugins and all their dependencies.
</para>
<para role="example-prefix">For example:</para>
@@ -154,17 +174,24 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>disable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>disable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>plugin</term>
<listitem><para>One or more plugins to disable.</para></listitem>
</varlistentry>
</variablelist>
<para>
- Disables the specified plugins and all plugins that
- depend on them.
+ Disables the specified plugins and all their dependencies.
</para>
<para role="example-prefix">For example:</para>
@@ -175,6 +202,42 @@
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>set</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>plugin</term>
+ <listitem><para>Zero or more plugins to enable.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Enables the specified plugins and all their
+ dependencies. Unlike <command>rabbitmq-plugins
+ enable</command> this command ignores and overwrites any
+ existing enabled plugins. <command>rabbitmq-plugins
+ set</command> with no plugin arguments is a legal command
+ meaning "disable all plugins".
+ </para>
+
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmq-plugins set rabbitmq_management</screen>
+ <para role="example">
+ This command enables the <command>management</command>
+ plugin and its dependencies and disables everything else.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</refsect1>
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index 26de71b7..e8b56660 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -27,6 +27,11 @@
%%
%% {ssl_listeners, [5671]},
+ %% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection
+ %% and SSL handshake), in milliseconds.
+ %%
+ %% {handshake_timeout, 10000},
+
%% Log levels (currently just used for connection logging).
%% One of 'info', 'warning', 'error' or 'none', in decreasing order
%% of verbosity. Defaults to 'info'.
@@ -103,6 +108,10 @@
%%
%% {ssl_cert_login_from, common_name},
+ %% SSL handshake timeout, in milliseconds.
+ %%
+ %% {ssl_handshake_timeout, 5000},
+
%%
%% Default User / VHost
%% ====================
@@ -213,7 +222,12 @@
%% Explicitly enable/disable hipe compilation.
%%
- %% {hipe_compile, true}
+ %% {hipe_compile, true},
+
+ %% Timeout used when waiting for Mnesia tables in a cluster to
+ %% become available.
+ %%
+ %% {mnesia_table_loading_timeout, 30000}
]},
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 01b024a2..6cfd3e00 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1475,6 +1475,10 @@
<term>send_pend</term>
<listitem><para>Send queue size.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>connected_at</term>
+ <listitem><para>Date and time this connection was established, as timestamp.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>connectioninfoitem</command>s are
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 7360208a..f26e0f77 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -39,12 +39,15 @@
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
+ {mnesia_table_loading_timeout, 30000},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
{trace_vhosts, []},
{log_levels, [{connection, info}]},
{ssl_cert_login_from, distinguished_name},
+ {ssl_handshake_timeout, 5000},
+ {handshake_timeout, 10000},
{reverse_dns_lookups, false},
{cluster_partition_handling, ignore},
{tcp_listen_options, [binary,
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5ac3197e..7a40f9eb 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -39,13 +39,25 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratches, policy, decorators}).
--record(exchange_serial, {name, next}).
+%% fields described as 'transient' here are cleared when writing to
+%% rabbit_durable_<thing>
+-record(exchange, {
+ name, type, durable, auto_delete, internal, arguments, %% immutable
+ scratches, %% durable, explicitly updated via update_scratch/3
+ policy, %% durable, implicitly updated when policy changes
+ decorators}). %% transient, recalculated in store/1 (i.e. recovery)
+
+-record(amqqueue, {
+ name, durable, auto_delete, exclusive_owner = none, %% immutable
+ arguments, %% immutable
+ pid, %% durable (just so we know home node)
+ slave_pids, sync_slave_pids, %% transient
+ down_slave_nodes, %% durable
+ policy, %% durable, implicit update as above
+ gm_pids, %% transient
+ decorators}). %% transient, recalculated as above
--record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, sync_slave_pids, policy,
- gm_pids, decorators}).
+-record(exchange_serial, {name, next}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -105,9 +117,6 @@
-define(DESIRED_HIBERNATE, 10000).
-define(CREDIT_DISC_BOUND, {2000, 500}).
-%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
--define(MAX_EXPIRY_TIMER, 4294967295).
-
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/packaging/common/README b/packaging/common/README
index 0a29ee27..35a1523a 100644
--- a/packaging/common/README
+++ b/packaging/common/README
@@ -17,4 +17,4 @@ run as the superuser.
An example configuration file is provided in the same directory as
this README. Copy it to /etc/rabbitmq/rabbitmq.config to use it. The
RabbitMQ server must be restarted after changing the configuration
-file or enabling or disabling plugins.
+file.
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index bd7d0b6a..36910eff 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -21,6 +21,7 @@
##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
+[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
@@ -35,4 +36,5 @@ exec ${ERL_DIR}erl \
-s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
-plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
+ -nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index a535ebad..61e39e38 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -31,6 +31,10 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!RABBITMQ_NODENAME!"=="" (
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -51,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index bd397441..2dbda427 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -39,7 +39,7 @@ DEFAULT_NODE_PORT=5672
[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE}
[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE}
[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
-
+[ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS}
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR}
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
@@ -131,6 +131,7 @@ exec ${ERL_DIR}erl \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
+ ${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \
${RABBITMQ_LISTEN_ARG} \
-sasl errlog_type error \
-sasl sasl_error_logger false \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 043204fa..e2312406 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -147,6 +147,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 895561d4..fb2703f2 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -235,6 +235,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
!RABBITMQ_DIST_ARG! ^
!STARVAR!
diff --git a/src/app_utils.erl b/src/app_utils.erl
index 0479ce66..e321888d 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -17,7 +17,7 @@
-export([load_applications/1, start_applications/1, start_applications/2,
stop_applications/1, stop_applications/2, app_dependency_order/2,
- wait_for_applications/1]).
+ wait_for_applications/1, app_dependencies/1]).
-ifdef(use_specs).
@@ -30,6 +30,7 @@
-spec stop_applications([atom()], error_handler()) -> 'ok'.
-spec wait_for_applications([atom()]) -> 'ok'.
-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
+-spec app_dependencies(atom()) -> [atom()].
-endif.
@@ -74,8 +75,8 @@ wait_for_applications(Apps) ->
app_dependency_order(RootApps, StripUnreachable) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
+ fun ({App, _Deps}) -> [{App, App}] end,
+ fun ({App, Deps}) -> [{Dep, App} || Dep <- Deps] end,
[{App, app_dependencies(App)} ||
{App, _Desc, _Vsn} <- application:loaded_applications()]),
try
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 29e38c1f..4b7a9a1f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -22,9 +22,8 @@
stop_and_halt/0, await_startup/0, status/0, is_running/0,
is_running/1, environment/0, rotate_logs/1, force_event_refresh/1,
start_fhc/0]).
-
-export([start/2, stop/1]).
-
+-export([start_apps/1, stop_apps/1]).
-export([log_location/1]). %% for testing
%%---------------------------------------------------------------------------
@@ -202,6 +201,7 @@
%% practice 2 processes seems just as fast as any other number > 1,
%% and keeps the progress bar realistic-ish.
-define(HIPE_PROCESSES, 2).
+-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).
%%----------------------------------------------------------------------------
@@ -211,6 +211,7 @@
%% this really should be an abstract type
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
-type(param() :: atom()).
+-type(app_name() :: atom()).
-spec(start/0 :: () -> 'ok').
-spec(boot/0 :: () -> 'ok').
@@ -242,6 +243,8 @@
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
+-spec(start_apps/1 :: ([app_name()]) -> 'ok').
+-spec(stop_apps/1 :: ([app_name()]) -> 'ok').
-endif.
@@ -312,9 +315,7 @@ start() ->
ok = ensure_working_log_handlers(),
rabbit_node_monitor:prepare_cluster_status_files(),
rabbit_mnesia:check_cluster_consistency(),
- ok = app_utils:start_applications(
- app_startup_order(), fun handle_app_error/2),
- ok = log_broker_started(rabbit_plugins:active())
+ broker_start()
end).
boot() ->
@@ -329,21 +330,14 @@ boot() ->
%% the upgrade, since if we are a secondary node the
%% primary node will have forgotten us
rabbit_mnesia:check_cluster_consistency(),
- Plugins = rabbit_plugins:setup(),
- ToBeLoaded = Plugins ++ ?APPS,
- ok = app_utils:load_applications(ToBeLoaded),
- StartupApps = app_utils:app_dependency_order(ToBeLoaded,
- false),
- ok = app_utils:start_applications(
- StartupApps, fun handle_app_error/2),
- ok = log_broker_started(Plugins)
+ broker_start()
end).
-handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
- throw({could_not_start, App, Reason});
-
-handle_app_error(App, Reason) ->
- throw({could_not_start, App, Reason}).
+broker_start() ->
+ Plugins = rabbit_plugins:setup(),
+ ToBeLoaded = Plugins ++ ?APPS,
+ start_apps(ToBeLoaded),
+ ok = log_broker_started(rabbit_plugins:active()).
start_it(StartFun) ->
Marker = spawn_link(fun() -> receive stop -> ok end end),
@@ -374,7 +368,7 @@ stop() ->
_ -> await_startup()
end,
rabbit_log:info("Stopping RabbitMQ~n"),
- ok = app_utils:stop_applications(app_shutdown_order()).
+ stop_apps(app_shutdown_order()).
stop_and_halt() ->
try
@@ -385,6 +379,36 @@ stop_and_halt() ->
end,
ok.
+start_apps(Apps) ->
+ app_utils:load_applications(Apps),
+ OrderedApps = app_utils:app_dependency_order(Apps, false),
+ case lists:member(rabbit, Apps) of
+ false -> run_boot_steps(Apps); %% plugin activation
+ true -> ok %% will run during start of rabbit app
+ end,
+ ok = app_utils:start_applications(OrderedApps,
+ handle_app_error(could_not_start)).
+
+stop_apps(Apps) ->
+ ok = app_utils:stop_applications(
+ Apps, handle_app_error(error_during_shutdown)),
+ case lists:member(rabbit, Apps) of
+ false -> run_cleanup_steps(Apps); %% plugin deactivation
+ true -> ok %% it's all going anyway
+ end,
+ ok.
+
+handle_app_error(Term) ->
+ fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) ->
+ throw({Term, App, ExitReason});
+ (App, Reason) ->
+ throw({Term, App, Reason})
+ end.
+
+run_cleanup_steps(Apps) ->
+ [run_step(Name, Attrs, cleanup) || {_, Name, Attrs} <- find_steps(Apps)],
+ ok.
+
await_startup() ->
app_utils:wait_for_applications(app_startup_order()).
@@ -468,7 +492,8 @@ start(normal, []) ->
true = register(rabbit, self()),
print_banner(),
log_banner(),
- [ok = run_boot_step(Step) || Step <- boot_steps()],
+ warn_if_kernel_config_dubious(),
+ run_boot_steps(),
{ok, SupPid};
Error ->
Error
@@ -496,29 +521,40 @@ app_shutdown_order() ->
%%---------------------------------------------------------------------------
%% boot step logic
-run_boot_step({_StepName, Attributes}) ->
- case [MFA || {mfa, MFA} <- Attributes] of
+run_boot_steps() ->
+ run_boot_steps([App || {App, _, _} <- application:loaded_applications()]).
+
+run_boot_steps(Apps) ->
+ [ok = run_step(Step, Attrs, mfa) || {_, Step, Attrs} <- find_steps(Apps)],
+ ok.
+
+find_steps(Apps) ->
+ All = sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)),
+ [Step || {App, _, _} = Step <- All, lists:member(App, Apps)].
+
+run_step(StepName, Attributes, AttributeName) ->
+ case [MFA || {Key, MFA} <- Attributes,
+ Key =:= AttributeName] of
[] ->
ok;
MFAs ->
[try
apply(M,F,A)
of
- ok -> ok;
- {error, Reason} -> boot_error(Reason, not_available)
+ ok -> ok;
+ {error, Reason} -> boot_error({boot_step, StepName, Reason},
+ not_available)
catch
- _:Reason -> boot_error(Reason, erlang:get_stacktrace())
+ _:Reason -> boot_error({boot_step, StepName, Reason},
+ erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
ok
end.
-boot_steps() ->
- sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)).
-
-vertices(_Module, Steps) ->
- [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
+vertices({AppName, _Module, Steps}) ->
+ [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps].
-edges(_Module, Steps) ->
+edges({_AppName, _Module, Steps}) ->
[case Key of
requires -> {StepName, OtherStep};
enables -> {OtherStep, StepName}
@@ -527,7 +563,7 @@ edges(_Module, Steps) ->
Key =:= requires orelse Key =:= enables].
sort_boot_steps(UnsortedSteps) ->
- case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
+ case rabbit_misc:build_acyclic_graph(fun vertices/1, fun edges/1,
UnsortedSteps) of
{ok, G} ->
%% Use topological sort to find a consistent ordering (if
@@ -541,8 +577,8 @@ sort_boot_steps(UnsortedSteps) ->
digraph:delete(G),
%% Check that all mentioned {M,F,A} triples are exported.
case [{StepName, {M,F,A}} ||
- {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
+ {_App, StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
not erlang:function_exported(M, F, length(A))] of
[] -> SortedSteps;
MissingFunctions -> basic_boot_error(
@@ -782,6 +818,31 @@ log_banner() ->
end || S <- Settings]),
error_logger:info_msg("~s", [Banner]).
+warn_if_kernel_config_dubious() ->
+ case erlang:system_info(kernel_poll) of
+ true -> ok;
+ false -> error_logger:warning_msg(
+ "Kernel poll (epoll, kqueue, etc) is disabled. Throughput "
+ "and CPU utilization may worsen.~n")
+ end,
+ AsyncThreads = erlang:system_info(thread_pool_size),
+ case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of
+ true -> error_logger:warning_msg(
+ "Erlang VM is running with ~b I/O threads, "
+ "file I/O performance may worsen~n", [AsyncThreads]);
+ false -> ok
+ end,
+ IDCOpts = case application:get_env(kernel, inet_default_connect_options) of
+ undefined -> [];
+ {ok, Val} -> Val
+ end,
+ case proplists:get_value(nodelay, IDCOpts, false) of
+ false -> error_logger:warning_msg(
+ "Nagle's algorithm is enabled for sockets, "
+ "network I/O latency will be higher~n");
+ true -> ok
+ end.
+
home_dir() ->
case init:get_argument(home) of
{ok, [[Home]]} -> Home;
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1aba7ecb..4e23dbd2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,7 +18,7 @@
-export([recover/0, stop/0, start/1, declare/5, declare/6,
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
--export([pseudo_queue/2]).
+-export([pseudo_queue/2, immutable/1]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
@@ -29,8 +29,8 @@
-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
--export([on_node_down/1]).
--export([update/2, store_queue/1, policy_changed/2]).
+-export([on_node_up/1, on_node_down/1]).
+-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
cancel_sync_mirrors/1]).
@@ -174,9 +174,12 @@
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
+-spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()).
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(update_decorators/1 :: (name()) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
@@ -254,15 +257,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
%% effect) this might not be possible to satisfy.
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
ok = check_declare_arguments(QueueName, Args),
- Q = rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- gm_pids = []}),
+ Q = rabbit_queue_decorator:set(
+ rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ down_slave_nodes = [],
+ gm_pids = []})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
@@ -308,12 +313,24 @@ store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue,
Q#amqqueue{slave_pids = [],
sync_slave_pids = [],
- gm_pids = []}, write),
- ok = mnesia:write(rabbit_queue, Q, write),
- ok;
+ gm_pids = [],
+ decorators = undefined}, write),
+ store_queue_ram(Q);
store_queue(Q = #amqqueue{durable = false}) ->
- ok = mnesia:write(rabbit_queue, Q, write),
- ok.
+ store_queue_ram(Q).
+
+store_queue_ram(Q) ->
+ ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write).
+
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q] -> store_queue_ram(Q),
+ ok;
+ [] -> ok
+ end
+ end).
policy_changed(Q1 = #amqqueue{decorators = Decorators1},
Q2 = #amqqueue{decorators = Decorators2}) ->
@@ -650,15 +667,23 @@ forget_all_durable(Node) ->
fun () ->
Qs = mnesia:match_object(rabbit_durable_queue,
#amqqueue{_ = '_'}, write),
- [rabbit_binding:process_deletions(
- internal_delete1(Name)) ||
- #amqqueue{name = Name, pid = Pid} = Q <- Qs,
- node(Pid) =:= Node,
- rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined],
+ [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs,
+ node(Pid) =:= Node],
ok
end),
ok.
+forget_node_for_queue(#amqqueue{name = Name,
+ down_slave_nodes = []}) ->
+ %% No slaves to recover from, queue is gone
+ rabbit_binding:process_deletions(internal_delete1(Name));
+
+forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) ->
+ %% Promote a slave while down - it'll happily recover as a master
+ Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H),
+ down_slave_nodes = T},
+ ok = mnesia:write(rabbit_durable_queue, Q1, write).
+
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
@@ -674,6 +699,20 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
+on_node_up(Node) ->
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Qs = mnesia:match_object(rabbit_queue,
+ #amqqueue{_ = '_'}, write),
+ [case lists:member(Node, DSNs) of
+ true -> DSNs1 = DSNs -- [Node],
+ store_queue(
+ Q#amqqueue{down_slave_nodes = DSNs1});
+ false -> ok
+ end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs],
+ ok
+ end).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
@@ -709,6 +748,14 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
+immutable(Q) -> Q#amqqueue{pid = none,
+ slave_pids = none,
+ sync_slave_pids = none,
+ down_slave_nodes = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none}.
+
deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
[];
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9b785303..4082c53d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -84,6 +84,7 @@
memory,
slave_pids,
synchronised_slave_pids,
+ down_slave_nodes,
backing_queue_status,
state
]).
@@ -385,12 +386,12 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
- TRef = erlang:send_after(After, self(), {drop_expired, Version}),
+ TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}),
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ttl_timer_expiry = TExpiry})
when Expiry + 1000 < TExpiry ->
- case erlang:cancel_timer(TRef) of
+ case rabbit_misc:cancel_timer(TRef) of
false -> State;
_ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
end;
@@ -810,6 +811,14 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(down_slave_nodes, #q{q = #amqqueue{name = Name,
+ durable = Durable}}) ->
+ {ok, Q = #amqqueue{down_slave_nodes = Nodes}} =
+ rabbit_amqqueue:lookup(Name),
+ case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> '';
+ true -> Nodes
+ end;
i(state, #q{status = running}) -> credit_flow:state();
i(state, #q{status = State}) -> State;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -1165,7 +1174,7 @@ handle_cast({force_event_refresh, Ref},
emit_consumer_created(
Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
end,
- noreply(State);
+ noreply(rabbit_event:init_stats_timer(State, #q.stats_timer));
handle_cast(notify_decorators, State) ->
notify_decorators(State),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 74f9cacf..738c4570 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -341,7 +341,7 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({force_event_refresh, Ref}, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State),
Ref),
- noreply(State);
+ noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer));
handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
@@ -433,17 +433,22 @@ send(_Command, #ch{state = closing}) ->
send(Command, #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Command).
-handle_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid}) ->
+handle_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid,
+ conn_name = ConnName,
+ virtual_host = VHost,
+ user = User}) ->
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
{Channel, CloseMethod} ->
- rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n",
- [ConnPid, Channel, Reason]),
+ rabbit_log:error("Channel error on connection ~p (~s, vhost: '~s',"
+ " user: '~s'), channel ~p:~n~p~n",
+ [ConnPid, ConnName, VHost, User#user.username,
+ Channel, Reason]),
ok = rabbit_writer:send_command(WriterPid, CloseMethod),
{noreply, State1};
{0, _} ->
@@ -668,8 +673,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
tx = Tx,
+ channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
- trace_state = TraceState}) ->
+ trace_state = TraceState,
+ user = #user{username = Username},
+ conn_name = ConnName}) ->
check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
@@ -690,7 +698,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_in(Message, TraceState),
+ rabbit_trace:tap_in(Message, ConnName, ChannelNum,
+ Username, TraceState),
Delivery = rabbit_basic:delivery(
Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
@@ -992,7 +1001,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
QueueName,
fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
Q, Durable, AutoDelete, Args, Owner),
- rabbit_amqqueue:stat(Q)
+ maybe_stat(NoWait, Q)
end) of
{ok, MessageCount, ConsumerCount} ->
return_queue_declare_ok(QueueName, NoWait, MessageCount,
@@ -1048,7 +1057,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
- QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -1204,6 +1213,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
E
end.
+maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q);
+maybe_stat(true, _Q) -> {ok, 0, 0}.
+
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
@@ -1365,7 +1377,10 @@ record_sent(ConsumerTag, AckRequired,
Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
- trace_state = TraceState}) ->
+ trace_state = TraceState,
+ user = #user{username = Username},
+ conn_name = ConnName,
+ channel = ChannelNum}) ->
?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
{none, true} -> get;
{none, false} -> get_no_ack;
@@ -1376,7 +1391,7 @@ record_sent(ConsumerTag, AckRequired,
true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
- rabbit_trace:tap_out(Msg, TraceState),
+ rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
UAMQ);
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index b867223b..a33103fd 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -23,6 +23,7 @@
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify/3, notify_if/3]).
+-export([sync_notify/2, sync_notify/3]).
%%----------------------------------------------------------------------------
@@ -61,6 +62,9 @@
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-spec(notify/3 :: (event_type(), event_props(), reference() | 'none') -> 'ok').
-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok').
+-spec(sync_notify/2 :: (event_type(), event_props()) -> 'ok').
+-spec(sync_notify/3 :: (event_type(), event_props(),
+ reference() | 'none') -> 'ok').
-endif.
@@ -145,7 +149,16 @@ notify_if(false, _Type, _Props) -> ok.
notify(Type, Props) -> notify(Type, Props, none).
notify(Type, Props, Ref) ->
- gen_event:notify(?MODULE, #event{type = Type,
- props = Props,
- reference = Ref,
- timestamp = os:timestamp()}).
+ gen_event:notify(?MODULE, event_cons(Type, Props, Ref)).
+
+sync_notify(Type, Props) -> sync_notify(Type, Props, none).
+
+sync_notify(Type, Props, Ref) ->
+ gen_event:sync_notify(?MODULE, event_cons(Type, Props, Ref)).
+
+event_cons(Type, Props, Ref) ->
+ #event{type = Type,
+ props = Props,
+ reference = Ref,
+ timestamp = os:timestamp()}.
+
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4d4a2a58..a1772f0a 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -20,7 +20,8 @@
-export([recover/0, policy_changed/2, callback/4, declare/6,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
- lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
+ lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
+ update_scratch/3, update_decorators/1, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
@@ -61,6 +62,7 @@
-spec(lookup_or_die/1 ::
(name()) -> rabbit_types:exchange() |
rabbit_types:channel_exit()).
+-spec(list/0 :: () -> [rabbit_types:exchange()]).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
-spec(lookup_scratch/2 :: (name(), atom()) ->
rabbit_types:ok(term()) |
@@ -70,6 +72,8 @@
(name(),
fun((rabbit_types:exchange()) -> rabbit_types:exchange()))
-> not_found | rabbit_types:exchange()).
+-spec(update_decorators/1 :: (name()) -> 'ok').
+-spec(immutable/1 :: (rabbit_types:exchange()) -> rabbit_types:exchange()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -106,24 +110,15 @@ recover() ->
mnesia:read({rabbit_exchange, XName}) =:= []
end,
fun (X, Tx) ->
- case Tx of
- true -> store(X);
- false -> ok
- end,
- callback(X, create, map_create_tx(Tx), [X])
+ X1 = case Tx of
+ true -> store_ram(X);
+ false -> rabbit_exchange_decorator:set(X)
+ end,
+ callback(X1, create, map_create_tx(Tx), [X1])
end,
rabbit_durable_exchange),
- report_missing_decorators(Xs),
[XName || #exchange{name = XName} <- Xs].
-report_missing_decorators(Xs) ->
- Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) ||
- #exchange{decorators = D} <- Xs])),
- case [M || M <- Mods, code:which(M) =:= non_existing] of
- [] -> ok;
- M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M])
- end.
-
callback(X = #exchange{type = XType,
decorators = Decorators}, Fun, Serial0, Args) ->
Serial = if is_function(Serial0) -> Serial0;
@@ -158,12 +153,13 @@ serial(#exchange{name = XName} = X) ->
end.
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
- X = rabbit_policy:set(#exchange{name = XName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- arguments = Args}),
+ X = rabbit_exchange_decorator:set(
+ rabbit_policy:set(#exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args})),
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
@@ -171,13 +167,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
[] ->
- store(X),
- ok = case Durable of
- true -> mnesia:write(rabbit_durable_exchange,
- X, write);
- false -> ok
- end,
- {new, X};
+ {new, store(X)};
[ExistingX] ->
{existing, ExistingX}
end
@@ -195,7 +185,19 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
map_create_tx(true) -> transaction;
map_create_tx(false) -> none.
-store(X) -> ok = mnesia:write(rabbit_exchange, X, write).
+
+store(X = #exchange{durable = true}) ->
+ mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined},
+ write),
+ store_ram(X);
+store(X = #exchange{durable = false}) ->
+ store_ram(X).
+
+store_ram(X) ->
+ X1 = rabbit_exchange_decorator:set(X),
+ ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1),
+ write),
+ X1.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
@@ -243,6 +245,8 @@ lookup_or_die(Name) ->
{error, not_found} -> rabbit_misc:not_found(Name)
end.
+list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}).
+
%% Not dirty_match_object since that would not be transactional when used in a
%% tx context
list(VHostPath) ->
@@ -287,20 +291,27 @@ update_scratch(Name, App, Fun) ->
ok
end).
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_exchange, Name}) of
+ [X] -> store_ram(X),
+ ok;
+ [] -> ok
+ end
+ end).
+
update(Name, Fun) ->
case mnesia:wread({rabbit_exchange, Name}) of
- [X = #exchange{durable = Durable}] ->
- X1 = Fun(X),
- ok = mnesia:write(rabbit_exchange, X1, write),
- case Durable of
- true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
- _ -> ok
- end,
- X1;
- [] ->
- not_found
+ [X] -> X1 = Fun(X),
+ store(X1);
+ [] -> not_found
end.
+immutable(X) -> X#exchange{scratches = none,
+ policy = none,
+ decorators = none}.
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 2f056b1b..900f9c32 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([select/2, set/1]).
+-export([select/2, set/1, register/2, unregister/1]).
%% This is like an exchange type except that:
%%
@@ -104,3 +104,25 @@ list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
cons_if_eq(Select, Select, Item, List) -> [Item | List];
cons_if_eq(_Select, _Other, _Item, List) -> List.
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(exchange_decorator, TypeName, ModuleName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(exchange_decorator, TypeName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+maybe_recover(X = #exchange{name = Name,
+ decorators = Decs}) ->
+ #exchange{decorators = Decs1} = set(X),
+ Old = lists:sort(select(all, Decs)),
+ New = lists:sort(select(all, Decs1)),
+ case New of
+ Old -> ok;
+ _ -> %% TODO create a tx here for non-federation decorators
+ [M:create(none, X) || M <- New -- Old],
+ rabbit_exchange:update_decorators(Name)
+ end.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 7aec1ac8..9e8c4a18 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -78,9 +78,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids,
- gm_pids = GMPids }] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids,
+ gm_pids = GMPids,
+ down_slave_nodes = DSNs}] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
lists:member(GM, DeadGMPids)
@@ -89,6 +90,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
AlivePids = [Pid || {_GM, Pid} <- AliveGM],
Alive = [Pid || Pid <- [QPid | SPids],
lists:member(Pid, AlivePids)],
+ DSNs1 = [node(Pid) ||
+ Pid <- SPids,
+ not lists:member(Pid, AlivePids)] ++ DSNs,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
@@ -97,9 +101,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM},
+ Q1 = Q#amqqueue{pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = AliveGM,
+ down_slave_nodes = DSNs1},
store_updated_slaves(Q1),
%% If we add and remove nodes at the same time we
%% might tell the old master we need to sync and
@@ -109,8 +114,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
_ ->
%% Master has changed, and we're not it.
%% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM},
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM,
+ down_slave_nodes = DSNs1},
store_updated_slaves(Q1)
end,
{ok, QPid1, DeadPids}
@@ -239,12 +245,16 @@ log(Level, QName, Fmt, Args) ->
rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt,
[rabbit_misc:rs(QName) | Args]).
-store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
- sync_slave_pids = SSPids}) ->
+store_updated_slaves(Q = #amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids,
+ down_slave_nodes = DSNs}) ->
%% TODO now that we clear sync_slave_pids in rabbit_durable_queue,
%% do we still need this filtering?
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
- Q1 = Q#amqqueue{sync_slave_pids = SSPids1},
+ DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]],
+ Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
+ down_slave_nodes = DSNs1},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q1),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 11d6a79c..cc06ae44 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -653,8 +653,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
-backing_queue_timeout(State = #state { backing_queue = BQ }) ->
- run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
+backing_queue_timeout(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State#state{backing_queue_state = BQ:timeout(BQS)}.
ensure_sync_timer(State) ->
rabbit_misc:ensure_timer(State, #state.sync_timer_ref,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 18c07f86..180993a5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -45,7 +45,7 @@
-export([with_local_io/1, local_info_msg/2]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
--export([pid_to_string/1, string_to_pid/1]).
+-export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]).
-export([version_compare/2, version_compare/3]).
-export([version_minor_equivalent/2]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
@@ -67,10 +67,11 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
--export([ensure_timer/4, stop_timer/2]).
+-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([moving_average/4]).
+-export([now_to_ms/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -94,6 +95,7 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -192,6 +194,7 @@
(rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(node_to_fake_pid/1 :: (atom()) -> pid()).
-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
-spec(version_compare/3 ::
(string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
@@ -209,7 +212,8 @@
[string()])
-> {'ok', {atom(), [{string(), string()}], [string()]}} |
'no_command').
--spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
+-spec(all_module_attributes/1 ::
+ (atom()) -> [{atom(), atom(), [term()]}]).
-spec(build_acyclic_graph/3 ::
(graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
-> rabbit_types:ok_or_error2(digraph(),
@@ -245,11 +249,16 @@
-> {any(), non_neg_integer()}).
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
+-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()).
+-spec(cancel_timer/1 :: (tref()) -> 'ok').
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined')
-> float()).
+-spec(now_to_ms/1 :: ({non_neg_integer(),
+ non_neg_integer(),
+ non_neg_integer()}) -> pos_integer()).
-endif.
%%----------------------------------------------------------------------------
@@ -705,6 +714,10 @@ string_to_pid(Str) ->
throw(Err)
end.
+%% node(node_to_fake_pid(Node)) =:= Node.
+node_to_fake_pid(Node) ->
+ string_to_pid(format("<~s.0.0.0>", [Node])).
+
version_compare(A, B, lte) ->
case version_compare(A, B) of
eq -> true;
@@ -849,20 +862,20 @@ module_attributes(Module) ->
end.
all_module_attributes(Name) ->
- Modules =
+ Targets =
lists:usort(
lists:append(
- [Modules || {App, _, _} <- application:loaded_applications(),
- {ok, Modules} <- [application:get_key(App, modules)]])),
+ [[{App, Module} || Module <- Modules] ||
+ {App, _, _} <- application:loaded_applications(),
+ {ok, Modules} <- [application:get_key(App, modules)]])),
lists:foldl(
- fun (Module, Acc) ->
+ fun ({App, Module}, Acc) ->
case lists:append([Atts || {N, Atts} <- module_attributes(Module),
N =:= Name]) of
[] -> Acc;
- Atts -> [{Module, Atts} | Acc]
+ Atts -> [{App, Module, Atts} | Acc]
end
- end, [], Modules).
-
+ end, [], Targets).
build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
G = digraph:new([acyclic]),
@@ -870,13 +883,13 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
[case digraph:vertex(G, Vertex) of
false -> digraph:add_vertex(G, Vertex, Label);
_ -> ok = throw({graph_error, {vertex, duplicate, Vertex}})
- end || {Module, Atts} <- Graph,
- {Vertex, Label} <- VertexFun(Module, Atts)],
+ end || GraphElem <- Graph,
+ {Vertex, Label} <- VertexFun(GraphElem)],
[case digraph:add_edge(G, From, To) of
{error, E} -> throw({graph_error, {edge, E, From, To}});
_ -> ok
- end || {Module, Atts} <- Graph,
- {From, To} <- EdgeFun(Module, Atts)],
+ end || GraphElem <- Graph,
+ {From, To} <- EdgeFun(GraphElem)],
{ok, G}
catch {graph_error, Reason} ->
true = digraph:delete(G),
@@ -1012,7 +1025,9 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
-check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}};
+now_to_ms({Mega, Sec, Micro}) ->
+ (Mega * 1000000 * 1000000 + Sec * 1000000 + Micro) div 1000.
+
check_expiry(N) when N < 0 -> {error, {value_negative, N}};
check_expiry(_N) -> ok.
@@ -1040,7 +1055,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
ensure_timer(State, Idx, After, Msg) ->
case element(Idx, State) of
- undefined -> TRef = erlang:send_after(After, self(), Msg),
+ undefined -> TRef = send_after(After, self(), Msg),
setelement(Idx, State, TRef);
_ -> State
end.
@@ -1048,12 +1063,25 @@ ensure_timer(State, Idx, After, Msg) ->
stop_timer(State, Idx) ->
case element(Idx, State) of
undefined -> State;
- TRef -> case erlang:cancel_timer(TRef) of
- false -> State;
- _ -> setelement(Idx, State, undefined)
- end
+ TRef -> cancel_timer(TRef),
+ setelement(Idx, State, undefined)
end.
+%% timer:send_after/3 goes through a single timer process but allows
+%% long delays. erlang:send_after/3 does not have a bottleneck but
+%% only allows max 2^32-1 millis.
+-define(MAX_ERLANG_SEND_AFTER, 4294967295).
+send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER ->
+ {ok, Ref} = timer:send_after(Millis, Pid, Msg),
+ {timer, Ref};
+send_after(Millis, Pid, Msg) ->
+ {erlang, erlang:send_after(Millis, Pid, Msg)}.
+
+cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref),
+ ok;
+cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref),
+ ok.
+
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 0791bbe2..c8d76719 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -37,8 +37,6 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--define(SSL_TIMEOUT, 5). %% seconds
-
-define(FIRST_TEST_BIND_PORT, 10000).
%%----------------------------------------------------------------------------
@@ -187,9 +185,14 @@ ensure_ssl() ->
end
end.
+ssl_timeout() ->
+ {ok, Val} = application:get_env(rabbit, ssl_handshake_timeout),
+ Val.
+
ssl_transform_fun(SslOpts) ->
fun (Sock) ->
- case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
+ Timeout = ssl_timeout(),
+ case catch ssl:ssl_accept(Sock, SslOpts, Timeout) of
{ok, SslSock} ->
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
{error, timeout} ->
@@ -204,7 +207,7 @@ ssl_transform_fun(SslOpts) ->
%% form, according to the TLS spec). So we give
%% the ssl_connection a little bit of time to send
%% such alerts.
- timer:sleep(?SSL_TIMEOUT * 1000),
+ timer:sleep(Timeout),
{error, {ssl_upgrade_error, Reason}};
{'EXIT', Reason} ->
{error, {ssl_upgrade_failure, Reason}}
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 22b0c280..72acc905 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -416,6 +416,7 @@ ensure_ping_timer(State) ->
State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes).
handle_live_rabbit(Node) ->
+ ok = rabbit_amqqueue:on_node_up(Node),
ok = rabbit_alarm:on_node_up(Node),
ok = rabbit_mnesia:on_node_up(Node).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index c0fb05e2..9acaa1d4 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -18,6 +18,7 @@
-include("rabbit.hrl").
-export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]).
+-export([ensure/1]).
%%----------------------------------------------------------------------------
@@ -31,22 +32,54 @@
-spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]).
-spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) ->
[plugin_name()]).
-
+-spec(ensure/1 :: (string()) -> {'ok', [atom()], [atom()]} | {error, any()}).
-endif.
%%----------------------------------------------------------------------------
+ensure(FileJustChanged) ->
+ {ok, OurFile} = application:get_env(rabbit, enabled_plugins_file),
+ case OurFile of
+ FileJustChanged ->
+ {ok, Dir} = application:get_env(rabbit, plugins_dir),
+ Enabled = read_enabled(OurFile),
+ Wanted = dependencies(false, Enabled, list(Dir)),
+ prepare_plugins(Enabled),
+ Current = active(),
+ Start = Wanted -- Current,
+ Stop = Current -- Wanted,
+ rabbit:start_apps(Start),
+ %% We need sync_notify here since mgmt will attempt to look at all
+ %% the modules for the disabled plugins - if they are unloaded
+ %% that won't work.
+ ok = rabbit_event:sync_notify(plugins_changed, [{enabled, Start},
+ {disabled, Stop}]),
+ rabbit:stop_apps(Stop),
+ clean_plugins(Stop),
+ {ok, Start, Stop};
+ _ ->
+ {error, {enabled_plugins_mismatch, FileJustChanged, OurFile}}
+ end.
+
%% @doc Prepares the file system and installs all enabled plugins.
setup() ->
- {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(ExpandDir) of
+ ok -> ok;
+ {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
+ [ExpandDir, E1]}})
+ end,
+
{ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file),
- prepare_plugins(EnabledFile, PluginDir, ExpandDir).
+ Enabled = read_enabled(EnabledFile),
+ prepare_plugins(Enabled).
%% @doc Lists the plugins which are currently running.
active() ->
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
- InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ],
+ InstalledPlugins = plugin_names(list(ExpandDir)),
[App || {App, _, _} <- rabbit_misc:which_applications(),
lists:member(App, InstalledPlugins)].
@@ -67,7 +100,7 @@ list(PluginsDir) ->
_ -> error_logger:warning_msg(
"Problem reading some plugins: ~p~n", [Problems])
end,
- Plugins.
+ ensure_dependencies(Plugins).
%% @doc Read the list of enabled plugins from the supplied term file.
read_enabled(PluginsFile) ->
@@ -86,15 +119,10 @@ read_enabled(PluginsFile) ->
%% the resulting list, otherwise they're skipped.
dependencies(Reverse, Sources, AllPlugins) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
- lists:ukeysort(
- 1, [{Name, Deps} ||
- #plugin{name = Name,
- dependencies = Deps} <- AllPlugins] ++
- [{Dep, []} ||
- #plugin{dependencies = Deps} <- AllPlugins,
- Dep <- Deps])),
+ fun ({App, _Deps}) -> [{App, App}] end,
+ fun ({App, Deps}) -> [{App, Dep} || Dep <- Deps] end,
+ [{Name, Deps} || #plugin{name = Name,
+ dependencies = Deps} <- AllPlugins]),
Dests = case Reverse of
false -> digraph_utils:reachable(Sources, G);
true -> digraph_utils:reaching(Sources, G)
@@ -102,27 +130,44 @@ dependencies(Reverse, Sources, AllPlugins) ->
true = digraph:delete(G),
Dests.
+%% Make sure we don't list OTP apps in here, and also that we detect
+%% missing dependencies.
+ensure_dependencies(Plugins) ->
+ Names = plugin_names(Plugins),
+ NotThere = [Dep || #plugin{dependencies = Deps} <- Plugins,
+ Dep <- Deps,
+ not lists:member(Dep, Names)],
+ {OTP, Missing} = lists:partition(fun is_loadable/1, lists:usort(NotThere)),
+ case Missing of
+ [] -> ok;
+ _ -> Blame = [Name || #plugin{name = Name,
+ dependencies = Deps} <- Plugins,
+ lists:any(fun (Dep) ->
+ lists:member(Dep, Missing)
+ end, Deps)],
+ throw({error, {missing_dependencies, Missing, Blame}})
+ end,
+ [P#plugin{dependencies = Deps -- OTP}
+ || P = #plugin{dependencies = Deps} <- Plugins].
+
+is_loadable(App) ->
+ case application:load(App) of
+ {error, {already_loaded, _}} -> true;
+ ok -> application:unload(App),
+ true;
+ _ -> false
+ end.
+
%%----------------------------------------------------------------------------
-prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
+prepare_plugins(Enabled) ->
+ {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
AllPlugins = list(PluginsDistDir),
- Enabled = read_enabled(EnabledFile),
ToUnpack = dependencies(false, Enabled, AllPlugins),
ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins),
- case Enabled -- plugin_names(ToUnpackPlugins) of
- [] -> ok;
- Missing -> error_logger:warning_msg(
- "The following enabled plugins were not found: ~p~n",
- [Missing])
- end,
-
- %% Eliminate the contents of the destination directory
- case delete_recursively(ExpandDir) of
- ok -> ok;
- {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
- [ExpandDir, E1]}})
- end,
case filelib:ensure_dir(ExpandDir ++ "/") of
ok -> ok;
{error, E2} -> throw({error, {cannot_create_plugins_expand_dir,
@@ -134,6 +179,20 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
[prepare_dir_plugin(PluginAppDescPath) ||
PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")].
+clean_plugins(Plugins) ->
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ [clean_plugin(Plugin, ExpandDir) || Plugin <- Plugins].
+
+clean_plugin(Plugin, ExpandDir) ->
+ {ok, Mods} = application:get_key(Plugin, modules),
+ application:unload(Plugin),
+ [begin
+ code:soft_purge(Mod),
+ code:delete(Mod),
+ false = code:is_loaded(Mod)
+ end || Mod <- Mods],
+ delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])).
+
prepare_dir_plugin(PluginAppDescPath) ->
code:add_path(filename:dirname(PluginAppDescPath)),
list_to_atom(filename:basename(PluginAppDescPath, ".app")).
@@ -172,8 +231,7 @@ plugin_info(Base, {app, App0}) ->
mkplugin(Name, Props, Type, Location) ->
Version = proplists:get_value(vsn, Props, "0"),
Description = proplists:get_value(description, Props, ""),
- Dependencies =
- filter_applications(proplists:get_value(applications, Props, [])),
+ Dependencies = proplists:get_value(applications, Props, []),
#plugin{name = Name, version = Version, description = Description,
dependencies = Dependencies, location = Location, type = Type}.
@@ -206,18 +264,6 @@ parse_binary(Bin) ->
Err -> {error, {invalid_app, Err}}
end.
-filter_applications(Applications) ->
- [Application || Application <- Applications,
- not is_available_app(Application)].
-
-is_available_app(Application) ->
- case application:load(Application) of
- {error, {already_loaded, _}} -> true;
- ok -> application:unload(Application),
- true;
- _ -> false
- end.
-
plugin_names(Plugins) ->
[Name || #plugin{name = Name} <- Plugins].
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 89e16f14..278fcf98 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -18,23 +18,34 @@
-include("rabbit.hrl").
-export([start/0, stop/0]).
+-export([action/6]).
+-define(NODE_OPT, "-n").
-define(VERBOSE_OPT, "-v").
-define(MINIMAL_OPT, "-m").
-define(ENABLED_OPT, "-E").
-define(ENABLED_ALL_OPT, "-e").
+-define(OFFLINE_OPT, "--offline").
+-define(ONLINE_OPT, "--online").
+-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
+-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
+-define(ONLINE_DEF, {?ONLINE_OPT, flag}).
--define(GLOBAL_DEFS, []).
+-define(RPC_TIMEOUT, infinity).
+
+-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]).
-define(COMMANDS,
[{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
- enable,
- disable]).
+ {enable, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {disable, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {set, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {sync, []}]).
%%----------------------------------------------------------------------------
@@ -51,11 +62,10 @@
start() ->
{ok, [[PluginsFile|_]|_]} =
init:get_argument(enabled_plugins_file),
+ {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
{Command, Opts, Args} =
- case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS,
- init:get_plain_arguments())
- of
+ case parse_arguments(init:get_plain_arguments(), NodeStr) of
{ok, Res} -> Res;
no_command -> print_error("could not recognise command", []),
usage()
@@ -67,7 +77,8 @@ start() ->
[string:join([atom_to_list(Command) | Args], " ")])
end,
- case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
+ Node = proplists:get_value(?NODE_OPT, Opts),
+ case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of
ok ->
rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
@@ -76,12 +87,23 @@ start() ->
{'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
PrintInvalidCommandError(),
usage();
+ {error, {missing_dependencies, Missing, Blame}} ->
+ print_error("dependent plugins ~p not found; used by ~p.",
+ [Missing, Blame]),
+ rabbit_misc:quit(2);
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
{error_string, Reason} ->
print_error("~s", [Reason]),
rabbit_misc:quit(2);
+ {badrpc, {'EXIT', Reason}} ->
+ print_error("~p", [Reason]),
+ rabbit_misc:quit(2);
+ {badrpc, Reason} ->
+ print_error("unable to connect to node ~w: ~w", [Node, Reason]),
+ print_badrpc_diagnostics([Node]),
+ rabbit_misc:quit(2);
Other ->
print_error("~p", [Other]),
rabbit_misc:quit(2)
@@ -92,50 +114,80 @@ stop() ->
%%----------------------------------------------------------------------------
-action(list, [], Opts, PluginsFile, PluginsDir) ->
- action(list, [".*"], Opts, PluginsFile, PluginsDir);
-action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
- format_plugins(Pat, Opts, PluginsFile, PluginsDir);
+parse_arguments(CmdLine, NodeStr) ->
+ case rabbit_misc:parse_arguments(
+ ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of
+ {ok, {Cmd, Opts0, Args}} ->
+ Opts = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts0],
+ {ok, {Cmd, Opts, Args}};
+ E ->
+ E
+ end.
+
+action(list, Node, [], Opts, PluginsFile, PluginsDir) ->
+ action(list, Node, [".*"], Opts, PluginsFile, PluginsDir);
+action(list, Node, [Pat], Opts, PluginsFile, PluginsDir) ->
+ format_plugins(Node, Pat, Opts, PluginsFile, PluginsDir);
-action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
+action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) ->
case ToEnable0 of
[] -> throw({error_string, "Not enough arguments for 'enable'"});
_ -> ok
end,
AllPlugins = rabbit_plugins:list(PluginsDir),
Enabled = rabbit_plugins:read_enabled(PluginsFile),
- ImplicitlyEnabled = rabbit_plugins:dependencies(false,
- Enabled, AllPlugins),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
Missing = ToEnable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string, fmt_missing(Missing)})
+ end,
NewEnabled = lists:usort(Enabled ++ ToEnable),
NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
NewEnabled, AllPlugins),
- MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing,
- case {Missing, MissingDeps} of
- {[], []} -> ok;
- {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)});
- {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)});
- {_, _} -> throw({error_string,
- fmt_missing("plugins", Missing) ++
- fmt_missing("dependencies", MissingDeps)})
- end,
write_enabled_plugins(PluginsFile, NewEnabled),
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
- NewImplicitlyEnabled -- ImplicitlyEnabled),
- report_change()
- end;
+ NewImplicitlyEnabled -- ImplicitlyEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
+
+action(set, Node, ToSet0, Opts, PluginsFile, PluginsDir) ->
+ ToSet = [list_to_atom(Name) || Name <- ToSet0],
+ AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ Missing = ToSet -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string, fmt_missing(Missing)})
+ end,
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ ToSet, AllPlugins),
+ write_enabled_plugins(PluginsFile, ToSet),
+ case NewImplicitlyEnabled of
+ [] -> io:format("All plugins are now disabled.~n");
+ _ -> print_list("The following plugins are now enabled:",
+ NewImplicitlyEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
-action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
+action(disable, Node, ToDisable0, Opts, PluginsFile, PluginsDir) ->
case ToDisable0 of
[] -> throw({error_string, "Not enough arguments for 'disable'"});
_ -> ok
end,
- ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
- Enabled = rabbit_plugins:read_enabled(PluginsFile),
AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
Missing = ToDisable -- plugin_names(AllPlugins),
case Missing of
[] -> ok;
@@ -144,30 +196,35 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
end,
ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins),
NewEnabled = Enabled -- ToDisableDeps,
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ NewEnabled, AllPlugins),
case length(Enabled) =:= length(NewEnabled) of
true -> io:format("Plugin configuration unchanged.~n");
- false -> ImplicitlyEnabled =
- rabbit_plugins:dependencies(false, Enabled, AllPlugins),
- NewImplicitlyEnabled =
- rabbit_plugins:dependencies(false,
- NewEnabled, AllPlugins),
- print_list("The following plugins have been disabled:",
+ false -> print_list("The following plugins have been disabled:",
ImplicitlyEnabled -- NewImplicitlyEnabled),
- write_enabled_plugins(PluginsFile, NewEnabled),
- report_change()
- end.
+ write_enabled_plugins(PluginsFile, NewEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
+
+action(sync, Node, [], _Opts, PluginsFile, _PluginsDir) ->
+ sync(Node, true, PluginsFile).
%%----------------------------------------------------------------------------
-print_error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
+
+print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
+
+print_badrpc_diagnostics(Nodes) ->
+ fmt_stderr(rabbit_nodes:diagnostics(Nodes), []).
usage() ->
io:format("~s", [rabbit_plugins_usage:usage()]),
rabbit_misc:quit(1).
%% Pretty print a list of plugins.
-format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
+format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) ->
Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
Format = case {Verbose, Minimal} of
@@ -182,41 +239,52 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
AvailablePlugins = rabbit_plugins:list(PluginsDir),
EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile),
- EnabledImplicitly =
- rabbit_plugins:dependencies(false, EnabledExplicitly,
- AvailablePlugins) -- EnabledExplicitly,
- Missing = [#plugin{name = Name, dependencies = []} ||
- Name <- ((EnabledExplicitly ++ EnabledImplicitly) --
- plugin_names(AvailablePlugins))],
+ AllEnabled = rabbit_plugins:dependencies(false, EnabledExplicitly,
+ AvailablePlugins),
+ EnabledImplicitly = AllEnabled -- EnabledExplicitly,
+ {StatusMsg, Running} =
+ case rpc:call(Node, rabbit_plugins, active, [], ?RPC_TIMEOUT) of
+ {badrpc, _} -> {"[failed to contact ~s - status not shown]", []};
+ Active -> {"* = running on ~s", Active}
+ end,
{ok, RE} = re:compile(Pattern),
Plugins = [ Plugin ||
- Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing,
+ Plugin = #plugin{name = Name} <- AvailablePlugins,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
- if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
- OnlyEnabledAll -> (lists:member(Name,
- EnabledExplicitly) or
- lists:member(Name, EnabledImplicitly));
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ OnlyEnabledAll -> lists:member(Name, EnabledExplicitly) or
+ lists:member(Name,EnabledImplicitly);
true -> true
end],
Plugins1 = usort_plugins(Plugins),
MaxWidth = lists:max([length(atom_to_list(Name)) ||
#plugin{name = Name} <- Plugins1] ++ [0]),
- [format_plugin(P, EnabledExplicitly, EnabledImplicitly,
- plugin_names(Missing), Format, MaxWidth) || P <- Plugins1],
+ case Format of
+ minimal -> ok;
+ _ -> io:format(" Configured: E = explicitly enabled; "
+ "e = implicitly enabled~n"
+ " | Status: ~s~n"
+ " |/~n", [rabbit_misc:format(StatusMsg, [Node])])
+ end,
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Running,
+ Format, MaxWidth) || P <- Plugins1],
ok.
format_plugin(#plugin{name = Name, version = Version,
description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Missing,
- Format, MaxWidth) ->
- Glyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly),
- lists:member(Name, Missing)} of
- {true, false, false} -> "[E]";
- {false, true, false} -> "[e]";
- {_, _, true} -> "[!]";
- _ -> "[ ]"
- end,
+ EnabledExplicitly, EnabledImplicitly, Running, Format,
+ MaxWidth) ->
+ EnabledGlyph = case {lists:member(Name, EnabledExplicitly),
+ lists:member(Name, EnabledImplicitly)} of
+ {true, false} -> "E";
+ {false, true} -> "e";
+ _ -> " "
+ end,
+ RunningGlyph = case lists:member(Name, Running) of
+ true -> "*";
+ false -> " "
+ end,
+ Glyph = rabbit_misc:format("[~s~s]", [EnabledGlyph, RunningGlyph]),
Opt = fun (_F, A, A) -> ok;
( F, A, _) -> io:format(F, [A])
end,
@@ -227,9 +295,9 @@ format_plugin(#plugin{name = Name, version = Version,
Opt("~s", Version, undefined),
io:format("~n");
verbose -> io:format("~s ~w~n", [Glyph, Name]),
- Opt(" Version: \t~s~n", Version, undefined),
- Opt(" Dependencies:\t~p~n", Deps, []),
- Opt(" Description: \t~s~n", Description, undefined),
+ Opt(" Version: \t~s~n", Version, undefined),
+ Opt(" Dependencies:\t~p~n", Deps, []),
+ Opt(" Description: \t~s~n", Description, undefined),
io:format("~n")
end.
@@ -240,8 +308,8 @@ fmt_list(Header, Plugins) ->
lists:flatten(
[Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
-fmt_missing(Desc, Missing) ->
- fmt_list("The following " ++ Desc ++ " could not be found:", Missing).
+fmt_missing(Missing) ->
+ fmt_list("The following plugins could not be found:", Missing).
usort_plugins(Plugins) ->
lists:usort(fun plugins_cmp/2, Plugins).
@@ -262,6 +330,51 @@ write_enabled_plugins(PluginsFile, Plugins) ->
PluginsFile, Reason}})
end.
-report_change() ->
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n").
+action_change(Opts, Node, Old, New, PluginsFile) ->
+ action_change0(proplists:get_bool(?OFFLINE_OPT, Opts),
+ proplists:get_bool(?ONLINE_OPT, Opts),
+ Node, Old, New, PluginsFile).
+
+action_change0(true, _Online, _Node, Same, Same, _PluginsFile) ->
+ %% Definitely nothing to do
+ ok;
+action_change0(true, _Online, _Node, _Old, _New, _PluginsFile) ->
+ io:format("Offline change; changes will take effect at broker restart.~n");
+action_change0(false, Online, Node, _Old, _New, PluginsFile) ->
+ sync(Node, Online, PluginsFile).
+
+sync(Node, ForceOnline, PluginsFile) ->
+ rpc_call(Node, ForceOnline, rabbit_plugins, ensure, [PluginsFile]).
+
+rpc_call(Node, Online, Mod, Fun, Args) ->
+ io:format("~nApplying plugin configuration to ~s...", [Node]),
+ case rpc:call(Node, Mod, Fun, Args) of
+ {ok, [], []} ->
+ io:format(" nothing to do.~n", []);
+ {ok, Start, []} ->
+ io:format(" started ~b plugin~s.~n", [length(Start), plur(Start)]);
+ {ok, [], Stop} ->
+ io:format(" stopped ~b plugin~s.~n", [length(Stop), plur(Stop)]);
+ {ok, Start, Stop} ->
+ io:format(" stopped ~b plugin~s and started ~b plugin~s.~n",
+ [length(Stop), plur(Stop), length(Start), plur(Start)]);
+ {badrpc, nodedown} = Error ->
+ io:format(" failed.~n", []),
+ case Online of
+ true -> Error;
+ false -> io:format(
+ " * Could not contact node ~s.~n"
+ " Changes will take effect at broker restart.~n"
+ " * Options: --online - fail if broker cannot be "
+ "contacted.~n"
+ " --offline - do not try to contact "
+ "broker.~n",
+ [Node])
+ end;
+ Error ->
+ io:format(" failed.~n", []),
+ Error
+ end.
+
+plur([_]) -> "";
+plur(_) -> "s".
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index fe2b766f..3558cf98 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -61,13 +61,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value) ->
{error, "~p is not a valid dead letter routing key", [Value]};
validate_policy0(<<"message-ttl">>, Value)
- when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"message-ttl">>, Value) ->
{error, "~p is not a valid message TTL", [Value]};
validate_policy0(<<"expires">>, Value)
- when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 1 ->
ok;
validate_policy0(<<"expires">>, Value) ->
{error, "~p is not a valid queue expiry", [Value]};
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 0a69fb32..f5d03360 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -46,17 +46,11 @@ name(#exchange{policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set(
- Q#amqqueue{policy = set0(Name)});
-set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
- X#exchange{policy = set0(Name)}).
+set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
-set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)};
-set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set(
- X#exchange{policy = match(Name, Ps)}).
-
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
@@ -104,12 +98,18 @@ recover0() ->
Policies = list(),
[rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:write(rabbit_durable_exchange, set(X, Policies), write)
- end) || X <- Xs],
+ mnesia:write(
+ rabbit_durable_exchange,
+ rabbit_exchange_decorator:set(
+ X#exchange{policy = match(Name, Policies)}), write)
+ end) || X = #exchange{name = Name} <- Xs],
[rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:write(rabbit_durable_queue, set(Q, Policies), write)
- end) || Q <- Qs],
+ mnesia:write(
+ rabbit_durable_queue,
+ rabbit_queue_decorator:set(
+ Q#amqqueue{policy = match(Name, Policies)}), write)
+ end) || Q = #amqqueue{name = Name} <- Qs],
ok.
invalid_file() ->
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 6205e2dc..adfe0c7f 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -2,7 +2,7 @@
-include("rabbit.hrl").
--export([select/1, set/1]).
+-export([select/1, set/1, register/2, unregister/1]).
%%----------------------------------------------------------------------------
@@ -41,3 +41,24 @@ select(Modules) ->
set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(queue_decorator, TypeName, ModuleName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(queue_decorator, TypeName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+maybe_recover(Q = #amqqueue{name = Name,
+ decorators = Decs}) ->
+ #amqqueue{decorators = Decs1} = set(Q),
+ Old = lists:sort(select(Decs)),
+ New = lists:sort(select(Decs1)),
+ case New of
+ Old -> ok;
+ _ -> [M:startup(Q) || M <- New -- Old],
+ rabbit_amqqueue:update_decorators(Name)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ddaf205e..9db607f9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -27,7 +27,6 @@
-export([conserve_resources/3, server_properties/1]).
--define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
@@ -43,7 +42,7 @@
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, connected_at}).
-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}).
@@ -55,7 +54,7 @@
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, channel_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties, connected_at]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -189,10 +188,10 @@ server_capabilities(_) ->
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
socket_error(Reason) when is_atom(Reason) ->
- log(error, "error on AMQP connection ~p: ~s~n",
+ log(error, "Error on AMQP connection ~p: ~s~n",
[self(), rabbit_misc:format_inet_error(Reason)]);
socket_error(Reason) ->
- log(error, "error on AMQP connection ~p:~n~p~n", [self(), Reason]).
+ log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -216,8 +215,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
exit(normal)
end,
log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
+ {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
ClientSock = socket_op(Sock, SockTransform),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
+ erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
?store_proc_name(list_to_binary(Name)),
@@ -231,13 +231,14 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
peer_port = PeerPort,
protocol = none,
user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
+ timeout_sec = (HandshakeTimeout / 1000),
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
capabilities = [],
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ connected_at = rabbit_misc:now_to_ms(os:timestamp())},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
@@ -410,7 +411,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
rabbit_event:notify(
connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref),
- State;
+ rabbit_event:init_stats_timer(State, #v1.stats_timer);
handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
State;
@@ -548,21 +549,27 @@ wait_for_channel_termination(0, TimerRef, State) ->
end;
_ -> State
end;
-wait_for_channel_termination(N, TimerRef, State) ->
+wait_for_channel_termination(N, TimerRef,
+ State = #v1{connection_state = CS,
+ connection = #connection{
+ name = ConnName,
+ user = User,
+ vhost = VHost}}) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
{Channel, State1} = channel_cleanup(ChPid, State),
case {Channel, termination_kind(Reason)} of
- {undefined, _} -> exit({abnormal_dependent_exit,
- ChPid, Reason});
- {_, controlled} -> wait_for_channel_termination(
- N-1, TimerRef, State1);
- {_, uncontrolled} -> log(error,
- "AMQP connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
- wait_for_channel_termination(
- N-1, TimerRef, State1)
+ {undefined, _} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef, State1);
+ {_, uncontrolled} ->
+ log(error, "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:"
+ "error while terminating:~n~p~n",
+ [self(), ConnName, VHost, User#user.username,
+ CS, Channel, Reason]),
+ wait_for_channel_termination(N-1, TimerRef, State1)
end;
cancel_wait ->
exit(channel_termination_timeout)
@@ -581,16 +588,24 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+log_hard_error(#v1{connection_state = CS,
+ connection = #connection{
+ name = ConnName,
+ user = User,
+ vhost = VHost}}, Channel, Reason) ->
+ log(error,
+ "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:~n~p~n",
+ [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]).
+
handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
- log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), closed, Channel, Reason]),
+ log_hard_error(State, Channel, Reason),
State;
handle_exception(State = #v1{connection = #connection{protocol = Protocol},
connection_state = CS},
Channel, Reason)
when ?IS_RUNNING(State) orelse CS =:= closing ->
- log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), CS, Channel, Reason]),
+ log_hard_error(State, Channel, Reason),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
State1 = close_connection(terminate_channels(State)),
@@ -1129,6 +1144,7 @@ ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
+ic(connected_at, #connection{connected_at = T}) -> T;
ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index cf125913..f78549ff 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -82,6 +82,8 @@ set(VHost, Component, Name, Term, User) ->
set_global(Name, Term) ->
mnesia_update(Name, Term),
+ event_notify(parameter_set, none, global, [{name, Name},
+ {value, Term}]),
ok.
format_error(L) ->
@@ -164,6 +166,8 @@ mnesia_clear(VHost, Component, Name) ->
event_notify(_Event, _VHost, <<"policy">>, _Props) ->
ok;
+event_notify(Event, none, Component, Props) ->
+ rabbit_event:notify(Event, [{component, Component} | Props]);
event_notify(Event, VHost, Component, Props) ->
rabbit_event:notify(Event, [{vhost, VHost},
{component, Component} | Props]).
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index da75932d..47c77cd0 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -70,7 +70,8 @@ wait_for_replicated() ->
not lists:member({local_content, true}, TabDef)]).
wait(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
+ {ok, Timeout} = application:get_env(rabbit, mnesia_table_loading_timeout),
+ case mnesia:wait_for_tables(TableNames, Timeout) of
ok ->
ok;
{timeout, BadTabs} ->
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index aafd81df..dbc2856d 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,7 +16,7 @@
-module(rabbit_trace).
--export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]).
+-export([init/1, enabled/1, tap_in/5, tap_out/5, start/1, stop/1]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -32,8 +32,12 @@
-spec(init/1 :: (rabbit_types:vhost()) -> state()).
-spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
--spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
+-spec(tap_in/5 :: (rabbit_types:basic_message(), binary(),
+ rabbit_channel:channel_number(),
+ rabbit_types:username(), state()) -> 'ok').
+-spec(tap_out/5 :: (rabbit_amqqueue:qmsg(), binary(),
+ rabbit_channel:channel_number(),
+ rabbit_types:username(), state()) -> 'ok').
-spec(start/1 :: (rabbit_types:vhost()) -> 'ok').
-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok').
@@ -54,15 +58,27 @@ enabled(VHost) ->
{ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
lists:member(VHost, VHosts).
-tap_in(_Msg, none) -> ok;
-tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) ->
- trace(TraceX, Msg, <<"publish">>, XName, []).
-
-tap_out(_Msg, none) -> ok;
-tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) ->
+tap_in(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok;
+tap_in(Msg = #basic_message{exchange_name = #resource{name = XName,
+ virtual_host = VHost}},
+ ConnName, ChannelNum, Username, TraceX) ->
+ trace(TraceX, Msg, <<"publish">>, XName,
+ [{<<"vhost">>, longstr, VHost},
+ {<<"connection">>, longstr, ConnName},
+ {<<"channel">>, signedint, ChannelNum},
+ {<<"user">>, longstr, Username}]).
+
+tap_out(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok;
+tap_out({#resource{name = QName, virtual_host = VHost},
+ _QPid, _QMsgId, Redelivered, Msg},
+ ConnName, ChannelNum, Username, TraceX) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
trace(TraceX, Msg, <<"deliver">>, QName,
- [{<<"redelivered">>, signedint, RedeliveredNum}]).
+ [{<<"redelivered">>, signedint, RedeliveredNum},
+ {<<"vhost">>, longstr, VHost},
+ {<<"connection">>, longstr, ConnName},
+ {<<"channel">>, signedint, ChannelNum},
+ {<<"user">>, longstr, Username}]).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b6d37852..1104f373 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -48,6 +48,7 @@
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
+-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
%% -------------------------------------------------------------------
@@ -77,6 +78,8 @@
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
-spec(internal_system_x/0 :: () -> 'ok').
+-spec(cluster_name/0 :: () -> 'ok').
+-spec(down_slave_nodes/0 :: () -> 'ok').
-endif.
@@ -382,6 +385,21 @@ cluster_name_tx() ->
[mnesia:delete(T, K, write) || K <- Ks],
ok.
+down_slave_nodes() ->
+ ok = down_slave_nodes(rabbit_queue),
+ ok = down_slave_nodes(rabbit_durable_queue).
+
+down_slave_nodes(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids, Decorators}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, [], Policy, GmPids, Decorators}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index d943b599..3a041508 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -114,8 +114,8 @@ upgrades_required(Scope) ->
with_upgrade_graph(Fun, Scope) ->
case rabbit_misc:build_acyclic_graph(
- fun (Module, Steps) -> vertices(Module, Steps, Scope) end,
- fun (Module, Steps) -> edges(Module, Steps, Scope) end,
+ fun ({_App, Module, Steps}) -> vertices(Module, Steps, Scope) end,
+ fun ({_App, Module, Steps}) -> edges(Module, Steps, Scope) end,
rabbit_misc:all_module_attributes(rabbit_upgrade)) of
{ok, G} -> try
Fun(G)
@@ -161,7 +161,7 @@ heads(G) ->
categorise_by_scope(Version) when is_list(Version) ->
Categorised =
- [{Scope, Name} || {_Module, Attributes} <-
+ [{Scope, Name} || {_App, _Module, Attributes} <-
rabbit_misc:all_module_attributes(rabbit_upgrade),
{Name, Scope, _Requires} <- Attributes,
lists:member(Name, Version)],