diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-09 12:30:29 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-09 12:30:29 +0100 |
commit | f0e94a8cefeefd582e732cfe4ff928578eef73d4 (patch) | |
tree | 4f145083efc44cd2dfe525fd0563b180f73898cd | |
parent | fa440d982ce2a4ecfdcbd5cf984a594ba9f87836 (diff) | |
parent | bf0d78abd2f6757f054dcb06048f6a7eabade0f4 (diff) | |
download | rabbitmq-server-bug23045.tar.gz |
merged default into bug23045bug23045
29 files changed, 1305 insertions, 742 deletions
@@ -11,7 +11,7 @@ syntax: regexp ^dist/ ^include/rabbit_framing\.hrl$ ^include/rabbit_framing_spec\.hrl$ -^src/rabbit_framing\.erl$ +^src/rabbit_framing_amqp.*\.erl$ ^src/.*\_usage.erl$ ^rabbit\.plt$ ^basic.plt$ @@ -12,7 +12,7 @@ EBIN_DIR=ebin INCLUDE_DIR=include DOCS_DIR=docs INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl -SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) +SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) WEB_URL=http://www.rabbitmq.com/ @@ -56,7 +56,8 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json $(AMQP_CODEGEN_DIR)/rabbitmq-0.8-extensions.json +AMQP_SPEC_JSON_FILES_0_9_1=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.9.1.json +AMQP_SPEC_JSON_FILES_0_8=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.8.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -99,11 +100,14 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@ +$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) + $(PYTHON) codegen.py --ignore-conflicts header $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) $@ -$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@ +$(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_9_1) $@ + +$(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_8) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_8) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ @@ -128,7 +132,7 @@ $(BASIC_PLT): $(BEAM_TARGETS) clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel - rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc + rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing_amqp_*.erl codegen.pyc rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL) rm -f $(RABBIT_PLT) rm -f $(DEPS_FILE) @@ -175,6 +179,14 @@ stop-rabbit-on-node: all force-snapshot: all echo "rabbit_persister:force_snapshot()." | $(ERL_CALL) +set-memory-alarm: all + echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \ + $(ERL_CALL) + +clear-memory-alarm: all + echo "alarm_handler:clear_alarm(vm_memory_high_watermark)." | \ + $(ERL_CALL) + stop-node: -$(ERL_CALL) -q @@ -315,11 +315,16 @@ def genErl(spec): methods = spec.allMethods() printFileHeader() - print """-module(rabbit_framing). --include("rabbit_framing.hrl"). - + module = "rabbit_framing_amqp_%d_%d" % (spec.major, spec.minor) + if spec.revision != 0: + module = "%s_%d" % (module, spec.revision) + if module == "rabbit_framing_amqp_8_0": + module = "rabbit_framing_amqp_0_8" + print "-module(%s)." % module + print """-include("rabbit_framing.hrl"). + +-export([version/0]). -export([lookup_method_name/1]). - -export([method_id/1]). -export([method_has_content/1]). -export([is_method_synchronous/1]). @@ -395,6 +400,7 @@ def genErl(spec): print """ %% Method signatures -ifdef(use_specs). +-spec(version/0 :: () -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()). -spec(method_id/1 :: (amqp_method_name()) -> amqp_method()). -spec(method_has_content/1 :: (amqp_method_name()) -> boolean()). @@ -414,6 +420,10 @@ bitvalue(true) -> 1; bitvalue(false) -> 0; bitvalue(undefined) -> 0. """ + version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision) + if version == '{8, 0, 0}': version = '{0, 8, 0}' + print "version() -> %s." % (version) + for m in methods: genLookupMethodName(m) print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})." @@ -472,8 +482,6 @@ def genHrl(spec): methods = spec.allMethods() printFileHeader() - print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major) - print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor) print "-define(PROTOCOL_PORT, %d)." % (spec.port) for (c,v,cls) in spec.constants: diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index e53a97c2..33552e17 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -88,9 +88,6 @@ </listitem> </varlistentry> </variablelist> - <para> - Flags must precede all other parameters to <command>rabbitmqctl</command>. - </para> </refsect1> <refsect1> @@ -271,7 +268,7 @@ <variablelist> <varlistentry id="cluster"> - <term><cmdsynopsis><command>cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> + <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> @@ -336,7 +333,7 @@ </listitem> </varlistentry> <varlistentry id="force_cluster"> - <term><cmdsynopsis><command>force_cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> + <term><cmdsynopsis><command>force_cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> @@ -547,7 +544,7 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>configure</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">-s <replaceable>scope</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> @@ -555,11 +552,21 @@ <listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem> </varlistentry> <varlistentry> - <term>username</term> + <term>scope</term> + <listitem><para>Scope of the permissions: either + <command>client</command> (the default) or + <command>all</command>. This determines whether + permissions are checked for server-generated resource + names (<command>all</command>) or only for + client-specified resource names + (<command>client</command>).</para></listitem> + </varlistentry> + <varlistentry> + <term>user</term> <listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem> </varlistentry> <varlistentry> - <term>configure</term> + <term>conf</term> <listitem><para>A regular expression matching resource names for which the user is granted configure permissions.</para></listitem> </varlistentry> <varlistentry> @@ -888,6 +895,10 @@ <listitem><para>Number of channels using the connection.</para></listitem> </varlistentry> <varlistentry> + <term>protocol</term> + <listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem> + </varlistentry> + <varlistentry> <term>user</term> <listitem><para>Username associated with the connection.</para></listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 2cd28abb..48e19ff8 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -27,4 +27,5 @@ {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, - {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}. + {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, + {collect_statistics, none}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 3fd52568..b9abd788 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -30,13 +30,14 @@ %% -record(user, {username, password}). --record(permission, {configure, write, read}). +-record(permission, {scope, configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). -record(vhost, {virtual_host, dummy}). --record(connection, {user, timeout_sec, frame_max, vhost, client_properties}). +-record(connection, {protocol, user, timeout_sec, frame_max, vhost, + client_properties}). -record(content, {class_id, @@ -44,6 +45,7 @@ properties_bin, %% either 'none', or an encoded properties binary %% Note: at most one of properties and properties_bin can be %% 'none' at once. + protocol, %% The protocol under which properties_bin was encoded payload_fragments_rev %% list of binaries, in reverse order (!) }). @@ -70,16 +72,20 @@ -record(delivery, {mandatory, immediate, txn, sender, message}). -record(amqp_error, {name, explanation, method = none}). +-record(event, {type, props, timestamp}). + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). +-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8"). -define(ERTS_MINIMUM, "5.6.3"). -define(MAX_WAIT, 16#ffffffff). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(STATS_INTERVAL, 5000). -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). diff --git a/src/rabbit.erl b/src/rabbit.erl index 18045b94..41c628a0 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -89,6 +89,13 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). +-rabbit_boot_step({rabbit_event, + [{description, "statistics event manager"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_event]}}, + {requires, external_infrastructure}, + {enables, kernel_ready}]}). + -rabbit_boot_step({kernel_ready, [{description, "kernel ready"}, {requires, external_infrastructure}]}). @@ -426,9 +433,9 @@ print_banner() -> "| ~s +---+ |~n" "| |~n" "+-------------------+~n" - "AMQP ~p-~p~n~s~n~s~n~n", + "~s~n~s~n~s~n~n", [Product, string:right([$v|Version], ProductLen), - ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, + ?PROTOCOL_VERSION, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), Settings = [{"node", node()}, {"app descriptor", app_location()}, diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 55d738ff..8d00f591 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -38,7 +38,7 @@ -export([add_user/2, delete_user/1, change_password/2, list_users/0, lookup_user/1]). -export([add_vhost/1, delete_vhost/1, list_vhosts/0]). --export([set_permissions/5, clear_permissions/2, +-export([set_permissions/5, set_permissions/6, clear_permissions/2, list_vhost_permissions/1, list_user_permissions/1]). %%---------------------------------------------------------------------------- @@ -51,11 +51,14 @@ -type(username() :: binary()). -type(password() :: binary()). -type(regexp() :: binary()). +-type(scope() :: binary()). -spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user() | rabbit_types:channel_exit()). --spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user()). +-spec(user_pass_login/2 :: + (username(), password()) + -> rabbit_types:user() | rabbit_types:channel_exit()). -spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> 'ok' | rabbit_types:channel_exit()). @@ -70,12 +73,14 @@ (username()) -> rabbit_types:ok(rabbit_types:user()) | rabbit_types:error('not_found')). -spec(add_vhost/1 :: - (rabbit_types:vhost()) -> 'ok' | rabbit_types:connection_exit()). + (rabbit_types:vhost()) -> 'ok'). -spec(delete_vhost/1 :: - (rabbit_types:vhost()) -> 'ok' | rabbit_types:connection_exit()). + (rabbit_types:vhost()) -> 'ok'). -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). +-spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(), + regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). -spec(list_vhost_permissions/1 :: (rabbit_types:vhost()) @@ -155,6 +160,7 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. +permission_index(scope) -> #permission.scope; permission_index(configure) -> #permission.configure; permission_index(write) -> #permission.write; permission_index(read) -> #permission.read. @@ -167,7 +173,7 @@ check_resource_access(Username, Permission); check_resource_access(_Username, #resource{name = <<"amq.gen",_/binary>>}, - _Permission) -> + #permission{scope = client}) -> ok; check_resource_access(Username, R = #resource{virtual_host = VHostPath, name = Name}, @@ -298,7 +304,7 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _, _}) -> + lists:foreach(fun ({Username, _, _, _, _}) -> ok = clear_permissions(Username, VHostPath) end, list_vhost_permissions(VHostPath)), @@ -316,7 +322,16 @@ validate_regexp(RegexpBin) -> end. set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> + set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm, + WritePerm, ReadPerm). + +set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), + Scope = case ScopeBin of + <<"client">> -> client; + <<"all">> -> all; + _ -> throw({error, {invalid_scope, ScopeBin}}) + end, rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -326,12 +341,14 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> username = Username, virtual_host = VHostPath}, permission = #permission{ + scope = Scope, configure = ConfigurePerm, write = WritePerm, read = ReadPerm}}, write) end)). + clear_permissions(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( @@ -343,22 +360,23 @@ clear_permissions(Username, VHostPath) -> end)). list_vhost_permissions(VHostPath) -> - [{Username, ConfigurePerm, WritePerm, ReadPerm} || - {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- + [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- list_permissions(rabbit_misc:with_vhost( VHostPath, match_user_vhost('_', VHostPath)))]. list_user_permissions(Username) -> - [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || - {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, permission = #permission{ + scope = Scope, configure = ConfigurePerm, write = WritePerm, read = ReadPerm}} <- diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6eace7e9..2453280e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -39,8 +39,9 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/4]). + stat/1, deliver/2, requeue/3, ack/4, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([emit_stats/1]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). @@ -78,7 +79,7 @@ (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> {'new' | 'existing', rabbit_types:amqqueue()} | - rabbit_types:channel_error()). + rabbit_types:channel_exit()). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found')). @@ -113,6 +114,7 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). +-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -130,6 +132,7 @@ -spec(ack/4 :: (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid()) -> 'ok'). +-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()). -spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). @@ -147,8 +150,7 @@ -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) - -> rabbit_types:amqqueue() | 'not_found' | - rabbit_types:connection_exit()). + -> rabbit_types:amqqueue() | 'not_found'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit()). @@ -158,7 +160,7 @@ -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(maybe_expire/1 :: (pid()) -> 'ok'). --spec(on_node_down/1 :: (node()) -> 'ok' | rabbit_types:connection_exit()). +-spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()). -endif. @@ -303,9 +305,8 @@ check_declare_arguments(QueueName, Args) -> ok -> ok; {error, Error} -> rabbit_misc:protocol_error( precondition_failed, - "Invalid arguments in declaration of queue ~s: " - "~w (on argument: ~w)", - [rabbit_misc:rs(QueueName), Error, Key]) + "invalid arg '~s' for ~s: ~w", + [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]], ok. @@ -353,6 +354,9 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). +emit_stats(#amqqueue{pid = QPid}) -> + delegate_pcast(QPid, 7, emit_stats). + delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -376,9 +380,11 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). +reject(QPid, MsgIds, Requeue, ChPid) -> + delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}). + commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( - fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, QPids). @@ -388,9 +394,6 @@ rollback_all(QPids, Txn, ChPid) -> notify_down_all(QPids, ChPid) -> safe_delegate_call_ok( - %% we don't care if the queue process has terminated in the - %% meantime - fun (_) -> ok end, fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). @@ -447,7 +450,7 @@ internal_delete(QueueName) -> end. maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}, + gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun}, infinity). update_ram_duration(QPid) -> @@ -485,11 +488,11 @@ pseudo_queue(QueueName, Pid) -> arguments = [], pid = Pid}. -safe_delegate_call_ok(H, F, Pids) -> +safe_delegate_call_ok(F, Pids) -> {_, Bad} = delegate:invoke(Pids, fun (Pid) -> rabbit_misc:with_exit_handler( - fun () -> H(Pid) end, + fun () -> ok end, fun () -> F(Pid) end) end), case Bad of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 67f0fcf5..d52660c5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -59,7 +59,8 @@ expires, sync_timer_ref, rate_timer_ref, - expiry_timer_ref + expiry_timer_ref, + stats_timer }). -record(consumer, {tag, ack_required}). @@ -74,13 +75,8 @@ txn, unsent_message_count}). --define(INFO_KEYS, - [name, - durable, - auto_delete, - arguments, - pid, - owner_pid, +-define(STATISTICS_KEYS, + [pid, exclusive_consumer_pid, exclusive_consumer_tag, messages_ready, @@ -91,6 +87,17 @@ backing_queue_status ]). +-define(CREATION_EVENT_KEYS, + [pid, + name, + durable, + auto_delete, + arguments, + owner_pid + ]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). + %%---------------------------------------------------------------------------- start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). @@ -114,7 +121,8 @@ init(Q) -> expires = undefined, sync_timer_ref = undefined, rate_timer_ref = undefined, - expiry_timer_ref = undefined}, hibernate, + expiry_timer_ref = undefined, + stats_timer = rabbit_event:init_stats_timer()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -155,6 +163,10 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), + rabbit_event:notify( + queue_created, + [{Item, i(Item, State)} || + Item <- ?CREATION_EVENT_KEYS]), noreply(init_expires(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -173,6 +185,7 @@ terminate_shutdown(Fun, State) -> BQ:tx_rollback(Txn, BQSN), BQSN1 end, BQS, all_ch_record()), + rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -189,9 +202,10 @@ noreply(NewState) -> next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), + State2 = ensure_stats_timer(State1), case BQ:needs_idle_timeout(BQS)of - true -> {ensure_sync_timer(State1), 0}; - false -> {stop_sync_timer(State1), hibernate} + true -> {ensure_sync_timer(State2), 0}; + false -> {stop_sync_timer(State2), hibernate} end. ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> @@ -249,6 +263,18 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> State end. +ensure_stats_timer(State = #q{stats_timer = StatsTimer, + q = Q}) -> + State#q{stats_timer = rabbit_event:ensure_stats_timer( + StatsTimer, + fun() -> emit_stats(State) end, + fun() -> rabbit_amqqueue:emit_stats(Q) end)}. + +stop_stats_timer(State = #q{stats_timer = StatsTimer}) -> + State#q{stats_timer = rabbit_event:stop_stats_timer( + StatsTimer, + fun() -> emit_stats(State) end)}. + assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -560,6 +586,10 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). +emit_stats(State) -> + rabbit_event:notify(queue_stats, + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). + %--------------------------------------------------------------------------- handle_call({init, Recover}, From, @@ -783,6 +813,21 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State#q{backing_queue_state = BQS1}) end; +handle_cast({reject, AckTags, Requeue, ChPid}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + case lookup_ch(ChPid) of + not_found -> + noreply(State); + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(case Requeue of + true -> requeue_and_run(AckTags, State); + false -> BQS1 = BQ:ack(AckTags, BQS), + State #q { backing_queue_state = BQS1 } + end) + end; + handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); @@ -841,7 +886,11 @@ handle_cast(maybe_expire, State) -> true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) - end. + end; + +handle_cast(emit_stats, State) -> + emit_stats(State), + noreply(State). handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -878,4 +927,5 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), infinity), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}. + {hibernate, stop_stats_timer( + stop_rate_timer(State#q{backing_queue_state = BQS2}))}. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 848c1e91..c1445a0c 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -49,12 +49,10 @@ | rabbit_types:error('not_found'))). -spec(publish/1 :: - (rabbit_types:delivery()) -> publish_result() | - rabbit_types:connection_exit()). + (rabbit_types:delivery()) -> publish_result()). -spec(delivery/4 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message()) - -> rabbit_types:delivery()). + rabbit_types:message()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) @@ -64,12 +62,12 @@ -spec(publish/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) - -> publish_result() | rabbit_types:connection_exit()). + -> publish_result()). -spec(publish/7 :: (rabbit_exchange:name(), rabbit_router:routing_key(), boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), properties_input(), binary()) - -> publish_result() | rabbit_types:connection_exit()). + -> publish_result()). -spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) -> rabbit_types:content()). @@ -99,18 +97,24 @@ delivery(Mandatory, Immediate, Txn, Message) -> sender = self(), message = Message}. build_content(Properties, BodyBin) -> - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), #content{class_id = ClassId, properties = Properties, properties_bin = none, + protocol = none, payload_fragments_rev = [BodyBin]}. from_content(Content) -> #content{class_id = ClassId, properties = Props, payload_fragments_rev = FragmentsRev} = - rabbit_binary_parser:ensure_content_decoded(Content), - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + rabbit_binary_parser:ensure_content_decoded(Content, + rabbit_framing_amqp_0_9_1), + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), {Props, list_to_binary(lists:reverse(FragmentsRev))}. message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 0e6ebe57..f0ec6180 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -41,12 +41,12 @@ % See definition of check_empty_content_body_frame_size/0, an assertion called at startup. -define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8). --export([build_simple_method_frame/2, - build_simple_content_frames/3, +-export([build_simple_method_frame/3, + build_simple_content_frames/4, build_heartbeat_frame/0]). -export([generate_table/1, encode_properties/2]). -export([check_empty_content_body_frame_size/0]). --export([ensure_content_encoded/1, clear_encoded_content/1]). +-export([ensure_content_encoded/2, clear_encoded_content/1]). -import(lists). @@ -56,20 +56,22 @@ -type(frame() :: [binary()]). --spec(build_simple_method_frame/2 :: - (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record()) +-spec(build_simple_method_frame/3 :: + (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), + rabbit_types:protocol()) -> frame()). --spec(build_simple_content_frames/3 :: +-spec(build_simple_content_frames/4 :: (rabbit_channel:channel_number(), rabbit_types:content(), - non_neg_integer()) + non_neg_integer(), rabbit_types:protocol()) -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). -spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()). -spec(encode_properties/2 :: ([rabbit_framing:amqp_property_type()], [any()]) -> binary()). -spec(check_empty_content_body_frame_size/0 :: () -> 'ok'). --spec(ensure_content_encoded/1 :: - (rabbit_types:content()) -> rabbit_types:encoded_content()). +-spec(ensure_content_encoded/2 :: + (rabbit_types:content(), rabbit_types:protocol()) -> + rabbit_types:encoded_content()). -spec(clear_encoded_content/1 :: (rabbit_types:content()) -> rabbit_types:unencoded_content()). @@ -77,30 +79,24 @@ %%---------------------------------------------------------------------------- -build_simple_method_frame(ChannelInt, MethodRecord) -> - MethodFields = rabbit_framing:encode_method_fields(MethodRecord), +build_simple_method_frame(ChannelInt, MethodRecord, Protocol) -> + MethodFields = Protocol:encode_method_fields(MethodRecord), MethodName = rabbit_misc:method_record_type(MethodRecord), - {ClassId, MethodId} = rabbit_framing:method_id(MethodName), + {ClassId, MethodId} = Protocol:method_id(MethodName), create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]). -build_simple_content_frames(ChannelInt, - #content{class_id = ClassId, - properties = ContentProperties, - properties_bin = ContentPropertiesBin, - payload_fragments_rev = PayloadFragmentsRev}, - FrameMax) -> - {BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt), +build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) -> + #content{class_id = ClassId, + properties_bin = ContentPropertiesBin, + payload_fragments_rev = PayloadFragmentsRev} = + ensure_content_encoded(Content, Protocol), + {BodySize, ContentFrames} = + build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt), HeaderFrame = create_frame(2, ChannelInt, [<<ClassId:16, 0:16, BodySize:64>>, - maybe_encode_properties(ContentProperties, ContentPropertiesBin)]), + ContentPropertiesBin]), [HeaderFrame | ContentFrames]. -maybe_encode_properties(_ContentProperties, ContentPropertiesBin) - when is_binary(ContentPropertiesBin) -> - ContentPropertiesBin; -maybe_encode_properties(ContentProperties, none) -> - rabbit_framing:encode_properties(ContentProperties). - build_content_frames(FragsRev, FrameMax, ChannelInt) -> BodyPayloadMax = if FrameMax == 0 -> iolist_size(FragsRev); @@ -283,13 +279,16 @@ check_empty_content_body_frame_size() -> ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE}) end. -ensure_content_encoded(Content = #content{properties_bin = PropsBin}) +ensure_content_encoded(Content = #content{properties_bin = PropsBin, + protocol = Protocol}, Protocol) when PropsBin =/= 'none' -> Content; -ensure_content_encoded(Content = #content{properties = Props}) -> - Content #content{properties_bin = rabbit_framing:encode_properties(Props)}. +ensure_content_encoded(Content = #content{properties = Props}, Protocol) -> + Content#content{properties_bin = Protocol:encode_properties(Props), + protocol = Protocol}. -clear_encoded_content(Content = #content{properties_bin = none}) -> +clear_encoded_content(Content = #content{properties_bin = none, + protocol = none}) -> Content; clear_encoded_content(Content = #content{properties = none}) -> %% Only clear when we can rebuild the properties_bin later in @@ -297,4 +296,4 @@ clear_encoded_content(Content = #content{properties = none}) -> %% one of properties and properties_bin can be 'none' Content; clear_encoded_content(Content = #content{}) -> - Content#content{properties_bin = none}. + Content#content{properties_bin = none, protocol = none}. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 69e34440..1d0a62af 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -34,7 +34,7 @@ -include("rabbit.hrl"). -export([parse_table/1, parse_properties/2]). --export([ensure_content_decoded/1, clear_decoded_content/1]). +-export([ensure_content_decoded/2, clear_decoded_content/1]). -import(lists). @@ -45,8 +45,9 @@ -spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()). -spec(parse_properties/2 :: ([rabbit_framing:amqp_property_type()], binary()) -> [any()]). --spec(ensure_content_decoded/1 :: - (rabbit_types:content()) -> rabbit_types:decoded_content()). +-spec(ensure_content_decoded/2 :: + (rabbit_types:content(), rabbit_types:protocol()) + -> rabbit_types:decoded_content()). -spec(clear_decoded_content/1 :: (rabbit_types:content()) -> rabbit_types:undecoded_content()). @@ -162,12 +163,12 @@ parse_property(bit, Rest) -> parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) -> {parse_table(Table), Rest}. -ensure_content_decoded(Content = #content{properties = Props}) +ensure_content_decoded(Content = #content{properties = Props}, _Protocol) when Props =/= 'none' -> Content; -ensure_content_decoded(Content = #content{properties_bin = PropBin}) +ensure_content_decoded(Content = #content{properties_bin = PropBin}, Protocol) when is_binary(PropBin) -> - Content#content{properties = rabbit_framing:decode_properties( + Content#content{properties = Protocol:decode_properties( Content#content.class_id, PropBin)}. clear_decoded_content(Content = #content{properties = none}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dafc3075..6f244166 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,10 +36,9 @@ -behaviour(gen_server2). -export([start_link/6, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). +-export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). - --export([flow_timeout/2]). +-export([emit_stats/1, flush/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -48,37 +47,38 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, flow}). - --record(flow, {server, client, pending}). + consumer_mapping, blocking, queue_collector_pid, stats_timer}). -define(MAX_PERMISSION_CACHE_SIZE, 12). --define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds --define(INFO_KEYS, +-define(STATISTICS_KEYS, [pid, - connection, - number, - user, - vhost, transactional, consumer_count, messages_unacknowledged, acks_uncommitted, prefetch_count]). +-define(CREATION_EVENT_KEYS, + [pid, + connection, + number, + user, + vhost]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -export_type([channel_number/0]). --type(ref() :: any()). -type(channel_number() :: non_neg_integer()). -spec(start_link/6 :: (channel_number(), pid(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> pid()). + rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -87,25 +87,22 @@ -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(flow_timeout/2 :: (pid(), ref()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). -spec(info_all/0 :: () -> [[rabbit_types:info()]]). -spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). +-spec(emit_stats/1 :: (pid()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - {ok, Pid} = gen_server2:start_link( - ?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost, CollectorPid], []), - Pid. + gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, + Username, VHost, CollectorPid], []). do(Pid, Method) -> do(Pid, Method, none). @@ -122,15 +119,9 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). -conserve_memory(Pid, Conserve) -> - gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}). - flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -flow_timeout(Pid, Ref) -> - gen_server2:pcast(Pid, 7, {flow_timeout, Ref}). - list() -> pg_local:get_members(rabbit_channels). @@ -151,31 +142,39 @@ info_all() -> info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). +emit_stats(Pid) -> + gen_server2:pcast(Pid, 7, emit_stats). + +flush(Pid) -> + gen_server2:call(Pid, flush). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), - {ok, #ch{state = starting, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new(), - blocking = dict:new(), - queue_collector_pid = CollectorPid, - flow = #flow{server = true, client = true, - pending = none}}, - hibernate, + State = #ch{state = starting, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new(), + blocking = dict:new(), + queue_collector_pid = CollectorPid, + stats_timer = rabbit_event:init_stats_timer()}, + rabbit_event:notify( + channel_created, + [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), + {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(info, _From, State) -> @@ -187,6 +186,9 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; +handle_call(flush, _From, State) -> + reply(ok, State); + handle_call(_Request, _From, State) -> noreply(State). @@ -225,26 +227,16 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), + {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, + maybe_incr_stats([{QPid, 1}], + case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> - noreply(State); -handle_cast({conserve_memory, false}, State = #ch{state = starting}) -> - ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}), - noreply(State#ch{state = running}); -handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) -> - flow_control(not Conserve, State); -handle_cast({conserve_memory, _Conserve}, State) -> - noreply(State); - -handle_cast({flow_timeout, Ref}, - State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> - {stop, normal, terminating( - rabbit_misc:amqp_error( - precondition_failed, - "timeout waiting for channel.flow_ok{active=~w}", - [not Flow], none), State)}; -handle_cast({flow_timeout, _Ref}, State) -> +handle_cast(emit_stats, State) -> + internal_emit_stats(State), {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, @@ -254,11 +246,12 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), - {hibernate, State}. + {hibernate, stop_stats_timer(State)}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -276,9 +269,23 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. +reply(Reply, NewState) -> + {reply, Reply, ensure_stats_timer(NewState), hibernate}. + +noreply(NewState) -> + {noreply, ensure_stats_timer(NewState), hibernate}. -noreply(NewState) -> {noreply, NewState, hibernate}. +ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> + ChPid = self(), + State#ch{stats_timer = rabbit_event:ensure_stats_timer( + StatsTimer, + fun() -> internal_emit_stats(State) end, + fun() -> emit_stats(ChPid) end)}. + +stop_stats_timer(State = #ch{stats_timer = StatsTimer}) -> + State#ch{stats_timer = rabbit_event:stop_stats_timer( + StatsTimer, + fun() -> internal_emit_stats(State) end)}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -385,10 +392,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of - true -> {noreply, State}; - false -> {reply, #'channel.open_ok'{}, State#ch{state = running}} - end; + {reply, #'channel.open_ok'{}, State#ch{state = running}}; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -405,10 +409,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) -> - rabbit_misc:protocol_error( - command_invalid, - "basic.publish received after channel.flow_ok{active=false}", []); handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, @@ -421,7 +421,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), + DecodedContent = rabbit_binary_parser:ensure_content_decoded( + Content, rabbit_framing_amqp_0_9_1), IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -437,6 +438,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, unroutable -> ok = basic_return(Message, WriterPid, no_route); not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, + maybe_incr_stats([{ExchangeName, 1} | + [{{QPid, ExchangeName}, 1} || + QPid <- DeliveredQPids]], publish, State), {noreply, case TxnKey of none -> State; _ -> add_tx_participants(DeliveredQPids, State) @@ -447,7 +451,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, _, State = #ch{transaction_id = TxnKey, unacked_message_q = UAMQ}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(TxnKey, Acked), + QIncs = ack(TxnKey, Acked), + Participants = [QPid || {QPid, _} <- QIncs], + maybe_incr_stats(QIncs, ack, State), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; @@ -470,11 +476,16 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, _QPid, _MsgId, Redelivered, + Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}}} -> State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), + maybe_incr_stats([{QPid, 1}], + case NoAck of + true -> get_no_ack; + false -> get + end, State), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -638,6 +649,17 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}), {noreply, State2}; +handle_method(#'basic.reject'{delivery_tag = DeliveryTag, + requeue = Requeue}, + _, State = #ch{ unacked_message_q = UAMQ}) -> + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), + ok = fold_per_queue( + fun (QPid, MsgIds, ok) -> + rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + end, ok, Acked), + ok = notify_limiter(State#ch.limiter_pid, Acked), + {noreply, State#ch{unacked_message_q = Remaining}}; + handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = false, @@ -735,7 +757,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% the connection shuts down. ok = case Owner of none -> ok; - _ -> rabbit_queue_collector:register(CollectorPid, Q) + _ -> rabbit_queue_collector:register( + CollectorPid, Q) end, return_queue_declare_ok(QueueName, NoWait, 0, 0, State); {existing, _Q} -> @@ -853,48 +876,12 @@ handle_method(#'channel.flow'{active = false}, _, blocking = dict:from_list(Queues)}} end; -handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{flow = #flow{server = Active, client = Flow, - pending = {_Ref, TRef}} = F}) - when Flow =:= not Active -> - {ok, cancel} = timer:cancel(TRef), - {noreply, State#ch{flow = F#flow{client = Active, pending = none}}}; -handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{flow = #flow{server = Flow, client = Flow, - pending = {_Ref, TRef}}}) - when Flow =:= not Active -> - {ok, cancel} = timer:cancel(TRef), - {noreply, issue_flow(Flow, State)}; -handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) -> - rabbit_misc:protocol_error( - command_invalid, "unsolicited channel.flow_ok", []); -handle_method(#'channel.flow_ok'{active = Active}, _, _State) -> - rabbit_misc:protocol_error( - command_invalid, - "received channel.flow_ok{active=~w} has incorrect polarity", [Active]); - handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). %%---------------------------------------------------------------------------- -flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}}) - when Flow =:= not Active -> - ok = clear_permission_cache(), - noreply(issue_flow(Active, State)); -flow_control(Active, State = #ch{flow = F}) -> - noreply(State#ch{flow = F#flow{server = Active}}). - -issue_flow(Active, State) -> - ok = rabbit_writer:send_command( - State#ch.writer_pid, #'channel.flow'{active = Active}), - Ref = make_ref(), - {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, - [self(), Ref]), - State#ch{flow = #flow{server = Active, client = not Active, - pending = {Ref, TRef}}}. - binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, @@ -938,7 +925,7 @@ basic_return(#basic_message{exchange_name = ExchangeName, content = Content}, WriterPid, Reason) -> {_Close, ReplyCode, ReplyText} = - rabbit_framing:lookup_amqp_exception(Reason), + rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, @@ -978,7 +965,7 @@ ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), - [QPid | L] + [{QPid, length(MsgIds)} | L] end, [], UAQ). make_tx_id() -> rabbit_guid:guid(). @@ -1031,7 +1018,7 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). start_limiter(State = #ch{unacked_message_q = UAMQ}) -> - LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)), + {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid. @@ -1105,6 +1092,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> pg_local:leave(rabbit_channels, self()), + rabbit_event:notify(channel_closed, [{pid, self()}]), rabbit_writer:shutdown(WriterPid), rabbit_limiter:shutdown(LimiterPid). @@ -1127,3 +1115,60 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); i(Item, _) -> throw({bad_argument, Item}). + +maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> + case rabbit_event:stats_level(StatsTimer) of + fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; + _ -> ok + end. + +incr_stats({QPid, _} = QX, Inc, Measure) -> + maybe_monitor(QPid), + update_measures(queue_exchange_stats, QX, Inc, Measure); +incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> + maybe_monitor(QPid), + update_measures(queue_stats, QPid, Inc, Measure); +incr_stats(X, Inc, Measure) -> + update_measures(exchange_stats, X, Inc, Measure). + +maybe_monitor(QPid) -> + case get({monitoring, QPid}) of + undefined -> erlang:monitor(process, QPid), + put({monitoring, QPid}, true); + _ -> ok + end. + +update_measures(Type, QX, Inc, Measure) -> + Measures = case get({Type, QX}) of + undefined -> []; + D -> D + end, + Cur = case orddict:find(Measure, Measures) of + error -> 0; + {ok, C} -> C + end, + put({Type, QX}, + orddict:store(Measure, Cur + Inc, Measures)). + +internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> + CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS], + case rabbit_event:stats_level(StatsTimer) of + coarse -> + rabbit_event:notify(channel_stats, CoarseStats); + fine -> + FineStats = + [{channel_queue_stats, + [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, + {channel_exchange_stats, + [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, + {channel_queue_exchange_stats, + [{QX, Stats} || + {{queue_exchange_stats, QX}, Stats} <- get()]}], + rabbit_event:notify(channel_stats, CoarseStats ++ FineStats) + end. + +erase_queue_stats(QPid) -> + erase({monitoring, QPid}), + erase({queue_stats, QPid}), + [erase({queue_exchange_stats, QX}) || + {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6e6ad06c..f0b623c2 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -32,20 +32,25 @@ -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/4]). - --record(params, {quiet, node, command, args}). +-export([start/0, stop/0, action/5]). -define(RPC_TIMEOUT, infinity). +-define(QUIET_OPT, "-q"). +-define(NODE_OPT, "-n"). +-define(VHOST_OPT, "-p"). +-define(SCOPE_OPT, "-s"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). --spec(action/4 :: (atom(), node(), [string()], - fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(action/5 :: + (atom(), node(), [string()], [{string(), any()}], + fun ((string(), [any()]) -> 'ok')) + -> 'ok'). -spec(usage/0 :: () -> no_return()). -endif. @@ -55,18 +60,33 @@ start() -> {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), FullCommand = init:get_plain_arguments(), - #params{quiet = Quiet, node = Node, command = Command, args = Args} = - parse_args(FullCommand, #params{quiet = false, - node = rabbit_misc:makenode(NodeStr)}), + case FullCommand of + [] -> usage(); + _ -> ok + end, + {[Command0 | Args], Opts} = + rabbit_misc:get_options( + [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr}, + {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}], + FullCommand), + Opts1 = lists:map(fun({K, V}) -> + case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)}; + _ -> {K, V} + end + end, Opts), + Command = list_to_atom(Command0), + Quiet = proplists:get_bool(?QUIET_OPT, Opts1), + Node = proplists:get_value(?NODE_OPT, Opts1), Inform = case Quiet of true -> fun (_Format, _Args1) -> ok end; false -> fun (Format, Args1) -> io:format(Format ++ " ...~n", Args1) - end + end end, %% The reason we don't use a try/catch here is that rpc:call turns %% thrown errors into normal return values - case catch action(Command, Node, Args, Inform) of + case catch action(Command, Node, Args, Opts, Inform) of ok -> case Quiet of true -> ok; @@ -118,15 +138,6 @@ print_badrpc_diagnostics(Node) -> fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]), ok. -parse_args(["-n", NodeS | Args], Params) -> - parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)}); -parse_args(["-q" | Args], Params) -> - parse_args(Args, Params#params{quiet = true}); -parse_args([Command | Args], Params) -> - Params#params{command = list_to_atom(Command), args = Args}; -parse_args([], _) -> - usage(). - stop() -> ok. @@ -134,39 +145,39 @@ usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), halt(1). -action(stop, Node, [], Inform) -> +action(stop, Node, [], _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), call(Node, {rabbit, stop_and_halt, []}); -action(stop_app, Node, [], Inform) -> +action(stop_app, Node, [], _Opts, Inform) -> Inform("Stopping node ~p", [Node]), call(Node, {rabbit, stop, []}); -action(start_app, Node, [], Inform) -> +action(start_app, Node, [], _Opts, Inform) -> Inform("Starting node ~p", [Node]), call(Node, {rabbit, start, []}); -action(reset, Node, [], Inform) -> +action(reset, Node, [], _Opts, Inform) -> Inform("Resetting node ~p", [Node]), call(Node, {rabbit_mnesia, reset, []}); -action(force_reset, Node, [], Inform) -> +action(force_reset, Node, [], _Opts, Inform) -> Inform("Forcefully resetting node ~p", [Node]), call(Node, {rabbit_mnesia, force_reset, []}); -action(cluster, Node, ClusterNodeSs, Inform) -> +action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), Inform("Clustering node ~p with ~p", [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); -action(force_cluster, Node, ClusterNodeSs, Inform) -> +action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); -action(status, Node, [], Inform) -> +action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), case call(Node, {rabbit, status, []}) of {badrpc, _} = Res -> Res; @@ -174,129 +185,117 @@ action(status, Node, [], Inform) -> ok end; -action(rotate_logs, Node, [], Inform) -> +action(rotate_logs, Node, [], _Opts, Inform) -> Inform("Reopening logs for node ~p", [Node]), call(Node, {rabbit, rotate_logs, [""]}); -action(rotate_logs, Node, Args = [Suffix], Inform) -> +action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) -> Inform("Rotating logs to files with suffix ~p", [Suffix]), call(Node, {rabbit, rotate_logs, Args}); -action(close_connection, Node, [PidStr, Explanation], Inform) -> +action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> Inform("Closing connection ~s", [PidStr]), rpc_call(Node, rabbit_networking, close_connection, [rabbit_misc:string_to_pid(PidStr), Explanation]); -action(add_user, Node, Args = [Username, _Password], Inform) -> +action(add_user, Node, Args = [Username, _Password], _Opts, Inform) -> Inform("Creating user ~p", [Username]), call(Node, {rabbit_access_control, add_user, Args}); -action(delete_user, Node, Args = [_Username], Inform) -> +action(delete_user, Node, Args = [_Username], _Opts, Inform) -> Inform("Deleting user ~p", Args), call(Node, {rabbit_access_control, delete_user, Args}); -action(change_password, Node, Args = [Username, _Newpassword], Inform) -> +action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> Inform("Changing password for user ~p", [Username]), call(Node, {rabbit_access_control, change_password, Args}); -action(list_users, Node, [], Inform) -> +action(list_users, Node, [], _Opts, Inform) -> Inform("Listing users", []), display_list(call(Node, {rabbit_access_control, list_users, []})); -action(add_vhost, Node, Args = [_VHostPath], Inform) -> +action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Creating vhost ~p", Args), call(Node, {rabbit_access_control, add_vhost, Args}); -action(delete_vhost, Node, Args = [_VHostPath], Inform) -> +action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Deleting vhost ~p", Args), call(Node, {rabbit_access_control, delete_vhost, Args}); -action(list_vhosts, Node, [], Inform) -> +action(list_vhosts, Node, [], _Opts, Inform) -> Inform("Listing vhosts", []), display_list(call(Node, {rabbit_access_control, list_vhosts, []})); -action(list_user_permissions, Node, Args = [_Username], Inform) -> +action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) -> Inform("Listing permissions for user ~p", Args), display_list(call(Node, {rabbit_access_control, list_user_permissions, Args})); -action(list_queues, Node, Args, Inform) -> +action(list_queues, Node, Args, Opts, Inform) -> Inform("Listing queues", []), - {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), - ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, messages]), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); -action(list_exchanges, Node, Args, Inform) -> +action(list_exchanges, Node, Args, Opts, Inform) -> Inform("Listing exchanges", []), - {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), - ArgAtoms = default_if_empty(RemainingArgs, [name, type]), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, type]), display_info_list(rpc_call(Node, rabbit_exchange, info_all, [VHostArg, ArgAtoms]), ArgAtoms); -action(list_bindings, Node, Args, Inform) -> +action(list_bindings, Node, _Args, Opts, Inform) -> Inform("Listing bindings", []), - {VHostArg, _} = parse_vhost_flag_bin(Args), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), InfoKeys = [exchange_name, queue_name, routing_key, args], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], InfoKeys); -action(list_connections, Node, Args, Inform) -> +action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); -action(list_channels, Node, Args, Inform) -> +action(list_channels, Node, Args, _Opts, Inform) -> Inform("Listing channels", []), ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count, messages_unacknowledged]), display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), ArgAtoms); -action(list_consumers, Node, Args, Inform) -> +action(list_consumers, Node, _Args, Opts, Inform) -> Inform("Listing consumers", []), - {VHostArg, _} = parse_vhost_flag_bin(Args), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])], InfoKeys); -action(Command, Node, Args, Inform) -> - {VHost, RemainingArgs} = parse_vhost_flag(Args), - action(Command, Node, VHost, RemainingArgs, Inform). - -action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) -> +action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Scope = proplists:get_value(?SCOPE_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, set_permissions, - [Username, VHost, CPerm, WPerm, RPerm]}); + [Scope, Username, VHost, CPerm, WPerm, RPerm]}); -action(clear_permissions, Node, VHost, [Username], Inform) -> +action(clear_permissions, Node, [Username], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]}); -action(list_permissions, Node, VHost, [], Inform) -> +action(list_permissions, Node, [], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost ~p", [VHost]), display_list(call(Node, {rabbit_access_control, list_vhost_permissions, [VHost]})). -parse_vhost_flag(Args) when is_list(Args) -> - case Args of - ["-p", VHost | RemainingArgs] -> - {VHost, RemainingArgs}; - RemainingArgs -> - {"/", RemainingArgs} - end. - -parse_vhost_flag_bin(Args) -> - {VHost, RemainingArgs} = parse_vhost_flag(Args), - {list_to_binary(VHost), RemainingArgs}. - default_if_empty(List, Default) when is_list(List) -> if List == [] -> Default; @@ -357,6 +356,8 @@ rpc_call(Node, Mod, Fun, Args) -> %% characters. We don't escape characters above 127, since they may %% form part of UTF-8 strings. +escape(Atom) when is_atom(Atom) -> + escape(atom_to_list(Atom)); escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin)); escape(L) when is_list(L) -> diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl new file mode 100644 index 00000000..113ffcb4 --- /dev/null +++ b/src/rabbit_event.erl @@ -0,0 +1,138 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_event). + +-include("rabbit.hrl"). + +-export([start_link/0]). +-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]). +-export([ensure_stats_timer_after/2, reset_stats_timer_after/1]). +-export([stats_level/1]). +-export([notify/2]). + +%%---------------------------------------------------------------------------- + +-record(state, {level, timer}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([event_type/0, event_props/0, event_timestamp/0, event/0]). + +-type(event_type() :: atom()). +-type(event_props() :: term()). +-type(event_timestamp() :: + {non_neg_integer(), non_neg_integer(), non_neg_integer()}). + +-type(event() :: #event { + type :: event_type(), + props :: event_props(), + timestamp :: event_timestamp() + }). + +-type(level() :: 'none' | 'coarse' | 'fine'). + +-opaque(state() :: #state { + level :: level(), + timer :: atom() + }). + +-type(timer_fun() :: fun (() -> 'ok')). + +-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any())). +-spec(init_stats_timer/0 :: () -> state()). +-spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()). +-spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()). +-spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()). +-spec(reset_stats_timer_after/1 :: (state()) -> state()). +-spec(stats_level/1 :: (state()) -> level()). +-spec(notify/2 :: (event_type(), event_props()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_event:start_link({local, ?MODULE}). + +init_stats_timer() -> + {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), + #state{level = StatsLevel, timer = undefined}. + +ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) -> + State; +ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) -> + NowFun(), + {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, + erlang, apply, [TimerFun, []]), + State#state{timer = TRef}; +ensure_stats_timer(State, _NowFun, _TimerFun) -> + State. + +stop_stats_timer(State = #state{level = none}, _NowFun) -> + State; +stop_stats_timer(State = #state{timer = undefined}, _NowFun) -> + State; +stop_stats_timer(State = #state{timer = TRef}, NowFun) -> + {ok, cancel} = timer:cancel(TRef), + NowFun(), + State#state{timer = undefined}. + +ensure_stats_timer_after(State = #state{level = none}, _TimerFun) -> + State; +ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) -> + {ok, TRef} = timer:apply_after(?STATS_INTERVAL, + erlang, apply, [TimerFun, []]), + State#state{timer = TRef}; +ensure_stats_timer_after(State, _TimerFun) -> + State. + +reset_stats_timer_after(State) -> + State#state{timer = undefined}. + +stats_level(#state{level = Level}) -> + Level. + +notify(Type, Props) -> + try + %% TODO: switch to os:timestamp() when we drop support for + %% Erlang/OTP < R13B01 + gen_event:notify(rabbit_event, #event{type = Type, + props = Props, + timestamp = now()}) + catch error:badarg -> + %% badarg means rabbit_event is no longer registered. We never + %% unregister it so the great likelihood is that we're shutting + %% down the broker but some events were backed up. Ignore it. + ok + end. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index b4b2ae68..af4eb1bd 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -67,10 +67,10 @@ fun((rabbit_types:exchange(), queue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). --spec(recover/0 :: () -> 'ok' | rabbit_types:connection_exit()). +-spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table()) - -> rabbit_types:exchange() | rabbit_types:connection_exit()). + -> rabbit_types:exchange()). -spec(check_type/1 :: (binary()) -> atom() | rabbit_types:connection_exit()). -spec(assert_equivalence/5 :: @@ -96,32 +96,26 @@ -spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) -> [[rabbit_types:info()]]). -spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) - -> {rabbit_router:routing_result(), [pid()]} | - rabbit_types:connection_exit()). + -> {rabbit_router:routing_result(), [pid()]}). -spec(add_binding/5 :: (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table(), inner_fun()) - -> bind_res() | rabbit_types:connection_exit()). + rabbit_framing:amqp_table(), inner_fun()) -> bind_res()). -spec(delete_binding/5 :: (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), rabbit_framing:amqp_table(), inner_fun()) - -> bind_res() | rabbit_types:error('binding_not_found') | - rabbit_types:connection_exit()). + -> bind_res() | rabbit_types:error('binding_not_found')). -spec(list_bindings/1 :: (rabbit_types:vhost()) -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), rabbit_framing:amqp_table()}]). -spec(delete_queue_bindings/1 :: - (rabbit_amqqueue:name()) - -> fun (() -> none()) | rabbit_types:connection_exit()). + (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(delete_transient_queue_bindings/1 :: - (rabbit_amqqueue:name()) - -> fun (() -> none()) | rabbit_types:connection_exit()). + (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(delete/2 :: (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | - rabbit_types:error('in_use') | - rabbit_types:connection_exit()). + rabbit_types:error('in_use')). -spec(list_queue_bindings/1 :: (rabbit_amqqueue:name()) -> [{name(), rabbit_router:routing_key(), @@ -198,6 +192,9 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> end end) of {new, X} -> TypeModule:create(X), + rabbit_event:notify( + exchange_created, + [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]), X; {existing, X} -> X; Err -> Err @@ -205,12 +202,8 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - case rabbit_exchange_type_registry:lookup_module(T) of - {ok, Module} -> Module; - {error, not_found} -> rabbit_misc:protocol_error( - command_invalid, - "invalid exchange type '~s'", [T]) - end. + {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + Module. %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> @@ -219,8 +212,12 @@ check_type(TypeBin) -> rabbit_misc:protocol_error( command_invalid, "unknown exchange type '~s'", [TypeBin]); T -> - _Module = type_to_module(T), - T + case rabbit_exchange_type_registry:lookup_module(T) of + {error, not_found} -> rabbit_misc:protocol_error( + command_invalid, + "invalid exchange type '~s'", [T]); + {ok, _Module} -> T + end end. assert_equivalence(X = #exchange{ durable = Durable, @@ -386,7 +383,6 @@ cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> [X] = mnesia:read({rabbit_exchange, ExchangeName}), {maybe_auto_delete(X), Bindings}. - delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). @@ -435,6 +431,12 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> X#exchange.durable andalso Q#amqqueue.durable, fun mnesia:write/3), + rabbit_event:notify( + binding_created, + [{exchange_name, ExchangeName}, + {queue_name, QueueName}, + {routing_key, RoutingKey}, + {arguments, Arguments}]), {new, X, B}; [_R] -> {existing, X, B} @@ -467,6 +469,10 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> X#exchange.durable andalso Q#amqqueue.durable, fun mnesia:delete_object/3), + rabbit_event:notify( + binding_deleted, + [{exchange_name, ExchangeName}, + {queue_name, QueueName}]), {maybe_auto_delete(X), B}; {error, _} = E -> E @@ -585,6 +591,7 @@ unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> Bindings = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), ok = mnesia:delete({rabbit_exchange, ExchangeName}), + rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), {deleted, Exchange, Bindings}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index bc1a2a08..00b74ad0 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,21 +32,22 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/2, process/2, shutdown/1]). +-export([start_link/3, process/2, shutdown/1]). %% internal --export([mainloop/1]). +-export([mainloop/2]). %%-------------------------------------------------------------------- -start_link(StartFun, StartArgs) -> - spawn_link( - fun () -> - %% we trap exits so that a normal termination of the - %% channel or reader process terminates us too. - process_flag(trap_exit, true), - mainloop(apply(StartFun, StartArgs)) - end). +start_link(StartFun, StartArgs, Protocol) -> + {ok, spawn_link( + fun () -> + %% we trap exits so that a normal termination of + %% the channel or reader process terminates us too. + process_flag(trap_exit, true), + {ok, ChannelPid} = apply(StartFun, StartArgs), + mainloop(ChannelPid, Protocol) + end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -72,37 +73,42 @@ read_frame(ChannelPid) -> Msg -> exit({unexpected_message, Msg}) end. -mainloop(ChannelPid) -> - {method, MethodName, FieldsBin} = read_frame(ChannelPid), - Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin), - case rabbit_framing:method_has_content(MethodName) of - true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), - rabbit_channel:do(ChannelPid, Method, - collect_content(ChannelPid, ClassId)); - false -> rabbit_channel:do(ChannelPid, Method) - end, - ?MODULE:mainloop(ChannelPid). +mainloop(ChannelPid, Protocol) -> + case read_frame(ChannelPid) of + {method, MethodName, FieldsBin} -> + Method = Protocol:decode_method_fields(MethodName, FieldsBin), + case Protocol:method_has_content(MethodName) of + true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), + rabbit_channel:do(ChannelPid, Method, + collect_content(ChannelPid, + ClassId, + Protocol)); + false -> rabbit_channel:do(ChannelPid, Method) + end, + ?MODULE:mainloop(ChannelPid, Protocol); + _ -> + unexpected_frame("expected method frame, " + "got non method frame instead", + []) + end. -collect_content(ChannelPid, ClassId) -> +collect_content(ChannelPid, ClassId, Protocol) -> case read_frame(ChannelPid) of {content_header, ClassId, 0, BodySize, PropertiesBin} -> Payload = collect_content_payload(ChannelPid, BodySize, []), #content{class_id = ClassId, properties = none, properties_bin = PropertiesBin, + protocol = Protocol, payload_fragments_rev = Payload}; {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> - rabbit_misc:protocol_error( - command_invalid, - "expected content header for class ~w, " - "got one for class ~w instead", - [ClassId, HeaderClassId]); + unexpected_frame("expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId]); _ -> - rabbit_misc:protocol_error( - command_invalid, - "expected content header for class ~w, " - "got non content header frame instead", - [ClassId]) + unexpected_frame("expected content header for class ~w, " + "got non content header frame instead", + [ClassId]) end. collect_content_payload(_ChannelPid, 0, Acc) -> @@ -114,8 +120,10 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> RemainingByteCount - size(FragmentBin), [FragmentBin | Acc]); _ -> - rabbit_misc:protocol_error( - command_invalid, - "expected content body, got non content body frame instead", - []) + unexpected_frame("expected content body, " + "got non content body frame instead", + []) end. + +unexpected_frame(ExplanationFormat, Params) -> + rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 1989fb7b..faddffc1 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,14 +31,17 @@ -module(rabbit_heartbeat). --export([start_heartbeat/2]). +-export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> - rabbit_types:maybe({pid(), pid()})). +-type(pids() :: rabbit_types:maybe({pid(), pid()})). + +-spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()). +-spec(pause_monitor/1 :: (pids()) -> 'ok'). +-spec(resume_monitor/1 :: (pids()) -> 'ok'). -endif. @@ -70,20 +73,43 @@ start_heartbeat(Sock, TimeoutSec) -> end}, Parent) end), {Sender, Receiver}. +pause_monitor(none) -> + ok; +pause_monitor({_Sender, Receiver}) -> + Receiver ! pause, + ok. + +resume_monitor(none) -> + ok; +resume_monitor({_Sender, Receiver}) -> + Receiver ! resume, + ok. + +%%---------------------------------------------------------------------------- + heartbeater(Params, Parent) -> heartbeater(Params, erlang:monitor(process, Parent), {0, 0}). heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, MonitorRef, {StatVal, SameCount}) -> + Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, receive {'DOWN', MonitorRef, process, _Object, _Info} -> ok; + pause -> + receive + {'DOWN', MonitorRef, process, _Object, _Info} -> + ok; + resume -> + Recurse({0, 0}); + Other -> + exit({unexpected_message, Other}) + end; Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> - Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, if NewStatVal =/= StatVal -> Recurse({NewStatVal, 0}); SameCount < Threshold -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 878af029..813ccc75 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -45,7 +45,7 @@ -type(maybe_pid() :: pid() | 'undefined'). --spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). +-spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). @@ -74,8 +74,7 @@ %%---------------------------------------------------------------------------- start_link(ChPid, UnackedMsgCount) -> - {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []), - Pid. + gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). shutdown(undefined) -> ok; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index af800072..f0c2bffb 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -64,7 +64,7 @@ -export([version_compare/2, version_compare/3]). -export([recursive_delete/1, dict_cons/3, orddict_cons/3, unlink_and_capture_exit/1]). --export([get_real_sock/1]). +-export([get_real_sock/1, get_options/2]). -import(mnesia). -import(lists). @@ -82,14 +82,16 @@ -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). -type(resource_name() :: binary()). +-type(optdef() :: {flag, string()} | {option, string(), any()}). +-type(channel_or_connection_exit() + :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). -spec(polite_pause/0 :: () -> 'done'). -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). -spec(die/1 :: - (rabbit_framing:amqp_exception()) - -> rabbit_types:channel_exit() | rabbit_types:connection_exit()). + (rabbit_framing:amqp_exception()) -> channel_or_connection_exit()). -spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary()) -> rabbit_types:connection_exit()). -spec(amqp_error/4 :: @@ -97,14 +99,12 @@ rabbit_framing:amqp_method_name()) -> rabbit_types:amqp_error()). -spec(protocol_error/3 :: (rabbit_framing:amqp_exception(), string(), [any()]) - -> rabbit_types:channel_exit() | - rabbit_types:connection_exit()). + -> channel_or_connection_exit()). -spec(protocol_error/4 :: (rabbit_framing:amqp_exception(), string(), [any()], - rabbit_framing:amqp_method_name()) - -> rabbit_types:channel_exit() | - rabbit_types:connection_exit()). --spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()). + rabbit_framing:amqp_method_name()) -> channel_or_connection_exit()). +-spec(protocol_error/1 :: + (rabbit_types:amqp_error()) -> channel_or_connection_exit()). -spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()). -spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(), @@ -190,6 +190,8 @@ orddict:dictionary()). -spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -spec(get_real_sock/1 :: (#sslsocket{}) -> any()). +-spec(get_options/2 :: ([optdef()], [string()]) + -> {[string()], [{string(), any()}]}). -endif. @@ -236,9 +238,9 @@ assert_args_equivalence1(Orig, New, Name, Key) -> {Same, Same} -> ok; {Orig1, New1} -> protocol_error( not_allowed, - "cannot redeclare ~s with inequivalent args for ~s: " + "inequivalent arg '~s' for ~s: " "required ~w, received ~w", - [rabbit_misc:rs(Name), Key, New1, Orig1]) + [Key, rabbit_misc:rs(Name), New1, Orig1]) end. get_config(Key) -> @@ -714,3 +716,36 @@ get_real_sock(#sslsocket{pid = {Sock, _}}) -> Sock; get_real_sock(Sock) -> Sock. + +% Separate flags and options from arguments. +% get_options([{flag, "-q"}, {option, "-p", "/"}], +% ["set_permissions","-p","/","guest", +% "-q",".*",".*",".*"]) +% == {["set_permissions","guest",".*",".*",".*"], +% [{"-q",true},{"-p","/"}]} +get_options(Defs, As) -> + lists:foldl(fun(Def, {AsIn, RsIn}) -> + {AsOut, Value} = case Def of + {flag, Key} -> + get_flag(Key, AsIn); + {option, Key, Default} -> + get_option(Key, Default, AsIn) + end, + {AsOut, [{Key, Value} | RsIn]} + end, {As, []}, Defs). + +get_option(K, _Default, [K, V | As]) -> + {As, V}; +get_option(K, Default, [Nk | As]) -> + {As1, V} = get_option(K, Default, As), + {[Nk | As1], V}; +get_option(_, Default, As) -> + {As, Default}. + +get_flag(K, [K | As]) -> + {As, true}; +get_flag(K, [Nk | As]) -> + {As1, V} = get_flag(K, As), + {[Nk | As1], V}; +get_flag(_, []) -> + {[], false}. diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index ea3768d4..9257ec82 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -49,6 +49,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok(pid())). -spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). +-spec(shutdown/1 :: (pid()) -> 'ok'). -endif. @@ -64,7 +65,7 @@ delete_all(CollectorPid) -> gen_server:call(CollectorPid, delete_all, infinity). shutdown(CollectorPid) -> - gen_server:call(CollectorPid, shutdown, infinity). + gen_server:cast(CollectorPid, shutdown). %%---------------------------------------------------------------------------- @@ -87,13 +88,10 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) -> rabbit_amqqueue:delete(Q, false, false) end) || {MonitorRef, Q} <- dict:to_list(Queues)], - {reply, ok, State}; + {reply, ok, State}. -handle_call(shutdown, _From, State) -> - {stop, normal, ok, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(shutdown, State) -> + {stop, normal, State}. handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, State = #state{queues = Queues}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b5514c82..f947cd90 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -39,9 +39,11 @@ -export([init/1, mainloop/3]). --export([server_properties/0]). +-export([conserve_memory/2, server_properties/0]). --export([analyze_frame/2]). +-export([analyze_frame/3]). + +-export([emit_stats/1]). -import(gen_tcp). -import(fprof). @@ -57,13 +59,17 @@ %--------------------------------------------------------------------------- --record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector}). +-record(v1, {sock, connection, callback, recv_length, recv_ref, + connection_state, queue_collector, heartbeater, stats_timer}). + +-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, + send_pend, state, channels]). --define(INFO_KEYS, - [pid, address, port, peer_address, peer_port, - recv_oct, recv_cnt, send_oct, send_cnt, send_pend, - state, channels, user, vhost, timeout, frame_max, client_properties]). +-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, + protocol, user, vhost, timeout, frame_max, + client_properties]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). %% connection lifecycle %% @@ -101,6 +107,17 @@ %% -> log error, mark channel as closing, *running* %% handshake_timeout -> ignore, *running* %% heartbeat timeout -> *throw* +%% conserve_memory=true -> *blocking* +%% blocking: +%% conserve_memory=true -> *blocking* +%% conserve_memory=false -> *running* +%% receive a method frame for a content-bearing method +%% -> process, stop receiving, *blocked* +%% ...rest same as 'running' +%% blocked: +%% conserve_memory=true -> *blocked* +%% conserve_memory=false -> resume receiving, *running* +%% ...rest same as 'running' %% closing: %% socket close -> *terminate* %% receive connection.close -> send connection.close_ok, @@ -134,6 +151,11 @@ %% %% TODO: refactor the code so that the above is obvious +-define(IS_RUNNING(State), + (State#v1.connection_state =:= running orelse + State#v1.connection_state =:= blocking orelse + State#v1.connection_state =:= blocked)). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -141,7 +163,9 @@ -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). +-spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). -endif. @@ -181,6 +205,9 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. +emit_stats(Pid) -> + gen_server:cast(Pid, emit_stats). + setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of @@ -208,6 +235,10 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +conserve_memory(Pid, Conserve) -> + Pid ! {conserve_memory, Conserve}, + ok. + server_properties() -> {ok, Product} = application:get_key(rabbit, id), {ok, Version} = application:get_key(rabbit, vsn), @@ -249,11 +280,16 @@ start_connection(Parent, Deb, Sock, SockTransform) -> timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, vhost = none, - client_properties = none}, + client_properties = none, + protocol = none}, callback = uninitialized_callback, + recv_length = 0, recv_ref = none, connection_state = pre_init, - queue_collector = Collector}, + queue_collector = Collector, + heartbeater = none, + stats_timer = + rabbit_event:init_stats_timer()}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -272,8 +308,9 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), + rabbit_misc:unlink_and_capture_exit(Collector), rabbit_queue_collector:shutdown(Collector), - rabbit_misc:unlink_and_capture_exit(Collector) + rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. @@ -294,6 +331,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end; {inet_async, Sock, Ref, {error, Reason}} -> throw({inet_error, Reason}); + {conserve_memory, Conserve} -> + mainloop(Parent, Deb, internal_conserve_memory(Conserve, State)); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -315,7 +354,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> terminate_connection -> State; handshake_timeout -> - if State#v1.connection_state =:= running orelse + if ?IS_RUNNING(State) orelse State#v1.connection_state =:= closing orelse State#v1.connection_state =:= closed -> mainloop(Parent, Deb, State); @@ -339,6 +378,12 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> catch Error -> {error, Error} end), mainloop(Parent, Deb, State); + {'$gen_cast', emit_stats} -> + internal_emit_stats(State), + mainloop(Parent, Deb, + State#v1{stats_timer = + rabbit_event:reset_stats_timer_after( + State#v1.stats_timer)}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -347,21 +392,44 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit({unexpected_message, Other}) end. -switch_callback(OldState, NewCallback, Length) -> +switch_callback(State = #v1{connection_state = blocked, + heartbeater = Heartbeater}, Callback, Length) -> + ok = rabbit_heartbeat:pause_monitor(Heartbeater), + State#v1{callback = Callback, recv_length = Length, recv_ref = none}; +switch_callback(State, Callback, Length) -> Ref = inet_op(fun () -> rabbit_net:async_recv( - OldState#v1.sock, Length, infinity) end), - OldState#v1{callback = NewCallback, - recv_ref = Ref}. + State#v1.sock, Length, infinity) end), + State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}. -terminate(Explanation, State = #v1{connection_state = running}) -> +terminate(Explanation, State) when ?IS_RUNNING(State) -> {normal, send_exception(State, 0, rabbit_misc:amqp_error( connection_forced, Explanation, [], none))}; terminate(_Explanation, State) -> {force, State}. -close_connection(State = #v1{connection = #connection{ +internal_conserve_memory(true, State = #v1{connection_state = running}) -> + State#v1{connection_state = blocking}; +internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> + State#v1{connection_state = running}; +internal_conserve_memory(false, State = #v1{connection_state = blocked, + heartbeater = Heartbeater, + callback = Callback, + recv_length = Length, + recv_ref = none}) -> + ok = rabbit_heartbeat:resume_monitor(Heartbeater), + switch_callback(State#v1{connection_state = running}, Callback, Length); +internal_conserve_memory(_Conserve, State) -> + State. + +close_connection(State = #v1{queue_collector = Collector, + connection = #connection{ timeout_sec = TimeoutSec}}) -> + %% The spec says "Exclusive queues may only be accessed by the + %% current connection, and are deleted when that connection + %% closes." This does not strictly imply synchrony, but in + %% practice it seems to be what people assume. + rabbit_queue_collector:delete_all(Collector), %% We terminate the connection after the specified interval, but %% no later than ?CLOSING_TIMEOUT seconds. TimeoutMillisec = @@ -437,24 +505,23 @@ wait_for_channel_termination(N, TimerRef) -> end. maybe_close(State = #v1{connection_state = closing, - queue_collector = Collector}) -> + connection = #connection{protocol = Protocol}, + sock = Sock}) -> case all_channels() of [] -> - %% Spec says "Exclusive queues may only be accessed by the current - %% connection, and are deleted when that connection closes." - %% This does not strictly imply synchrony, but in practice it seems - %% to be what people assume. - rabbit_queue_collector:delete_all(Collector), - ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), - close_connection(State); + NewState = close_connection(State), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), + NewState; _ -> State end; maybe_close(State) -> State. -handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) +handle_frame(Type, 0, Payload, + State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}}) when CS =:= closing; CS =:= closed -> - case analyze_frame(Type, Payload) of + case analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State @@ -462,31 +529,38 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> State; -handle_frame(Type, 0, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, 0, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; - trace -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); Other -> throw({unexpected_frame_on_channel0, Other}) end; -handle_frame(Type, Channel, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, Channel, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - trace -> throw({unexpected_trace_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {chpid, ChPid} -> + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), case AnalyzedFrame of {method, 'channel.close', _} -> - erase({channel, Channel}); - _ -> ok - end, - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), - State; + erase({channel, Channel}), + State; + {method, MethodName, _} -> + case (State#v1.connection_state =:= blocking andalso + Protocol:method_has_content(MethodName)) of + true -> State#v1{connection_state = blocked}; + false -> State + end; + _ -> + State + end; closing -> %% According to the spec, after sending a %% channel.close we must ignore all frames except @@ -506,32 +580,37 @@ handle_frame(Type, Channel, Payload, State) -> end, State; undefined -> - case State#v1.connection_state of - running -> ok = send_to_new_channel( - Channel, AnalyzedFrame, State), - State; - Other -> throw({channel_frame_while_starting, - Channel, Other, AnalyzedFrame}) + case ?IS_RUNNING(State) of + true -> ok = send_to_new_channel( + Channel, AnalyzedFrame, State), + State; + false -> throw({channel_frame_while_starting, + Channel, State#v1.connection_state, + AnalyzedFrame}) end end end. -analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) -> - {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields}; -analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) -> +analyze_frame(?FRAME_METHOD, + <<ClassId:16, MethodId:16, MethodFields/binary>>, + Protocol) -> + MethodName = Protocol:lookup_method_name({ClassId, MethodId}), + {method, MethodName, MethodFields}; +analyze_frame(?FRAME_HEADER, + <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, + _Protocol) -> {content_header, ClassId, Weight, BodySize, Properties}; -analyze_frame(?FRAME_BODY, Body) -> +analyze_frame(?FRAME_BODY, Body, _Protocol) -> {content_body, Body}; -analyze_frame(?FRAME_TRACE, _Body) -> - trace; -analyze_frame(?FRAME_HEARTBEAT, <<>>) -> +analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> heartbeat; -analyze_frame(_Type, _Body) -> +analyze_frame(_Type, _Body, _Protocol) -> error. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> %%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]), - {State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1}; + {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1}; handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of @@ -543,54 +622,76 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat throw({bad_payload, PayloadAndMarker}) end; -handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, - State = #v1{sock = Sock, connection = Connection}) -> - case check_version({ProtocolMajor, ProtocolMinor}, - {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of - true -> - ok = send_on_channel0( - Sock, - #'connection.start'{ - version_major = ?PROTOCOL_VERSION_MAJOR, - version_minor = ?PROTOCOL_VERSION_MINOR, - server_properties = server_properties(), - mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }), - {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT}, - connection_state = starting}, - frame_header, 7}; - false -> - throw({bad_version, ProtocolMajor, ProtocolMinor}) - end; +%% The two rules pertaining to version negotiation: +%% +%% * If the server cannot support the protocol specified in the +%% protocol header, it MUST respond with a valid protocol header and +%% then close the socket connection. +%% +%% * The server MUST provide a protocol version that is lower than or +%% equal to that requested by the client in the protocol header. +handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) -> + start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); + +%% This is the protocol header for 0-9, which we can safely treat as +%% though it were 0-9-1. +handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) -> + start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); + +%% This is what most clients send for 0-8. The 0-8 spec, confusingly, +%% defines the version as 8-0. +handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> + start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); + +%% The 0-8 spec as on the AMQP web site actually has this as the +%% protocol header; some libraries e.g., py-amqplib, send it when they +%% want 0-8. +handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> + start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); + +handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_version, A, B, C, D}); handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> rabbit_net:send( - Sock, <<"AMQP",1,1, - ?PROTOCOL_VERSION_MAJOR, - ?PROTOCOL_VERSION_MINOR>>) end), - throw({bad_header, Other}); + refuse_connection(Sock, {bad_header, Other}); handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). -%% the 0-8 spec, confusingly, defines the version as 8-0 -adjust_version({8,0}) -> {0,8}; -adjust_version(Version) -> Version. -check_version(ClientVersion, ServerVersion) -> - {ClientMajor, ClientMinor} = adjust_version(ClientVersion), - {ServerMajor, ServerMinor} = adjust_version(ServerVersion), - ClientMajor > ServerMajor - orelse - (ClientMajor == ServerMajor andalso - ClientMinor >= ServerMinor). +%% Offer a protocol version to the client. Connection.start only +%% includes a major and minor version number, Luckily 0-9 and 0-9-1 +%% are similar enough that clients will be happy with either. +start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, + Protocol, + State = #v1{sock = Sock, connection = Connection}) -> + Start = #'connection.start'{ version_major = ProtocolMajor, + version_minor = ProtocolMinor, + server_properties = server_properties(), + mechanisms = <<"PLAIN AMQPLAIN">>, + locales = <<"en_US">> }, + ok = send_on_channel0(Sock, Start, Protocol), + {State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT, + protocol = Protocol}, + connection_state = starting}, + frame_header, 7}. + +refuse_connection(Sock, Exception) -> + ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), + throw(Exception). + +ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) -> + Self = self(), + State#v1{stats_timer = rabbit_event:ensure_stats_timer_after( + StatsTimer, + fun() -> emit_stats(Self) end)}. %%-------------------------------------------------------------------------- -handle_method0(MethodName, FieldsBin, State) -> +handle_method0(MethodName, FieldsBin, + State = #v1{connection = #connection{protocol = Protocol}}) -> try - handle_method0(rabbit_framing:decode_method_fields( - MethodName, FieldsBin), + handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) catch exit:Reason -> CompleteReason = case Reason of @@ -598,13 +699,14 @@ handle_method0(MethodName, FieldsBin, State) -> Reason#amqp_error{method = MethodName}; OtherReason -> OtherReason end, - case State#v1.connection_state of - running -> send_exception(State, 0, CompleteReason); + case ?IS_RUNNING(State) of + true -> send_exception(State, 0, CompleteReason); %% We don't trust the client at this point - force %% them to wait for a bit so they can't DOS us with %% repeated failed logins etc. - Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, Other, CompleteReason}) + false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({channel0_error, State#v1.connection_state, + CompleteReason}) end end. @@ -612,14 +714,14 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response, client_properties = ClientProperties}, State = #v1{connection_state = starting, - connection = Connection, + connection = Connection = + #connection{protocol = Protocol}, sock = Sock}) -> User = rabbit_access_control:check_login(Mechanism, Response), - ok = send_on_channel0( - Sock, - #'connection.tune'{channel_max = 0, + Tune = #'connection.tune'{channel_max = 0, frame_max = ?FRAME_MAX, - heartbeat = 0}), + heartbeat = 0}, + ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{ user = User, @@ -638,53 +740,43 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), + Heartbeater = rabbit_heartbeat:start_heartbeat( + Sock, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, - frame_max = FrameMax}} + frame_max = FrameMax}, + heartbeater = Heartbeater} end; -handle_method0(#'connection.open'{virtual_host = VHostPath, - insist = Insist}, +handle_method0(#'connection.open'{virtual_host = VHostPath}, + State = #v1{connection_state = opening, connection = Connection = #connection{ - user = User}, + user = User, + protocol = Protocol}, sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, - KnownHosts = format_listeners(rabbit_networking:active_listeners()), - Redirects = compute_redirects(Insist), - if Redirects == [] -> - ok = send_on_channel0( - Sock, - #'connection.open_ok'{known_hosts = KnownHosts}), - State#v1{connection_state = running, - connection = NewConnection}; - true -> - %% FIXME: 'host' is supposed to only contain one - %% address; but which one do we pick? This is - %% really a problem with the spec. - Host = format_listeners(Redirects), - rabbit_log:info("connection ~p redirecting to ~p~n", - [self(), Host]), - ok = send_on_channel0( - Sock, - #'connection.redirect'{host = Host, - known_hosts = KnownHosts}), - close_connection(State#v1{connection = NewConnection}) - end; -handle_method0(#'connection.close'{}, - State = #v1{connection_state = running}) -> + ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + State1 = State#v1{connection_state = running, + connection = NewConnection}, + rabbit_event:notify( + connection_created, + [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]), + State1; +handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}, sock = Sock}) when CS =:= closing; CS =:= closed -> %% We're already closed or closing, so we don't need to cleanup %% anything. - ok = send_on_channel0(Sock, #'connection.close_ok'{}), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> @@ -697,23 +789,8 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -send_on_channel0(Sock, Method) -> - ok = rabbit_writer:internal_send_command(Sock, 0, Method). - -format_listeners(Listeners) -> - list_to_binary( - rabbit_misc:intersperse( - $,, - [io_lib:format("~s:~w", [Host, Port]) || - #listener{host = Host, port = Port} <- Listeners])). - -compute_redirects(true) -> []; -compute_redirects(false) -> - Node = node(), - LNode = rabbit_load:pick(), - if Node == LNode -> []; - true -> rabbit_networking:node_listeners(LNode) - end. +send_on_channel0(Sock, Method, Protocol) -> + ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). %%-------------------------------------------------------------------------- @@ -747,6 +824,10 @@ i(state, #v1{connection_state = S}) -> S; i(channels, #v1{}) -> length(all_channels()); +i(protocol, #v1{connection = #connection{protocol = none}}) -> + none; +i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> + Protocol:version(); i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> @@ -770,11 +851,13 @@ send_to_new_channel(Channel, AnalyzedFrame, #v1{sock = Sock, connection = #connection{ frame_max = FrameMax, user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector]), + vhost = VHost, + protocol = Protocol}} = State, + {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol), + {ok, ChPid} = rabbit_framing_channel:start_link( + fun rabbit_channel:start_link/6, + [Channel, self(), WriterPid, Username, VHost, Collector], + Protocol), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). @@ -790,25 +873,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> log_channel_error(CS, Channel, Reason), send_exception(State, Channel, Reason). -send_exception(State, Channel, Reason) -> - {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason), +send_exception(State = #v1{connection = #connection{protocol = Protocol}}, + Channel, Reason) -> + {ShouldClose, CloseChannel, CloseMethod} = + map_exception(Channel, Reason, Protocol), NewState = case ShouldClose of true -> terminate_channels(), close_connection(State); false -> close_channel(Channel, State) end, ok = rabbit_writer:internal_send_command( - NewState#v1.sock, CloseChannel, CloseMethod), + NewState#v1.sock, CloseChannel, CloseMethod, Protocol), NewState. -map_exception(Channel, Reason) -> +map_exception(Channel, Reason, Protocol) -> {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = - lookup_amqp_exception(Reason), + lookup_amqp_exception(Reason, Protocol), ShouldClose = SuggestedClose or (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; none -> {0, 0}; - _ -> rabbit_framing:method_id(FailedMethod) + _ -> Protocol:method_id(FailedMethod) end, {CloseChannel, CloseMethod} = case ShouldClose of @@ -823,22 +908,16 @@ map_exception(Channel, Reason) -> end, {ShouldClose, CloseChannel, CloseMethod}. -%% FIXME: this clause can go when we move to AMQP spec >=8.1 -lookup_amqp_exception(#amqp_error{name = precondition_failed, - explanation = Expl, - method = Method}) -> - ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), - {false, 406, ExplBin, Method}; lookup_amqp_exception(#amqp_error{name = Name, explanation = Expl, - method = Method}) -> - {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), + method = Method}, + Protocol) -> + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), ExplBin = amqp_exception_explanation(Text, Expl), {ShouldClose, Code, ExplBin, Method}; -lookup_amqp_exception(Other) -> +lookup_amqp_exception(Other, Protocol) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = - rabbit_framing:lookup_amqp_exception(internal_error), + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), {ShouldClose, Code, Text, none}. amqp_exception_explanation(Text, Expl) -> @@ -847,3 +926,7 @@ amqp_exception_explanation(Text, Expl) -> if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; true -> CompleteTextBin end. + +internal_emit_stats(State) -> + rabbit_event:notify(connection_stats, + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index d50b9f31..ec049a1a 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -69,8 +69,8 @@ deliver(QPids, Delivery = #delivery{mandatory = false, deliver(QPids, Delivery) -> {Success, _} = delegate:invoke(QPids, - fun (Pid) -> - rabbit_amqqueue:deliver(Pid, Delivery) + fun (Pid) -> + rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 090c714b..6812b8d4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -67,7 +67,8 @@ all_tests() -> passed = test_log_management(), passed = test_app_management(), passed = test_log_management_during_startup(), - passed = test_memory_pressure(), + passed = test_statistics(), + passed = test_option_parser(), passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), @@ -504,8 +505,10 @@ test_content_framing(FrameMax, BodyBin) -> rabbit_binary_generator:build_simple_content_frames( 1, rabbit_binary_generator:ensure_content_encoded( - rabbit_basic:build_content(#'P_basic'{}, BodyBin)), - FrameMax), + rabbit_basic:build_content(#'P_basic'{}, BodyBin), + rabbit_framing_amqp_0_9_1), + FrameMax, + rabbit_framing_amqp_0_9_1), %% header is formatted correctly and the size is the total of the %% fragments <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, @@ -723,6 +726,30 @@ test_log_management_during_startup() -> ok = control_action(start_app, []), passed. +test_option_parser() -> + % command and arguments should just pass through + ok = check_get_options({["mock_command", "arg1", "arg2"], []}, + [], ["mock_command", "arg1", "arg2"]), + + % get flags + ok = check_get_options( + {["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]}, + [{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]), + + % get options + ok = check_get_options( + {["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]}, + [{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}], + ["mock_command", "-foo", "bar"]), + + % shuffled and interleaved arguments and options + ok = check_get_options( + {["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]}, + [{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}], + ["-f", "a1", "-o1", "hello", "a2", "a3"]), + + passed. + test_cluster_management() -> %% 'cluster' and 'reset' should only work if the app is stopped @@ -858,7 +885,7 @@ test_cluster_management2(SecondaryNode) -> %% attempt to leave cluster when no other node is alive ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), - ok = control_action(stop_app, SecondaryNode, []), + ok = control_action(stop_app, SecondaryNode, [], []), ok = control_action(stop_app, []), {error, {no_running_cluster_nodes, _, _}} = control_action(reset, []), @@ -866,9 +893,9 @@ test_cluster_management2(SecondaryNode) -> %% leave system clustered, with the secondary node as a ram node ok = control_action(force_reset, []), ok = control_action(start_app, []), - ok = control_action(force_reset, SecondaryNode, []), - ok = control_action(cluster, SecondaryNode, [NodeS]), - ok = control_action(start_app, SecondaryNode, []), + ok = control_action(force_reset, SecondaryNode, [], []), + ok = control_action(cluster, SecondaryNode, [NodeS], []), + ok = control_action(start_app, SecondaryNode, [], []), passed. @@ -888,9 +915,12 @@ test_user_management() -> {error, {no_such_user, _}} = control_action(list_user_permissions, ["foo"]), {error, {no_such_vhost, _}} = - control_action(list_permissions, ["-p", "/testhost"]), + control_action(list_permissions, [], [{"-p", "/testhost"}]), {error, {invalid_regexp, _, _}} = control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), + {error, {invalid_scope, _}} = + control_action(set_permissions, ["guest", "foo", ".*", ".*"], + [{"-s", "cilent"}]), %% user creation ok = control_action(add_user, ["foo", "bar"]), @@ -906,16 +936,21 @@ test_user_management() -> ok = control_action(list_vhosts, []), %% user/vhost mapping - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), - ok = control_action(list_permissions, ["-p", "/testhost"]), + ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], + [{"-p", "/testhost"}]), + ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], + [{"-p", "/testhost"}]), + ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], + [{"-p", "/testhost"}, {"-s", "client"}]), + ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], + [{"-p", "/testhost"}, {"-s", "all"}]), + ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), + ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), ok = control_action(list_user_permissions, ["foo"]), %% user/vhost unmapping - ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), - ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), + ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]), + ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]), %% vhost deletion ok = control_action(delete_vhost, ["/testhost"]), @@ -924,8 +959,8 @@ test_user_management() -> %% deleting a populated vhost ok = control_action(add_vhost, ["/testhost"]), - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), + ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], + [{"-p", "/testhost"}]), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion @@ -938,8 +973,8 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, - self()), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, + <<"user">>, <<"/">>, self()), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1030,46 +1065,11 @@ test_hooks() -> end, passed. -test_memory_pressure_receiver(Pid) -> - receive - shutdown -> - ok; - {send_command, Method} -> - ok = case Method of - #'channel.flow'{} -> ok; - #'basic.qos_ok'{} -> ok; - #'channel.open_ok'{} -> ok - end, - Pid ! Method, - test_memory_pressure_receiver(Pid); - sync -> - Pid ! sync, - test_memory_pressure_receiver(Pid) - end. - -test_memory_pressure_receive_flow(Active) -> - receive #'channel.flow'{active = Active} -> ok - after 1000 -> throw(failed_to_receive_channel_flow) - end, - receive #'channel.flow'{} -> - throw(pipelining_sync_commands_detected) - after 0 -> - ok - end. - -test_memory_pressure_sync(Ch, Writer) -> - ok = rabbit_channel:do(Ch, #'basic.qos'{}), - Writer ! sync, - receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, - receive #'basic.qos_ok'{} -> ok - after 1000 -> throw(failed_to_receive_basic_qos_ok) - end. - -test_memory_pressure_spawn() -> +test_spawn(Receiver) -> Me = self(), - Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, - self()), + Writer = spawn(fun () -> Receiver(Me) end), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>, + <<"/">>, self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), MRef = erlang:monitor(process, Ch), receive #'channel.open_ok'{} -> ok @@ -1077,89 +1077,94 @@ test_memory_pressure_spawn() -> end, {Writer, Ch, MRef}. -expect_normal_channel_termination(MRef, Ch) -> - receive {'DOWN', MRef, process, Ch, normal} -> ok - after 1000 -> throw(channel_failed_to_exit) +test_statistics_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + Pid ! Method, + test_statistics_receiver(Pid) end. -gobble_channel_exit() -> - receive {channel_exit, _, _} -> ok - after 1000 -> throw(channel_exit_not_received) +test_statistics_event_receiver(Pid) -> + receive + Foo -> + Pid ! Foo, + test_statistics_event_receiver(Pid) end. -test_memory_pressure() -> - {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(), - [ok = rabbit_channel:conserve_memory(Ch0, Conserve) || - Conserve <- [false, false, true, false, true, true, false]], - ok = test_memory_pressure_sync(Ch0, Writer0), - receive {'DOWN', MRef0, process, Ch0, Info0} -> - throw({channel_died_early, Info0}) - after 0 -> ok - end, - - %% we should have just 1 active=false waiting for us - ok = test_memory_pressure_receive_flow(false), - - %% if we reply with flow_ok, we should immediately get an - %% active=true back - ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}), - ok = test_memory_pressure_receive_flow(true), - - %% if we publish at this point, the channel should die - Content = rabbit_basic:build_content(#'P_basic'{}, <<>>), - ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), - expect_normal_channel_termination(MRef0, Ch0), - gobble_channel_exit(), - - {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(), - ok = rabbit_channel:conserve_memory(Ch1, true), - ok = test_memory_pressure_receive_flow(false), - ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), - ok = test_memory_pressure_sync(Ch1, Writer1), - ok = rabbit_channel:conserve_memory(Ch1, false), - ok = test_memory_pressure_receive_flow(true), - %% send back the wrong flow_ok. Channel should die. - ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), - expect_normal_channel_termination(MRef1, Ch1), - gobble_channel_exit(), - - {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(), - %% just out of the blue, send a flow_ok. Life should end. - ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), - expect_normal_channel_termination(MRef2, Ch2), - gobble_channel_exit(), - - {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(), - ok = rabbit_channel:conserve_memory(Ch3, true), - ok = test_memory_pressure_receive_flow(false), - receive {'DOWN', MRef3, process, Ch3, _} -> - ok - after 12000 -> - throw(channel_failed_to_exit) - end, - gobble_channel_exit(), - - alarm_handler:set_alarm({vm_memory_high_watermark, []}), - Me = self(), - Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), - Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, - self()), - ok = rabbit_channel:do(Ch4, #'channel.open'{}), - MRef4 = erlang:monitor(process, Ch4), - Writer4 ! sync, - receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, - receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) - after 0 -> ok - end, - alarm_handler:clear_alarm(vm_memory_high_watermark), - Writer4 ! sync, - receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, - receive #'channel.open_ok'{} -> ok - after 1000 -> throw(failed_to_receive_channel_open_ok) - end, - rabbit_channel:shutdown(Ch4), - expect_normal_channel_termination(MRef4, Ch4), +test_statistics_receive_event(Ch, Matcher) -> + rabbit_channel:flush(Ch), + rabbit_channel:emit_stats(Ch), + test_statistics_receive_event1(Ch, Matcher). + +test_statistics_receive_event1(Ch, Matcher) -> + receive #event{type = channel_stats, props = Props} -> + case Matcher(Props) of + true -> Props; + _ -> test_statistics_receive_event1(Ch, Matcher) + end + after 1000 -> throw(failed_to_receive_event) + end. +test_statistics() -> + application:set_env(rabbit, collect_statistics, fine), + + %% ATM this just tests the queue / exchange stats in channels. That's + %% by far the most complex code though. + + %% Set up a channel and queue + {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1), + rabbit_channel:do(Ch, #'queue.declare'{}), + QName = receive #'queue.declare_ok'{queue = Q0} -> + Q0 + after 1000 -> throw(failed_to_receive_queue_declare_ok) + end, + {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), + QPid = Q#amqqueue.pid, + X = rabbit_misc:r(<<"/">>, exchange, <<"">>), + + rabbit_tests_event_receiver:start(self()), + + %% Check stats empty + Event = test_statistics_receive_event(Ch, fun (_) -> true end), + [] = proplists:get_value(channel_queue_stats, Event), + [] = proplists:get_value(channel_exchange_stats, Event), + [] = proplists:get_value(channel_queue_exchange_stats, Event), + + %% Publish and get a message + rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, + routing_key = QName}, + rabbit_basic:build_content(#'P_basic'{}, <<"">>)), + rabbit_channel:do(Ch, #'basic.get'{queue = QName}), + + %% Check the stats reflect that + Event2 = test_statistics_receive_event( + Ch, + fun (E) -> + length(proplists:get_value( + channel_queue_exchange_stats, E)) > 0 + end), + [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), + [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2), + [{{QPid,X},[{publish,1}]}] = + proplists:get_value(channel_queue_exchange_stats, Event2), + + %% Check the stats remove stuff on queue deletion + rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), + Event3 = test_statistics_receive_event( + Ch, + fun (E) -> + length(proplists:get_value( + channel_queue_exchange_stats, E)) == 0 + end), + + [] = proplists:get_value(channel_queue_stats, Event3), + [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3), + [] = proplists:get_value(channel_queue_exchange_stats, Event3), + + rabbit_channel:shutdown(Ch), + rabbit_tests_event_receiver:stop(), passed. test_delegates_async(SecondaryNode) -> @@ -1251,11 +1256,16 @@ test_delegates_sync(SecondaryNode) -> %--------------------------------------------------------------------- -control_action(Command, Args) -> control_action(Command, node(), Args). +control_action(Command, Args) -> + control_action(Command, node(), Args, default_options()). + +control_action(Command, Args, NewOpts) -> + control_action(Command, node(), Args, + expand_options(default_options(), NewOpts)). -control_action(Command, Node, Args) -> +control_action(Command, Node, Args, Opts) -> case catch rabbit_control:action( - Command, Node, Args, + Command, Node, Args, Opts, fun (Format, Args1) -> io:format(Format ++ " ...~n", Args1) end) of @@ -1269,13 +1279,28 @@ control_action(Command, Node, Args) -> info_action(Command, Args, CheckVHost) -> ok = control_action(Command, []), - if CheckVHost -> ok = control_action(Command, ["-p", "/"]); + if CheckVHost -> ok = control_action(Command, []); true -> ok end, ok = control_action(Command, lists:map(fun atom_to_list/1, Args)), {bad_argument, dummy} = control_action(Command, ["dummy"]), ok. +default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}]. + +expand_options(As, Bs) -> + lists:foldl(fun({K, _}=A, R) -> + case proplists:is_defined(K, R) of + true -> R; + false -> [A | R] + end + end, Bs, As). + +check_get_options({ExpArgs, ExpOpts}, Defs, Args) -> + {ExpArgs, ResOpts} = rabbit_misc:get_options(Defs, Args), + true = lists:sort(ExpOpts) == lists:sort(ResOpts), % don't care about the order + ok. + empty_files(Files) -> [case file:read_file_info(File) of {ok, FInfo} -> FInfo#file_info.size == 0; @@ -1763,7 +1788,7 @@ with_fresh_variable_queue(Fun) -> {len, 0}]), _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)), passed. - + test_variable_queue() -> [passed = with_fresh_variable_queue(F) || F <- [fun test_variable_queue_dynamic_duration_change/1, diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl new file mode 100644 index 00000000..a92e3da7 --- /dev/null +++ b/src/rabbit_tests_event_receiver.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_tests_event_receiver). + +-export([start/1, stop/0]). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +start(Pid) -> + gen_event:add_handler(rabbit_event, ?MODULE, [Pid]). + +stop() -> + gen_event:delete_handler(rabbit_event, ?MODULE, []). + +%%---------------------------------------------------------------------------- + +init([Pid]) -> + {ok, Pid}. + +handle_call(_Request, Pid) -> + {ok, not_understood, Pid}. + +handle_event(Event, Pid) -> + Pid ! Event, + {ok, Pid}. + +handle_info(_Info, Pid) -> + {ok, Pid}. + +terminate(_Arg, _Pid) -> + ok. + +code_change(_OldVsn, Pid, _Extra) -> + {ok, Pid}. + +%%---------------------------------------------------------------------------- diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 1f182593..a9313503 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -39,8 +39,8 @@ delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0, - binding/0, amqqueue/0, exchange/0, connection/0, user/0, - error/1, ok_or_error/1, ok_or_error2/2, ok/1, + binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, + user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1, channel_exit/0, connection_exit/0]). -type(channel_exit() :: no_return()). @@ -137,6 +137,8 @@ -type(connection() :: pid()). +-type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1'). + -type(user() :: #user{username :: rabbit_access_control:username(), password :: rabbit_access_control:password()}). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 80602038..f90ee734 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,14 +33,14 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, start_link/3, shutdown/1, mainloop/1]). +-export([start/4, start_link/4, shutdown/1, mainloop/1]). -export([send_command/2, send_command/3, send_command_and_signal_back/3, send_command_and_signal_back/4, send_command_and_notify/5]). --export([internal_send_command/3, internal_send_command/5]). +-export([internal_send_command/4, internal_send_command/6]). -import(gen_tcp). --record(wstate, {sock, channel, frame_max}). +-record(wstate, {sock, channel, frame_max, protocol}). -define(HIBERNATE_AFTER, 5000). @@ -48,14 +48,14 @@ -ifdef(use_specs). --spec(start/3 :: +-spec(start/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer()) - -> pid()). --spec(start_link/3 :: + non_neg_integer(), rabbit_types:protocol()) + -> rabbit_types:ok(pid())). +-spec(start_link/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer()) - -> pid()). + non_neg_integer(), rabbit_types:protocol()) + -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: @@ -70,29 +70,31 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). --spec(internal_send_command/3 :: +-spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - rabbit_framing:amqp_method_record()) + rabbit_framing:amqp_method_record(), rabbit_types:protocol()) -> 'ok'). --spec(internal_send_command/5 :: +-spec(internal_send_command/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:content(), - non_neg_integer()) + non_neg_integer(), rabbit_types:protocol()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax) -> - spawn(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}]). - -start_link(Sock, Channel, FrameMax) -> - spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, +start(Sock, Channel, FrameMax, Protocol) -> + {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}]). + frame_max = FrameMax, + protocol = Protocol}])}. + +start_link(Sock, Channel, FrameMax, Protocol) -> + {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}])}. mainloop(State) -> receive @@ -102,35 +104,40 @@ mainloop(State) -> end. handle_message({send_command, MethodRecord}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), + State = #wstate{sock = Sock, channel = Channel, + protocol = Protocol}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), State; handle_message({send_command, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), State; handle_message({send_command_and_signal_back, MethodRecord, Parent}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), + State = #wstate{sock = Sock, channel = Channel, + protocol = Protocol}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), Parent ! rabbit_writer_send_command_signal, State; handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), Parent ! rabbit_writer_send_command_signal, State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), rabbit_amqqueue:notify_sent(QPid, ChPid), State; handle_message({inet_reply, _, ok}, State) -> @@ -171,30 +178,32 @@ shutdown(W) -> %--------------------------------------------------------------------------- -assemble_frames(Channel, MethodRecord) -> +assemble_frames(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), - rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord). + rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord, + Protocol). -assemble_frames(Channel, MethodRecord, Content, FrameMax) -> +assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, Content), MethodName = rabbit_misc:method_record_type(MethodRecord), - true = rabbit_framing:method_has_content(MethodName), % assertion + true = Protocol:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( - Channel, MethodRecord), + Channel, MethodRecord, Protocol), ContentFrames = rabbit_binary_generator:build_simple_content_frames( - Channel, Content, FrameMax), + Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). -internal_send_command(Sock, Channel, MethodRecord) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). +internal_send_command(Sock, Channel, MethodRecord, Protocol) -> + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)). -internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + Content, FrameMax, Protocol)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -214,13 +223,14 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(Sock, Channel, MethodRecord) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)), +internal_send_command_async(Sock, Channel, MethodRecord, Protocol) -> + true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), ok. -internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)), + Content, FrameMax, Protocol)), ok. port_cmd(Sock, Data) -> diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index bbc3a8c0..3cbc80f8 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -78,7 +78,7 @@ rabbit_types:ok(pid())). -spec(update/0 :: () -> 'ok'). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). --spec(get_vm_limit/0 :: () -> (non_neg_integer() | 'unknown')). +-spec(get_vm_limit/0 :: () -> non_neg_integer()). -spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). |