summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-09 12:30:29 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-09 12:30:29 +0100
commitf0e94a8cefeefd582e732cfe4ff928578eef73d4 (patch)
tree4f145083efc44cd2dfe525fd0563b180f73898cd
parentfa440d982ce2a4ecfdcbd5cf984a594ba9f87836 (diff)
parentbf0d78abd2f6757f054dcb06048f6a7eabade0f4 (diff)
downloadrabbitmq-server-bug23045.tar.gz
merged default into bug23045bug23045
-rw-r--r--.hgignore2
-rw-r--r--Makefile26
-rw-r--r--codegen.py20
-rw-r--r--docs/rabbitmqctl.1.xml27
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit.hrl10
-rw-r--r--src/rabbit.erl11
-rw-r--r--src/rabbit_access_control.erl40
-rw-r--r--src/rabbit_amqqueue.erl33
-rw-r--r--src/rabbit_amqqueue_process.erl76
-rw-r--r--src/rabbit_basic.erl22
-rw-r--r--src/rabbit_binary_generator.erl61
-rw-r--r--src/rabbit_binary_parser.erl13
-rw-r--r--src/rabbit_channel.erl279
-rw-r--r--src/rabbit_control.erl141
-rw-r--r--src/rabbit_event.erl138
-rw-r--r--src/rabbit_exchange.erl53
-rw-r--r--src/rabbit_framing_channel.erl78
-rw-r--r--src/rabbit_heartbeat.erl34
-rw-r--r--src/rabbit_limiter.erl5
-rw-r--r--src/rabbit_misc.erl57
-rw-r--r--src/rabbit_queue_collector.erl12
-rw-r--r--src/rabbit_reader.erl421
-rw-r--r--src/rabbit_router.erl4
-rw-r--r--src/rabbit_tests.erl307
-rw-r--r--src/rabbit_tests_event_receiver.erl66
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_writer.erl100
-rw-r--r--src/vm_memory_monitor.erl2
29 files changed, 1305 insertions, 742 deletions
diff --git a/.hgignore b/.hgignore
index 7b796b66..03b60914 100644
--- a/.hgignore
+++ b/.hgignore
@@ -11,7 +11,7 @@ syntax: regexp
^dist/
^include/rabbit_framing\.hrl$
^include/rabbit_framing_spec\.hrl$
-^src/rabbit_framing\.erl$
+^src/rabbit_framing_amqp.*\.erl$
^src/.*\_usage.erl$
^rabbit\.plt$
^basic.plt$
diff --git a/Makefile b/Makefile
index 5694292d..1f15921b 100644
--- a/Makefile
+++ b/Makefile
@@ -12,7 +12,7 @@ EBIN_DIR=ebin
INCLUDE_DIR=include
DOCS_DIR=docs
INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl
-SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
+SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
WEB_URL=http://www.rabbitmq.com/
@@ -56,7 +56,8 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json $(AMQP_CODEGEN_DIR)/rabbitmq-0.8-extensions.json
+AMQP_SPEC_JSON_FILES_0_9_1=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.9.1.json
+AMQP_SPEC_JSON_FILES_0_8=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
@@ -99,11 +100,14 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/%.beam:
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
- $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@
+$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8)
+ $(PYTHON) codegen.py --ignore-conflicts header $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) $@
-$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
- $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@
+$(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_9_1) $@
+
+$(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_8)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_8) $@
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
@@ -128,7 +132,7 @@ $(BASIC_PLT): $(BEAM_TARGETS)
clean:
rm -f $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel
- rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
+ rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing_amqp_*.erl codegen.pyc
rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL)
rm -f $(RABBIT_PLT)
rm -f $(DEPS_FILE)
@@ -175,6 +179,14 @@ stop-rabbit-on-node: all
force-snapshot: all
echo "rabbit_persister:force_snapshot()." | $(ERL_CALL)
+set-memory-alarm: all
+ echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \
+ $(ERL_CALL)
+
+clear-memory-alarm: all
+ echo "alarm_handler:clear_alarm(vm_memory_high_watermark)." | \
+ $(ERL_CALL)
+
stop-node:
-$(ERL_CALL) -q
diff --git a/codegen.py b/codegen.py
index 420fc870..14229753 100644
--- a/codegen.py
+++ b/codegen.py
@@ -315,11 +315,16 @@ def genErl(spec):
methods = spec.allMethods()
printFileHeader()
- print """-module(rabbit_framing).
--include("rabbit_framing.hrl").
-
+ module = "rabbit_framing_amqp_%d_%d" % (spec.major, spec.minor)
+ if spec.revision != 0:
+ module = "%s_%d" % (module, spec.revision)
+ if module == "rabbit_framing_amqp_8_0":
+ module = "rabbit_framing_amqp_0_8"
+ print "-module(%s)." % module
+ print """-include("rabbit_framing.hrl").
+
+-export([version/0]).
-export([lookup_method_name/1]).
-
-export([method_id/1]).
-export([method_has_content/1]).
-export([is_method_synchronous/1]).
@@ -395,6 +400,7 @@ def genErl(spec):
print """
%% Method signatures
-ifdef(use_specs).
+-spec(version/0 :: () -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()).
-spec(method_id/1 :: (amqp_method_name()) -> amqp_method()).
-spec(method_has_content/1 :: (amqp_method_name()) -> boolean()).
@@ -414,6 +420,10 @@ bitvalue(true) -> 1;
bitvalue(false) -> 0;
bitvalue(undefined) -> 0.
"""
+ version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision)
+ if version == '{8, 0, 0}': version = '{0, 8, 0}'
+ print "version() -> %s." % (version)
+
for m in methods: genLookupMethodName(m)
print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})."
@@ -472,8 +482,6 @@ def genHrl(spec):
methods = spec.allMethods()
printFileHeader()
- print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
- print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
print "-define(PROTOCOL_PORT, %d)." % (spec.port)
for (c,v,cls) in spec.constants:
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index e53a97c2..33552e17 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -88,9 +88,6 @@
</listitem>
</varlistentry>
</variablelist>
- <para>
- Flags must precede all other parameters to <command>rabbitmqctl</command>.
- </para>
</refsect1>
<refsect1>
@@ -271,7 +268,7 @@
<variablelist>
<varlistentry id="cluster">
- <term><cmdsynopsis><command>cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -336,7 +333,7 @@
</listitem>
</varlistentry>
<varlistentry id="force_cluster">
- <term><cmdsynopsis><command>force_cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>force_cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -547,7 +544,7 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>configure</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">-s <replaceable>scope</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -555,11 +552,21 @@
<listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem>
</varlistentry>
<varlistentry>
- <term>username</term>
+ <term>scope</term>
+ <listitem><para>Scope of the permissions: either
+ <command>client</command> (the default) or
+ <command>all</command>. This determines whether
+ permissions are checked for server-generated resource
+ names (<command>all</command>) or only for
+ client-specified resource names
+ (<command>client</command>).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>user</term>
<listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem>
</varlistentry>
<varlistentry>
- <term>configure</term>
+ <term>conf</term>
<listitem><para>A regular expression matching resource names for which the user is granted configure permissions.</para></listitem>
</varlistentry>
<varlistentry>
@@ -888,6 +895,10 @@
<listitem><para>Number of channels using the connection.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>protocol</term>
+ <listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>user</term>
<listitem><para>Username associated with the connection.</para></listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 2cd28abb..48e19ff8 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -27,4 +27,5 @@
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},
- {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}.
+ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
+ {collect_statistics, none}]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 3fd52568..b9abd788 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -30,13 +30,14 @@
%%
-record(user, {username, password}).
--record(permission, {configure, write, read}).
+-record(permission, {scope, configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
-record(vhost, {virtual_host, dummy}).
--record(connection, {user, timeout_sec, frame_max, vhost, client_properties}).
+-record(connection, {protocol, user, timeout_sec, frame_max, vhost,
+ client_properties}).
-record(content,
{class_id,
@@ -44,6 +45,7 @@
properties_bin, %% either 'none', or an encoded properties binary
%% Note: at most one of properties and properties_bin can be
%% 'none' at once.
+ protocol, %% The protocol under which properties_bin was encoded
payload_fragments_rev %% list of binaries, in reverse order (!)
}).
@@ -70,16 +72,20 @@
-record(delivery, {mandatory, immediate, txn, sender, message}).
-record(amqp_error, {name, explanation, method = none}).
+-record(event, {type, props, timestamp}).
+
%%----------------------------------------------------------------------------
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
+-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
-define(MAX_WAIT, 16#ffffffff).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(STATS_INTERVAL, 5000).
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 18045b94..41c628a0 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -89,6 +89,13 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
+-rabbit_boot_step({rabbit_event,
+ [{description, "statistics event manager"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_event]}},
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
+
-rabbit_boot_step({kernel_ready,
[{description, "kernel ready"},
{requires, external_infrastructure}]}).
@@ -426,9 +433,9 @@ print_banner() ->
"| ~s +---+ |~n"
"| |~n"
"+-------------------+~n"
- "AMQP ~p-~p~n~s~n~s~n~n",
+ "~s~n~s~n~s~n~n",
[Product, string:right([$v|Version], ProductLen),
- ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
+ ?PROTOCOL_VERSION,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
Settings = [{"node", node()},
{"app descriptor", app_location()},
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 55d738ff..8d00f591 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -38,7 +38,7 @@
-export([add_user/2, delete_user/1, change_password/2, list_users/0,
lookup_user/1]).
-export([add_vhost/1, delete_vhost/1, list_vhosts/0]).
--export([set_permissions/5, clear_permissions/2,
+-export([set_permissions/5, set_permissions/6, clear_permissions/2,
list_vhost_permissions/1, list_user_permissions/1]).
%%----------------------------------------------------------------------------
@@ -51,11 +51,14 @@
-type(username() :: binary()).
-type(password() :: binary()).
-type(regexp() :: binary()).
+-type(scope() :: binary()).
-spec(check_login/2 ::
(binary(), binary()) -> rabbit_types:user() |
rabbit_types:channel_exit()).
--spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user()).
+-spec(user_pass_login/2 ::
+ (username(), password())
+ -> rabbit_types:user() | rabbit_types:channel_exit()).
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
-> 'ok' | rabbit_types:channel_exit()).
@@ -70,12 +73,14 @@
(username()) -> rabbit_types:ok(rabbit_types:user())
| rabbit_types:error('not_found')).
-spec(add_vhost/1 ::
- (rabbit_types:vhost()) -> 'ok' | rabbit_types:connection_exit()).
+ (rabbit_types:vhost()) -> 'ok').
-spec(delete_vhost/1 ::
- (rabbit_types:vhost()) -> 'ok' | rabbit_types:connection_exit()).
+ (rabbit_types:vhost()) -> 'ok').
-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
regexp(), regexp()) -> 'ok').
+-spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(),
+ regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
-spec(list_vhost_permissions/1 ::
(rabbit_types:vhost())
@@ -155,6 +160,7 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
+permission_index(scope) -> #permission.scope;
permission_index(configure) -> #permission.configure;
permission_index(write) -> #permission.write;
permission_index(read) -> #permission.read.
@@ -167,7 +173,7 @@ check_resource_access(Username,
Permission);
check_resource_access(_Username,
#resource{name = <<"amq.gen",_/binary>>},
- _Permission) ->
+ #permission{scope = client}) ->
ok;
check_resource_access(Username,
R = #resource{virtual_host = VHostPath, name = Name},
@@ -298,7 +304,7 @@ internal_delete_vhost(VHostPath) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list(VHostPath)),
- lists:foreach(fun ({Username, _, _, _}) ->
+ lists:foreach(fun ({Username, _, _, _, _}) ->
ok = clear_permissions(Username, VHostPath)
end,
list_vhost_permissions(VHostPath)),
@@ -316,7 +322,16 @@ validate_regexp(RegexpBin) ->
end.
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm,
+ WritePerm, ReadPerm).
+
+set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
+ Scope = case ScopeBin of
+ <<"client">> -> client;
+ <<"all">> -> all;
+ _ -> throw({error, {invalid_scope, ScopeBin}})
+ end,
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
@@ -326,12 +341,14 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
username = Username,
virtual_host = VHostPath},
permission = #permission{
+ scope = Scope,
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}},
write)
end)).
+
clear_permissions(Username, VHostPath) ->
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
@@ -343,22 +360,23 @@ clear_permissions(Username, VHostPath) ->
end)).
list_vhost_permissions(VHostPath) ->
- [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
- {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
list_permissions(rabbit_misc:with_vhost(
VHostPath, match_user_vhost('_', VHostPath)))].
list_user_permissions(Username) ->
- [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
list_permissions(rabbit_misc:with_user(
Username, match_user_vhost(Username, '_')))].
list_permissions(QueryThunk) ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
#user_permission{user_vhost = #user_vhost{username = Username,
virtual_host = VHostPath},
permission = #permission{
+ scope = Scope,
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}} <-
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6eace7e9..2453280e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -39,8 +39,9 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/4]).
+ stat/1, deliver/2, requeue/3, ack/4, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([emit_stats/1]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
@@ -78,7 +79,7 @@
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
-> {'new' | 'existing', rabbit_types:amqqueue()} |
- rabbit_types:channel_error()).
+ rabbit_types:channel_exit()).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found')).
@@ -113,6 +114,7 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
+-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -130,6 +132,7 @@
-spec(ack/4 ::
(pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid())
-> 'ok').
+-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()).
-spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
@@ -147,8 +150,7 @@
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
- -> rabbit_types:amqqueue() | 'not_found' |
- rabbit_types:connection_exit()).
+ -> rabbit_types:amqqueue() | 'not_found').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit()).
@@ -158,7 +160,7 @@
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(maybe_expire/1 :: (pid()) -> 'ok').
--spec(on_node_down/1 :: (node()) -> 'ok' | rabbit_types:connection_exit()).
+-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()).
-endif.
@@ -303,9 +305,8 @@ check_declare_arguments(QueueName, Args) ->
ok -> ok;
{error, Error} -> rabbit_misc:protocol_error(
precondition_failed,
- "Invalid arguments in declaration of queue ~s: "
- "~w (on argument: ~w)",
- [rabbit_misc:rs(QueueName), Error, Key])
+ "invalid arg '~s' for ~s: ~w",
+ [Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]],
ok.
@@ -353,6 +354,9 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
+emit_stats(#amqqueue{pid = QPid}) ->
+ delegate_pcast(QPid, 7, emit_stats).
+
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -376,9 +380,11 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+reject(QPid, MsgIds, Requeue, ChPid) ->
+ delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}).
+
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
- fun (QPid) -> exit({queue_disappeared, QPid}) end,
fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
QPids).
@@ -388,9 +394,6 @@ rollback_all(QPids, Txn, ChPid) ->
notify_down_all(QPids, ChPid) ->
safe_delegate_call_ok(
- %% we don't care if the queue process has terminated in the
- %% meantime
- fun (_) -> ok end,
fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
@@ -447,7 +450,7 @@ internal_delete(QueueName) ->
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun},
+ gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun},
infinity).
update_ram_duration(QPid) ->
@@ -485,11 +488,11 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
-safe_delegate_call_ok(H, F, Pids) ->
+safe_delegate_call_ok(F, Pids) ->
{_, Bad} = delegate:invoke(Pids,
fun (Pid) ->
rabbit_misc:with_exit_handler(
- fun () -> H(Pid) end,
+ fun () -> ok end,
fun () -> F(Pid) end)
end),
case Bad of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 67f0fcf5..d52660c5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -59,7 +59,8 @@
expires,
sync_timer_ref,
rate_timer_ref,
- expiry_timer_ref
+ expiry_timer_ref,
+ stats_timer
}).
-record(consumer, {tag, ack_required}).
@@ -74,13 +75,8 @@
txn,
unsent_message_count}).
--define(INFO_KEYS,
- [name,
- durable,
- auto_delete,
- arguments,
- pid,
- owner_pid,
+-define(STATISTICS_KEYS,
+ [pid,
exclusive_consumer_pid,
exclusive_consumer_tag,
messages_ready,
@@ -91,6 +87,17 @@
backing_queue_status
]).
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ name,
+ durable,
+ auto_delete,
+ arguments,
+ owner_pid
+ ]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+
%%----------------------------------------------------------------------------
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
@@ -114,7 +121,8 @@ init(Q) ->
expires = undefined,
sync_timer_ref = undefined,
rate_timer_ref = undefined,
- expiry_timer_ref = undefined}, hibernate,
+ expiry_timer_ref = undefined,
+ stats_timer = rabbit_event:init_stats_timer()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -155,6 +163,10 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
+ rabbit_event:notify(
+ queue_created,
+ [{Item, i(Item, State)} ||
+ Item <- ?CREATION_EVENT_KEYS]),
noreply(init_expires(State#q{backing_queue_state = BQS}));
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -173,6 +185,7 @@ terminate_shutdown(Fun, State) ->
BQ:tx_rollback(Txn, BQSN),
BQSN1
end, BQS, all_ch_record()),
+ rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -189,9 +202,10 @@ noreply(NewState) ->
next_state(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_rate_timer(State),
+ State2 = ensure_stats_timer(State1),
case BQ:needs_idle_timeout(BQS)of
- true -> {ensure_sync_timer(State1), 0};
- false -> {stop_sync_timer(State1), hibernate}
+ true -> {ensure_sync_timer(State2), 0};
+ false -> {stop_sync_timer(State2), hibernate}
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
@@ -249,6 +263,18 @@ ensure_expiry_timer(State = #q{expires = Expires}) ->
State
end.
+ensure_stats_timer(State = #q{stats_timer = StatsTimer,
+ q = Q}) ->
+ State#q{stats_timer = rabbit_event:ensure_stats_timer(
+ StatsTimer,
+ fun() -> emit_stats(State) end,
+ fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
+
+stop_stats_timer(State = #q{stats_timer = StatsTimer}) ->
+ State#q{stats_timer = rabbit_event:stop_stats_timer(
+ StatsTimer,
+ fun() -> emit_stats(State) end)}.
+
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -560,6 +586,10 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+emit_stats(State) ->
+ rabbit_event:notify(queue_stats,
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+
%---------------------------------------------------------------------------
handle_call({init, Recover}, From,
@@ -783,6 +813,21 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State#q{backing_queue_state = BQS1})
end;
+handle_cast({reject, AckTags, Requeue, ChPid},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ noreply(State);
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(case Requeue of
+ true -> requeue_and_run(AckTags, State);
+ false -> BQS1 = BQ:ack(AckTags, BQS),
+ State #q { backing_queue_state = BQS1 }
+ end)
+ end;
+
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
@@ -841,7 +886,11 @@ handle_cast(maybe_expire, State) ->
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
{stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
- end.
+ end;
+
+handle_cast(emit_stats, State) ->
+ emit_stats(State),
+ noreply(State).
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -878,4 +927,5 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}.
+ {hibernate, stop_stats_timer(
+ stop_rate_timer(State#q{backing_queue_state = BQS2}))}.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 848c1e91..c1445a0c 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -49,12 +49,10 @@
| rabbit_types:error('not_found'))).
-spec(publish/1 ::
- (rabbit_types:delivery()) -> publish_result() |
- rabbit_types:connection_exit()).
+ (rabbit_types:delivery()) -> publish_result()).
-spec(delivery/4 ::
(boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- rabbit_types:message())
- -> rabbit_types:delivery()).
+ rabbit_types:message()) -> rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
properties_input(), binary())
@@ -64,12 +62,12 @@
-spec(publish/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
properties_input(), binary())
- -> publish_result() | rabbit_types:connection_exit()).
+ -> publish_result()).
-spec(publish/7 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
properties_input(), binary())
- -> publish_result() | rabbit_types:connection_exit()).
+ -> publish_result()).
-spec(build_content/2 ::
(rabbit_framing:amqp_property_record(), binary())
-> rabbit_types:content()).
@@ -99,18 +97,24 @@ delivery(Mandatory, Immediate, Txn, Message) ->
sender = self(), message = Message}.
build_content(Properties, BodyBin) ->
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
#content{class_id = ClassId,
properties = Properties,
properties_bin = none,
+ protocol = none,
payload_fragments_rev = [BodyBin]}.
from_content(Content) ->
#content{class_id = ClassId,
properties = Props,
payload_fragments_rev = FragmentsRev} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ rabbit_binary_parser:ensure_content_decoded(Content,
+ rabbit_framing_amqp_0_9_1),
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 0e6ebe57..f0ec6180 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -41,12 +41,12 @@
% See definition of check_empty_content_body_frame_size/0, an assertion called at startup.
-define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
--export([build_simple_method_frame/2,
- build_simple_content_frames/3,
+-export([build_simple_method_frame/3,
+ build_simple_content_frames/4,
build_heartbeat_frame/0]).
-export([generate_table/1, encode_properties/2]).
-export([check_empty_content_body_frame_size/0]).
--export([ensure_content_encoded/1, clear_encoded_content/1]).
+-export([ensure_content_encoded/2, clear_encoded_content/1]).
-import(lists).
@@ -56,20 +56,22 @@
-type(frame() :: [binary()]).
--spec(build_simple_method_frame/2 ::
- (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record())
+-spec(build_simple_method_frame/3 ::
+ (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(),
+ rabbit_types:protocol())
-> frame()).
--spec(build_simple_content_frames/3 ::
+-spec(build_simple_content_frames/4 ::
(rabbit_channel:channel_number(), rabbit_types:content(),
- non_neg_integer())
+ non_neg_integer(), rabbit_types:protocol())
-> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()).
-spec(encode_properties/2 ::
([rabbit_framing:amqp_property_type()], [any()]) -> binary()).
-spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
--spec(ensure_content_encoded/1 ::
- (rabbit_types:content()) -> rabbit_types:encoded_content()).
+-spec(ensure_content_encoded/2 ::
+ (rabbit_types:content(), rabbit_types:protocol()) ->
+ rabbit_types:encoded_content()).
-spec(clear_encoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:unencoded_content()).
@@ -77,30 +79,24 @@
%%----------------------------------------------------------------------------
-build_simple_method_frame(ChannelInt, MethodRecord) ->
- MethodFields = rabbit_framing:encode_method_fields(MethodRecord),
+build_simple_method_frame(ChannelInt, MethodRecord, Protocol) ->
+ MethodFields = Protocol:encode_method_fields(MethodRecord),
MethodName = rabbit_misc:method_record_type(MethodRecord),
- {ClassId, MethodId} = rabbit_framing:method_id(MethodName),
+ {ClassId, MethodId} = Protocol:method_id(MethodName),
create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]).
-build_simple_content_frames(ChannelInt,
- #content{class_id = ClassId,
- properties = ContentProperties,
- properties_bin = ContentPropertiesBin,
- payload_fragments_rev = PayloadFragmentsRev},
- FrameMax) ->
- {BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt),
+build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) ->
+ #content{class_id = ClassId,
+ properties_bin = ContentPropertiesBin,
+ payload_fragments_rev = PayloadFragmentsRev} =
+ ensure_content_encoded(Content, Protocol),
+ {BodySize, ContentFrames} =
+ build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt),
HeaderFrame = create_frame(2, ChannelInt,
[<<ClassId:16, 0:16, BodySize:64>>,
- maybe_encode_properties(ContentProperties, ContentPropertiesBin)]),
+ ContentPropertiesBin]),
[HeaderFrame | ContentFrames].
-maybe_encode_properties(_ContentProperties, ContentPropertiesBin)
- when is_binary(ContentPropertiesBin) ->
- ContentPropertiesBin;
-maybe_encode_properties(ContentProperties, none) ->
- rabbit_framing:encode_properties(ContentProperties).
-
build_content_frames(FragsRev, FrameMax, ChannelInt) ->
BodyPayloadMax = if FrameMax == 0 ->
iolist_size(FragsRev);
@@ -283,13 +279,16 @@ check_empty_content_body_frame_size() ->
ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE})
end.
-ensure_content_encoded(Content = #content{properties_bin = PropsBin})
+ensure_content_encoded(Content = #content{properties_bin = PropsBin,
+ protocol = Protocol}, Protocol)
when PropsBin =/= 'none' ->
Content;
-ensure_content_encoded(Content = #content{properties = Props}) ->
- Content #content{properties_bin = rabbit_framing:encode_properties(Props)}.
+ensure_content_encoded(Content = #content{properties = Props}, Protocol) ->
+ Content#content{properties_bin = Protocol:encode_properties(Props),
+ protocol = Protocol}.
-clear_encoded_content(Content = #content{properties_bin = none}) ->
+clear_encoded_content(Content = #content{properties_bin = none,
+ protocol = none}) ->
Content;
clear_encoded_content(Content = #content{properties = none}) ->
%% Only clear when we can rebuild the properties_bin later in
@@ -297,4 +296,4 @@ clear_encoded_content(Content = #content{properties = none}) ->
%% one of properties and properties_bin can be 'none'
Content;
clear_encoded_content(Content = #content{}) ->
- Content#content{properties_bin = none}.
+ Content#content{properties_bin = none, protocol = none}.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 69e34440..1d0a62af 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -34,7 +34,7 @@
-include("rabbit.hrl").
-export([parse_table/1, parse_properties/2]).
--export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([ensure_content_decoded/2, clear_decoded_content/1]).
-import(lists).
@@ -45,8 +45,9 @@
-spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()).
-spec(parse_properties/2 ::
([rabbit_framing:amqp_property_type()], binary()) -> [any()]).
--spec(ensure_content_decoded/1 ::
- (rabbit_types:content()) -> rabbit_types:decoded_content()).
+-spec(ensure_content_decoded/2 ::
+ (rabbit_types:content(), rabbit_types:protocol())
+ -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:undecoded_content()).
@@ -162,12 +163,12 @@ parse_property(bit, Rest) ->
parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) ->
{parse_table(Table), Rest}.
-ensure_content_decoded(Content = #content{properties = Props})
+ensure_content_decoded(Content = #content{properties = Props}, _Protocol)
when Props =/= 'none' ->
Content;
-ensure_content_decoded(Content = #content{properties_bin = PropBin})
+ensure_content_decoded(Content = #content{properties_bin = PropBin}, Protocol)
when is_binary(PropBin) ->
- Content#content{properties = rabbit_framing:decode_properties(
+ Content#content{properties = Protocol:decode_properties(
Content#content.class_id, PropBin)}.
clear_decoded_content(Content = #content{properties = none}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dafc3075..6f244166 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,10 +36,9 @@
-behaviour(gen_server2).
-export([start_link/6, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
+-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-
--export([flow_timeout/2]).
+-export([emit_stats/1, flush/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
@@ -48,37 +47,38 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, flow}).
-
--record(flow, {server, client, pending}).
+ consumer_mapping, blocking, queue_collector_pid, stats_timer}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
--define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
--define(INFO_KEYS,
+-define(STATISTICS_KEYS,
[pid,
- connection,
- number,
- user,
- vhost,
transactional,
consumer_count,
messages_unacknowledged,
acks_uncommitted,
prefetch_count]).
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ connection,
+ number,
+ user,
+ vhost]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-export_type([channel_number/0]).
--type(ref() :: any()).
-type(channel_number() :: non_neg_integer()).
-spec(start_link/6 ::
(channel_number(), pid(), pid(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) -> pid()).
+ rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -87,25 +87,22 @@
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
-spec(info_all/0 :: () -> [[rabbit_types:info()]]).
-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
+-spec(emit_stats/1 :: (pid()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
- {ok, Pid} = gen_server2:start_link(
- ?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost, CollectorPid], []),
- Pid.
+ gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid,
+ Username, VHost, CollectorPid], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -122,15 +119,9 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
-conserve_memory(Pid, Conserve) ->
- gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}).
-
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-flow_timeout(Pid, Ref) ->
- gen_server2:pcast(Pid, 7, {flow_timeout, Ref}).
-
list() ->
pg_local:get_members(rabbit_channels).
@@ -151,31 +142,39 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+emit_stats(Pid) ->
+ gen_server2:pcast(Pid, 7, emit_stats).
+
+flush(Pid) ->
+ gen_server2:call(Pid, flush).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
- {ok, #ch{state = starting,
- channel = Channel,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- limiter_pid = undefined,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new(),
- blocking = dict:new(),
- queue_collector_pid = CollectorPid,
- flow = #flow{server = true, client = true,
- pending = none}},
- hibernate,
+ State = #ch{state = starting,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new(),
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid,
+ stats_timer = rabbit_event:init_stats_timer()},
+ rabbit_event:notify(
+ channel_created,
+ [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
+ {ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call(info, _From, State) ->
@@ -187,6 +186,9 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(flush, _From, State) ->
+ reply(ok, State);
+
handle_call(_Request, _From, State) ->
noreply(State).
@@ -225,26 +227,16 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
+ {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
+ maybe_incr_stats([{QPid, 1}],
+ case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
- noreply(State);
-handle_cast({conserve_memory, false}, State = #ch{state = starting}) ->
- ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}),
- noreply(State#ch{state = running});
-handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) ->
- flow_control(not Conserve, State);
-handle_cast({conserve_memory, _Conserve}, State) ->
- noreply(State);
-
-handle_cast({flow_timeout, Ref},
- State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
- {stop, normal, terminating(
- rabbit_misc:amqp_error(
- precondition_failed,
- "timeout waiting for channel.flow_ok{active=~w}",
- [not Flow], none), State)};
-handle_cast({flow_timeout, _Ref}, State) ->
+handle_cast(emit_stats, State) ->
+ internal_emit_stats(State),
{noreply, State}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
@@ -254,11 +246,12 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State)}.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
- {hibernate, State}.
+ {hibernate, stop_stats_timer(State)}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -276,9 +269,23 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
+reply(Reply, NewState) ->
+ {reply, Reply, ensure_stats_timer(NewState), hibernate}.
+
+noreply(NewState) ->
+ {noreply, ensure_stats_timer(NewState), hibernate}.
-noreply(NewState) -> {noreply, NewState, hibernate}.
+ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
+ ChPid = self(),
+ State#ch{stats_timer = rabbit_event:ensure_stats_timer(
+ StatsTimer,
+ fun() -> internal_emit_stats(State) end,
+ fun() -> emit_stats(ChPid) end)}.
+
+stop_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
+ State#ch{stats_timer = rabbit_event:stop_stats_timer(
+ StatsTimer,
+ fun() -> internal_emit_stats(State) end)}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -385,10 +392,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of
- true -> {noreply, State};
- false -> {reply, #'channel.open_ok'{}, State#ch{state = running}}
- end;
+ {reply, #'channel.open_ok'{}, State#ch{state = running}};
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -405,10 +409,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
- rabbit_misc:protocol_error(
- command_invalid,
- "basic.publish received after channel.flow_ok{active=false}", []);
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -421,7 +421,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
+ DecodedContent = rabbit_binary_parser:ensure_content_decoded(
+ Content, rabbit_framing_amqp_0_9_1),
IsPersistent = is_message_persistent(DecodedContent),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -437,6 +438,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
unroutable -> ok = basic_return(Message, WriterPid, no_route);
not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
+ maybe_incr_stats([{ExchangeName, 1} |
+ [{{QPid, ExchangeName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State),
{noreply, case TxnKey of
none -> State;
_ -> add_tx_participants(DeliveredQPids, State)
@@ -447,7 +451,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
_, State = #ch{transaction_id = TxnKey,
unacked_message_q = UAMQ}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- Participants = ack(TxnKey, Acked),
+ QIncs = ack(TxnKey, Acked),
+ Participants = [QPid || {QPid, _} <- QIncs],
+ maybe_incr_stats(QIncs, ack, State),
{noreply, case TxnKey of
none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
State#ch{unacked_message_q = Remaining};
@@ -470,11 +476,16 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, _QPid, _MsgId, Redelivered,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content}}} ->
State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
+ maybe_incr_stats([{QPid, 1}],
+ case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -638,6 +649,17 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
{noreply, State2};
+handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
+ requeue = Requeue},
+ _, State = #ch{ unacked_message_q = UAMQ}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
+ ok = fold_per_queue(
+ fun (QPid, MsgIds, ok) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, ok, Acked),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}};
+
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
@@ -735,7 +757,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% the connection shuts down.
ok = case Owner of
none -> ok;
- _ -> rabbit_queue_collector:register(CollectorPid, Q)
+ _ -> rabbit_queue_collector:register(
+ CollectorPid, Q)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
@@ -853,48 +876,12 @@ handle_method(#'channel.flow'{active = false}, _,
blocking = dict:from_list(Queues)}}
end;
-handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Active, client = Flow,
- pending = {_Ref, TRef}} = F})
- when Flow =:= not Active ->
- {ok, cancel} = timer:cancel(TRef),
- {noreply, State#ch{flow = F#flow{client = Active, pending = none}}};
-handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Flow, client = Flow,
- pending = {_Ref, TRef}}})
- when Flow =:= not Active ->
- {ok, cancel} = timer:cancel(TRef),
- {noreply, issue_flow(Flow, State)};
-handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) ->
- rabbit_misc:protocol_error(
- command_invalid, "unsolicited channel.flow_ok", []);
-handle_method(#'channel.flow_ok'{active = Active}, _, _State) ->
- rabbit_misc:protocol_error(
- command_invalid,
- "received channel.flow_ok{active=~w} has incorrect polarity", [Active]);
-
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
%%----------------------------------------------------------------------------
-flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}})
- when Flow =:= not Active ->
- ok = clear_permission_cache(),
- noreply(issue_flow(Active, State));
-flow_control(Active, State = #ch{flow = F}) ->
- noreply(State#ch{flow = F#flow{server = Active}}).
-
-issue_flow(Active, State) ->
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = Active}),
- Ref = make_ref(),
- {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
- [self(), Ref]),
- State#ch{flow = #flow{server = Active, client = not Active,
- pending = {Ref, TRef}}}.
-
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -938,7 +925,7 @@ basic_return(#basic_message{exchange_name = ExchangeName,
content = Content},
WriterPid, Reason) ->
{_Close, ReplyCode, ReplyText} =
- rabbit_framing:lookup_amqp_exception(Reason),
+ rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.return'{reply_code = ReplyCode,
@@ -978,7 +965,7 @@ ack(TxnKey, UAQ) ->
fold_per_queue(
fun (QPid, MsgIds, L) ->
ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
- [QPid | L]
+ [{QPid, length(MsgIds)} | L]
end, [], UAQ).
make_tx_id() -> rabbit_guid:guid().
@@ -1031,7 +1018,7 @@ fold_per_queue(F, Acc0, UAQ) ->
Acc0, D).
start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
- LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+ {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
ok = limit_queues(LPid, State),
LPid.
@@ -1105,6 +1092,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
pg_local:leave(rabbit_channels, self()),
+ rabbit_event:notify(channel_closed, [{pid, self()}]),
rabbit_writer:shutdown(WriterPid),
rabbit_limiter:shutdown(LimiterPid).
@@ -1127,3 +1115,60 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
rabbit_limiter:get_limit(LimiterPid);
i(Item, _) ->
throw({bad_argument, Item}).
+
+maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
+ case rabbit_event:stats_level(StatsTimer) of
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
+ end.
+
+incr_stats({QPid, _} = QX, Inc, Measure) ->
+ maybe_monitor(QPid),
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ maybe_monitor(QPid),
+ update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
+
+maybe_monitor(QPid) ->
+ case get({monitoring, QPid}) of
+ undefined -> erlang:monitor(process, QPid),
+ put({monitoring, QPid}, true);
+ _ -> ok
+ end.
+
+update_measures(Type, QX, Inc, Measure) ->
+ Measures = case get({Type, QX}) of
+ undefined -> [];
+ D -> D
+ end,
+ Cur = case orddict:find(Measure, Measures) of
+ error -> 0;
+ {ok, C} -> C
+ end,
+ put({Type, QX},
+ orddict:store(Measure, Cur + Inc, Measures)).
+
+internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
+ CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS],
+ case rabbit_event:stats_level(StatsTimer) of
+ coarse ->
+ rabbit_event:notify(channel_stats, CoarseStats);
+ fine ->
+ FineStats =
+ [{channel_queue_stats,
+ [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, CoarseStats ++ FineStats)
+ end.
+
+erase_queue_stats(QPid) ->
+ erase({monitoring, QPid}),
+ erase({queue_stats, QPid}),
+ [erase({queue_exchange_stats, QX}) ||
+ {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6e6ad06c..f0b623c2 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -32,20 +32,25 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/4]).
-
--record(params, {quiet, node, command, args}).
+-export([start/0, stop/0, action/5]).
-define(RPC_TIMEOUT, infinity).
+-define(QUIET_OPT, "-q").
+-define(NODE_OPT, "-n").
+-define(VHOST_OPT, "-p").
+-define(SCOPE_OPT, "-s").
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
--spec(action/4 :: (atom(), node(), [string()],
- fun ((string(), [any()]) -> 'ok')) -> 'ok').
+-spec(action/5 ::
+ (atom(), node(), [string()], [{string(), any()}],
+ fun ((string(), [any()]) -> 'ok'))
+ -> 'ok').
-spec(usage/0 :: () -> no_return()).
-endif.
@@ -55,18 +60,33 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
FullCommand = init:get_plain_arguments(),
- #params{quiet = Quiet, node = Node, command = Command, args = Args} =
- parse_args(FullCommand, #params{quiet = false,
- node = rabbit_misc:makenode(NodeStr)}),
+ case FullCommand of
+ [] -> usage();
+ _ -> ok
+ end,
+ {[Command0 | Args], Opts} =
+ rabbit_misc:get_options(
+ [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr},
+ {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}],
+ FullCommand),
+ Opts1 = lists:map(fun({K, V}) ->
+ case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
+ _ -> {K, V}
+ end
+ end, Opts),
+ Command = list_to_atom(Command0),
+ Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
+ Node = proplists:get_value(?NODE_OPT, Opts1),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
- end
+ end
end,
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
- case catch action(Command, Node, Args, Inform) of
+ case catch action(Command, Node, Args, Opts, Inform) of
ok ->
case Quiet of
true -> ok;
@@ -118,15 +138,6 @@ print_badrpc_diagnostics(Node) ->
fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]),
ok.
-parse_args(["-n", NodeS | Args], Params) ->
- parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)});
-parse_args(["-q" | Args], Params) ->
- parse_args(Args, Params#params{quiet = true});
-parse_args([Command | Args], Params) ->
- Params#params{command = list_to_atom(Command), args = Args};
-parse_args([], _) ->
- usage().
-
stop() ->
ok.
@@ -134,39 +145,39 @@ usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
halt(1).
-action(stop, Node, [], Inform) ->
+action(stop, Node, [], _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
call(Node, {rabbit, stop_and_halt, []});
-action(stop_app, Node, [], Inform) ->
+action(stop_app, Node, [], _Opts, Inform) ->
Inform("Stopping node ~p", [Node]),
call(Node, {rabbit, stop, []});
-action(start_app, Node, [], Inform) ->
+action(start_app, Node, [], _Opts, Inform) ->
Inform("Starting node ~p", [Node]),
call(Node, {rabbit, start, []});
-action(reset, Node, [], Inform) ->
+action(reset, Node, [], _Opts, Inform) ->
Inform("Resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, reset, []});
-action(force_reset, Node, [], Inform) ->
+action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, force_reset, []});
-action(cluster, Node, ClusterNodeSs, Inform) ->
+action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Clustering node ~p with ~p",
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
-action(force_cluster, Node, ClusterNodeSs, Inform) ->
+action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
-action(status, Node, [], Inform) ->
+action(status, Node, [], _Opts, Inform) ->
Inform("Status of node ~p", [Node]),
case call(Node, {rabbit, status, []}) of
{badrpc, _} = Res -> Res;
@@ -174,129 +185,117 @@ action(status, Node, [], Inform) ->
ok
end;
-action(rotate_logs, Node, [], Inform) ->
+action(rotate_logs, Node, [], _Opts, Inform) ->
Inform("Reopening logs for node ~p", [Node]),
call(Node, {rabbit, rotate_logs, [""]});
-action(rotate_logs, Node, Args = [Suffix], Inform) ->
+action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) ->
Inform("Rotating logs to files with suffix ~p", [Suffix]),
call(Node, {rabbit, rotate_logs, Args});
-action(close_connection, Node, [PidStr, Explanation], Inform) ->
+action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
Inform("Closing connection ~s", [PidStr]),
rpc_call(Node, rabbit_networking, close_connection,
[rabbit_misc:string_to_pid(PidStr), Explanation]);
-action(add_user, Node, Args = [Username, _Password], Inform) ->
+action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
Inform("Creating user ~p", [Username]),
call(Node, {rabbit_access_control, add_user, Args});
-action(delete_user, Node, Args = [_Username], Inform) ->
+action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
Inform("Deleting user ~p", Args),
call(Node, {rabbit_access_control, delete_user, Args});
-action(change_password, Node, Args = [Username, _Newpassword], Inform) ->
+action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
call(Node, {rabbit_access_control, change_password, Args});
-action(list_users, Node, [], Inform) ->
+action(list_users, Node, [], _Opts, Inform) ->
Inform("Listing users", []),
display_list(call(Node, {rabbit_access_control, list_users, []}));
-action(add_vhost, Node, Args = [_VHostPath], Inform) ->
+action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Creating vhost ~p", Args),
call(Node, {rabbit_access_control, add_vhost, Args});
-action(delete_vhost, Node, Args = [_VHostPath], Inform) ->
+action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Deleting vhost ~p", Args),
call(Node, {rabbit_access_control, delete_vhost, Args});
-action(list_vhosts, Node, [], Inform) ->
+action(list_vhosts, Node, [], _Opts, Inform) ->
Inform("Listing vhosts", []),
display_list(call(Node, {rabbit_access_control, list_vhosts, []}));
-action(list_user_permissions, Node, Args = [_Username], Inform) ->
+action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
Inform("Listing permissions for user ~p", Args),
display_list(call(Node, {rabbit_access_control, list_user_permissions,
Args}));
-action(list_queues, Node, Args, Inform) ->
+action(list_queues, Node, Args, Opts, Inform) ->
Inform("Listing queues", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = default_if_empty(RemainingArgs, [name, messages]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, messages]),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_exchanges, Node, Args, Inform) ->
+action(list_exchanges, Node, Args, Opts, Inform) ->
Inform("Listing exchanges", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = default_if_empty(RemainingArgs, [name, type]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, type]),
display_info_list(rpc_call(Node, rabbit_exchange, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_bindings, Node, Args, Inform) ->
+action(list_bindings, Node, _Args, Opts, Inform) ->
Inform("Listing bindings", []),
- {VHostArg, _} = parse_vhost_flag_bin(Args),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
InfoKeys = [exchange_name, queue_name, routing_key, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
InfoKeys);
-action(list_connections, Node, Args, Inform) ->
+action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
-action(list_channels, Node, Args, Inform) ->
+action(list_channels, Node, Args, _Opts, Inform) ->
Inform("Listing channels", []),
ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count,
messages_unacknowledged]),
display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
ArgAtoms);
-action(list_consumers, Node, Args, Inform) ->
+action(list_consumers, Node, _Args, Opts, Inform) ->
Inform("Listing consumers", []),
- {VHostArg, _} = parse_vhost_flag_bin(Args),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])],
InfoKeys);
-action(Command, Node, Args, Inform) ->
- {VHost, RemainingArgs} = parse_vhost_flag(Args),
- action(Command, Node, VHost, RemainingArgs, Inform).
-
-action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) ->
+action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Scope = proplists:get_value(?SCOPE_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
call(Node, {rabbit_access_control, set_permissions,
- [Username, VHost, CPerm, WPerm, RPerm]});
+ [Scope, Username, VHost, CPerm, WPerm, RPerm]});
-action(clear_permissions, Node, VHost, [Username], Inform) ->
+action(clear_permissions, Node, [Username], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]});
-action(list_permissions, Node, VHost, [], Inform) ->
+action(list_permissions, Node, [], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
display_list(call(Node, {rabbit_access_control, list_vhost_permissions,
[VHost]})).
-parse_vhost_flag(Args) when is_list(Args) ->
- case Args of
- ["-p", VHost | RemainingArgs] ->
- {VHost, RemainingArgs};
- RemainingArgs ->
- {"/", RemainingArgs}
- end.
-
-parse_vhost_flag_bin(Args) ->
- {VHost, RemainingArgs} = parse_vhost_flag(Args),
- {list_to_binary(VHost), RemainingArgs}.
-
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
Default;
@@ -357,6 +356,8 @@ rpc_call(Node, Mod, Fun, Args) ->
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
+escape(Atom) when is_atom(Atom) ->
+ escape(atom_to_list(Atom));
escape(Bin) when is_binary(Bin) ->
escape(binary_to_list(Bin));
escape(L) when is_list(L) ->
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
new file mode 100644
index 00000000..113ffcb4
--- /dev/null
+++ b/src/rabbit_event.erl
@@ -0,0 +1,138 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_event).
+
+-include("rabbit.hrl").
+
+-export([start_link/0]).
+-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
+-export([ensure_stats_timer_after/2, reset_stats_timer_after/1]).
+-export([stats_level/1]).
+-export([notify/2]).
+
+%%----------------------------------------------------------------------------
+
+-record(state, {level, timer}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([event_type/0, event_props/0, event_timestamp/0, event/0]).
+
+-type(event_type() :: atom()).
+-type(event_props() :: term()).
+-type(event_timestamp() ::
+ {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
+
+-type(event() :: #event {
+ type :: event_type(),
+ props :: event_props(),
+ timestamp :: event_timestamp()
+ }).
+
+-type(level() :: 'none' | 'coarse' | 'fine').
+
+-opaque(state() :: #state {
+ level :: level(),
+ timer :: atom()
+ }).
+
+-type(timer_fun() :: fun (() -> 'ok')).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any())).
+-spec(init_stats_timer/0 :: () -> state()).
+-spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
+-spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
+-spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()).
+-spec(reset_stats_timer_after/1 :: (state()) -> state()).
+-spec(stats_level/1 :: (state()) -> level()).
+-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_event:start_link({local, ?MODULE}).
+
+init_stats_timer() ->
+ {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
+ #state{level = StatsLevel, timer = undefined}.
+
+ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) ->
+ State;
+ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) ->
+ NowFun(),
+ {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
+ erlang, apply, [TimerFun, []]),
+ State#state{timer = TRef};
+ensure_stats_timer(State, _NowFun, _TimerFun) ->
+ State.
+
+stop_stats_timer(State = #state{level = none}, _NowFun) ->
+ State;
+stop_stats_timer(State = #state{timer = undefined}, _NowFun) ->
+ State;
+stop_stats_timer(State = #state{timer = TRef}, NowFun) ->
+ {ok, cancel} = timer:cancel(TRef),
+ NowFun(),
+ State#state{timer = undefined}.
+
+ensure_stats_timer_after(State = #state{level = none}, _TimerFun) ->
+ State;
+ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) ->
+ {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
+ erlang, apply, [TimerFun, []]),
+ State#state{timer = TRef};
+ensure_stats_timer_after(State, _TimerFun) ->
+ State.
+
+reset_stats_timer_after(State) ->
+ State#state{timer = undefined}.
+
+stats_level(#state{level = Level}) ->
+ Level.
+
+notify(Type, Props) ->
+ try
+ %% TODO: switch to os:timestamp() when we drop support for
+ %% Erlang/OTP < R13B01
+ gen_event:notify(rabbit_event, #event{type = Type,
+ props = Props,
+ timestamp = now()})
+ catch error:badarg ->
+ %% badarg means rabbit_event is no longer registered. We never
+ %% unregister it so the great likelihood is that we're shutting
+ %% down the broker but some events were backed up. Ignore it.
+ ok
+ end.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index b4b2ae68..af4eb1bd 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -67,10 +67,10 @@
fun((rabbit_types:exchange(), queue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
--spec(recover/0 :: () -> 'ok' | rabbit_types:connection_exit()).
+-spec(recover/0 :: () -> 'ok').
-spec(declare/5 ::
(name(), type(), boolean(), boolean(), rabbit_framing:amqp_table())
- -> rabbit_types:exchange() | rabbit_types:connection_exit()).
+ -> rabbit_types:exchange()).
-spec(check_type/1 ::
(binary()) -> atom() | rabbit_types:connection_exit()).
-spec(assert_equivalence/5 ::
@@ -96,32 +96,26 @@
-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
-> [[rabbit_types:info()]]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
- -> {rabbit_router:routing_result(), [pid()]} |
- rabbit_types:connection_exit()).
+ -> {rabbit_router:routing_result(), [pid()]}).
-spec(add_binding/5 ::
(name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun())
- -> bind_res() | rabbit_types:connection_exit()).
+ rabbit_framing:amqp_table(), inner_fun()) -> bind_res()).
-spec(delete_binding/5 ::
(name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
rabbit_framing:amqp_table(), inner_fun())
- -> bind_res() | rabbit_types:error('binding_not_found') |
- rabbit_types:connection_exit()).
+ -> bind_res() | rabbit_types:error('binding_not_found')).
-spec(list_bindings/1 ::
(rabbit_types:vhost())
-> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
rabbit_framing:amqp_table()}]).
-spec(delete_queue_bindings/1 ::
- (rabbit_amqqueue:name())
- -> fun (() -> none()) | rabbit_types:connection_exit()).
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
-spec(delete_transient_queue_bindings/1 ::
- (rabbit_amqqueue:name())
- -> fun (() -> none()) | rabbit_types:connection_exit()).
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
- rabbit_types:error('in_use') |
- rabbit_types:connection_exit()).
+ rabbit_types:error('in_use')).
-spec(list_queue_bindings/1 ::
(rabbit_amqqueue:name())
-> [{name(), rabbit_router:routing_key(),
@@ -198,6 +192,9 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
end
end) of
{new, X} -> TypeModule:create(X),
+ rabbit_event:notify(
+ exchange_created,
+ [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]),
X;
{existing, X} -> X;
Err -> Err
@@ -205,12 +202,8 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- case rabbit_exchange_type_registry:lookup_module(T) of
- {ok, Module} -> Module;
- {error, not_found} -> rabbit_misc:protocol_error(
- command_invalid,
- "invalid exchange type '~s'", [T])
- end.
+ {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ Module.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
@@ -219,8 +212,12 @@ check_type(TypeBin) ->
rabbit_misc:protocol_error(
command_invalid, "unknown exchange type '~s'", [TypeBin]);
T ->
- _Module = type_to_module(T),
- T
+ case rabbit_exchange_type_registry:lookup_module(T) of
+ {error, not_found} -> rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid exchange type '~s'", [T]);
+ {ok, _Module} -> T
+ end
end.
assert_equivalence(X = #exchange{ durable = Durable,
@@ -386,7 +383,6 @@ cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
[X] = mnesia:read({rabbit_exchange, ExchangeName}),
{maybe_auto_delete(X), Bindings}.
-
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
@@ -435,6 +431,12 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
X#exchange.durable andalso
Q#amqqueue.durable,
fun mnesia:write/3),
+ rabbit_event:notify(
+ binding_created,
+ [{exchange_name, ExchangeName},
+ {queue_name, QueueName},
+ {routing_key, RoutingKey},
+ {arguments, Arguments}]),
{new, X, B};
[_R] ->
{existing, X, B}
@@ -467,6 +469,10 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
X#exchange.durable andalso
Q#amqqueue.durable,
fun mnesia:delete_object/3),
+ rabbit_event:notify(
+ binding_deleted,
+ [{exchange_name, ExchangeName},
+ {queue_name, QueueName}]),
{maybe_auto_delete(X), B};
{error, _} = E ->
E
@@ -585,6 +591,7 @@ unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Bindings = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}),
+ rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]),
{deleted, Exchange, Bindings}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index bc1a2a08..00b74ad0 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,21 +32,22 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/2, process/2, shutdown/1]).
+-export([start_link/3, process/2, shutdown/1]).
%% internal
--export([mainloop/1]).
+-export([mainloop/2]).
%%--------------------------------------------------------------------
-start_link(StartFun, StartArgs) ->
- spawn_link(
- fun () ->
- %% we trap exits so that a normal termination of the
- %% channel or reader process terminates us too.
- process_flag(trap_exit, true),
- mainloop(apply(StartFun, StartArgs))
- end).
+start_link(StartFun, StartArgs, Protocol) ->
+ {ok, spawn_link(
+ fun () ->
+ %% we trap exits so that a normal termination of
+ %% the channel or reader process terminates us too.
+ process_flag(trap_exit, true),
+ {ok, ChannelPid} = apply(StartFun, StartArgs),
+ mainloop(ChannelPid, Protocol)
+ end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -72,37 +73,42 @@ read_frame(ChannelPid) ->
Msg -> exit({unexpected_message, Msg})
end.
-mainloop(ChannelPid) ->
- {method, MethodName, FieldsBin} = read_frame(ChannelPid),
- Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin),
- case rabbit_framing:method_has_content(MethodName) of
- true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
- rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid, ClassId));
- false -> rabbit_channel:do(ChannelPid, Method)
- end,
- ?MODULE:mainloop(ChannelPid).
+mainloop(ChannelPid, Protocol) ->
+ case read_frame(ChannelPid) of
+ {method, MethodName, FieldsBin} ->
+ Method = Protocol:decode_method_fields(MethodName, FieldsBin),
+ case Protocol:method_has_content(MethodName) of
+ true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
+ rabbit_channel:do(ChannelPid, Method,
+ collect_content(ChannelPid,
+ ClassId,
+ Protocol));
+ false -> rabbit_channel:do(ChannelPid, Method)
+ end,
+ ?MODULE:mainloop(ChannelPid, Protocol);
+ _ ->
+ unexpected_frame("expected method frame, "
+ "got non method frame instead",
+ [])
+ end.
-collect_content(ChannelPid, ClassId) ->
+collect_content(ChannelPid, ClassId, Protocol) ->
case read_frame(ChannelPid) of
{content_header, ClassId, 0, BodySize, PropertiesBin} ->
Payload = collect_content_payload(ChannelPid, BodySize, []),
#content{class_id = ClassId,
properties = none,
properties_bin = PropertiesBin,
+ protocol = Protocol,
payload_fragments_rev = Payload};
{content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId]);
+ unexpected_frame("expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId]);
_ ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId])
+ unexpected_frame("expected content header for class ~w, "
+ "got non content header frame instead",
+ [ClassId])
end.
collect_content_payload(_ChannelPid, 0, Acc) ->
@@ -114,8 +120,10 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
RemainingByteCount - size(FragmentBin),
[FragmentBin | Acc]);
_ ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content body, got non content body frame instead",
- [])
+ unexpected_frame("expected content body, "
+ "got non content body frame instead",
+ [])
end.
+
+unexpected_frame(ExplanationFormat, Params) ->
+ rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 1989fb7b..faddffc1 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,14 +31,17 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/2]).
+-export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) ->
- rabbit_types:maybe({pid(), pid()})).
+-type(pids() :: rabbit_types:maybe({pid(), pid()})).
+
+-spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()).
+-spec(pause_monitor/1 :: (pids()) -> 'ok').
+-spec(resume_monitor/1 :: (pids()) -> 'ok').
-endif.
@@ -70,20 +73,43 @@ start_heartbeat(Sock, TimeoutSec) ->
end}, Parent) end),
{Sender, Receiver}.
+pause_monitor(none) ->
+ ok;
+pause_monitor({_Sender, Receiver}) ->
+ Receiver ! pause,
+ ok.
+
+resume_monitor(none) ->
+ ok;
+resume_monitor({_Sender, Receiver}) ->
+ Receiver ! resume,
+ ok.
+
+%%----------------------------------------------------------------------------
+
heartbeater(Params, Parent) ->
heartbeater(Params, erlang:monitor(process, Parent), {0, 0}).
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
MonitorRef, {StatVal, SameCount}) ->
+ Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
receive
{'DOWN', MonitorRef, process, _Object, _Info} ->
ok;
+ pause ->
+ receive
+ {'DOWN', MonitorRef, process, _Object, _Info} ->
+ ok;
+ resume ->
+ Recurse({0, 0});
+ Other ->
+ exit({unexpected_message, Other})
+ end;
Other ->
exit({unexpected_message, Other})
after TimeoutMillisec ->
case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
- Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
if NewStatVal =/= StatVal ->
Recurse({NewStatVal, 0});
SameCount < Threshold ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 878af029..813ccc75 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -45,7 +45,7 @@
-type(maybe_pid() :: pid() | 'undefined').
--spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
+-spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
@@ -74,8 +74,7 @@
%%----------------------------------------------------------------------------
start_link(ChPid, UnackedMsgCount) ->
- {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []),
- Pid.
+ gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []).
shutdown(undefined) ->
ok;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index af800072..f0c2bffb 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -64,7 +64,7 @@
-export([version_compare/2, version_compare/3]).
-export([recursive_delete/1, dict_cons/3, orddict_cons/3,
unlink_and_capture_exit/1]).
--export([get_real_sock/1]).
+-export([get_real_sock/1, get_options/2]).
-import(mnesia).
-import(lists).
@@ -82,14 +82,16 @@
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
-type(resource_name() :: binary()).
+-type(optdef() :: {flag, string()} | {option, string(), any()}).
+-type(channel_or_connection_exit()
+ :: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
-spec(polite_pause/0 :: () -> 'done').
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
-spec(die/1 ::
- (rabbit_framing:amqp_exception())
- -> rabbit_types:channel_exit() | rabbit_types:connection_exit()).
+ (rabbit_framing:amqp_exception()) -> channel_or_connection_exit()).
-spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary())
-> rabbit_types:connection_exit()).
-spec(amqp_error/4 ::
@@ -97,14 +99,12 @@
rabbit_framing:amqp_method_name())
-> rabbit_types:amqp_error()).
-spec(protocol_error/3 :: (rabbit_framing:amqp_exception(), string(), [any()])
- -> rabbit_types:channel_exit() |
- rabbit_types:connection_exit()).
+ -> channel_or_connection_exit()).
-spec(protocol_error/4 ::
(rabbit_framing:amqp_exception(), string(), [any()],
- rabbit_framing:amqp_method_name())
- -> rabbit_types:channel_exit() |
- rabbit_types:connection_exit()).
--spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()).
+ rabbit_framing:amqp_method_name()) -> channel_or_connection_exit()).
+-spec(protocol_error/1 ::
+ (rabbit_types:amqp_error()) -> channel_or_connection_exit()).
-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()).
-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
rabbit_framing:amqp_table(),
@@ -190,6 +190,8 @@
orddict:dictionary()).
-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-spec(get_real_sock/1 :: (#sslsocket{}) -> any()).
+-spec(get_options/2 :: ([optdef()], [string()])
+ -> {[string()], [{string(), any()}]}).
-endif.
@@ -236,9 +238,9 @@ assert_args_equivalence1(Orig, New, Name, Key) ->
{Same, Same} -> ok;
{Orig1, New1} -> protocol_error(
not_allowed,
- "cannot redeclare ~s with inequivalent args for ~s: "
+ "inequivalent arg '~s' for ~s: "
"required ~w, received ~w",
- [rabbit_misc:rs(Name), Key, New1, Orig1])
+ [Key, rabbit_misc:rs(Name), New1, Orig1])
end.
get_config(Key) ->
@@ -714,3 +716,36 @@ get_real_sock(#sslsocket{pid = {Sock, _}}) ->
Sock;
get_real_sock(Sock) ->
Sock.
+
+% Separate flags and options from arguments.
+% get_options([{flag, "-q"}, {option, "-p", "/"}],
+% ["set_permissions","-p","/","guest",
+% "-q",".*",".*",".*"])
+% == {["set_permissions","guest",".*",".*",".*"],
+% [{"-q",true},{"-p","/"}]}
+get_options(Defs, As) ->
+ lists:foldl(fun(Def, {AsIn, RsIn}) ->
+ {AsOut, Value} = case Def of
+ {flag, Key} ->
+ get_flag(Key, AsIn);
+ {option, Key, Default} ->
+ get_option(Key, Default, AsIn)
+ end,
+ {AsOut, [{Key, Value} | RsIn]}
+ end, {As, []}, Defs).
+
+get_option(K, _Default, [K, V | As]) ->
+ {As, V};
+get_option(K, Default, [Nk | As]) ->
+ {As1, V} = get_option(K, Default, As),
+ {[Nk | As1], V};
+get_option(_, Default, As) ->
+ {As, Default}.
+
+get_flag(K, [K | As]) ->
+ {As, true};
+get_flag(K, [Nk | As]) ->
+ {As1, V} = get_flag(K, As),
+ {[Nk | As1], V};
+get_flag(_, []) ->
+ {[], false}.
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index ea3768d4..9257ec82 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -49,6 +49,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok(pid())).
-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
+-spec(shutdown/1 :: (pid()) -> 'ok').
-endif.
@@ -64,7 +65,7 @@ delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).
shutdown(CollectorPid) ->
- gen_server:call(CollectorPid, shutdown, infinity).
+ gen_server:cast(CollectorPid, shutdown).
%%----------------------------------------------------------------------------
@@ -87,13 +88,10 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
rabbit_amqqueue:delete(Q, false, false)
end)
|| {MonitorRef, Q} <- dict:to_list(Queues)],
- {reply, ok, State};
+ {reply, ok, State}.
-handle_call(shutdown, _From, State) ->
- {stop, normal, ok, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
+handle_cast(shutdown, State) ->
+ {stop, normal, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
State = #state{queues = Queues}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b5514c82..f947cd90 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -39,9 +39,11 @@
-export([init/1, mainloop/3]).
--export([server_properties/0]).
+-export([conserve_memory/2, server_properties/0]).
--export([analyze_frame/2]).
+-export([analyze_frame/3]).
+
+-export([emit_stats/1]).
-import(gen_tcp).
-import(fprof).
@@ -57,13 +59,17 @@
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector}).
+-record(v1, {sock, connection, callback, recv_length, recv_ref,
+ connection_state, queue_collector, heartbeater, stats_timer}).
+
+-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
+ send_pend, state, channels]).
--define(INFO_KEYS,
- [pid, address, port, peer_address, peer_port,
- recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
- state, channels, user, vhost, timeout, frame_max, client_properties]).
+-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port,
+ protocol, user, vhost, timeout, frame_max,
+ client_properties]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
%% connection lifecycle
%%
@@ -101,6 +107,17 @@
%% -> log error, mark channel as closing, *running*
%% handshake_timeout -> ignore, *running*
%% heartbeat timeout -> *throw*
+%% conserve_memory=true -> *blocking*
+%% blocking:
+%% conserve_memory=true -> *blocking*
+%% conserve_memory=false -> *running*
+%% receive a method frame for a content-bearing method
+%% -> process, stop receiving, *blocked*
+%% ...rest same as 'running'
+%% blocked:
+%% conserve_memory=true -> *blocked*
+%% conserve_memory=false -> resume receiving, *running*
+%% ...rest same as 'running'
%% closing:
%% socket close -> *terminate*
%% receive connection.close -> send connection.close_ok,
@@ -134,6 +151,11 @@
%%
%% TODO: refactor the code so that the above is obvious
+-define(IS_RUNNING(State),
+ (State#v1.connection_state =:= running orelse
+ State#v1.connection_state =:= blocking orelse
+ State#v1.connection_state =:= blocked)).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -141,7 +163,9 @@
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
+-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
-endif.
@@ -181,6 +205,9 @@ info(Pid, Items) ->
{error, Error} -> throw(Error)
end.
+emit_stats(Pid) ->
+ gen_server:cast(Pid, emit_stats).
+
setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
@@ -208,6 +235,10 @@ teardown_profiling(Value) ->
fprof:analyse([{dest, []}, {cols, 100}])
end.
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
+ ok.
+
server_properties() ->
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -249,11 +280,16 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
- client_properties = none},
+ client_properties = none,
+ protocol = none},
callback = uninitialized_callback,
+ recv_length = 0,
recv_ref = none,
connection_state = pre_init,
- queue_collector = Collector},
+ queue_collector = Collector,
+ heartbeater = none,
+ stats_timer =
+ rabbit_event:init_stats_timer()},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -272,8 +308,9 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%%
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
+ rabbit_misc:unlink_and_capture_exit(Collector),
rabbit_queue_collector:shutdown(Collector),
- rabbit_misc:unlink_and_capture_exit(Collector)
+ rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
@@ -294,6 +331,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
+ {conserve_memory, Conserve} ->
+ mainloop(Parent, Deb, internal_conserve_memory(Conserve, State));
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -315,7 +354,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
terminate_connection ->
State;
handshake_timeout ->
- if State#v1.connection_state =:= running orelse
+ if ?IS_RUNNING(State) orelse
State#v1.connection_state =:= closing orelse
State#v1.connection_state =:= closed ->
mainloop(Parent, Deb, State);
@@ -339,6 +378,12 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
catch Error -> {error, Error}
end),
mainloop(Parent, Deb, State);
+ {'$gen_cast', emit_stats} ->
+ internal_emit_stats(State),
+ mainloop(Parent, Deb,
+ State#v1{stats_timer =
+ rabbit_event:reset_stats_timer_after(
+ State#v1.stats_timer)});
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -347,21 +392,44 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit({unexpected_message, Other})
end.
-switch_callback(OldState, NewCallback, Length) ->
+switch_callback(State = #v1{connection_state = blocked,
+ heartbeater = Heartbeater}, Callback, Length) ->
+ ok = rabbit_heartbeat:pause_monitor(Heartbeater),
+ State#v1{callback = Callback, recv_length = Length, recv_ref = none};
+switch_callback(State, Callback, Length) ->
Ref = inet_op(fun () -> rabbit_net:async_recv(
- OldState#v1.sock, Length, infinity) end),
- OldState#v1{callback = NewCallback,
- recv_ref = Ref}.
+ State#v1.sock, Length, infinity) end),
+ State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}.
-terminate(Explanation, State = #v1{connection_state = running}) ->
+terminate(Explanation, State) when ?IS_RUNNING(State) ->
{normal, send_exception(State, 0,
rabbit_misc:amqp_error(
connection_forced, Explanation, [], none))};
terminate(_Explanation, State) ->
{force, State}.
-close_connection(State = #v1{connection = #connection{
+internal_conserve_memory(true, State = #v1{connection_state = running}) ->
+ State#v1{connection_state = blocking};
+internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
+ State#v1{connection_state = running};
+internal_conserve_memory(false, State = #v1{connection_state = blocked,
+ heartbeater = Heartbeater,
+ callback = Callback,
+ recv_length = Length,
+ recv_ref = none}) ->
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ switch_callback(State#v1{connection_state = running}, Callback, Length);
+internal_conserve_memory(_Conserve, State) ->
+ State.
+
+close_connection(State = #v1{queue_collector = Collector,
+ connection = #connection{
timeout_sec = TimeoutSec}}) ->
+ %% The spec says "Exclusive queues may only be accessed by the
+ %% current connection, and are deleted when that connection
+ %% closes." This does not strictly imply synchrony, but in
+ %% practice it seems to be what people assume.
+ rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
TimeoutMillisec =
@@ -437,24 +505,23 @@ wait_for_channel_termination(N, TimerRef) ->
end.
maybe_close(State = #v1{connection_state = closing,
- queue_collector = Collector}) ->
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
case all_channels() of
[] ->
- %% Spec says "Exclusive queues may only be accessed by the current
- %% connection, and are deleted when that connection closes."
- %% This does not strictly imply synchrony, but in practice it seems
- %% to be what people assume.
- rabbit_queue_collector:delete_all(Collector),
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
- close_connection(State);
+ NewState = close_connection(State),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ NewState;
_ -> State
end;
maybe_close(State) ->
State.
-handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
+handle_frame(Type, 0, Payload,
+ State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload) of
+ case analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -462,31 +529,38 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->
State;
-handle_frame(Type, 0, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, 0, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
end;
-handle_frame(Type, Channel, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, Channel, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{chpid, ChPid} ->
+ ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
case AnalyzedFrame of
{method, 'channel.close', _} ->
- erase({channel, Channel});
- _ -> ok
- end,
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
- State;
+ erase({channel, Channel}),
+ State;
+ {method, MethodName, _} ->
+ case (State#v1.connection_state =:= blocking andalso
+ Protocol:method_has_content(MethodName)) of
+ true -> State#v1{connection_state = blocked};
+ false -> State
+ end;
+ _ ->
+ State
+ end;
closing ->
%% According to the spec, after sending a
%% channel.close we must ignore all frames except
@@ -506,32 +580,37 @@ handle_frame(Type, Channel, Payload, State) ->
end,
State;
undefined ->
- case State#v1.connection_state of
- running -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
- Other -> throw({channel_frame_while_starting,
- Channel, Other, AnalyzedFrame})
+ case ?IS_RUNNING(State) of
+ true -> ok = send_to_new_channel(
+ Channel, AnalyzedFrame, State),
+ State;
+ false -> throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state,
+ AnalyzedFrame})
end
end
end.
-analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) ->
- {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields};
-analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) ->
+analyze_frame(?FRAME_METHOD,
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+analyze_frame(?FRAME_HEADER,
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
{content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body) ->
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
-analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
heartbeat;
-analyze_frame(_Type, _Body) ->
+analyze_frame(_Type, _Body, _Protocol) ->
error.
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
%%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]),
- {State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1};
+ {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1};
handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) ->
case PayloadAndMarker of
@@ -543,54 +622,76 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
throw({bad_payload, PayloadAndMarker})
end;
-handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
- State = #v1{sock = Sock, connection = Connection}) ->
- case check_version({ProtocolMajor, ProtocolMinor},
- {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
- true ->
- ok = send_on_channel0(
- Sock,
- #'connection.start'{
- version_major = ?PROTOCOL_VERSION_MAJOR,
- version_minor = ?PROTOCOL_VERSION_MINOR,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> }),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT},
- connection_state = starting},
- frame_header, 7};
- false ->
- throw({bad_version, ProtocolMajor, ProtocolMinor})
- end;
+%% The two rules pertaining to version negotiation:
+%%
+%% * If the server cannot support the protocol specified in the
+%% protocol header, it MUST respond with a valid protocol header and
+%% then close the socket connection.
+%%
+%% * The server MUST provide a protocol version that is lower than or
+%% equal to that requested by the client in the protocol header.
+handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) ->
+ start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
+
+%% This is the protocol header for 0-9, which we can safely treat as
+%% though it were 0-9-1.
+handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) ->
+ start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
+
+%% This is what most clients send for 0-8. The 0-8 spec, confusingly,
+%% defines the version as 8-0.
+handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+
+%% The 0-8 spec as on the AMQP web site actually has this as the
+%% protocol header; some libraries e.g., py-amqplib, send it when they
+%% want 0-8.
+handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+
+handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, A, B, C, D});
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> rabbit_net:send(
- Sock, <<"AMQP",1,1,
- ?PROTOCOL_VERSION_MAJOR,
- ?PROTOCOL_VERSION_MINOR>>) end),
- throw({bad_header, Other});
+ refuse_connection(Sock, {bad_header, Other});
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
-%% the 0-8 spec, confusingly, defines the version as 8-0
-adjust_version({8,0}) -> {0,8};
-adjust_version(Version) -> Version.
-check_version(ClientVersion, ServerVersion) ->
- {ClientMajor, ClientMinor} = adjust_version(ClientVersion),
- {ServerMajor, ServerMinor} = adjust_version(ServerVersion),
- ClientMajor > ServerMajor
- orelse
- (ClientMajor == ServerMajor andalso
- ClientMinor >= ServerMinor).
+%% Offer a protocol version to the client. Connection.start only
+%% includes a major and minor version number, Luckily 0-9 and 0-9-1
+%% are similar enough that clients will be happy with either.
+start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
+ Protocol,
+ State = #v1{sock = Sock, connection = Connection}) ->
+ Start = #'connection.start'{ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = <<"PLAIN AMQPLAIN">>,
+ locales = <<"en_US">> },
+ ok = send_on_channel0(Sock, Start, Protocol),
+ {State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7}.
+
+refuse_connection(Sock, Exception) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
+ throw(Exception).
+
+ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) ->
+ Self = self(),
+ State#v1{stats_timer = rabbit_event:ensure_stats_timer_after(
+ StatsTimer,
+ fun() -> emit_stats(Self) end)}.
%%--------------------------------------------------------------------------
-handle_method0(MethodName, FieldsBin, State) ->
+handle_method0(MethodName, FieldsBin,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
try
- handle_method0(rabbit_framing:decode_method_fields(
- MethodName, FieldsBin),
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
catch exit:Reason ->
CompleteReason = case Reason of
@@ -598,13 +699,14 @@ handle_method0(MethodName, FieldsBin, State) ->
Reason#amqp_error{method = MethodName};
OtherReason -> OtherReason
end,
- case State#v1.connection_state of
- running -> send_exception(State, 0, CompleteReason);
+ case ?IS_RUNNING(State) of
+ true -> send_exception(State, 0, CompleteReason);
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
- Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, Other, CompleteReason})
+ false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, State#v1.connection_state,
+ CompleteReason})
end
end.
@@ -612,14 +714,14 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
State = #v1{connection_state = starting,
- connection = Connection,
+ connection = Connection =
+ #connection{protocol = Protocol},
sock = Sock}) ->
User = rabbit_access_control:check_login(Mechanism, Response),
- ok = send_on_channel0(
- Sock,
- #'connection.tune'{channel_max = 0,
+ Tune = #'connection.tune'{channel_max = 0,
frame_max = ?FRAME_MAX,
- heartbeat = 0}),
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{
user = User,
@@ -638,53 +740,43 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ Heartbeater = rabbit_heartbeat:start_heartbeat(
+ Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}}
+ frame_max = FrameMax},
+ heartbeater = Heartbeater}
end;
-handle_method0(#'connection.open'{virtual_host = VHostPath,
- insist = Insist},
+handle_method0(#'connection.open'{virtual_host = VHostPath},
+
State = #v1{connection_state = opening,
connection = Connection = #connection{
- user = User},
+ user = User,
+ protocol = Protocol},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
-handle_method0(#'connection.close'{},
- State = #v1{connection_state = running}) ->
+ ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State1 = State#v1{connection_state = running,
+ connection = NewConnection},
+ rabbit_event:notify(
+ connection_created,
+ [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
+ State1;
+handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol},
sock = Sock})
when CS =:= closing; CS =:= closed ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
- ok = send_on_channel0(Sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
@@ -697,23 +789,8 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-send_on_channel0(Sock, Method) ->
- ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
+send_on_channel0(Sock, Method, Protocol) ->
+ ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
%%--------------------------------------------------------------------------
@@ -747,6 +824,10 @@ i(state, #v1{connection_state = S}) ->
S;
i(channels, #v1{}) ->
length(all_channels());
+i(protocol, #v1{connection = #connection{protocol = none}}) ->
+ none;
+i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
+ Protocol:version();
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
@@ -770,11 +851,13 @@ send_to_new_channel(Channel, AnalyzedFrame,
#v1{sock = Sock, connection = #connection{
frame_max = FrameMax,
user = #user{username = Username},
- vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
- ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector]),
+ vhost = VHost,
+ protocol = Protocol}} = State,
+ {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol),
+ {ok, ChPid} = rabbit_framing_channel:start_link(
+ fun rabbit_channel:start_link/6,
+ [Channel, self(), WriterPid, Username, VHost, Collector],
+ Protocol),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
@@ -790,25 +873,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
log_channel_error(CS, Channel, Reason),
send_exception(State, Channel, Reason).
-send_exception(State, Channel, Reason) ->
- {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason),
+send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+ Channel, Reason) ->
+ {ShouldClose, CloseChannel, CloseMethod} =
+ map_exception(Channel, Reason, Protocol),
NewState = case ShouldClose of
true -> terminate_channels(),
close_connection(State);
false -> close_channel(Channel, State)
end,
ok = rabbit_writer:internal_send_command(
- NewState#v1.sock, CloseChannel, CloseMethod),
+ NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
NewState.
-map_exception(Channel, Reason) ->
+map_exception(Channel, Reason, Protocol) ->
{SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
- lookup_amqp_exception(Reason),
+ lookup_amqp_exception(Reason, Protocol),
ShouldClose = SuggestedClose or (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
none -> {0, 0};
- _ -> rabbit_framing:method_id(FailedMethod)
+ _ -> Protocol:method_id(FailedMethod)
end,
{CloseChannel, CloseMethod} =
case ShouldClose of
@@ -823,22 +908,16 @@ map_exception(Channel, Reason) ->
end,
{ShouldClose, CloseChannel, CloseMethod}.
-%% FIXME: this clause can go when we move to AMQP spec >=8.1
-lookup_amqp_exception(#amqp_error{name = precondition_failed,
- explanation = Expl,
- method = Method}) ->
- ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl),
- {false, 406, ExplBin, Method};
lookup_amqp_exception(#amqp_error{name = Name,
explanation = Expl,
- method = Method}) ->
- {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name),
+ method = Method},
+ Protocol) ->
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name),
ExplBin = amqp_exception_explanation(Text, Expl),
{ShouldClose, Code, ExplBin, Method};
-lookup_amqp_exception(Other) ->
+lookup_amqp_exception(Other, Protocol) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {ShouldClose, Code, Text} =
- rabbit_framing:lookup_amqp_exception(internal_error),
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error),
{ShouldClose, Code, Text, none}.
amqp_exception_explanation(Text, Expl) ->
@@ -847,3 +926,7 @@ amqp_exception_explanation(Text, Expl) ->
if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
true -> CompleteTextBin
end.
+
+internal_emit_stats(State) ->
+ rabbit_event:notify(connection_stats,
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index d50b9f31..ec049a1a 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -69,8 +69,8 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
deliver(QPids, Delivery) ->
{Success, _} =
delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
+ fun (Pid) ->
+ rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
lists:foldl(fun fold_deliveries/2, {false, []}, Success),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 090c714b..6812b8d4 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -67,7 +67,8 @@ all_tests() ->
passed = test_log_management(),
passed = test_app_management(),
passed = test_log_management_during_startup(),
- passed = test_memory_pressure(),
+ passed = test_statistics(),
+ passed = test_option_parser(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
@@ -504,8 +505,10 @@ test_content_framing(FrameMax, BodyBin) ->
rabbit_binary_generator:build_simple_content_frames(
1,
rabbit_binary_generator:ensure_content_encoded(
- rabbit_basic:build_content(#'P_basic'{}, BodyBin)),
- FrameMax),
+ rabbit_basic:build_content(#'P_basic'{}, BodyBin),
+ rabbit_framing_amqp_0_9_1),
+ FrameMax,
+ rabbit_framing_amqp_0_9_1),
%% header is formatted correctly and the size is the total of the
%% fragments
<<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
@@ -723,6 +726,30 @@ test_log_management_during_startup() ->
ok = control_action(start_app, []),
passed.
+test_option_parser() ->
+ % command and arguments should just pass through
+ ok = check_get_options({["mock_command", "arg1", "arg2"], []},
+ [], ["mock_command", "arg1", "arg2"]),
+
+ % get flags
+ ok = check_get_options(
+ {["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]},
+ [{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]),
+
+ % get options
+ ok = check_get_options(
+ {["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]},
+ [{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}],
+ ["mock_command", "-foo", "bar"]),
+
+ % shuffled and interleaved arguments and options
+ ok = check_get_options(
+ {["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]},
+ [{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}],
+ ["-f", "a1", "-o1", "hello", "a2", "a3"]),
+
+ passed.
+
test_cluster_management() ->
%% 'cluster' and 'reset' should only work if the app is stopped
@@ -858,7 +885,7 @@ test_cluster_management2(SecondaryNode) ->
%% attempt to leave cluster when no other node is alive
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
- ok = control_action(stop_app, SecondaryNode, []),
+ ok = control_action(stop_app, SecondaryNode, [], []),
ok = control_action(stop_app, []),
{error, {no_running_cluster_nodes, _, _}} =
control_action(reset, []),
@@ -866,9 +893,9 @@ test_cluster_management2(SecondaryNode) ->
%% leave system clustered, with the secondary node as a ram node
ok = control_action(force_reset, []),
ok = control_action(start_app, []),
- ok = control_action(force_reset, SecondaryNode, []),
- ok = control_action(cluster, SecondaryNode, [NodeS]),
- ok = control_action(start_app, SecondaryNode, []),
+ ok = control_action(force_reset, SecondaryNode, [], []),
+ ok = control_action(cluster, SecondaryNode, [NodeS], []),
+ ok = control_action(start_app, SecondaryNode, [], []),
passed.
@@ -888,9 +915,12 @@ test_user_management() ->
{error, {no_such_user, _}} =
control_action(list_user_permissions, ["foo"]),
{error, {no_such_vhost, _}} =
- control_action(list_permissions, ["-p", "/testhost"]),
+ control_action(list_permissions, [], [{"-p", "/testhost"}]),
{error, {invalid_regexp, _, _}} =
control_action(set_permissions, ["guest", "+foo", ".*", ".*"]),
+ {error, {invalid_scope, _}} =
+ control_action(set_permissions, ["guest", "foo", ".*", ".*"],
+ [{"-s", "cilent"}]),
%% user creation
ok = control_action(add_user, ["foo", "bar"]),
@@ -906,16 +936,21 @@ test_user_management() ->
ok = control_action(list_vhosts, []),
%% user/vhost mapping
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
- ok = control_action(list_permissions, ["-p", "/testhost"]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}, {"-s", "client"}]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}, {"-s", "all"}]),
+ ok = control_action(list_permissions, [], [{"-p", "/testhost"}]),
+ ok = control_action(list_permissions, [], [{"-p", "/testhost"}]),
ok = control_action(list_user_permissions, ["foo"]),
%% user/vhost unmapping
- ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
- ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
+ ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]),
+ ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]),
%% vhost deletion
ok = control_action(delete_vhost, ["/testhost"]),
@@ -924,8 +959,8 @@ test_user_management() ->
%% deleting a populated vhost
ok = control_action(add_vhost, ["/testhost"]),
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}]),
ok = control_action(delete_vhost, ["/testhost"]),
%% user deletion
@@ -938,8 +973,8 @@ test_user_management() ->
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
- self()),
+ {ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
+ <<"user">>, <<"/">>, self()),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1030,46 +1065,11 @@ test_hooks() ->
end,
passed.
-test_memory_pressure_receiver(Pid) ->
- receive
- shutdown ->
- ok;
- {send_command, Method} ->
- ok = case Method of
- #'channel.flow'{} -> ok;
- #'basic.qos_ok'{} -> ok;
- #'channel.open_ok'{} -> ok
- end,
- Pid ! Method,
- test_memory_pressure_receiver(Pid);
- sync ->
- Pid ! sync,
- test_memory_pressure_receiver(Pid)
- end.
-
-test_memory_pressure_receive_flow(Active) ->
- receive #'channel.flow'{active = Active} -> ok
- after 1000 -> throw(failed_to_receive_channel_flow)
- end,
- receive #'channel.flow'{} ->
- throw(pipelining_sync_commands_detected)
- after 0 ->
- ok
- end.
-
-test_memory_pressure_sync(Ch, Writer) ->
- ok = rabbit_channel:do(Ch, #'basic.qos'{}),
- Writer ! sync,
- receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
- receive #'basic.qos_ok'{} -> ok
- after 1000 -> throw(failed_to_receive_basic_qos_ok)
- end.
-
-test_memory_pressure_spawn() ->
+test_spawn(Receiver) ->
Me = self(),
- Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
- self()),
+ Writer = spawn(fun () -> Receiver(Me) end),
+ {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>,
+ <<"/">>, self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
@@ -1077,89 +1077,94 @@ test_memory_pressure_spawn() ->
end,
{Writer, Ch, MRef}.
-expect_normal_channel_termination(MRef, Ch) ->
- receive {'DOWN', MRef, process, Ch, normal} -> ok
- after 1000 -> throw(channel_failed_to_exit)
+test_statistics_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_statistics_receiver(Pid)
end.
-gobble_channel_exit() ->
- receive {channel_exit, _, _} -> ok
- after 1000 -> throw(channel_exit_not_received)
+test_statistics_event_receiver(Pid) ->
+ receive
+ Foo ->
+ Pid ! Foo,
+ test_statistics_event_receiver(Pid)
end.
-test_memory_pressure() ->
- {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(),
- [ok = rabbit_channel:conserve_memory(Ch0, Conserve) ||
- Conserve <- [false, false, true, false, true, true, false]],
- ok = test_memory_pressure_sync(Ch0, Writer0),
- receive {'DOWN', MRef0, process, Ch0, Info0} ->
- throw({channel_died_early, Info0})
- after 0 -> ok
- end,
-
- %% we should have just 1 active=false waiting for us
- ok = test_memory_pressure_receive_flow(false),
-
- %% if we reply with flow_ok, we should immediately get an
- %% active=true back
- ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}),
- ok = test_memory_pressure_receive_flow(true),
-
- %% if we publish at this point, the channel should die
- Content = rabbit_basic:build_content(#'P_basic'{}, <<>>),
- ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
- expect_normal_channel_termination(MRef0, Ch0),
- gobble_channel_exit(),
-
- {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(),
- ok = rabbit_channel:conserve_memory(Ch1, true),
- ok = test_memory_pressure_receive_flow(false),
- ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
- ok = test_memory_pressure_sync(Ch1, Writer1),
- ok = rabbit_channel:conserve_memory(Ch1, false),
- ok = test_memory_pressure_receive_flow(true),
- %% send back the wrong flow_ok. Channel should die.
- ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
- expect_normal_channel_termination(MRef1, Ch1),
- gobble_channel_exit(),
-
- {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(),
- %% just out of the blue, send a flow_ok. Life should end.
- ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
- expect_normal_channel_termination(MRef2, Ch2),
- gobble_channel_exit(),
-
- {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(),
- ok = rabbit_channel:conserve_memory(Ch3, true),
- ok = test_memory_pressure_receive_flow(false),
- receive {'DOWN', MRef3, process, Ch3, _} ->
- ok
- after 12000 ->
- throw(channel_failed_to_exit)
- end,
- gobble_channel_exit(),
-
- alarm_handler:set_alarm({vm_memory_high_watermark, []}),
- Me = self(),
- Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>,
- self()),
- ok = rabbit_channel:do(Ch4, #'channel.open'{}),
- MRef4 = erlang:monitor(process, Ch4),
- Writer4 ! sync,
- receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
- receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok)
- after 0 -> ok
- end,
- alarm_handler:clear_alarm(vm_memory_high_watermark),
- Writer4 ! sync,
- receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
- receive #'channel.open_ok'{} -> ok
- after 1000 -> throw(failed_to_receive_channel_open_ok)
- end,
- rabbit_channel:shutdown(Ch4),
- expect_normal_channel_termination(MRef4, Ch4),
+test_statistics_receive_event(Ch, Matcher) ->
+ rabbit_channel:flush(Ch),
+ rabbit_channel:emit_stats(Ch),
+ test_statistics_receive_event1(Ch, Matcher).
+
+test_statistics_receive_event1(Ch, Matcher) ->
+ receive #event{type = channel_stats, props = Props} ->
+ case Matcher(Props) of
+ true -> Props;
+ _ -> test_statistics_receive_event1(Ch, Matcher)
+ end
+ after 1000 -> throw(failed_to_receive_event)
+ end.
+test_statistics() ->
+ application:set_env(rabbit, collect_statistics, fine),
+
+ %% ATM this just tests the queue / exchange stats in channels. That's
+ %% by far the most complex code though.
+
+ %% Set up a channel and queue
+ {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1),
+ rabbit_channel:do(Ch, #'queue.declare'{}),
+ QName = receive #'queue.declare_ok'{queue = Q0} ->
+ Q0
+ after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
+ QPid = Q#amqqueue.pid,
+ X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
+
+ rabbit_tests_event_receiver:start(self()),
+
+ %% Check stats empty
+ Event = test_statistics_receive_event(Ch, fun (_) -> true end),
+ [] = proplists:get_value(channel_queue_stats, Event),
+ [] = proplists:get_value(channel_exchange_stats, Event),
+ [] = proplists:get_value(channel_queue_exchange_stats, Event),
+
+ %% Publish and get a message
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>,
+ routing_key = QName},
+ rabbit_basic:build_content(#'P_basic'{}, <<"">>)),
+ rabbit_channel:do(Ch, #'basic.get'{queue = QName}),
+
+ %% Check the stats reflect that
+ Event2 = test_statistics_receive_event(
+ Ch,
+ fun (E) ->
+ length(proplists:get_value(
+ channel_queue_exchange_stats, E)) > 0
+ end),
+ [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
+ [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
+ [{{QPid,X},[{publish,1}]}] =
+ proplists:get_value(channel_queue_exchange_stats, Event2),
+
+ %% Check the stats remove stuff on queue deletion
+ rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
+ Event3 = test_statistics_receive_event(
+ Ch,
+ fun (E) ->
+ length(proplists:get_value(
+ channel_queue_exchange_stats, E)) == 0
+ end),
+
+ [] = proplists:get_value(channel_queue_stats, Event3),
+ [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3),
+ [] = proplists:get_value(channel_queue_exchange_stats, Event3),
+
+ rabbit_channel:shutdown(Ch),
+ rabbit_tests_event_receiver:stop(),
passed.
test_delegates_async(SecondaryNode) ->
@@ -1251,11 +1256,16 @@ test_delegates_sync(SecondaryNode) ->
%---------------------------------------------------------------------
-control_action(Command, Args) -> control_action(Command, node(), Args).
+control_action(Command, Args) ->
+ control_action(Command, node(), Args, default_options()).
+
+control_action(Command, Args, NewOpts) ->
+ control_action(Command, node(), Args,
+ expand_options(default_options(), NewOpts)).
-control_action(Command, Node, Args) ->
+control_action(Command, Node, Args, Opts) ->
case catch rabbit_control:action(
- Command, Node, Args,
+ Command, Node, Args, Opts,
fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
end) of
@@ -1269,13 +1279,28 @@ control_action(Command, Node, Args) ->
info_action(Command, Args, CheckVHost) ->
ok = control_action(Command, []),
- if CheckVHost -> ok = control_action(Command, ["-p", "/"]);
+ if CheckVHost -> ok = control_action(Command, []);
true -> ok
end,
ok = control_action(Command, lists:map(fun atom_to_list/1, Args)),
{bad_argument, dummy} = control_action(Command, ["dummy"]),
ok.
+default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}].
+
+expand_options(As, Bs) ->
+ lists:foldl(fun({K, _}=A, R) ->
+ case proplists:is_defined(K, R) of
+ true -> R;
+ false -> [A | R]
+ end
+ end, Bs, As).
+
+check_get_options({ExpArgs, ExpOpts}, Defs, Args) ->
+ {ExpArgs, ResOpts} = rabbit_misc:get_options(Defs, Args),
+ true = lists:sort(ExpOpts) == lists:sort(ResOpts), % don't care about the order
+ ok.
+
empty_files(Files) ->
[case file:read_file_info(File) of
{ok, FInfo} -> FInfo#file_info.size == 0;
@@ -1763,7 +1788,7 @@ with_fresh_variable_queue(Fun) ->
{len, 0}]),
_ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
passed.
-
+
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,
diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl
new file mode 100644
index 00000000..a92e3da7
--- /dev/null
+++ b/src/rabbit_tests_event_receiver.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_tests_event_receiver).
+
+-export([start/1, stop/0]).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+start(Pid) ->
+ gen_event:add_handler(rabbit_event, ?MODULE, [Pid]).
+
+stop() ->
+ gen_event:delete_handler(rabbit_event, ?MODULE, []).
+
+%%----------------------------------------------------------------------------
+
+init([Pid]) ->
+ {ok, Pid}.
+
+handle_call(_Request, Pid) ->
+ {ok, not_understood, Pid}.
+
+handle_event(Event, Pid) ->
+ Pid ! Event,
+ {ok, Pid}.
+
+handle_info(_Info, Pid) ->
+ {ok, Pid}.
+
+terminate(_Arg, _Pid) ->
+ ok.
+
+code_change(_OldVsn, Pid, _Extra) ->
+ {ok, Pid}.
+
+%%----------------------------------------------------------------------------
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 1f182593..a9313503 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -39,8 +39,8 @@
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0,
- binding/0, amqqueue/0, exchange/0, connection/0, user/0,
- error/1, ok_or_error/1, ok_or_error2/2, ok/1,
+ binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
+ user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1,
channel_exit/0, connection_exit/0]).
-type(channel_exit() :: no_return()).
@@ -137,6 +137,8 @@
-type(connection() :: pid()).
+-type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1').
+
-type(user() ::
#user{username :: rabbit_access_control:username(),
password :: rabbit_access_control:password()}).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 80602038..f90ee734 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,14 +33,14 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/3, start_link/3, shutdown/1, mainloop/1]).
+-export([start/4, start_link/4, shutdown/1, mainloop/1]).
-export([send_command/2, send_command/3, send_command_and_signal_back/3,
send_command_and_signal_back/4, send_command_and_notify/5]).
--export([internal_send_command/3, internal_send_command/5]).
+-export([internal_send_command/4, internal_send_command/6]).
-import(gen_tcp).
--record(wstate, {sock, channel, frame_max}).
+-record(wstate, {sock, channel, frame_max, protocol}).
-define(HIBERNATE_AFTER, 5000).
@@ -48,14 +48,14 @@
-ifdef(use_specs).
--spec(start/3 ::
+-spec(start/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer())
- -> pid()).
--spec(start_link/3 ::
+ non_neg_integer(), rabbit_types:protocol())
+ -> rabbit_types:ok(pid())).
+-spec(start_link/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer())
- -> pid()).
+ non_neg_integer(), rabbit_types:protocol())
+ -> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
@@ -70,29 +70,31 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
--spec(internal_send_command/3 ::
+-spec(internal_send_command/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- rabbit_framing:amqp_method_record())
+ rabbit_framing:amqp_method_record(), rabbit_types:protocol())
-> 'ok').
--spec(internal_send_command/5 ::
+-spec(internal_send_command/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
rabbit_framing:amqp_method_record(), rabbit_types:content(),
- non_neg_integer())
+ non_neg_integer(), rabbit_types:protocol())
-> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start(Sock, Channel, FrameMax) ->
- spawn(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax}]).
-
-start_link(Sock, Channel, FrameMax) ->
- spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+start(Sock, Channel, FrameMax, Protocol) ->
+ {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}]).
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
+
+start_link(Sock, Channel, FrameMax, Protocol) ->
+ {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
mainloop(State) ->
receive
@@ -102,35 +104,40 @@ mainloop(State) ->
end.
handle_message({send_command, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ State = #wstate{sock = Sock, channel = Channel,
+ protocol = Protocol}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
State;
handle_message({send_command, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
State;
handle_message({send_command_and_signal_back, MethodRecord, Parent},
- State = #wstate{sock = Sock, channel = Channel}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ State = #wstate{sock = Sock, channel = Channel,
+ protocol = Protocol}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
Parent ! rabbit_writer_send_command_signal,
State;
handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
Parent ! rabbit_writer_send_command_signal,
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
handle_message({inet_reply, _, ok}, State) ->
@@ -171,30 +178,32 @@ shutdown(W) ->
%---------------------------------------------------------------------------
-assemble_frames(Channel, MethodRecord) ->
+assemble_frames(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
- rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord).
+ rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord,
+ Protocol).
-assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
+assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
- true = rabbit_framing:method_has_content(MethodName), % assertion
+ true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
- Channel, MethodRecord),
+ Channel, MethodRecord, Protocol),
ContentFrames = rabbit_binary_generator:build_simple_content_frames(
- Channel, Content, FrameMax),
+ Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
-internal_send_command(Sock, Channel, MethodRecord) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
+internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
+ ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)).
-internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
+internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
+ Protocol) ->
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)).
+ Content, FrameMax, Protocol)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -214,13 +223,14 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(Sock, Channel, MethodRecord) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)),
+internal_send_command_async(Sock, Channel, MethodRecord, Protocol) ->
+ true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
ok.
-internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
+internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax,
+ Protocol) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)),
+ Content, FrameMax, Protocol)),
ok.
port_cmd(Sock, Data) ->
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index bbc3a8c0..3cbc80f8 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -78,7 +78,7 @@
rabbit_types:ok(pid())).
-spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
--spec(get_vm_limit/0 :: () -> (non_neg_integer() | 'unknown')).
+-spec(get_vm_limit/0 :: () -> non_neg_integer()).
-spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').