summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--codegen.py3
-rw-r--r--docs/examples-to-end.xsl7
-rw-r--r--docs/rabbitmqctl.1.xml8
-rw-r--r--include/rabbit_backing_queue_spec.hrl5
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec8
-rw-r--r--packaging/common/rabbitmq-server.init19
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf23
-rw-r--r--packaging/debs/Debian/debian/changelog12
-rw-r--r--packaging/debs/Debian/debian/dirs1
-rw-r--r--packaging/debs/Debian/debian/postinst1
-rw-r--r--packaging/debs/Debian/debian/postrm.in3
-rw-r--r--scripts/rabbitmq-server.bat3
-rw-r--r--scripts/rabbitmq-service.bat4
-rw-r--r--src/delegate.erl6
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/file_handle_cache.erl55
-rw-r--r--src/gatherer.erl2
-rw-r--r--src/gen_server2.erl4
-rw-r--r--src/gm.erl4
-rw-r--r--src/rabbit.erl23
-rw-r--r--src/rabbit_access_control.erl3
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl213
-rw-r--r--src/rabbit_amqqueue_sup.erl14
-rw-r--r--src/rabbit_auth_backend_internal.erl18
-rw-r--r--src/rabbit_basic.erl74
-rw-r--r--src/rabbit_binding.erl29
-rw-r--r--src/rabbit_channel.erl342
-rw-r--r--src/rabbit_client_sup.erl3
-rw-r--r--src/rabbit_command_assembler.erl4
-rw-r--r--src/rabbit_control.erl31
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_error_logger.erl10
-rw-r--r--src/rabbit_log.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_misc.erl20
-rw-r--r--src/rabbit_mirror_queue_slave.erl11
-rw-r--r--src/rabbit_misc.erl8
-rw-r--r--src/rabbit_mnesia.erl12
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_networking.erl27
-rw-r--r--src/rabbit_node_monitor.erl1
-rw-r--r--src/rabbit_prelaunch.erl39
-rw-r--r--src/rabbit_reader.erl41
-rw-r--r--src/rabbit_restartable_sup.erl10
-rw-r--r--src/rabbit_sup.erl15
-rw-r--r--src/rabbit_tests.erl13
-rw-r--r--src/rabbit_upgrade.erl8
-rw-r--r--src/rabbit_writer.erl3
-rw-r--r--src/tcp_acceptor_sup.erl8
-rw-r--r--src/tcp_listener.erl8
-rw-r--r--src/tcp_listener_sup.erl15
-rw-r--r--src/test_sup.erl12
-rw-r--r--src/vm_memory_monitor.erl4
-rw-r--r--src/worker_pool.erl1
-rw-r--r--src/worker_pool_sup.erl4
57 files changed, 735 insertions, 482 deletions
diff --git a/Makefile b/Makefile
index 106f2993..1cda821f 100644
--- a/Makefile
+++ b/Makefile
@@ -121,7 +121,7 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c
$(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_8) $@
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
- dialyzer --plt $(BASIC_PLT) --no_native \
+ dialyzer --plt $(BASIC_PLT) --no_native --fullpath \
-Wrace_conditions $(BEAM_TARGETS)
# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target
diff --git a/codegen.py b/codegen.py
index 8cd9dab8..7636c196 100644
--- a/codegen.py
+++ b/codegen.py
@@ -371,6 +371,8 @@ def genErl(spec):
classIds.add(m.klass.index)
print prettyType("amqp_class_id()",
["%i" % ci for ci in classIds])
+ print prettyType("amqp_class_name()",
+ ["%s" % c.erlangName() for c in spec.allClasses()])
print "-endif. % use_specs"
print """
@@ -378,6 +380,7 @@ def genErl(spec):
-ifdef(use_specs).
-spec(version/0 :: () -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()).
+-spec(lookup_class_name/1 :: (amqp_class_id()) -> amqp_class_name()).
-spec(method_id/1 :: (amqp_method_name()) -> amqp_method()).
-spec(method_has_content/1 :: (amqp_method_name()) -> boolean()).
-spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()).
diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl
index a0a74178..4db1d5c4 100644
--- a/docs/examples-to-end.xsl
+++ b/docs/examples-to-end.xsl
@@ -2,7 +2,10 @@
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
version='1.0'>
-<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" />
+<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN"
+ doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd"
+ indent="yes"
+/>
<!-- Don't copy examples through in place -->
<xsl:template match="*[@role='example-prefix']"/>
@@ -27,7 +30,7 @@
<varlistentry>
<term><command><xsl:copy-of select="text()"/></command></term>
<listitem>
- <xsl:copy-of select="following-sibling::para[@role='example']"/>
+ <xsl:copy-of select="following-sibling::para[@role='example' and preceding-sibling::screen[1] = current()]"/>
</listitem>
</varlistentry>
</xsl:for-each>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 4d3065b7..3291c44d 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -556,13 +556,15 @@
<varlistentry>
<term><cmdsynopsis><command>list_users</command></cmdsynopsis></term>
<listitem>
- <para>Lists users</para>
+ <para>
+ Lists users. Each result row will contain the user name
+ followed by a list of the tags set for that user.
+ </para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl list_users</screen>
<para role="example">
This command instructs the RabbitMQ broker to list all
- users. Each result row will contain the user name and
- the administrator status of the user, in that order.
+ users.
</para>
</listitem>
</varlistentry>
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index ee102f5e..20fe4234 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -26,6 +26,7 @@
fun ((rabbit_types:message_properties())
-> rabbit_types:message_properties())).
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
+-type(duration() :: ('undefined' | 'infinity' | number())).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
@@ -55,8 +56,8 @@
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
-spec(set_ram_duration_target/2 ::
- (('undefined' | 'infinity' | number()), state()) -> state()).
--spec(ram_duration/1 :: (state()) -> {number(), state()}).
+ (duration(), state()) -> state()).
+-spec(ram_duration/1 :: (state()) -> {duration(), state()}).
-spec(needs_timeout/1 :: (state()) -> 'false' | 'timed' | 'idle').
-spec(timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index bdd6c4a1..0c5aa96a 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -50,7 +50,6 @@ make install TARGET_DIR=%{_maindir} \
mkdir -p %{buildroot}%{_localstatedir}/lib/rabbitmq/mnesia
mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq
-mkdir -p %{buildroot}%{_localstatedir}/run/rabbitmq
#Copy all necessary lib files etc.
install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server
@@ -112,7 +111,6 @@ done
%defattr(-,root,root,-)
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq
-%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/run/rabbitmq
%dir %{_sysconfdir}/rabbitmq
%{_initrddir}/rabbitmq-server
%config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server
@@ -122,6 +120,12 @@ done
rm -rf %{buildroot}
%changelog
+* Fri Sep 9 2011 tim@rabbitmq.com 2.6.1-1
+- New Upstream Release
+
+* Fri Aug 26 2011 tim@rabbitmq.com 2.6.0-1
+- New Upstream Release
+
* Mon Jun 27 2011 simon@rabbitmq.com 2.5.1-1
- New Upstream Release
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index e2815f04..15fd5d5b 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -34,12 +34,27 @@ test -x $CONTROL || exit 0
RETVAL=0
set -e
+ensure_pid_dir () {
+ PID_DIR=`dirname ${PID_FILE}`
+ if [ ! -d ${PID_DIR} ] ; then
+ mkdir -p ${PID_DIR}
+ chown -R ${USER}:${USER} ${PID_DIR}
+ chmod 755 ${PID_DIR}
+ fi
+}
+
+remove_pid () {
+ rm -f ${PID_FILE}
+ rmdir `dirname ${PID_FILE}` || :
+}
+
start_rabbitmq () {
status_rabbitmq quiet
if [ $RETVAL = 0 ] ; then
echo RabbitMQ is currently running
else
RETVAL=0
+ ensure_pid_dir
set +e
setsid sh -c "RABBITMQ_PID_FILE=$PID_FILE $DAEMON > \
${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err" &
@@ -54,7 +69,7 @@ start_rabbitmq () {
fi
;;
*)
- rm -f $PID_FILE
+ remove_pid
echo FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}
RETVAL=1
;;
@@ -70,7 +85,7 @@ stop_rabbitmq () {
RETVAL=$?
set -e
if [ $RETVAL = 0 ] ; then
- rm -f $PID_FILE
+ remove_pid
if [ -n "$LOCK_FILE" ] ; then
rm -f $LOCK_FILE
fi
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index 51e16517..e6776eff 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -43,7 +43,7 @@ OCF_RESKEY_server_default="/usr/sbin/rabbitmq-server"
OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl"
OCF_RESKEY_nodename_default="rabbit@localhost"
OCF_RESKEY_log_base_default="/var/log/rabbitmq"
-OCF_RESKEY_pid_file_default="/var/lib/rabbitmq/pid"
+OCF_RESKEY_pid_file_default="/var/run/rabbitmq/pid"
: ${OCF_RESKEY_server=${OCF_RESKEY_server_default}}
: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}}
: ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}}
@@ -179,6 +179,21 @@ RABBITMQ_PID_FILE=$OCF_RESKEY_pid_file
[ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME"
[ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME
+ensure_pid_dir () {
+ PID_DIR=`dirname ${RABBITMQ_PID_FILE}`
+ if [ ! -d ${PID_DIR} ] ; then
+ mkdir -p ${PID_DIR}
+ chown -R rabbitmq:rabbitmq ${PID_DIR}
+ chmod 755 ${PID_DIR}
+ fi
+ return $OCF_SUCCESS
+}
+
+remove_pid () {
+ rm -f ${RABBITMQ_PID_FILE}
+ rmdir `dirname ${RABBITMQ_PID_FILE}` || :
+}
+
export_vars() {
[ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS
[ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT
@@ -186,7 +201,7 @@ export_vars() {
[ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE
[ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE
[ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS
- [ ! -z $RABBITMQ_PID_FILE ] && export RABBITMQ_PID_FILE
+ [ ! -z $RABBITMQ_PID_FILE ] && ensure_pid_dir && export RABBITMQ_PID_FILE
}
rabbit_validate_partial() {
@@ -268,7 +283,7 @@ rabbit_start() {
rabbit_wait $RABBITMQ_PID_FILE
rc=$?
if [ "$rc" != $OCF_SUCCESS ]; then
- rm -f $RABBITMQ_PID_FILE
+ remove_pid
ocf_log info "rabbitmq-server start failed: $rc"
exit $OCF_ERR_GENERIC
fi
@@ -299,7 +314,7 @@ rabbit_stop() {
rabbit_status
rc=$?
if [ "$rc" = $OCF_NOT_RUNNING ]; then
- rm -f $RABBITMQ_PID_FILE
+ remove_pid
stop_wait=0
break
elif [ "$rc" != $OCF_SUCCESS ]; then
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 9063a6ed..8f526544 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,15 @@
+rabbitmq-server (2.6.1-1) natty; urgency=low
+
+ * New Upstream Release
+
+ -- Tim <tim@rabbitmq.com> Fri, 09 Sep 2011 14:38:45 +0100
+
+rabbitmq-server (2.6.0-1) natty; urgency=low
+
+ * New Upstream Release
+
+ -- Tim <tim@rabbitmq.com> Fri, 26 Aug 2011 16:29:40 +0100
+
rabbitmq-server (2.5.1-1) lucid; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/dirs b/packaging/debs/Debian/debian/dirs
index 5cf167d5..625b7d41 100644
--- a/packaging/debs/Debian/debian/dirs
+++ b/packaging/debs/Debian/debian/dirs
@@ -4,7 +4,6 @@ usr/sbin
usr/share/man
var/lib/rabbitmq/mnesia
var/log/rabbitmq
-var/run/rabbitmq
etc/logrotate.d
etc/rabbitmq
diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst
index ca531f14..b11340ef 100644
--- a/packaging/debs/Debian/debian/postinst
+++ b/packaging/debs/Debian/debian/postinst
@@ -32,7 +32,6 @@ fi
chown -R rabbitmq:rabbitmq /var/lib/rabbitmq
chown -R rabbitmq:rabbitmq /var/log/rabbitmq
-chown -R rabbitmq:rabbitmq /var/run/rabbitmq
case "$1" in
configure)
diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in
index c4aeeebe..baf081fc 100644
--- a/packaging/debs/Debian/debian/postrm.in
+++ b/packaging/debs/Debian/debian/postrm.in
@@ -32,9 +32,6 @@ case "$1" in
if [ -d /var/log/rabbitmq ]; then
rm -r /var/log/rabbitmq
fi
- if [ -d /var/run/rabbitmq ]; then
- rm -r /var/run/rabbitmq
- fi
if [ -d /etc/rabbitmq ]; then
rm -r /etc/rabbitmq
fi
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 5e2097db..84d24c45 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -135,15 +135,14 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-boot "!RABBITMQ_BOOT_FILE!" ^
!RABBITMQ_CONFIG_ARG! ^
-sname !RABBITMQ_NODENAME! ^
--s rabbit ^
+W w ^
+A30 ^
+P 1048576 ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
--kernel error_logger {file,\""!LOGS:\=/!"\"} ^
!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
+-kernel error_logger {file,\""!LOGS:\=/!"\"} ^
-sasl sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index b2aa4f58..60697d0b 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -201,14 +201,14 @@ set ERLANG_SERVICE_ARGUMENTS= ^
!RABBITMQ_EBIN_PATH! ^
-boot "!RABBITMQ_BOOT_FILE!" ^
!RABBITMQ_CONFIG_ARG! ^
--s rabbit ^
+W w ^
+A30 ^
++P 1048576 ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^
--kernel error_logger {file,\""!LOGS:\=/!"\"} ^
!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
+-kernel error_logger {file,\""!LOGS:\=/!"\"} ^
-sasl sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
diff --git a/src/delegate.erl b/src/delegate.erl
index 17046201..edb4eba4 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -28,13 +28,13 @@
-ifdef(use_specs).
-spec(start_link/1 ::
- (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
--spec(invoke_no_result/2 ::
- (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+ (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
-spec(invoke/2 ::
( pid(), fun ((pid()) -> A)) -> A;
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
+-spec(invoke_no_result/2 ::
+ (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
-endif.
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index fc693c7d..4c131a6c 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -28,7 +28,7 @@
-ifdef(use_specs).
--spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (integer()) -> rabbit_types:ok_pid_or_error()).
-spec(count/1 :: ([node()]) -> integer()).
-endif.
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 3c2111dc..e14dfe22 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -159,7 +159,8 @@
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)).
--define(CLIENT_ETS_TABLE, ?MODULE).
+-define(CLIENT_ETS_TABLE, file_handle_cache_client).
+-define(ELDERS_ETS_TABLE, file_handle_cache_elders).
%%----------------------------------------------------------------------------
@@ -228,7 +229,7 @@
-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(open/3 ::
- (string(), [any()],
+ (file:filename(), [any()],
[{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
-> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
@@ -243,17 +244,17 @@
-spec(flush/1 :: (ref()) -> ok_or_error()).
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
val_or_error(non_neg_integer())).
--spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
+-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(obtain/0 :: () -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
--spec(info_keys/0 :: () -> [atom()]).
--spec(info/0 :: () -> [{atom(), any()}]).
--spec(info/1 :: ([atom()]) -> [{atom(), any()}]).
--spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(info/0 :: () -> rabbit_types:infos()).
+-spec(info/1 :: ([atom()]) -> rabbit_types:infos()).
+-spec(ulimit/0 :: () -> 'unknown' | non_neg_integer()).
-endif.
@@ -794,7 +795,6 @@ init([]) ->
Watermark;
_ ->
case ulimit() of
- infinity -> infinity;
unknown -> ?FILE_HANDLES_LIMIT_OTHER;
Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS])
end
@@ -803,7 +803,8 @@ init([]) ->
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
[Limit, ObtainLimit]),
Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
- {ok, #fhc_state { elders = dict:new(),
+ Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]),
+ {ok, #fhc_state { elders = Elders,
limit = Limit,
open_count = 0,
open_pending = pending_new(),
@@ -819,28 +820,27 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
elders = Elders,
clients = Clients })
when EldestUnusedSince =/= undefined ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ true = ets:insert(Elders, {Pid, EldestUnusedSince}),
Item = #pending { kind = open,
pid = Pid,
requested = Requested,
from = From },
ok = track_client(Pid, Clients),
- State1 = State #fhc_state { elders = Elders1 },
- case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of
+ case needs_reduce(State #fhc_state { open_count = Count + Requested }) of
true -> case ets:lookup(Clients, Pid) of
[#cstate { opened = 0 }] ->
true = ets:update_element(
Clients, Pid, {#cstate.blocked, true}),
{noreply,
- reduce(State1 #fhc_state {
+ reduce(State #fhc_state {
open_pending = pending_in(Item, Pending) })};
[#cstate { opened = Opened }] ->
true = ets:update_element(
Clients, Pid,
{#cstate.pending_closes, Opened}),
- {reply, close, State1}
+ {reply, close, State}
end;
- false -> {noreply, run_pending_item(Item, State1)}
+ false -> {noreply, run_pending_item(Item, State)}
end;
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
@@ -888,21 +888,20 @@ handle_cast({register_callback, Pid, MFA},
handle_cast({update, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders })
when EldestUnusedSince =/= undefined ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ true = ets:insert(Elders, {Pid, EldestUnusedSince}),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, State #fhc_state { elders = Elders1 }};
+ {noreply, State};
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
- Elders1 = case EldestUnusedSince of
- undefined -> dict:erase(Pid, Elders);
- _ -> dict:store(Pid, EldestUnusedSince, Elders)
- end,
+ true = case EldestUnusedSince of
+ undefined -> ets:delete(Elders, Pid);
+ _ -> ets:insert(Elders, {Pid, EldestUnusedSince})
+ end,
ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
{noreply, adjust_alarm(State, process_pending(
- update_counts(open, Pid, -1,
- State #fhc_state { elders = Elders1 })))};
+ update_counts(open, Pid, -1, State)))};
handle_cast({transfer, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
@@ -923,6 +922,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
[#cstate { opened = Opened, obtained = Obtained }] =
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
+ true = ets:delete(Elders, Pid),
FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
{noreply, adjust_alarm(
State,
@@ -931,11 +931,12 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
open_count = OpenCount - Opened,
open_pending = filter_pending(FilterFun, OpenPending),
obtain_count = ObtainCount - Obtained,
- obtain_pending = filter_pending(FilterFun, ObtainPending),
- elders = dict:erase(Pid, Elders) }))}.
+ obtain_pending = filter_pending(FilterFun, ObtainPending) }))}.
-terminate(_Reason, State = #fhc_state { clients = Clients }) ->
+terminate(_Reason, State = #fhc_state { clients = Clients,
+ elders = Elders }) ->
ets:delete(Clients),
+ ets:delete(Elders),
State.
code_change(_OldVsn, State, _Extra) ->
@@ -1091,7 +1092,7 @@ reduce(State = #fhc_state { open_pending = OpenPending,
timer_ref = TRef }) ->
Now = now(),
{CStates, Sum, ClientCount} =
- dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
+ ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
[#cstate { pending_closes = PendingCloses,
opened = Opened,
blocked = Blocked } = CState] =
diff --git a/src/gatherer.erl b/src/gatherer.erl
index aa43e9a9..fe976b50 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -27,7 +27,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(stop/1 :: (pid()) -> 'ok').
-spec(fork/1 :: (pid()) -> 'ok').
-spec(finish/1 :: (pid()) -> 'ok').
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 35258139..ab6c4e64 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -616,11 +616,11 @@ in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
process_msg({system, From, Req},
GS2State = #gs2_state { parent = Parent, debug = Debug }) ->
+ %% gen_server puts Hib on the end as the 7th arg, but that version
+ %% of the fun seems not to be documented so leaving out for now.
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State);
process_msg({'EXIT', Parent, Reason} = Msg,
GS2State = #gs2_state { parent = Parent }) ->
- %% gen_server puts Hib on the end as the 7th arg, but that version
- %% of the fun seems not to be documented so leaving out for now.
terminate(Reason, Msg, GS2State);
process_msg(Msg, GS2State = #gs2_state { debug = [] }) ->
handle_msg(Msg, GS2State);
diff --git a/src/gm.erl b/src/gm.erl
index 8b4d2776..8c838a70 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -422,9 +422,9 @@
-type(group_name() :: any()).
--spec(create_tables/0 :: () -> 'ok').
+-spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
-spec(start_link/3 :: (group_name(), atom(), any()) ->
- {'ok', pid()} | {'error', any()}).
+ rabbit_types:ok_pid_or_error()).
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 8cae7fde..3e311747 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -18,7 +18,8 @@
-behaviour(application).
--export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, environment/0,
+-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0,
+ is_running/0 , is_running/1, environment/0,
rotate_logs/1, force_event_refresh/0]).
-export([start/2, stop/1]).
@@ -196,6 +197,8 @@
{os, {atom(), atom()}} |
{erlang_version, string()} |
{memory, any()}]).
+-spec(is_running/0 :: () -> boolean()).
+-spec(is_running/1 :: (node()) -> boolean()).
-spec(environment/0 :: () -> [{atom() | term()}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
@@ -203,6 +206,14 @@
-spec(boot_delegate/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
+-spec(start/2 :: ('normal',[]) ->
+ {'error',
+ {'erlang_version_too_old',
+ {'found',[any()]},
+ {'required',[any(),...]}}} |
+ {'ok',pid()}).
+-spec(stop/1 :: (_) -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -233,11 +244,19 @@ stop_and_halt() ->
status() ->
[{pid, list_to_integer(os:getpid())},
- {running_applications, application:which_applications()},
+ {running_applications, application:which_applications(infinity)},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
{memory, erlang:memory()}].
+is_running() -> is_running(node()).
+
+is_running(Node) ->
+ case rpc:call(Node, application, which_applications, [infinity]) of
+ {badrpc, _} -> false;
+ Apps -> proplists:is_defined(rabbit, Apps)
+ end.
+
environment() ->
lists:keysort(
1, [P || P = {K, _} <- application:get_all_env(rabbit),
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index c0ae18c0..ca28d686 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -32,6 +32,9 @@
-spec(check_user_pass_login/2 ::
(rabbit_types:username(), rabbit_types:password())
-> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
+-spec(check_user_login/2 ::
+ (rabbit_types:username(), [{atom(), any()}])
+ -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
-> 'ok' | rabbit_types:channel_exit()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 88ff26cc..b3e92b69 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -49,7 +49,7 @@
-type(name() :: rabbit_types:r('queue')).
-type(qlen() :: rabbit_types:ok(non_neg_integer())).
--type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A)).
+-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())).
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
@@ -64,6 +64,9 @@
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
-> {'new' | 'existing', rabbit_types:amqqueue()} |
rabbit_types:channel_exit()).
+-spec(internal_declare/2 ::
+ (rabbit_types:amqqueue(), boolean())
+ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found')).
@@ -132,9 +135,6 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
--spec(internal_declare/2 ::
- (rabbit_types:amqqueue(), boolean())
- -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -147,6 +147,7 @@
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
+-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
-endif.
@@ -320,7 +321,7 @@ check_declare_arguments(QueueName, Args) ->
ok -> ok;
{error, Error} -> rabbit_misc:protocol_error(
precondition_failed,
- "invalid arg '~s' for ~s: ~w",
+ "invalid arg '~s' for ~s: ~255p",
[Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <-
[{<<"x-expires">>, fun check_integer_argument/2},
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e5038efe..c28cd5bf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -29,12 +29,12 @@
-export([start_link/1, info_keys/0]).
+-export([init_with_backing_queue_state/7]).
+
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
--export([init_with_backing_queue_state/7]).
-
%% Queue's state
-record(q, {q,
exclusive_consumer,
@@ -56,14 +56,29 @@
-record(consumer, {tag, ack_required}).
%% These are held in our process dictionary
--record(cr, {consumer_count,
- ch_pid,
- limiter,
+-record(cr, {ch_pid,
monitor_ref,
acktags,
+ consumer_count,
+ limiter,
is_limit_active,
unsent_message_count}).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 ::
+ (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(init_with_backing_queue_state/7 ::
+ (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
+ [rabbit_types:delivery()], dict()) -> #q{}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
-define(STATISTICS_KEYS,
[pid,
exclusive_consumer_pid,
@@ -321,10 +336,10 @@ ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
undefined -> MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumer_count = 0,
- ch_pid = ChPid,
+ C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
acktags = sets:new(),
+ consumer_count = 0,
is_limit_active = false,
limiter = rabbit_limiter:make_token(),
unsent_message_count = 0},
@@ -333,18 +348,18 @@ ch_record(ChPid) ->
C = #cr{} -> C
end.
-store_ch_record(C = #cr{ch_pid = ChPid}) ->
- put({ch, ChPid}, C).
-
-maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
- acktags = ChAckTags,
- unsent_message_count = UnsentMessageCount}) ->
+update_ch_record(C = #cr{consumer_count = ConsumerCount,
+ acktags = ChAckTags,
+ unsent_message_count = UnsentMessageCount}) ->
case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {0, 0, 0} -> ok = erase_ch_record(C),
- false;
- _ -> store_ch_record(C),
- true
- end.
+ {0, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
+ end,
+ C.
+
+store_ch_record(C = #cr{ch_pid = ChPid}) ->
+ put({ch, ChPid}, C),
+ ok.
erase_ch_record(#cr{ch_pid = ChPid,
limiter = Limiter,
@@ -354,6 +369,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
+update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
+ ok = rabbit_limiter:register(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 1});
+update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
+ ok = rabbit_limiter:unregister(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 0,
+ limiter = rabbit_limiter:make_token()});
+update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
+ update_ch_record(C#cr{consumer_count = Count + Delta}).
+
all_ch_record() -> [C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
@@ -391,9 +416,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
true -> sets:add_element(AckTag, ChAckTags);
false -> ChAckTags
end,
- NewC = C#cr{unsent_message_count = Count + 1,
- acktags = ChAckTags1},
- true = maybe_store_ch_record(NewC),
+ NewC = update_ch_record(
+ C#cr{unsent_message_count = Count + 1,
+ acktags = ChAckTags1}),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
@@ -411,7 +436,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
- true = maybe_store_ch_record(C#cr{is_limit_active = true}),
+ update_ch_record(C#cr{is_limit_active = true}),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
@@ -545,11 +570,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_record_confirm_message(Confirm, State1),
case Delivered of
true -> State2;
- false -> BQS1 =
- BQ:publish(Message,
- (message_properties(State)) #message_properties{
- needs_confirming = needs_confirming(Confirm)},
- ChPid, BQS),
+ false -> Props = (message_properties(State)) #message_properties{
+ needs_confirming = needs_confirming(Confirm)},
+ BQS1 = BQ:publish(Message, Props, ChPid, BQS),
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
@@ -569,8 +592,8 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS,
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
remove_consumer(ChPid, ConsumerTag, Queue) ->
- queue:filter(fun ({CP, #consumer{tag = CT}}) ->
- (CP /= ChPid) or (CT /= ConsumerTag)
+ queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
+ (CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
remove_consumers(ChPid, Queue) ->
@@ -593,8 +616,7 @@ possibly_unblock(State, ChPid, Update) ->
not_found ->
State;
C ->
- NewC = Update(C),
- maybe_store_ch_record(NewC),
+ NewC = update_ch_record(Update(C)),
case ch_record_state_transition(C, NewC) of
ok -> State;
unblock -> {NewBlockedConsumers, NewActiveConsumers} =
@@ -633,11 +655,6 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
end
end.
-cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
- none;
-cancel_holder(_ChPid, _ConsumerTag, Holder) ->
- Holder.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -663,8 +680,15 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
-subtract_acks(A, B) when is_list(B) ->
- lists:foldl(fun sets:del_element/2, A, B).
+subtract_acks(ChPid, AckTags, State, Fun) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ State;
+ C = #cr{acktags = ChAckTags} ->
+ update_ch_record(C#cr{acktags = lists:foldl(fun sets:del_element/2,
+ ChAckTags, AckTags)}),
+ Fun(State)
+ end.
discard_delivery(#delivery{sender = ChPid,
message = Message},
@@ -931,10 +955,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State3 =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- true = maybe_store_ch_record(
- C#cr{acktags =
- sets:add_element(AckTag,
- ChAckTags)}),
+ ChAckTags1 = sets:add_element(AckTag, ChAckTags),
+ update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
false -> State2
end,
@@ -950,16 +972,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ C = ch_record(ChPid),
+ C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- true = maybe_store_ch_record(
- C#cr{consumer_count = ConsumerCount +1,
- limiter = Limiter}),
- ok = case ConsumerCount of
- 0 -> rabbit_limiter:register(Limiter, self());
- _ -> ok
- end,
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> ExistingHolder
end,
@@ -967,7 +983,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
State2 =
- case is_ch_blocked(C) of
+ case is_ch_blocked(C1) of
true -> State1#q{
blocked_consumers =
add_consumer(ChPid, Consumer,
@@ -985,34 +1001,27 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State = #q{exclusive_consumer = Holder}) ->
+ ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
not_found ->
- ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumer_count = ConsumerCount,
- limiter = Limiter} ->
- C1 = C#cr{consumer_count = ConsumerCount -1},
- maybe_store_ch_record(
- case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(Limiter, self()),
- C1#cr{limiter = rabbit_limiter:make_token()};
- _ -> C1
- end),
+ C ->
+ update_consumer_count(C, -1),
emit_consumer_deleted(ChPid, ConsumerTag),
- ok = maybe_send_reply(ChPid, OkMsg),
- NewState =
- State#q{exclusive_consumer = cancel_holder(ChPid,
- ConsumerTag,
- Holder),
- active_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.active_consumers),
- blocked_consumers = remove_consumer(
+ State1 = State#q{
+ exclusive_consumer = case Holder of
+ {ChPid, ConsumerTag} -> none;
+ _ -> Holder
+ end,
+ active_consumers = remove_consumer(
+ ChPid, ConsumerTag,
+ State#q.active_consumers),
+ blocked_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.blocked_consumers)},
- case should_auto_delete(NewState) of
- false -> reply(ok, ensure_expiry_timer(NewState));
- true -> {stop, normal, ok, NewState}
+ case should_auto_delete(State1) of
+ false -> reply(ok, ensure_expiry_timer(State1));
+ true -> {stop, normal, ok, State1}
end
end;
@@ -1042,14 +1051,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(requeue_and_run(AckTags, State))
- end.
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1) -> requeue_and_run(AckTags, State1) end)).
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -1058,33 +1062,26 @@ handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
noreply(deliver_or_enqueue(Delivery, State));
-handle_cast({ack, AckTags, ChPid},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- maybe_store_ch_record(C#cr{acktags = subtract_acks(
- ChAckTags, AckTags)}),
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- noreply(State#q{backing_queue_state = BQS1})
- end;
-
-handle_cast({reject, AckTags, Requeue, ChPid},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(case Requeue of
- true -> requeue_and_run(AckTags, State);
- false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State#q{backing_queue_state = BQS1}
- end)
- end;
+handle_cast({ack, AckTags, ChPid}, State) ->
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end));
+
+handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case Requeue of
+ true -> requeue_and_run(AckTags, State1);
+ false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end
+ end));
handle_cast(delete_immediately, State) ->
{stop, normal, State};
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 2c28adce..7b3ebcf2 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -26,6 +26,20 @@
-define(SERVER, ?MODULE).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_child/2 ::
+ (node(), [any()]) -> rabbit_types:ok(pid() | undefined) |
+ rabbit_types:ok({pid(), any()}) |
+ rabbit_types:error(any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link() ->
supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 6a018bd1..156d98dc 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -110,17 +110,13 @@ internal_check_user_login(Username, Fun) ->
Refused
end.
-check_vhost_access(#user{username = Username}, VHost) ->
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHost}}) of
- [] -> false;
- [_R] -> true
- end
- end).
+check_vhost_access(#user{username = Username}, VHostPath) ->
+ case mnesia:dirty_read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] -> false;
+ [_R] -> true
+ end.
check_resource_access(#user{username = Username},
#resource{virtual_host = VHostPath, name = Name},
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9cc406e7..b266d366 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,8 +18,8 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/3, message/4, properties/1, delivery/4]).
--export([publish/4, publish/6]).
+-export([publish/4, publish/6, publish/1,
+ message/3, message/4, properties/1, delivery/4]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -35,6 +35,12 @@
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
-type(body_input() :: (binary() | [binary()])).
+-spec(publish/4 ::
+ (exchange_input(), rabbit_router:routing_key(), properties_input(),
+ body_input()) -> publish_result()).
+-spec(publish/6 ::
+ (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
+ properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
-spec(delivery/4 ::
@@ -49,12 +55,6 @@
rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
--spec(publish/4 ::
- (exchange_input(), rabbit_router:routing_key(), properties_input(),
- body_input()) -> publish_result()).
--spec(publish/6 ::
- (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
- properties_input(), body_input()) -> publish_result()).
-spec(build_content/2 :: (rabbit_framing:amqp_property_record(),
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
@@ -64,13 +64,34 @@
%%----------------------------------------------------------------------------
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(Exchange, RoutingKeyBin, Properties, Body) ->
+ publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
+ publish(X, delivery(Mandatory, Immediate,
+ message(XName, RKey, properties(Props), Body),
+ undefined));
+publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
+ publish(delivery(Mandatory, Immediate,
+ message(XName, RKey, properties(Props), Body),
+ undefined)).
+
publish(Delivery = #delivery{
- message = #basic_message{exchange_name = ExchangeName}}) ->
- case rabbit_exchange:lookup(ExchangeName) of
+ message = #basic_message{exchange_name = XName}}) ->
+ case rabbit_exchange:lookup(XName) of
{ok, X} -> publish(X, Delivery);
- Other -> Other
+ Err -> Err
end.
+publish(X, Delivery) ->
+ {RoutingRes, DeliveredQPids} =
+ rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
+ {ok, RoutingRes, DeliveredQPids}.
+
delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
@@ -113,11 +134,10 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
headers = Headers0}})
end.
-message(ExchangeName, RoutingKey,
- #content{properties = Props} = DecodedContent) ->
+message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
try
{ok, #basic_message{
- exchange_name = ExchangeName,
+ exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
id = rabbit_guid:guid(),
is_persistent = is_message_persistent(DecodedContent),
@@ -127,10 +147,10 @@ message(ExchangeName, RoutingKey,
{error, _Reason} = Error -> Error
end.
-message(ExchangeName, RoutingKey, RawProperties, Body) ->
+message(XName, RoutingKey, RawProperties, Body) ->
Properties = properties(RawProperties),
Content = build_content(Properties, Body),
- {ok, Msg} = message(ExchangeName, RoutingKey, Content),
+ {ok, Msg} = message(XName, RoutingKey, Content),
Msg.
properties(P = #'P_basic'{}) ->
@@ -152,28 +172,6 @@ indexof([], _Element, _N) -> 0;
indexof([Element | _Rest], Element, N) -> N;
indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
-publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
-
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
-publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
- publish(X, delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined));
-publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
- case rabbit_exchange:lookup(XName) of
- {ok, X} -> publish(X, RKey, Mandatory, Immediate, Props, Body);
- Err -> Err
- end.
-
-publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
- {ok, RoutingRes, DeliveredQPids}.
-
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
case Mode of
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 205d5bba..43c26941 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -108,21 +108,34 @@ recover(XNames, QNames) ->
SelectSet = fun (#resource{kind = exchange}) -> XNameSet;
(#resource{kind = queue}) -> QNameSet
end,
- [recover_semi_durable_route(R, SelectSet(Dst)) ||
+ {ok, Gatherer} = gatherer:start_link(),
+ [recover_semi_durable_route(Gatherer, R, SelectSet(Dst)) ||
R = #route{binding = #binding{destination = Dst}} <-
rabbit_misc:dirty_read_all(rabbit_semi_durable_route)],
+ empty = gatherer:out(Gatherer),
+ ok = gatherer:stop(Gatherer),
ok.
-recover_semi_durable_route(R = #route{binding = B}, ToRecover) ->
+recover_semi_durable_route(Gatherer, R = #route{binding = B}, ToRecover) ->
#binding{source = Src, destination = Dst} = B,
- {ok, X} = rabbit_exchange:lookup(Src),
+ case sets:is_element(Dst, ToRecover) of
+ true -> {ok, X} = rabbit_exchange:lookup(Src),
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () ->
+ recover_semi_durable_route_txn(R, X),
+ gatherer:finish(Gatherer)
+ end);
+ false -> ok
+ end.
+
+recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- Rs = mnesia:match_object(rabbit_semi_durable_route, R, read),
- case Rs =/= [] andalso sets:is_element(Dst, ToRecover) of
- false -> no_recover;
- true -> ok = sync_transient_route(R, fun mnesia:write/3),
- rabbit_exchange:serial(X)
+ case mnesia:match_object(rabbit_semi_durable_route, R, read) of
+ [] -> no_recover;
+ _ -> ok = sync_transient_route(R, fun mnesia:write/3),
+ rabbit_exchange:serial(X)
end
end,
fun (no_recover, _) -> ok;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dfe84644..d2f55277 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,8 +35,8 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter, tx_status, next_tag,
unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
- user, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
+ user, virtual_host, most_recently_declared_queue, queue_monitors,
+ consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state}).
@@ -189,9 +189,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_monitors = dict:new(),
consumer_mapping = dict:new(),
- blocking = dict:new(),
- consumer_monitors = dict:new(),
+ blocking = sets:new(),
+ queue_consumers = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
@@ -275,7 +276,7 @@ handle_cast(terminate, State) ->
handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
- noreply(monitor_consumer(ConsumerTag, State));
+ noreply(consumer_monitor(ConsumerTag, State));
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
@@ -299,13 +300,13 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State),
- maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State1),
+ State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
+ noreply(State3#ch{next_tag = DeliveryTag + 1});
handle_cast(force_event_refresh, State) ->
@@ -323,15 +324,13 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
noreply([ensure_stats_timer],
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
-handle_info({'DOWN', MRef, process, QPid, Reason},
- State = #ch{consumer_monitors = ConsumerMonitors}) ->
- noreply(
- case dict:find(MRef, ConsumerMonitors) of
- error ->
- handle_publishing_queue_down(QPid, Reason, State);
- {ok, ConsumerTag} ->
- handle_consuming_queue_down(MRef, ConsumerTag, State)
- end);
+handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
+ State1 = handle_publishing_queue_down(QPid, Reason, State),
+ State2 = queue_blocked(QPid, State1),
+ State3 = handle_consuming_queue_down(QPid, State2),
+ erase_queue_stats(QPid),
+ noreply(State3#ch{queue_monitors =
+ dict:erase(QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -516,17 +515,16 @@ check_name(_Kind, NameBin) ->
NameBin.
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
- case dict:find(QPid, Blocking) of
- error -> State;
- {ok, MRef} -> true = erlang:demonitor(MRef),
- Blocking1 = dict:erase(QPid, Blocking),
- ok = case dict:size(Blocking1) of
- 0 -> rabbit_writer:send_command(
- State#ch.writer_pid,
- #'channel.flow_ok'{active = false});
- _ -> ok
- end,
- State#ch{blocking = Blocking1}
+ case sets:is_element(QPid, Blocking) of
+ false -> State;
+ true -> Blocking1 = sets:del_element(QPid, Blocking),
+ ok = case sets:size(Blocking1) of
+ 0 -> rabbit_writer:send_command(
+ State#ch.writer_pid,
+ #'channel.flow_ok'{active = false});
+ _ -> ok
+ end,
+ demonitor_queue(QPid, State#ch{blocking = Blocking1})
end.
record_confirm(undefined, _, State) ->
@@ -545,38 +543,41 @@ confirm(MsgSeqNos, QPid, State) ->
{MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
record_confirms(MXs, State1).
-process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}) ->
- {MXs, UMQ1, UQM1} =
- lists:foldl(
- fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack);
- none -> Acc
- end
- end, {[], UMQ, UQM}, MsgSeqNos),
- {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) ->
- UQM1 = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> gb_trees:delete(QPid, UQM);
- false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
- end;
- none ->
- UQM
- end,
+process_confirms(MsgSeqNos, QPid, Nack, State) ->
+ lists:foldl(
+ fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UMQ0) of
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
+ Acc, Nack);
+ none -> Acc
+ end
+ end, {[], State}, MsgSeqNos).
+
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
+ {MXs, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}},
+ Nack) ->
+ State1 = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> UQM1 = gb_trees:delete(QPid, UQM),
+ demonitor_queue(
+ QPid, State#ch{unconfirmed_qm = UQM1});
+ false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
+ State#ch{unconfirmed_qm = UQM1}
+ end;
+ none ->
+ State
+ end,
Qs1 = gb_sets:del_element(QPid, Qs),
%% If QPid somehow died initiating a nack, clear the message from
%% internal data-structures. Also, cleanup empty entries.
case (Nack orelse gb_sets:is_empty(Qs1)) of
- true ->
- {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
- false ->
- {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
+ true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ),
+ {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}};
+ false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ),
+ {MXs, State1#ch{unconfirmed_mq = UMQ1}}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -693,11 +694,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
State),
- maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State),
- maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State1),
+ State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
@@ -707,7 +708,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State1#ch{next_tag = DeliveryTag + 1}};
+ {noreply, State3#ch{next_tag = DeliveryTag + 1}};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -746,12 +747,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end) of
{ok, Q} ->
State1 = State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- {Q, undefined},
+ dict:store(ActualConsumerTag, Q,
ConsumerMapping)},
{noreply,
case NoWait of
- true -> monitor_consumer(ActualConsumerTag, State1);
+ true -> consumer_monitor(ActualConsumerTag, State1);
false -> State1
end};
{{error, exclusive_consume_unavailable}, _Q} ->
@@ -768,22 +768,26 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
_, State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors}) ->
+ queue_consumers = QCons}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, {Q, MRef}} ->
- ConsumerMonitors1 =
- case MRef of
- undefined -> ConsumerMonitors;
- _ -> true = erlang:demonitor(MRef),
- dict:erase(MRef, ConsumerMonitors)
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
+ QCons1 =
+ case dict:find(QPid, QCons) of
+ error -> QCons;
+ {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
+ case gb_sets:is_empty(CTags1) of
+ true -> dict:erase(QPid, QCons);
+ false -> dict:store(QPid, CTags1, QCons)
+ end
end,
- NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
- ConsumerMapping),
- consumer_monitors = ConsumerMonitors1},
+ NewState = demonitor_queue(
+ Q, State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1}),
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -1108,10 +1112,12 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
- QPid <- QPids],
+ QPids -> State2 = lists:foldl(fun monitor_queue/2,
+ State1#ch{blocking =
+ sets:from_list(QPids)},
+ QPids),
ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State1#ch{blocking = dict:from_list(Queues)}}
+ {noreply, State2}
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1120,23 +1126,51 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
-monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- capabilities = Capabilities}) ->
+consumer_monitor(ConsumerTag,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ queue_consumers = QCons,
+ capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
- {#amqqueue{pid = QPid} = Q, undefined} =
- dict:fetch(ConsumerTag, ConsumerMapping),
- MRef = erlang:monitor(process, QPid),
- State#ch{consumer_mapping =
- dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
- consumer_monitors =
- dict:store(MRef, ConsumerTag, ConsumerMonitors)};
+ #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
+ QCons1 = dict:update(QPid,
+ fun (CTags) ->
+ gb_sets:insert(ConsumerTag, CTags)
+ end,
+ gb_sets:singleton(ConsumerTag),
+ QCons),
+ monitor_queue(QPid, State#ch{queue_consumers = QCons1});
_ ->
State
end.
+monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
+ case (not dict:is_key(QPid, QMons) andalso
+ queue_monitor_needed(QPid, State)) of
+ true -> MRef = erlang:monitor(process, QPid),
+ State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
+ false -> State
+ end.
+
+demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
+ case (dict:is_key(QPid, QMons) andalso
+ not queue_monitor_needed(QPid, State)) of
+ true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
+ State#ch{queue_monitors = dict:erase(QPid, QMons)};
+ false -> State
+ end.
+
+queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer,
+ queue_consumers = QCons,
+ blocking = Blocking,
+ unconfirmed_qm = UQM}) ->
+ StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
+ ConsumerMonitored = dict:is_key(QPid, QCons),
+ QueueBlocked = sets:is_element(QPid, Blocking),
+ ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
+ StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
+
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1157,21 +1191,25 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
{true, fun send_nacks/2}
end,
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
- erase_queue_stats(QPid),
- State3 = SendFun(MXs, State2),
- queue_blocked(QPid, State3).
-
-handle_consuming_queue_down(MRef, ConsumerTag,
- State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- writer_pid = WriterPid}) ->
- ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
- ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors),
- Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
- nowait = true},
- ok = rabbit_writer:send_command(WriterPid, Cancel),
+ SendFun(MXs, State2).
+
+handle_consuming_queue_down(QPid,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ queue_consumers = QCons,
+ writer_pid = WriterPid}) ->
+ ConsumerTags = case dict:find(QPid, QCons) of
+ error -> gb_sets:new();
+ {ok, CTags} -> CTags
+ end,
+ ConsumerMapping1 =
+ gb_sets:fold(fun (CTag, CMap) ->
+ Cancel = #'basic.cancel'{consumer_tag = CTag,
+ nowait = true},
+ ok = rabbit_writer:send_command(WriterPid, Cancel),
+ dict:erase(CTag, CMap)
+ end, ConsumerMapping, ConsumerTags),
State#ch{consumer_mapping = ConsumerMapping1,
- consumer_monitors = ConsumerMonitors1}.
+ queue_consumers = dict:erase(QPid, QCons)}.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
@@ -1271,9 +1309,8 @@ ack(Acked, State) ->
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
[{QPid, length(MsgIds)} | L]
end, [], Acked),
- maybe_incr_stats(QIncs, ack, State),
ok = notify_limiter(State#ch.limiter, Acked),
- State.
+ maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
uncommitted_ack_q = queue:new()}.
@@ -1307,8 +1344,7 @@ limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
consumer_queues(Consumers) ->
lists:usort([QPid ||
- {_Key, {#amqqueue{pid = QPid}, _MRef}}
- <- dict:to_list(Consumers)]).
+ {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
@@ -1334,38 +1370,37 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
XName, MsgSeqNo, Message, State),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1),
- State1.
+ QPid <- DeliveredQPids]], publish, State1).
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
- record_confirm(MsgSeqNo, XName, State);
+ record_confirm(MsgSeqNo, XName,
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State));
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- maybe_incr_stats([{XName, 1}], return_not_delivered, State),
- record_confirm(MsgSeqNo, XName, State);
+ record_confirm(MsgSeqNo, XName,
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State));
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
+ #ch{unconfirmed_mq = UMQ} = State,
UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
SingletonSet = gb_sets:singleton(MsgSeqNo),
- UQM1 = lists:foldl(
- fun (QPid, UQM2) ->
- maybe_monitor(QPid),
- case gb_trees:lookup(QPid, UQM2) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- gb_trees:update(QPid, MsgSeqNos1, UQM2);
- none ->
- gb_trees:insert(QPid, SingletonSet, UQM2)
- end
- end, UQM, QPids),
- State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
+ lists:foldl(
+ fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) ->
+ case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
+ UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
+ State0#ch{unconfirmed_qm = UQM1};
+ none ->
+ UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
+ monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
+ end
+ end, State#ch{unconfirmed_mq = UMQ1}, QPids).
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1385,11 +1420,13 @@ send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx_status = failed}).
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- C1 = lists:append(C),
- MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
- MsgSeqNo
- end || {MsgSeqNo, ExchangeName} <- C1 ],
- send_confirms(MsgSeqNos, State #ch{confirmed = []});
+ {MsgSeqNos, State1} =
+ lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
+ {[MsgSeqNo | MSNs],
+ maybe_incr_stats([{ExchangeName, 1}], confirm,
+ State0)}
+ end, {[], State}, lists:append(C)),
+ send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1469,30 +1506,26 @@ i(Item, _) ->
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, _) ->
- ok.
+maybe_incr_redeliver_stats(_, _, State) ->
+ State.
-maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
+maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) ->
case rabbit_event:stats_level(StatsTimer) of
- fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
- _ -> ok
+ fine -> lists:foldl(fun ({QX, Inc}, State0) ->
+ incr_stats(QX, Inc, Measure, State0)
+ end, State, QXIncs);
+ _ -> State
end.
-incr_stats({QPid, _} = QX, Inc, Measure) ->
- maybe_monitor(QPid),
- update_measures(queue_exchange_stats, QX, Inc, Measure);
-incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
- maybe_monitor(QPid),
- update_measures(queue_stats, QPid, Inc, Measure);
-incr_stats(X, Inc, Measure) ->
- update_measures(exchange_stats, X, Inc, Measure).
-
-maybe_monitor(QPid) ->
- case get({monitoring, QPid}) of
- undefined -> erlang:monitor(process, QPid),
- put({monitoring, QPid}, true);
- _ -> ok
- end.
+incr_stats({QPid, _} = QX, Inc, Measure, State) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure),
+ monitor_queue(QPid, State);
+incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure),
+ monitor_queue(QPid, State);
+incr_stats(X, Inc, Measure, State) ->
+ update_measures(exchange_stats, X, Inc, Measure),
+ State.
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
@@ -1528,7 +1561,6 @@ emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
end.
erase_queue_stats(QPid) ->
- erase({monitoring, QPid}),
erase({queue_stats, QPid}),
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index 15e92542..dfb400e3 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -28,8 +28,7 @@
-ifdef(use_specs).
--spec(start_link/1 :: (mfa()) ->
- rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()).
-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
rabbit_types:ok_pid_or_error()).
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
index 07036ce8..a0953eab 100644
--- a/src/rabbit_command_assembler.erl
+++ b/src/rabbit_command_assembler.erl
@@ -22,8 +22,12 @@
%%----------------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+
-ifdef(use_specs).
+-export_type([frame/0]).
+
-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY |
?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY |
?FRAME_TRACE | ?FRAME_HEARTBEAT).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index b9e550c9..1163ae9d 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -17,7 +17,7 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, diagnostics/1, log_action/3]).
+-export([start/0, stop/0, action/5, diagnostics/1]).
-define(RPC_TIMEOUT, infinity).
@@ -50,7 +50,6 @@
-> 'ok').
-spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]).
-spec(usage/0 :: () -> no_return()).
--spec(log_action/3 :: (node(), string(), [term()]) -> ok).
-endif.
@@ -73,7 +72,6 @@ start() ->
Command = list_to_atom(Command0),
Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
Node = proplists:get_value(?NODE_OPT, Opts1),
- rpc_call(Node, rabbit_control, log_action, [node(), Command0, Args]),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
@@ -362,7 +360,7 @@ wait_for_application(Node, PidFile, Inform) ->
wait_for_application(Node, Pid) ->
case process_up(Pid) of
- true -> case node_up(Node) of
+ true -> case rabbit:is_running(Node) of
true -> ok;
false -> timer:sleep(1000),
wait_for_application(Node, Pid)
@@ -378,12 +376,6 @@ wait_and_read_pid_file(PidFile) ->
{error, _} = E -> exit({error, {could_not_read_pid, E}})
end.
-node_up(Node) ->
- case rpc_call(Node, application, which_applications, [infinity]) of
- {badrpc, _} -> false;
- Apps -> proplists:is_defined(rabbit, Apps)
- end.
-
% Test using some OS clunkiness since we shouldn't trust
% rpc:call(os, getpid, []) at this point
process_up(Pid) ->
@@ -521,22 +513,3 @@ quit(Status) ->
{unix, _} -> halt(Status);
{win32, _} -> init:stop(Status)
end.
-
-log_action(Node, Command, Args) ->
- rabbit_misc:with_local_io(
- fun () ->
- error_logger:info_msg("~p executing~n rabbitmqctl ~s ~s~n",
- [Node, Command,
- format_args(mask_args(Command, Args))])
- end).
-
-%% Mask passwords and other sensitive info before logging.
-mask_args("add_user", [Name, _Password | Args]) ->
- [Name, "****" | Args];
-mask_args("change_password", [Name, _Password | Args]) ->
- [Name, "****" | Args];
-mask_args(_, Args) ->
- Args.
-
-format_args(Args) ->
- string:join([io_lib:format("~p", [A]) || A <- Args], " ").
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 68afaf5d..6f9a4650 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -72,7 +72,7 @@ list() ->
%%----------------------------------------------------------------------------
connect(Username, VHost, Protocol, Pid, Infos) ->
- case lists:keymember(rabbit, 1, application:which_applications()) of
+ case rabbit:is_running() of
true ->
case rabbit_access_control:check_user_login(Username, []) of
{ok, User} ->
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 93aad9e3..6e29ace7 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -27,6 +27,16 @@
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2,
handle_info/2]).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(boot/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
boot() ->
{ok, DefaultVHost} = application:get_env(default_vhost),
ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 8207d6bc..558e0957 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -42,6 +42,8 @@
-spec(error/1 :: (string()) -> 'ok').
-spec(error/2 :: (string(), [any()]) -> 'ok').
+-spec(message/4 :: (_,_,_,_) -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index ad5fd28f..5fc6341f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -59,6 +59,10 @@
known_senders :: set()
}).
+-type(ack() :: non_neg_integer()).
+-type(state() :: master_state()).
+-include("rabbit_backing_queue_spec.hrl").
+
-spec(promote_backing_queue_state/6 ::
(pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index cf8e9484..baebc52b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -22,6 +22,26 @@
-include("rabbit.hrl").
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(remove_from_queue/2 ::
+ (rabbit_amqqueue:name(), [pid()])
+ -> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
+-spec(on_node_up/0 :: () -> 'ok').
+-spec(drop_mirror/2 ::
+ (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
+-spec(add_mirror/2 ::
+ (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
+-spec(add_mirror/3 ::
+ (rabbit_types:vhost(), binary(), atom())
+ -> rabbit_types:ok_or_error(any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
%% If the dead pids include the queue pid (i.e. the master has died)
%% then only remove that if we are about to be promoted. Otherwise we
%% can have the situation where a slave updates the mnesia record for
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 3c453981..43962491 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -45,8 +45,19 @@
-behaviour(gm).
-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
-include("gm_specs.hrl").
+-ifdef(use_specs).
+%% Shut dialyzer up
+-spec(promote_me/2 :: (_, _) -> no_return()).
+-endif.
+
+%%----------------------------------------------------------------------------
+
+
-define(CREATION_EVENT_KEYS,
[pid,
name,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ae28722a..0b39a209 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -749,7 +749,7 @@ recursive_delete(Files) ->
end, ok, Files).
recursive_delete1(Path) ->
- case filelib:is_dir(Path) of
+ case filelib:is_dir(Path) and not(is_symlink(Path)) of
false -> case file:delete(Path) of
ok -> ok;
{error, enoent} -> ok; %% Path doesn't exist anyway
@@ -777,6 +777,12 @@ recursive_delete1(Path) ->
end
end.
+is_symlink(Name) ->
+ case file:read_link(Name) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
recursive_copy(Src, Dest) ->
case filelib:is_dir(Src) of
false -> case file:copy(Src, Dest) of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c63c67f4..665b15c5 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -70,6 +70,8 @@
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
+-spec(table_names/0 :: () -> [atom()]).
+
-endif.
%%----------------------------------------------------------------------------
@@ -122,8 +124,10 @@ cluster(ClusterNodes, Force) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
- case not Force andalso is_only_disc_node(node(), false) andalso
- not should_be_disc_node(ClusterNodes) of
+ case not Force andalso is_clustered() andalso
+ is_only_disc_node(node(), false) andalso
+ not should_be_disc_node(ClusterNodes)
+ of
true -> log_both("last running disc node leaving cluster");
_ -> ok
end,
@@ -715,7 +719,9 @@ wait_for_tables(TableNames) ->
reset(Force) ->
ensure_mnesia_not_running(),
- case not Force andalso is_only_disc_node(node(), false) of
+ case not Force andalso is_clustered() andalso
+ is_only_disc_node(node(), false)
+ of
true -> log_both("no other disc nodes running");
false -> ok
end,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 17d5f64b..cc12eb5d 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -146,6 +146,8 @@
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
+-spec(close_all_indicated/1 ::
+ (client_msstate()) -> rabbit_types:ok(client_msstate())).
-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index b2abcba6..31f476fc 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -78,6 +78,33 @@
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(check_tcp_listener_address/2 :: (atom(), listener_config())
-> [{inet:ip_address(), ip_port(), family(), atom()}]).
+-spec(ensure_ssl/0 :: () -> rabbit_types:infos()).
+-spec(ssl_transform_fun/1 ::
+ (rabbit_types:infos())
+ -> fun ((rabbit_net:socket())
+ -> rabbit_types:ok_or_error(#ssl_socket{}))).
+
+-spec(boot/0 :: () -> 'ok').
+-spec(start_client/1 ::
+ (port() | #ssl_socket{ssl::{'sslsocket',_,_}}) ->
+ atom() | pid() | port() | {atom(),atom()}).
+-spec(start_ssl_client/2 ::
+ (_,port() | #ssl_socket{ssl::{'sslsocket',_,_}}) ->
+ atom() | pid() | port() | {atom(),atom()}).
+-spec(tcp_listener_started/3 ::
+ (_,
+ string() |
+ {byte(),byte(),byte(),byte()} |
+ {char(),char(),char(),char(),char(),char(),char(),char()},
+ _) ->
+ 'ok').
+-spec(tcp_listener_stopped/3 ::
+ (_,
+ string() |
+ {byte(),byte(),byte(),byte()} |
+ {char(),char(),char(),char(),char(),char(),char(),char()},
+ _) ->
+ 'ok').
-endif.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index cb4f826d..8aa24ab5 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -31,6 +31,7 @@
-ifdef(use_specs).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(rabbit_running_on/1 :: (node()) -> 'ok').
-spec(notify_cluster/0 :: () -> 'ok').
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 92829e49..9fe073d9 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -29,6 +29,9 @@
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
+%% Shut dialyzer up
+-spec(terminate/1 :: (string()) -> no_return()).
+-spec(terminate/2 :: (string(), [any()]) -> no_return()).
-endif.
@@ -136,38 +139,10 @@ determine_version(App) ->
{App, Vsn}.
delete_recursively(Fn) ->
- case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
- true ->
- case file:list_dir(Fn) of
- {ok, Files} ->
- case lists:foldl(fun ( Fn1, ok) -> delete_recursively(
- Fn ++ "/" ++ Fn1);
- (_Fn1, Err) -> Err
- end, ok, Files) of
- ok -> case file:del_dir(Fn) of
- ok -> ok;
- {error, E} -> {error,
- {cannot_delete, Fn, E}}
- end;
- Err -> Err
- end;
- {error, E} ->
- {error, {cannot_list_files, Fn, E}}
- end;
- false ->
- case filelib:is_file(Fn) of
- true -> case file:delete(Fn) of
- ok -> ok;
- {error, E} -> {error, {cannot_delete, Fn, E}}
- end;
- false -> ok
- end
- end.
-
-is_symlink(Name) ->
- case file:read_link(Name) of
- {ok, _} -> true;
- _ -> false
+ case rabbit_misc:recursive_delete([Fn]) of
+ ok -> ok;
+ {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
+ Error -> Error
end.
unpack_ez_plugins(SrcDir, DestDir) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7eec2a2e..b4871cef 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -85,6 +85,15 @@
rabbit_types:ok_or_error2(
rabbit_net:socket(), any()))) -> no_return()).
+-spec(mainloop/2 :: (_,#v1{}) -> any()).
+-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
+-spec(system_continue/3 :: (_,_,#v1{}) -> any()).
+-spec(system_terminate/4 :: (_,_,_,_) -> none()).
+
+-spec(process_channel_frame/5 ::
+ (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(),
+ tuple()) -> tuple()).
+
-endif.
%%--------------------------------------------------------------------------
@@ -493,20 +502,7 @@ handle_frame(Type, Channel, Payload,
AnalyzedFrame, self(),
Channel, ChPid, FramingState),
put({channel, Channel}, {ChPid, NewAState}),
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- channel_cleanup(ChPid),
- State;
- {method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking
- andalso
- Protocol:method_has_content(MethodName)) of
- true -> State#v1{connection_state = blocked};
- false -> State
- end;
- _ ->
- State
- end;
+ post_process_frame(AnalyzedFrame, ChPid, State);
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -518,6 +514,23 @@ handle_frame(Type, Channel, Payload,
end
end.
+post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
+ channel_cleanup(ChPid),
+ State;
+post_process_frame({method, MethodName, _}, _ChPid,
+ State = #v1{connection = #connection{
+ protocol = Protocol}}) ->
+ case Protocol:method_has_content(MethodName) of
+ true -> erlang:bump_reductions(2000),
+ case State#v1.connection_state of
+ blocking -> State#v1{connection_state = blocked};
+ _ -> State
+ end;
+ false -> State
+ end;
+post_process_frame(_Frame, _ChPid, State) ->
+ State.
+
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index 0491244b..cda3ccbe 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -24,6 +24,16 @@
-include("rabbit.hrl").
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(Name, {_M, _F, _A} = Fun) ->
supervisor:start_link({local, Name}, ?MODULE, [Fun]).
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 508b127e..802ea5e2 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -27,6 +27,21 @@
-define(SERVER, ?MODULE).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_child/1 :: (atom()) -> 'ok').
+-spec(start_child/3 :: (atom(), atom(), [any()]) -> 'ok').
+-spec(start_restartable_child/1 :: (atom()) -> 'ok').
+-spec(start_restartable_child/2 :: (atom(), [any()]) -> 'ok').
+-spec(stop_child/1 :: (atom()) -> rabbit_types:ok_or_error(any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index cd5d9be0..7e84251f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -757,13 +757,23 @@ test_topic_expect_match(X, List) ->
end, List).
test_app_management() ->
- %% starting, stopping, status
+ control_action(wait, [rabbit_mnesia:dir() ++ ".pid"]),
+ %% Starting, stopping and diagnostics. Note that we don't try
+ %% 'report' when the rabbit app is stopped and that we enable
+ %% tracing for the duration of this function.
+ ok = control_action(trace_on, []),
ok = control_action(stop_app, []),
ok = control_action(stop_app, []),
ok = control_action(status, []),
+ ok = control_action(cluster_status, []),
+ ok = control_action(environment, []),
ok = control_action(start_app, []),
ok = control_action(start_app, []),
ok = control_action(status, []),
+ ok = control_action(report, []),
+ ok = control_action(cluster_status, []),
+ ok = control_action(environment, []),
+ ok = control_action(trace_off, []),
passed.
test_log_management() ->
@@ -1146,6 +1156,7 @@ test_user_management() ->
ok = control_action(add_user, ["foo", "bar"]),
{error, {user_already_exists, _}} =
control_action(add_user, ["foo", "bar"]),
+ ok = control_action(clear_password, ["foo"]),
ok = control_action(change_password, ["foo", "baz"]),
TestTags = fun (Tags) ->
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 9739f6b7..e7a302f8 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -228,13 +228,7 @@ secondary_upgrade(AllNodes) ->
ok.
nodes_running(Nodes) ->
- [N || N <- Nodes, node_running(N)].
-
-node_running(Node) ->
- case rpc:call(Node, application, which_applications, []) of
- {badrpc, _} -> false;
- Apps -> lists:keysearch(rabbit, 1, Apps) =/= false
- end.
+ [N || N <- Nodes, rabbit:is_running(N)].
%% -------------------------------------------------------------------
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index ac3434d2..091b50e4 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -67,6 +67,9 @@
non_neg_integer(), rabbit_types:protocol())
-> 'ok').
+-spec(mainloop/2 :: (_,_) -> 'done').
+-spec(mainloop1/2 :: (_,_) -> any()).
+
-endif.
%%---------------------------------------------------------------------------
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index bf0eacd1..4c835598 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -22,6 +22,14 @@
-export([init/1]).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+-spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(Name, Callback) ->
supervisor:start_link({local,Name}, ?MODULE, Callback).
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index cd646969..ad2a0d02 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -25,6 +25,14 @@
-record(state, {sock, on_startup, on_shutdown, label}).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+-spec(start_link/8 ::
+ (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(),
+ atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+-endif.
+
%%--------------------------------------------------------------------
start_link(IPAddress, Port, SocketOpts,
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 58c2f30c..5bff5c27 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -22,6 +22,21 @@
-export([init/1]).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/7 ::
+ (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
+ mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/8 ::
+ (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
+ mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, Label) ->
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 84c4121c..5feb146f 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -21,6 +21,18 @@
-export([test_supervisor_delayed_restart/0,
init/1, start_child/0]).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(test_supervisor_delayed_restart/0 :: () -> 'passed').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
test_supervisor_delayed_restart() ->
passed = with_sup(simple_one_for_one_terminate,
fun (SupPid) ->
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index fb2fa267..a54bf996 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -57,15 +57,15 @@
-ifdef(use_specs).
--spec(start_link/1 :: (float()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()).
-spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> non_neg_integer()).
--spec(get_memory_limit/0 :: () -> non_neg_integer()).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').
-spec(get_vm_memory_high_watermark/0 :: () -> float()).
-spec(set_vm_memory_high_watermark/1 :: (float()) -> 'ok').
+-spec(get_memory_limit/0 :: () -> non_neg_integer()).
-endif.
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index e4f260cc..456ff39f 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -41,6 +41,7 @@
-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
-spec(submit_async/1 ::
(fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+-spec(idle/1 :: (any()) -> 'ok').
-endif.
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index 28c1adc6..d37c3a0f 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -26,8 +26,8 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
--spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok_pid_or_error()).
-endif.