diff options
29 files changed, 885 insertions, 602 deletions
@@ -24,18 +24,6 @@ from amqp_codegen import * import string import re -erlangTypeMap = { - 'octet': 'octet', - 'shortstr': 'shortstr', - 'longstr': 'longstr', - 'short': 'shortint', - 'long': 'longint', - 'longlong': 'longlongint', - 'bit': 'bit', - 'table': 'table', - 'timestamp': 'timestamp', -} - # Coming up with a proper encoding of AMQP tables in JSON is too much # hassle at this stage. Given that the only default value we are # interested in is for the empty table, we only support that. @@ -123,7 +111,7 @@ def printFileHeader(): def genErl(spec): def erlType(domain): - return erlangTypeMap[spec.resolveDomain(domain)] + return erlangize(spec.resolveDomain(domain)) def fieldTypeList(fields): return '[' + ', '.join([erlType(f.domain) for f in fields]) + ']' @@ -186,11 +174,11 @@ def genErl(spec): return p+'Len:32/unsigned, '+p+':'+p+'Len/binary' elif type == 'octet': return p+':8/unsigned' - elif type == 'shortint': + elif type == 'short': return p+':16/unsigned' - elif type == 'longint': + elif type == 'long': return p+':32/unsigned' - elif type == 'longlongint': + elif type == 'longlong': return p+':64/unsigned' elif type == 'timestamp': return p+':64/unsigned' @@ -233,29 +221,23 @@ def genErl(spec): def presentBin(fields): ps = ', '.join(['P' + str(f.index) + ':1' for f in fields]) return '<<' + ps + ', _:%d, R0/binary>>' % (16 - len(fields),) - def mkMacroName(field): - return '?' + field.domain.upper() + '_PROP' - def writePropFieldLine(field, bin_next = None): + def writePropFieldLine(field): i = str(field.index) - if not bin_next: - bin_next = 'R' + str(field.index + 1) - if field.domain in ['octet', 'timestamp']: - print (" {%s, %s} = %s(%s, %s, %s, %s)," % - ('F' + i, bin_next, mkMacroName(field), 'P' + i, - 'R' + i, 'I' + i, 'X' + i)) + if field.domain == 'bit': + print " {F%s, R%s} = {P%s =/= 0, R%s}," % \ + (i, str(field.index + 1), i, i) else: - print (" {%s, %s} = %s(%s, %s, %s, %s, %s)," % - ('F' + i, bin_next, mkMacroName(field), 'P' + i, - 'R' + i, 'L' + i, 'S' + i, 'X' + i)) + print " {F%s, R%s} = if P%s =:= 0 -> {undefined, R%s}; true -> ?%s_VAL(R%s, L%s, V%s, X%s) end," % \ + (i, str(field.index + 1), i, i, erlType(field.domain).upper(), i, i, i, i) if len(c.fields) == 0: - print "decode_properties(%d, _) ->" % (c.index,) + print "decode_properties(%d, <<>>) ->" % (c.index,) else: print ("decode_properties(%d, %s) ->" % (c.index, presentBin(c.fields))) - for field in c.fields[:-1]: + for field in c.fields: writePropFieldLine(field) - writePropFieldLine(c.fields[-1], "<<>>") + print " <<>> = %s," % ('R' + str(len(c.fields))) print " #'P_%s'{%s};" % (erlangize(c.name), fieldMapList(c.fields)) def genFieldPreprocessing(packed): @@ -283,9 +265,27 @@ def genErl(spec): print " <<%s>>;" % (', '.join([methodFieldFragment(f) for f in packedFields])) def genEncodeProperties(c): + def presentBin(fields): + ps = ', '.join(['P' + str(f.index) + ':1' for f in fields]) + return '<<' + ps + ', 0:%d>>' % (16 - len(fields),) + def writePropFieldLine(field): + i = str(field.index) + if field.domain == 'bit': + print " {P%s, R%s} = {F%s =:= 1, R%s}," % \ + (i, str(field.index + 1), i, i) + else: + print " {P%s, R%s} = if F%s =:= undefined -> {0, R%s}; true -> {1, [?%s_PROP(F%s, L%s) | R%s]} end," % \ + (i, str(field.index + 1), i, i, erlType(field.domain).upper(), i, i, i) + print "encode_properties(#'P_%s'{%s}) ->" % (erlangize(c.name), fieldMapList(c.fields)) - print " rabbit_binary_generator:encode_properties(%s, %s);" % \ - (fieldTypeList(c.fields), fieldTempList(c.fields)) + if len(c.fields) == 0: + print " <<>>;" + else: + print " R0 = [<<>>]," + for field in c.fields: + writePropFieldLine(field) + print " list_to_binary([%s | lists:reverse(R%s)]);" % \ + (presentBin(c.fields), str(len(c.fields))) def messageConstantClass(cls): # We do this because 0.8 uses "soft error" and 8.1 uses "soft-error". @@ -350,8 +350,8 @@ def genErl(spec): 'table' | 'byte' | 'double' | 'float' | 'long' | 'short' | 'bool' | 'binary' | 'void' | 'array'). -type(amqp_property_type() :: - 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' | - 'longlongint' | 'timestamp' | 'bit' | 'table'). + 'shortstr' | 'longstr' | 'octet' | 'short' | 'long' | + 'longlong' | 'timestamp' | 'bit' | 'table'). -type(amqp_table() :: [{binary(), amqp_field_type(), amqp_value()}]). -type(amqp_array() :: [{amqp_field_type(), amqp_value()}]). @@ -429,25 +429,78 @@ shortstr_size(S) -> _ -> exit(method_field_shortstr_overflow) end. --define(SHORTSTR_PROP(P, R, L, S, X), - if P =:= 0 -> {undefined, R}; - true -> <<L:8/unsigned, S:L/binary, X/binary>> = R, - {S, X} +-define(SHORTSTR_VAL(R, L, V, X), + begin + <<L:8/unsigned, V:L/binary, X/binary>> = R, + {V, X} + end). + +-define(LONGSTR_VAL(R, L, V, X), + begin + <<L:32/unsigned, V:L/binary, X/binary>> = R, + {V, X} + end). + +-define(SHORT_VAL(R, L, V, X), + begin + <<V:8/unsigned, X/binary>> = R, + {V, X} + end). + +-define(LONG_VAL(R, L, V, X), + begin + <<V:32/unsigned, X/binary>> = R, + {V, X} + end). + +-define(LONGLONG_VAL(R, L, V, X), + begin + <<V:64/unsigned, X/binary>> = R, + {V, X} + end). + +-define(OCTET_VAL(R, L, V, X), + begin + <<V:8/unsigned, X/binary>> = R, + {V, X} end). --define(TABLE_PROP(P, R, L, T, X), - if P =:= 0 -> {undefined, R}; - true -> <<L:32/unsigned, T:L/binary, X/binary>> = R, - {rabbit_binary_parser:parse_table(T), X} + +-define(TABLE_VAL(R, L, V, X), + begin + <<L:32/unsigned, V:L/binary, X/binary>> = R, + {rabbit_binary_parser:parse_table(V), X} end). --define(OCTET_PROP(P, R, I, X), - if P =:= 0 -> {undefined, R}; - true -> <<I:8/unsigned, X/binary>> = R, - {I, X} + +-define(TIMESTAMP_VAL(R, L, V, X), + begin + <<V:64/unsigned, X/binary>> = R, + {V, X} end). --define(TIMESTAMP_PROP(P, R, I, X), - if P =:= 0 -> {undefined, R}; - true -> <<I:64/unsigned, X/binary>> = R, - {I, X} + +-define(SHORTSTR_PROP(X, L), + begin + L = size(X), + if L < 256 -> <<L:8, X:L/binary>>; + true -> exit(content_properties_shortstr_overflow) + end + end). + +-define(LONGSTR_PROP(X, L), + begin + L = size(X), + <<L:32, X:L/binary>> + end). + +-define(OCTET_PROP(X, L), <<X:8/unsigned>>). +-define(SHORT_PROP(X, L), <<X:16/unsigned>>). +-define(LONG_PROP(X, L), <<X:32/unsigned>>). +-define(LONGLONG_PROP(X, L), <<X:64/unsigned>>). +-define(TIMESTAMP_PROP(X, L), <<X:64/unsigned>>). + +-define(TABLE_PROP(X, T), + begin + T = rabbit_binary_generator:generate_table(X), + <<(size(T)):32, T/binary>> end). """ version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision) @@ -497,9 +550,6 @@ shortstr_size(S) -> print "amqp_exception(_Code) -> undefined." def genHrl(spec): - def erlType(domain): - return erlangTypeMap[spec.resolveDomain(domain)] - def fieldNameList(fields): return ', '.join([erlangize(f.name) for f in fields]) diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml index 5d74c6e1..8ecb4fc8 100644 --- a/docs/rabbitmq-plugins.1.xml +++ b/docs/rabbitmq-plugins.1.xml @@ -96,11 +96,13 @@ </varlistentry> </variablelist> <para> - Lists available plugins, their versions, dependencies and + Lists all plugins, their versions, dependencies and descriptions. Each plugin is prefixed with a status indicator - [ ] to indicate that the plugin is not enabled, [E] to indicate that it is explicitly enabled, - and [e] to indicate that it is implicitly enabled. + [e] to indicate that it is implicitly enabled, and [!] to + indicate that it is enabled but missing and thus not + operational. </para> <para> If the optional pattern is given, only plugins whose @@ -109,16 +111,15 @@ <para role="example-prefix">For example:</para> <screen role="example">rabbitmq-plugins list</screen> <para role="example"> - This command lists all the plugins available, on one line each. + This command lists all plugins, on one line each. </para> <screen role="example">rabbitmq-plugins list -v </screen> <para role="example"> - This command lists all the plugins available. + This command lists all plugins. </para> <screen role="example">rabbitmq-plugins list -v management</screen> <para role="example"> - This command lists all the plugins available, but does not - display plugins whose name does not contain "management". + This command lists all plugins whose name contains "management". </para> <screen role="example">rabbitmq-plugins list -e rabbit</screen> <para role="example"> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 3082fe14..34947b66 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1282,22 +1282,26 @@ <listitem><para>Readable name for the connection.</para></listitem> </varlistentry> <varlistentry> - <term>address</term> - <listitem><para>Server IP address.</para></listitem> - </varlistentry> - <varlistentry> <term>port</term> <listitem><para>Server port.</para></listitem> </varlistentry> <varlistentry> - <term>peer_address</term> - <listitem><para>Peer address.</para></listitem> + <term>host</term> + <listitem><para>Server hostname obtained via reverse + DNS, or its IP address if reverse DNS failed or was + not enabled.</para></listitem> </varlistentry> <varlistentry> <term>peer_port</term> <listitem><para>Peer port.</para></listitem> </varlistentry> <varlistentry> + <term>peer_host</term> + <listitem><para>Peer hostname obtained via reverse + DNS, or its IP address if reverse DNS failed or was + not enabled.</para></listitem> + </varlistentry> + <varlistentry> <term>ssl</term> <listitem><para>Boolean indicating whether the connection is secured with SSL.</para></listitem> @@ -1414,7 +1418,7 @@ </variablelist> <para> If no <command>connectioninfoitem</command>s are - specified then user, peer address, peer port, time since + specified then user, peer host, peer port, time since flow control and memory block state are displayed. </para> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 9b1ff8bd..16dfd196 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -43,6 +43,7 @@ {trace_vhosts, []}, {log_levels, [{connection, info}]}, {ssl_cert_login_from, distinguished_name}, + {reverse_dns_lookups, false}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index adbb6102..b2832b45 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -110,5 +110,6 @@ -define(DESIRED_HIBERNATE, 10000). -define(CREDIT_DISC_BOUND, {2000, 500}). +-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). diff --git a/src/background_gc.erl b/src/background_gc.erl new file mode 100644 index 00000000..7c68a177 --- /dev/null +++ b/src/background_gc.erl @@ -0,0 +1,78 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(background_gc). + +-behaviour(gen_server2). + +-export([start_link/0, run/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(MAX_RATIO, 0.01). +-define(IDEAL_INTERVAL, 60000). + +-record(state, {last_interval}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(run/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], + [{timeout, infinity}]). + +run() -> gen_server2:cast(?MODULE, run). + +%%---------------------------------------------------------------------------- + +init([]) -> {ok, interval_gc(#state{last_interval = ?IDEAL_INTERVAL})}. + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}. + +handle_cast(run, State) -> gc(), {noreply, State}; + +handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. + +handle_info(run, State) -> {noreply, interval_gc(State)}; + +handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> {ok, State}. + +terminate(_Reason, State) -> State. + +%%---------------------------------------------------------------------------- + +interval_gc(State = #state{last_interval = LastInterval}) -> + {ok, Interval} = rabbit_misc:interval_operation( + fun gc/0, ?MAX_RATIO, ?IDEAL_INTERVAL, LastInterval), + erlang:send_after(Interval, self(), run), + State#state{last_interval = Interval}. + +gc() -> + [garbage_collect(P) || P <- processes(), + {status, waiting} == process_info(P, status)], + garbage_collect(), %% since we will never be waiting... + ok. diff --git a/src/rabbit.erl b/src/rabbit.erl index c52c296a..c3a6d283 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -179,6 +179,12 @@ {mfa, {rabbit_node_monitor, notify_node_up, []}}, {requires, networking}]}). +-rabbit_boot_step({background_gc, + [{description, "background garbage collection"}, + {mfa, {rabbit_sup, start_restartable_child, + [background_gc]}}, + {enables, networking}]}). + %%--------------------------------------------------------------------------- -include("rabbit_framing.hrl"). @@ -570,7 +576,10 @@ boot_delegate() -> rabbit_sup:start_supervisor_child(delegate_sup, [Count]). recover() -> - rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()). + Qs = rabbit_amqqueue:recover(), + ok = rabbit_binding:recover(rabbit_exchange:recover(), + [QName || #amqqueue{name = QName} <- Qs]), + rabbit_amqqueue:start(Qs). maybe_insert_default_data() -> case rabbit_table:is_empty() of diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index e6625b2b..d7d4d82a 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -55,8 +55,12 @@ start() -> ok = gen_event:add_handler(?SERVER, ?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), rabbit_sup:start_restartable_child( - vm_memory_monitor, [MemoryWatermark, fun rabbit_alarm:set_alarm/1, - fun rabbit_alarm:clear_alarm/1]), + vm_memory_monitor, [MemoryWatermark, + fun (Alarm) -> + background_gc:run(), + set_alarm(Alarm) + end, + fun clear_alarm/1]), {ok, DiskLimit} = application:get_env(disk_free_limit), rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]), ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6ad85b24..9fb453c1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -16,9 +16,11 @@ -module(rabbit_amqqueue). --export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). +-export([recover/0, stop/0, start/1, declare/5, + delete_immediately/1, delete/3, purge/1]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, +-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, + assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). @@ -40,8 +42,6 @@ -define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). --define(MAX_EXPIRY_TIMER, 4294967295). - -define(MORE_CONSUMER_CREDIT_AFTER, 50). -define(FAILOVER_WAIT_MILLIS, 100). @@ -61,18 +61,21 @@ -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -type(routing_result() :: 'routed' | 'unroutable'). --type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). - --spec(start/0 :: () -> [name()]). +-type(queue_or_absent() :: rabbit_types:amqqueue() | + {'absent', rabbit_types:amqqueue()}). +-type(not_found_or_absent() :: 'not_found' | + {'absent', rabbit_types:amqqueue()}). +-spec(recover/0 :: () -> [rabbit_types:amqqueue()]). -spec(stop/0 :: () -> 'ok'). +-spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok'). -spec(declare/5 :: (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) - -> {'new' | 'existing', rabbit_types:amqqueue()} | + -> {'new' | 'existing' | 'absent', rabbit_types:amqqueue()} | rabbit_types:channel_exit()). -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) - -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). + -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). -spec(update/2 :: (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok'). @@ -80,7 +83,10 @@ (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found'); ([name()]) -> [rabbit_types:amqqueue()]). --spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')). +-spec(not_found_or_absent/1 :: (name()) -> not_found_or_absent()). +-spec(with/2 :: (name(), qfun(A)) -> + A | rabbit_types:error(not_found_or_absent())). +-spec(with/3 :: (name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B). -spec(with_or_die/2 :: (name(), qfun(A)) -> A | rabbit_types:channel_exit()). -spec(assert_equivalence/5 :: @@ -175,7 +181,7 @@ -define(CONSUMER_INFO_KEYS, [queue_name, channel_pid, consumer_tag, ack_required]). -start() -> +recover() -> %% Clear out remnants of old incarnation, in case we restarted %% faster than other nodes handled DOWN messages from us. on_node_down(node()), @@ -195,6 +201,14 @@ stop() -> {ok, BQ} = application:get_env(rabbit, backing_queue_module), ok = BQ:stop(). +start(Qs) -> + %% At this point all recovered queues and their bindings are + %% visible to routing, so now it is safe for them to complete + %% their initialisation (which may involve interacting with other + %% queues). + [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs], + ok. + find_durable_queues() -> Node = node(), %% TODO: use dirty ops instead @@ -207,8 +221,8 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(node(), Q) || Q <- DurableQueues], - [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs, - gen_server2:call(Pid, {init, true}, infinity) == {new, Q}]. + [Q || Q = #amqqueue{pid = Pid} <- Qs, + gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), @@ -223,10 +237,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> gm_pids = []}), {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), Q1 = start_queue_process(Node, Q0), - case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of - not_found -> rabbit_misc:not_found(QueueName); - Q2 -> Q2 - end. + gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity). internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -236,13 +247,12 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> - case mnesia:read({rabbit_durable_queue, QueueName}) of - [] -> Q1 = rabbit_policy:set(Q), - ok = store_queue(Q1), - B = add_default_binding(Q1), - fun () -> B(), Q1 end; - %% Q exists on stopped node - [_] -> rabbit_misc:const(not_found) + case not_found_or_absent(QueueName) of + not_found -> Q1 = rabbit_policy:set(Q), + ok = store_queue(Q1), + B = add_default_binding(Q1), + fun () -> B(), Q1 end; + {absent, _Q} = R -> rabbit_misc:const(R) end; [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of @@ -296,28 +306,47 @@ lookup(Names) when is_list(Names) -> lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). +not_found_or_absent(Name) -> + %% NB: we assume that the caller has already performed a lookup on + %% rabbit_queue and not found anything + case mnesia:read({rabbit_durable_queue, Name}) of + [] -> not_found; + [Q] -> {absent, Q} %% Q exists on stopped node + end. + +not_found_or_absent_dirty(Name) -> + %% We should read from both tables inside a tx, to get a + %% consistent view. But the chances of an inconsistency are small, + %% and only affect the error kind. + case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of + {error, not_found} -> not_found; + {ok, Q} -> {absent, Q} + end. + with(Name, F, E) -> case lookup(Name) of {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do %% with the QPid. - E1 = fun () -> - case rabbit_misc:is_process_alive(QPid) of - true -> E(); - false -> timer:sleep(25), - with(Name, F, E) - end - end, - rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end); + rabbit_misc:with_exit_handler( + fun () -> + case rabbit_misc:is_process_alive(QPid) of + true -> E(not_found_or_absent_dirty(Name)); + false -> timer:sleep(25), + with(Name, F, E) + end + end, fun () -> F(Q) end); {error, not_found} -> - E() + E(not_found_or_absent_dirty(Name)) end. -with(Name, F) -> - with(Name, F, fun () -> {error, not_found} end). +with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). + with_or_die(Name, F) -> - with(Name, F, fun () -> rabbit_misc:not_found(Name) end). + with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name); + ({absent, Q}) -> rabbit_misc:absent(Q) + end). assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, @@ -352,8 +381,8 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). check_declare_arguments(QueueName, Args) -> - Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, - {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, + Checks = [{<<"x-expires">>, fun check_expires_arg/2}, + {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of @@ -380,20 +409,17 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. -check_positive_int_arg({Type, Val}, Args) -> +check_expires_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of - ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}}; - ok when Val > 0 -> ok; - ok -> {error, {value_zero_or_less, Val}}; - Error -> Error + ok when Val == 0 -> {error, {value_zero, Val}}; + ok -> rabbit_misc:check_expiry(Val); + Error -> Error end. -check_non_neg_int_arg({Type, Val}, Args) -> +check_message_ttl_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of - ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}}; - ok when Val >= 0 -> ok; - ok -> {error, {value_less_than_zero, Val}}; - Error -> Error + ok -> rabbit_misc:check_expiry(Val); + Error -> Error end. check_dlxrk_arg({longstr, _}, Args) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8d05a78c..92b00db0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -195,10 +195,8 @@ code_change(_OldVsn, State, _Extra) -> declare(Recover, From, State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined}) -> - case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> - {stop, normal, not_found, State}; - Q1 -> + case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of + #amqqueue{} = Q1 -> case matches(Recover, Q, Q1) of true -> gen_server2:reply(From, {new, Q}), @@ -208,6 +206,7 @@ declare(Recover, From, State = #q{q = Q, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = bq_init(BQ, Q, Recover), + recovery_barrier(Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -216,28 +215,39 @@ declare(Recover, From, State = #q{q = Q, noreply(State1); false -> {stop, normal, {existing, Q1}, State} - end - end. + end; + Err -> + {stop, normal, Err, State} + end. -matches(true, Q, Q) -> true; -matches(true, _Q, _Q1) -> false; -matches(false, Q1, Q2) -> +matches(new, Q1, Q2) -> %% i.e. not policy - Q1#amqqueue.name =:= Q2#amqqueue.name andalso - Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso - Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso + Q1#amqqueue.name =:= Q2#amqqueue.name andalso + Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso + Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso - Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso - Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids. + Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso + Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids; +matches(_, Q, Q) -> true; +matches(_, _Q, _Q1) -> false. bq_init(BQ, Q, Recover) -> Self = self(), - BQ:init(Q, Recover, + BQ:init(Q, Recover =/= new, fun (Mod, Fun) -> rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). +recovery_barrier(new) -> + ok; +recovery_barrier(BarrierPid) -> + MRef = erlang:monitor(process, BarrierPid), + receive + {BarrierPid, go} -> erlang:demonitor(MRef, [flush]); + {'DOWN', MRef, process, _, _} -> ok + end. + process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> lists:foldl( fun({Arg, Fun}, State1) -> @@ -247,9 +257,9 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> end end, State, [{<<"x-expires">>, fun init_expires/2}, - {<<"x-message-ttl">>, fun init_ttl/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, - {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]). + {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, + {<<"x-message-ttl">>, fun init_ttl/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -553,7 +563,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), - Props = message_properties(Confirm, Delivered, State), + Props = message_properties(Message, Confirm, Delivered, State), case attempt_delivery(Delivery, Props, State1) of {true, State2} -> State2; @@ -680,16 +690,21 @@ subtract_acks(ChPid, AckTags, State, Fun) -> Fun(State) end. -message_properties(Confirm, Delivered, #q{ttl = TTL}) -> - #message_properties{expiry = calculate_msg_expiry(TTL), +message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) -> + #message_properties{expiry = calculate_msg_expiry(Message, TTL), needs_confirming = Confirm == eventually, delivered = Delivered}. -calculate_msg_expiry(undefined) -> undefined; -calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). +calculate_msg_expiry(#basic_message{content = Content}, TTL) -> + #content{properties = Props} = + rabbit_binary_parser:ensure_content_decoded(Content), + %% We assert that the expiration must be valid - we check in the channel. + {ok, MsgTTL} = rabbit_basic:parse_expiration(Props), + case lists:min([TTL, MsgTTL]) of + undefined -> undefined; + T -> now_micros() + T * 1000 + end. -drop_expired_messages(State = #q{ttl = undefined}) -> - State; drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ }) -> Now = now_micros(), @@ -711,8 +726,6 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, ensure_ttl_timer(undefined, State) -> State; -ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) -> - State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> After = (case Expiry - now_micros() of V when V > 0 -> V + 999; %% always fire later @@ -853,8 +866,8 @@ make_dead_letter_msg(Reason, {<<"time">>, timestamp, TimeSec}, {<<"exchange">>, longstr, Exchange#resource.name}, {<<"routing-keys">>, array, RKs1}], - HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, - Info, Headers)) + HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, + Info, Headers)) end, Content1 = rabbit_basic:map_headers(HeadersFun2, Content), Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), @@ -998,9 +1011,9 @@ handle_call({init, Recover}, From, q = #amqqueue{name = QName} = Q} = State, gen_server2:reply(From, not_found), case Recover of - true -> ok; - _ -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", [QName]) + new -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]); + _ -> ok end, BQS = bq_init(BQ, Q, Recover), %% Rely on terminate to delete the queue. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index db2b7e95..9bd1fad9 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -19,8 +19,9 @@ -include("rabbit_framing.hrl"). -export([publish/4, publish/5, publish/1, - message/3, message/4, properties/1, append_table_header/3, - extract_headers/1, map_headers/2, delivery/3, header_routes/1]). + message/3, message/4, properties/1, prepend_table_header/3, + extract_headers/1, map_headers/2, delivery/3, header_routes/1, + parse_expiration/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -58,7 +59,7 @@ -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). --spec(append_table_header/3 :: +-spec(prepend_table_header/3 :: (binary(), rabbit_framing:amqp_table(), headers()) -> headers()). -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). @@ -72,6 +73,9 @@ binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). +-spec(parse_expiration/1 :: + (rabbit_framing:amqp_property_record()) + -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())). -endif. @@ -177,15 +181,45 @@ properties(P) when is_list(P) -> end end, #'P_basic'{}, P). -append_table_header(Name, Info, undefined) -> - append_table_header(Name, Info, []); -append_table_header(Name, Info, Headers) -> - Prior = case rabbit_misc:table_lookup(Headers, Name) of - undefined -> []; - {array, Existing} -> Existing - end, +prepend_table_header(Name, Info, undefined) -> + prepend_table_header(Name, Info, []); +prepend_table_header(Name, Info, Headers) -> + case rabbit_misc:table_lookup(Headers, Name) of + {array, Existing} -> + prepend_table(Name, Info, Existing, Headers); + undefined -> + prepend_table(Name, Info, [], Headers); + Other -> + Headers2 = prepend_table(Name, Info, [], Headers), + set_invalid_header(Name, Other, Headers2) + end. + +prepend_table(Name, Info, Prior, Headers) -> rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). +set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) -> + case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of + undefined -> + set_invalid([{Name, array, [Value]}], Headers); + {table, ExistingHdr} -> + update_invalid(Name, Value, ExistingHdr, Headers); + Other -> + %% somehow the x-invalid-headers header is corrupt + Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}], + set_invalid_header(Name, Value, set_invalid(Invalid, Headers)) + end. + +set_invalid(NewHdr, Headers) -> + rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr). + +update_invalid(Name, Value, ExistingHdr, Header) -> + Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of + undefined -> [Value]; + {array, Prior} -> [Value | Prior] + end, + NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values), + set_invalid(NewHdr, Header). + extract_headers(Content) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), @@ -224,3 +258,19 @@ header_routes(HeadersTable) -> {Type, _Val} -> throw({error, {unacceptable_type_in_header, binary_to_list(HeaderKey), Type}}) end || HeaderKey <- ?ROUTING_HEADERS]). + +parse_expiration(#'P_basic'{expiration = undefined}) -> + {ok, undefined}; +parse_expiration(#'P_basic'{expiration = Expiration}) -> + case string:to_integer(binary_to_list(Expiration)) of + {error, no_integer} = E -> + E; + {N, ""} -> + case rabbit_misc:check_expiry(N) of + ok -> {ok, N}; + E = {error, _} -> E + end; + {_, S} -> + {error, {leftover_string, S}} + end. + diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 4700fa31..a333c1ce 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -21,7 +21,7 @@ -export([build_simple_method_frame/3, build_simple_content_frames/4, build_heartbeat_frame/0]). --export([generate_table/1, encode_properties/2]). +-export([generate_table/1]). -export([check_empty_frame_size/0]). -export([ensure_content_encoded/2, clear_encoded_content/1]). -export([map_exception/3]). @@ -42,8 +42,6 @@ -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). -spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()). --spec(encode_properties/2 :: - ([rabbit_framing:amqp_property_type()], [any()]) -> binary()). -spec(check_empty_frame_size/0 :: () -> 'ok'). -spec(ensure_content_encoded/2 :: (rabbit_types:content(), rabbit_types:protocol()) -> @@ -118,51 +116,24 @@ create_frame(TypeInt, ChannelInt, Payload) -> %% table_field_to_binary supports the AMQP 0-8/0-9 standard types, S, %% I, D, T and F, as well as the QPid extensions b, d, f, l, s, t, x, %% and V. - -table_field_to_binary({FName, Type, Value}) -> - [short_string_to_binary(FName) | field_value_to_binary(Type, Value)]. - -field_value_to_binary(longstr, Value) -> - ["S", long_string_to_binary(Value)]; - -field_value_to_binary(signedint, Value) -> - ["I", <<Value:32/signed>>]; - -field_value_to_binary(decimal, {Before, After}) -> - ["D", Before, <<After:32>>]; - -field_value_to_binary(timestamp, Value) -> - ["T", <<Value:64>>]; - -field_value_to_binary(table, Value) -> - ["F", table_to_binary(Value)]; - -field_value_to_binary(array, Value) -> - ["A", array_to_binary(Value)]; - -field_value_to_binary(byte, Value) -> - ["b", <<Value:8/unsigned>>]; - -field_value_to_binary(double, Value) -> - ["d", <<Value:64/float>>]; - -field_value_to_binary(float, Value) -> - ["f", <<Value:32/float>>]; - -field_value_to_binary(long, Value) -> - ["l", <<Value:64/signed>>]; - -field_value_to_binary(short, Value) -> - ["s", <<Value:16/signed>>]; - -field_value_to_binary(bool, Value) -> - ["t", if Value -> 1; true -> 0 end]; - -field_value_to_binary(binary, Value) -> - ["x", long_string_to_binary(Value)]; - -field_value_to_binary(void, _Value) -> - ["V"]. +table_field_to_binary({FName, T, V}) -> + [short_string_to_binary(FName) | field_value_to_binary(T, V)]. + +field_value_to_binary(longstr, V) -> ["S", long_string_to_binary(V)]; +field_value_to_binary(signedint, V) -> ["I", <<V:32/signed>>]; +field_value_to_binary(decimal, V) -> {Before, After} = V, + ["D", Before, <<After:32>>]; +field_value_to_binary(timestamp, V) -> ["T", <<V:64>>]; +field_value_to_binary(table, V) -> ["F", table_to_binary(V)]; +field_value_to_binary(array, V) -> ["A", array_to_binary(V)]; +field_value_to_binary(byte, V) -> ["b", <<V:8/unsigned>>]; +field_value_to_binary(double, V) -> ["d", <<V:64/float>>]; +field_value_to_binary(float, V) -> ["f", <<V:32/float>>]; +field_value_to_binary(long, V) -> ["l", <<V:64/signed>>]; +field_value_to_binary(short, V) -> ["s", <<V:16/signed>>]; +field_value_to_binary(bool, V) -> ["t", if V -> 1; true -> 0 end]; +field_value_to_binary(binary, V) -> ["x", long_string_to_binary(V)]; +field_value_to_binary(void, _V) -> ["V"]. table_to_binary(Table) when is_list(Table) -> BinTable = generate_table(Table), @@ -176,9 +147,8 @@ generate_table(Table) when is_list(Table) -> list_to_binary(lists:map(fun table_field_to_binary/1, Table)). generate_array(Array) when is_list(Array) -> - list_to_binary(lists:map( - fun ({Type, Value}) -> field_value_to_binary(Type, Value) end, - Array)). + list_to_binary(lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end, + Array)). short_string_to_binary(String) when is_binary(String) -> Len = size(String), @@ -196,63 +166,12 @@ long_string_to_binary(String) when is_binary(String) -> long_string_to_binary(String) -> [<<(length(String)):32>>, String]. -encode_properties([], []) -> - <<0, 0>>; -encode_properties(TypeList, ValueList) -> - encode_properties(0, TypeList, ValueList, 0, [], []). - -encode_properties(_Bit, [], [], FirstShortAcc, FlagsAcc, PropsAcc) -> - list_to_binary([lists:reverse(FlagsAcc), <<FirstShortAcc:16>>, lists:reverse(PropsAcc)]); -encode_properties(_Bit, [], _ValueList, _FirstShortAcc, _FlagsAcc, _PropsAcc) -> - exit(content_properties_values_overflow); -encode_properties(15, TypeList, ValueList, FirstShortAcc, FlagsAcc, PropsAcc) -> - NewFlagsShort = FirstShortAcc bor 1, % set the continuation low bit - encode_properties(0, TypeList, ValueList, 0, [<<NewFlagsShort:16>> | FlagsAcc], PropsAcc); -encode_properties(Bit, [bit | TypeList], [Value | ValueList], FirstShortAcc, FlagsAcc, PropsAcc) -> - case Value of - true -> encode_properties(Bit + 1, TypeList, ValueList, - FirstShortAcc bor (1 bsl (15 - Bit)), FlagsAcc, PropsAcc); - false -> encode_properties(Bit + 1, TypeList, ValueList, - FirstShortAcc, FlagsAcc, PropsAcc); - Other -> exit({content_properties_illegal_bit_value, Other}) - end; -encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, FlagsAcc, PropsAcc) -> - case Value of - undefined -> encode_properties(Bit + 1, TypeList, ValueList, - FirstShortAcc, FlagsAcc, PropsAcc); - _ -> encode_properties(Bit + 1, TypeList, ValueList, - FirstShortAcc bor (1 bsl (15 - Bit)), - FlagsAcc, - [encode_property(T, Value) | PropsAcc]) - end. - -encode_property(shortstr, String) -> - Len = size(String), - if Len < 256 -> <<Len:8, String:Len/binary>>; - true -> exit(content_properties_shortstr_overflow) - end; -encode_property(longstr, String) -> - Len = size(String), <<Len:32, String:Len/binary>>; -encode_property(octet, Int) -> - <<Int:8/unsigned>>; -encode_property(shortint, Int) -> - <<Int:16/unsigned>>; -encode_property(longint, Int) -> - <<Int:32/unsigned>>; -encode_property(longlongint, Int) -> - <<Int:64/unsigned>>; -encode_property(timestamp, Int) -> - <<Int:64/unsigned>>; -encode_property(table, Table) -> - table_to_binary(Table). - check_empty_frame_size() -> %% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly. - ComputedSize = iolist_size(create_frame(?FRAME_BODY, 0, <<>>)), - if ComputedSize == ?EMPTY_FRAME_SIZE -> - ok; - true -> - exit({incorrect_empty_frame_size, ComputedSize, ?EMPTY_FRAME_SIZE}) + case iolist_size(create_frame(?FRAME_BODY, 0, <<>>)) of + ?EMPTY_FRAME_SIZE -> ok; + ComputedSize -> exit({incorrect_empty_frame_size, + ComputedSize, ?EMPTY_FRAME_SIZE}) end. ensure_content_encoded(Content = #content{properties_bin = PropBin, diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 5f0016b6..53878d6a 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -50,47 +50,36 @@ parse_array(<<ValueAndRest/binary>>) -> {Type, Value, Rest} = parse_field_value(ValueAndRest), [{Type, Value} | parse_array(Rest)]. -parse_field_value(<<"S", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) -> - {longstr, ValueString, Rest}; +parse_field_value(<<"S", VLen:32/unsigned, V:VLen/binary, R/binary>>) -> + {longstr, V, R}; -parse_field_value(<<"I", Value:32/signed, Rest/binary>>) -> - {signedint, Value, Rest}; +parse_field_value(<<"I", V:32/signed, R/binary>>) -> + {signedint, V, R}; -parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, Rest/binary>>) -> - {decimal, {Before, After}, Rest}; +parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, R/binary>>) -> + {decimal, {Before, After}, R}; -parse_field_value(<<"T", Value:64/unsigned, Rest/binary>>) -> - {timestamp, Value, Rest}; +parse_field_value(<<"T", V:64/unsigned, R/binary>>) -> + {timestamp, V, R}; -parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, Rest/binary>>) -> - {table, parse_table(Table), Rest}; +parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> + {table, parse_table(Table), R}; -parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, Rest/binary>>) -> - {array, parse_array(Array), Rest}; +parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> + {array, parse_array(Array), R}; -parse_field_value(<<"b", Value:8/unsigned, Rest/binary>>) -> - {byte, Value, Rest}; +parse_field_value(<<"b", V:8/unsigned, R/binary>>) -> {byte, V, R}; +parse_field_value(<<"d", V:64/float, R/binary>>) -> {double, V, R}; +parse_field_value(<<"f", V:32/float, R/binary>>) -> {float, V, R}; +parse_field_value(<<"l", V:64/signed, R/binary>>) -> {long, V, R}; +parse_field_value(<<"s", V:16/signed, R/binary>>) -> {short, V, R}; +parse_field_value(<<"t", V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R}; -parse_field_value(<<"d", Value:64/float, Rest/binary>>) -> - {double, Value, Rest}; +parse_field_value(<<"x", VLen:32/unsigned, V:VLen/binary, R/binary>>) -> + {binary, V, R}; -parse_field_value(<<"f", Value:32/float, Rest/binary>>) -> - {float, Value, Rest}; - -parse_field_value(<<"l", Value:64/signed, Rest/binary>>) -> - {long, Value, Rest}; - -parse_field_value(<<"s", Value:16/signed, Rest/binary>>) -> - {short, Value, Rest}; - -parse_field_value(<<"t", Value:8/unsigned, Rest/binary>>) -> - {bool, (Value /= 0), Rest}; - -parse_field_value(<<"x", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) -> - {binary, ValueString, Rest}; - -parse_field_value(<<"V", Rest/binary>>) -> - {void, undefined, Rest}. +parse_field_value(<<"V", R/binary>>) -> + {void, undefined, R}. ensure_content_decoded(Content = #content{properties = Props}) when Props =/= none -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 0d23f716..2d486651 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -35,9 +35,11 @@ -type(key() :: binary()). --type(bind_errors() :: rabbit_types:error('source_not_found' | - 'destination_not_found' | - 'source_and_destination_not_found')). +-type(bind_errors() :: rabbit_types:error( + {'resources_missing', + [{'not_found', (rabbit_types:binding_source() | + rabbit_types:binding_destination())} | + {'absent', rabbit_types:amqqueue()}]})). -type(bind_ok_or_error() :: 'ok' | bind_errors() | rabbit_types:error('binding_not_found')). -type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())). @@ -330,21 +332,32 @@ sync_transient_route(Route, Fun) -> call_with_source_and_destination(SrcName, DstName, Fun) -> SrcTable = table_for_resource(SrcName), DstTable = table_for_resource(DstName), - ErrFun = fun (Err) -> rabbit_misc:const({error, Err}) end, + ErrFun = fun (Names) -> + Errs = [not_found_or_absent(Name) || Name <- Names], + rabbit_misc:const({error, {resources_missing, Errs}}) + end, rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case {mnesia:read({SrcTable, SrcName}), mnesia:read({DstTable, DstName})} of {[Src], [Dst]} -> Fun(Src, Dst); - {[], [_] } -> ErrFun(source_not_found); - {[_], [] } -> ErrFun(destination_not_found); - {[], [] } -> ErrFun(source_and_destination_not_found) - end + {[], [_] } -> ErrFun([SrcName]); + {[_], [] } -> ErrFun([DstName]); + {[], [] } -> ErrFun([SrcName, DstName]) + end end). table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; table_for_resource(#resource{kind = queue}) -> rabbit_queue. +not_found_or_absent(#resource{kind = exchange} = Name) -> + {not_found, Name}; +not_found_or_absent(#resource{kind = queue} = Name) -> + case rabbit_amqqueue:not_found_or_absent(Name) of + not_found -> {not_found, Name}; + {absent, _Q} = R -> R + end. + contains(Table, MatchHead) -> continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 54427206..a94d2ab5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -474,6 +474,13 @@ check_user_id_header(#'P_basic'{user_id = Claimed}, "'~s'", [Claimed, Actual]) end. +check_expiration_header(Props) -> + case rabbit_basic:parse_expiration(Props) of + {ok, _} -> ok; + {error, E} -> precondition_failed("invalid expiration '~s': ~p", + [Props#'P_basic'.expiration, E]) + end. + check_internal_exchange(#exchange{name = Name, internal = true}) -> rabbit_misc:protocol_error(access_refused, "cannot publish to internal ~s", @@ -614,8 +621,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_internal_exchange(Exchange), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - check_user_id_header(DecodedContent#content.properties, State), + DecodedContent = #content {properties = Props} = + rabbit_binary_parser:ensure_content_decoded(Content), + check_user_id_header(Props, State), + check_expiration_header(Props), {MsgSeqNo, State1} = case {TxStatus, ConfirmEnabled} of {none, false} -> {undefined, State}; @@ -960,8 +969,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. - handle_method(Declare, none, State) - end + handle_method(Declare, none, State); + {absent, Q} -> + rabbit_misc:absent(Q) + end; + {error, {absent, Q}} -> + rabbit_misc:absent(Q) end; handle_method(#'queue.declare'{queue = QueueNameBin, @@ -1170,14 +1183,10 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, (_X, #exchange{}) -> ok end) of - {error, source_not_found} -> - rabbit_misc:not_found(ExchangeName); - {error, destination_not_found} -> - rabbit_misc:not_found(DestinationName); - {error, source_and_destination_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(DestinationName)]); + {error, {resources_missing, [{not_found, Name} | _]}} -> + rabbit_misc:not_found(Name); + {error, {resources_missing, [{absent, Q} | _]}} -> + rabbit_misc:absent(Q); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index bcb83851..42459833 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -83,7 +83,7 @@ init(Type) -> child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) -> [{writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol, ReaderPid]}, + [Sock, Channel, FrameMax, Protocol, ReaderPid, true]}, intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)]; child_specs(direct) -> [{limiter, {rabbit_limiter, start_link, []}, diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 25f7d758..669a0787 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -386,7 +386,7 @@ action(list_bindings, Node, Args, Opts, Inform) -> action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), - ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]), + ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); @@ -611,7 +611,7 @@ display_info_list(Results, InfoItemKeys) when is_list(Results) -> fun (Result) -> display_row( [format_info_item(proplists:get_value(X, Result)) || X <- InfoItemKeys]) - end, Results), + end, lists:sort(Results)), ok; display_info_list(Other, _) -> Other. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 3f1b20fe..7d91b6fa 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -19,8 +19,8 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2]). --export([reset_stats_timer/2]). +-export([init_stats_timer/2, init_disabled_stats_timer/2, + ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]). -export([stats_level/2, if_enabled/3]). -export([notify/2, notify_if/3]). @@ -51,6 +51,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(init_stats_timer/2 :: (container(), pos()) -> container()). +-spec(init_disabled_stats_timer/2 :: (container(), pos()) -> container()). -spec(ensure_stats_timer/3 :: (container(), pos(), term()) -> container()). -spec(stop_stats_timer/2 :: (container(), pos()) -> container()). -spec(reset_stats_timer/2 :: (container(), pos()) -> container()). @@ -90,10 +91,13 @@ start_link() -> init_stats_timer(C, P) -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), - {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), + {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), setelement(P, C, #state{level = StatsLevel, interval = Interval, timer = undefined}). +init_disabled_stats_timer(C, P) -> + setelement(P, C, #state{level = none, interval = 0, timer = undefined}). + ensure_stats_timer(C, P, Msg) -> case element(P, C) of #state{level = Level, interval = Interval, timer = undefined} = State diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 6a7a28f2..df733546 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -229,7 +229,10 @@ dropwhile(Pred, AckRequired, {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), Dropped = Len - Len1, - ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}), + case Dropped of + 0 -> ok; + _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}) + end, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), {Next, Msgs, State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 } }. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ec00ecef..2f75ef2e 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -137,7 +137,7 @@ on_node_up() -> ok. drop_mirrors(QName, Nodes) -> - [ok = drop_mirror(QName, Node) || Node <- Nodes], + [{ok, _} = drop_mirror(QName, Node) || Node <- Nodes], ok. drop_mirror(QName, MirrorNode) -> @@ -154,7 +154,7 @@ drop_mirror(QName, MirrorNode) -> "Dropping queue mirror on node ~p for ~s~n", [MirrorNode, rabbit_misc:rs(Name)]), exit(Pid, {shutdown, dropped}), - ok + {ok, dropped} end end). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ab9a9ceb..137ccf20 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -21,7 +21,7 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, quit/1, protocol_error/3, protocol_error/4, protocol_error/1]). --export([not_found/1, assert_args_equivalence/4]). +-export([not_found/1, absent/1, assert_args_equivalence/4]). -export([dirty_read/1]). -export([table_lookup/2, set_table_value/4]). -export([r/3, r/2, r_arg/4, rs/1]). @@ -63,13 +63,18 @@ -export([version/0]). -export([sequence_error/1]). -export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]). +-export([check_expiry/1]). -export([base64url/1]). +-export([interval_operation/4]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal; R =:= shutdown). +%% This is dictated by `erlang:send_after' on which we depend to implement TTL. +-define(MAX_EXPIRY_TIMER, 4294967295). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -111,6 +116,7 @@ -spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> channel_or_connection_exit()). -spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()). +-spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()). -spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(), rabbit_types:r(any()), [binary()]) -> @@ -228,7 +234,11 @@ -spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error'). -spec(json_to_term/1 :: (any()) -> any()). -spec(term_to_json/1 :: (any()) -> any()). +-spec(check_expiry/1 :: (integer()) -> rabbit_types:ok_or_error(any())). -spec(base64url/1 :: (binary()) -> string()). +-spec(interval_operation/4 :: + (thunk(A), float(), non_neg_integer(), non_neg_integer()) + -> {A, non_neg_integer()}). -endif. @@ -266,6 +276,15 @@ protocol_error(#amqp_error{} = Error) -> not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). +absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) -> + %% The assertion of durability is mainly there because we mention + %% durability in the error message. That way we will hopefully + %% notice if at some future point our logic changes s.t. we get + %% here with non-durable queues. + protocol_error(not_found, + "home node '~s' of durable ~s is down or inaccessible", + [node(QPid), rs(QueueName)]). + type_class(byte) -> int; type_class(short) -> int; type_class(signedint) -> int; @@ -990,9 +1009,28 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. +check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}}; +check_expiry(N) when N < 0 -> {error, {value_negative, N}}; +check_expiry(_N) -> ok. + base64url(In) -> lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; ($\/, Acc) -> [$\_ | Acc]; ($\=, Acc) -> Acc; (Chr, Acc) -> [Chr | Acc] end, [], base64:encode_to_string(In))). + +%% Ideally, you'd want Fun to run every IdealInterval. but you don't +%% want it to take more than MaxRatio of IdealInterval. So if it takes +%% more then you want to run it less often. So we time how long it +%% takes to run, and then suggest how long you should wait before +%% running it again. Times are in millis. +interval_operation(Fun, MaxRatio, IdealInterval, LastInterval) -> + {Micros, Res} = timer:tc(Fun), + {Res, case {Micros > 1000 * (MaxRatio * IdealInterval), + Micros > 1000 * (MaxRatio * LastInterval)} of + {true, true} -> round(LastInterval * 1.5); + {true, false} -> LastInterval; + {false, false} -> lists:max([IdealInterval, + round(LastInterval / 1.5)]) + end}. diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 038154c3..562fc197 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -20,7 +20,7 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1, peercert/1, - tune_buffer_size/1, connection_string/2]). + tune_buffer_size/1, connection_string/2, socket_ends/2]). %%--------------------------------------------------------------------------- @@ -36,7 +36,7 @@ -type(socket() :: port() | #ssl_socket{}). -type(opts() :: [{atom(), any()} | {raw, non_neg_integer(), non_neg_integer(), binary()}]). - +-type(host_or_ip() :: binary() | inet:ip_address()). -spec(is_ssl/1 :: (socket()) -> boolean()). -spec(ssl_info/1 :: (socket()) -> 'nossl' | ok_val_or_error( @@ -72,6 +72,10 @@ -spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()). -spec(connection_string/2 :: (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())). +-spec(socket_ends/2 :: + (socket(), 'inbound' | 'outbound') + -> ok_val_or_error({host_or_ip(), rabbit_networking:ip_port(), + host_or_ip(), rabbit_networking:ip_port()})). -endif. @@ -193,17 +197,37 @@ tune_buffer_size(Sock) -> end. connection_string(Sock, Direction) -> - {From, To} = case Direction of - inbound -> {fun peername/1, fun sockname/1}; - outbound -> {fun sockname/1, fun peername/1} - end, + case socket_ends(Sock, Direction) of + {ok, {FromAddress, FromPort, ToAddress, ToPort}} -> + {ok, rabbit_misc:format( + "~s:~p -> ~s:~p", + [maybe_ntoab(FromAddress), FromPort, + maybe_ntoab(ToAddress), ToPort])}; + Error -> + Error + end. + +socket_ends(Sock, Direction) -> + {From, To} = sock_funs(Direction), case {From(Sock), To(Sock)} of {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} -> - {ok, rabbit_misc:format("~s:~p -> ~s:~p", - [rabbit_misc:ntoab(FromAddress), FromPort, - rabbit_misc:ntoab(ToAddress), ToPort])}; + {ok, {rdns(FromAddress), FromPort, + rdns(ToAddress), ToPort}}; {{error, _Reason} = Error, _} -> Error; {_, {error, _Reason} = Error} -> Error end. + +maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr); +maybe_ntoab(Host) -> Host. + +rdns(Addr) -> + {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups), + case Lookup of + true -> list_to_binary(rabbit_networking:tcp_host(Addr)); + _ -> Addr + end. + +sock_funs(inbound) -> {fun peername/1, fun sockname/1}; +sock_funs(outbound) -> {fun sockname/1, fun peername/1}. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 5cf8d1ae..31eeef73 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -21,7 +21,7 @@ node_listeners/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, - close_connection/2, force_connection_event_refresh/0]). + close_connection/2, force_connection_event_refresh/0, tcp_host/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([tcp_listener_addresses/1, tcp_listener_spec/6, diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index b11c9d04..8d0e4456 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -85,10 +85,10 @@ cluster_status_filename() -> prepare_cluster_status_files() -> rabbit_mnesia:ensure_mnesia_dir(), - CorruptFiles = fun () -> throw({error, corrupt_cluster_status_files}) end, + Corrupt = fun(F) -> throw({error, corrupt_cluster_status_files, F}) end, RunningNodes1 = case try_read_file(running_nodes_filename()) of {ok, [Nodes]} when is_list(Nodes) -> Nodes; - {ok, _ } -> CorruptFiles(); + {ok, Other} -> Corrupt(Other); {error, enoent} -> [] end, ThisNode = [node()], @@ -102,8 +102,8 @@ prepare_cluster_status_files() -> {ok, [AllNodes0]} when is_list(AllNodes0) -> {legacy_cluster_nodes(AllNodes0), legacy_should_be_disc_node(AllNodes0)}; - {ok, _} -> - CorruptFiles(); + {ok, Files} -> + Corrupt(Files); {error, enoent} -> {legacy_cluster_nodes([]), true} end, @@ -134,8 +134,8 @@ read_cluster_status() -> try_read_file(running_nodes_filename())} of {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) -> {All, Disc, Running}; - {_, _} -> - throw({error, corrupt_or_missing_cluster_files}) + {Stat, Run} -> + throw({error, {corrupt_or_missing_cluster_files, Stat, Run}}) end. update_cluster_status() -> @@ -184,6 +184,11 @@ partitions() -> %%---------------------------------------------------------------------------- init([]) -> + %% We trap exits so that the supervisor will not just kill us. We + %% want to be sure that we are not going to be killed while + %% writing out the cluster status files - bad things can then + %% happen. + process_flag(trap_exit, true), {ok, _} = mnesia:subscribe(system), {ok, #state{monitors = pmon:new(), partitions = []}}. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index ecb19611..9f94af7d 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -19,18 +19,6 @@ -export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]). --define(VERBOSE_DEF, {?VERBOSE_OPT, flag}). --define(MINIMAL_DEF, {?MINIMAL_OPT, flag}). --define(ENABLED_DEF, {?ENABLED_OPT, flag}). --define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}). - --define(GLOBAL_DEFS, []). - --define(COMMANDS, - [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]}, - enable, - disable]). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -100,8 +88,13 @@ dependencies(Reverse, Sources, AllPlugins) -> {ok, G} = rabbit_misc:build_acyclic_graph( fun (App, _Deps) -> [{App, App}] end, fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end, - [{Name, Deps} - || #plugin{name = Name, dependencies = Deps} <- AllPlugins]), + lists:ukeysort( + 1, [{Name, Deps} || + #plugin{name = Name, + dependencies = Deps} <- AllPlugins] ++ + [{Dep, []} || + #plugin{dependencies = Deps} <- AllPlugins, + Dep <- Deps])), Dests = case Reverse of false -> digraph_utils:reachable(Sources, G); true -> digraph_utils:reaching(Sources, G) diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 572cf150..2158d1da 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -108,16 +108,19 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> Enabled, AllPlugins), ToEnable = [list_to_atom(Name) || Name <- ToEnable0], Missing = ToEnable -- plugin_names(AllPlugins), - case Missing of - [] -> ok; - _ -> throw({error_string, - fmt_list("The following plugins could not be found:", - Missing)}) - end, NewEnabled = lists:usort(Enabled ++ ToEnable), - write_enabled_plugins(PluginsFile, NewEnabled), NewImplicitlyEnabled = rabbit_plugins:dependencies(false, NewEnabled, AllPlugins), + MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing, + case {Missing, MissingDeps} of + {[], []} -> ok; + {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)}); + {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)}); + {_, _} -> throw({error_string, + fmt_missing("plugins", Missing) ++ + fmt_missing("dependencies", MissingDeps)}) + end, + write_enabled_plugins(PluginsFile, NewEnabled), maybe_warn_mochiweb(NewImplicitlyEnabled), case NewEnabled -- ImplicitlyEnabled of [] -> io:format("Plugin configuration unchanged.~n"); @@ -183,9 +186,12 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> EnabledImplicitly = rabbit_plugins:dependencies(false, EnabledExplicitly, AvailablePlugins) -- EnabledExplicitly, + Missing = [#plugin{name = Name, dependencies = []} || + Name <- ((EnabledExplicitly ++ EnabledImplicitly) -- + plugin_names(AvailablePlugins))], {ok, RE} = re:compile(Pattern), Plugins = [ Plugin || - Plugin = #plugin{name = Name} <- AvailablePlugins, + Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing, re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, if OnlyEnabled -> lists:member(Name, EnabledExplicitly); OnlyEnabledAll -> (lists:member(Name, @@ -196,30 +202,35 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> Plugins1 = usort_plugins(Plugins), MaxWidth = lists:max([length(atom_to_list(Name)) || #plugin{name = Name} <- Plugins1] ++ [0]), - [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format, - MaxWidth) || P <- Plugins1], + [format_plugin(P, EnabledExplicitly, EnabledImplicitly, + plugin_names(Missing), Format, MaxWidth) || P <- Plugins1], ok. format_plugin(#plugin{name = Name, version = Version, description = Description, dependencies = Deps}, - EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) -> + EnabledExplicitly, EnabledImplicitly, Missing, + Format, MaxWidth) -> Glyph = case {lists:member(Name, EnabledExplicitly), - lists:member(Name, EnabledImplicitly)} of - {true, false} -> "[E]"; - {false, true} -> "[e]"; - _ -> "[ ]" + lists:member(Name, EnabledImplicitly), + lists:member(Name, Missing)} of + {true, false, false} -> "[E]"; + {false, true, false} -> "[e]"; + {_, _, true} -> "[!]"; + _ -> "[ ]" end, + Opt = fun (_F, A, A) -> ok; + ( F, A, _) -> io:format(F, [A]) + end, case Format of minimal -> io:format("~s~n", [Name]); - normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++ - "w ~s~n", [Glyph, Name, Version]); + normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++ "w ", + [Glyph, Name]), + Opt("~s", Version, undefined), + io:format("~n"); verbose -> io:format("~s ~w~n", [Glyph, Name]), - io:format(" Version: \t~s~n", [Version]), - case Deps of - [] -> ok; - _ -> io:format(" Dependencies:\t~p~n", [Deps]) - end, - io:format(" Description:\t~s~n", [Description]), + Opt(" Version: \t~s~n", Version, undefined), + Opt(" Dependencies:\t~p~n", Deps, []), + Opt(" Description: \t~s~n", Description, undefined), io:format("~n") end. @@ -230,6 +241,9 @@ fmt_list(Header, Plugins) -> lists:flatten( [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]). +fmt_missing(Desc, Missing) -> + fmt_list("The following " ++ Desc ++ " could not be found:", Missing). + usort_plugins(Plugins) -> lists:usort(fun plugins_cmp/2, Plugins). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f4e6865b..928786e9 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -35,23 +35,23 @@ %%-------------------------------------------------------------------------- --record(v1, {parent, sock, connection, callback, recv_len, pending_recv, +-record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, auth_mechanism, auth_state, conserve_resources, - last_blocked_by, last_blocked_at}). + last_blocked_by, last_blocked_at, host, peer_host, + port, peer_port}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, channels]). --define(CREATION_EVENT_KEYS, [pid, name, address, port, peer_address, peer_port, - ssl, peer_cert_subject, peer_cert_issuer, - peer_cert_validity, auth_mechanism, - ssl_protocol, ssl_key_exchange, - ssl_cipher, ssl_hash, - protocol, user, vhost, timeout, frame_max, - client_properties]). +-define(CREATION_EVENT_KEYS, + [pid, name, port, peer_port, host, + peer_host, ssl, peer_cert_subject, peer_cert_issuer, + peer_cert_validity, auth_mechanism, ssl_protocol, + ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, + timeout, frame_max, client_properties]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -192,16 +192,20 @@ socket_op(Sock, Fun) -> name(Sock) -> socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end). +socket_ends(Sock) -> + socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end). + start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - ConnStr = name(Sock), - log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]), + Name = name(Sock), + log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]), ClientSock = socket_op(Sock, SockTransform), - erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), - handshake_timeout), + erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), + {PeerHost, PeerPort, Host, Port} = socket_ends(Sock), State = #v1{parent = Parent, sock = ClientSock, + name = list_to_binary(Name), connection = #connection{ protocol = none, user = none, @@ -224,19 +228,23 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, auth_state = none, conserve_resources = false, last_blocked_by = none, - last_blocked_at = never}, + last_blocked_at = never, + host = Host, + peer_host = PeerHost, + port = Port, + peer_port = PeerPort}, try ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end), recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), handshake, 8)), - log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr]) + log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of connection_closed_abruptly -> warning; _ -> error end, "closing AMQP connection ~p (~s):~n~p~n", - [self(), ConnStr, Ex]) + [self(), Name, Ex]) after %% We don't call gen_tcp:close/1 here since it waits for %% pending output to be sent, which results in unnecessary @@ -341,6 +349,8 @@ handle_other({'$gen_cast', force_event_refresh}, Deb, State) handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> %% Ignore, we will emit a created event once we start running. mainloop(Deb, State); +handle_other(ensure_stats, Deb, State) -> + mainloop(Deb, ensure_stats_timer(State)); handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> @@ -491,6 +501,14 @@ handle_exception(State, Channel, Reason) -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), throw({handshake_error, State#v1.connection_state, Channel, Reason}). +%% we've "lost sync" with the client and hence must not accept any +%% more input +fatal_frame_error(Error, Type, Channel, Payload, State) -> + frame_error(Error, Type, Channel, Payload, State), + %% grace period to allow transmission of error + timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw(fatal_frame_error). + frame_error(Error, Type, Channel, Payload, State) -> {Str, Bin} = payload_snippet(Payload), handle_exception(State, Channel, @@ -513,7 +531,7 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) -> %%-------------------------------------------------------------------------- create_channel(Channel, State) -> - #v1{sock = Sock, queue_collector = Collector, + #v1{sock = Sock, name = Name, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, connection = #connection{protocol = Protocol, frame_max = FrameMax, @@ -522,7 +540,7 @@ create_channel(Channel, State) -> capabilities = Capabilities}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock), + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), @@ -621,8 +639,9 @@ handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State = #v1{connection = #connection{frame_max = FrameMax}}) when FrameMax /= 0 andalso PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE -> - frame_error({frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, - Type, Channel, <<>>, State); + fatal_frame_error( + {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, + Type, Channel, <<>>, State); handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, @@ -633,8 +652,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> case EndMarker of ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), switch_callback(State1, frame_header, 7); - _ -> frame_error({invalid_frame_end_marker, EndMarker}, - Type, Channel, Payload, State) + _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker}, + Type, Channel, Payload, State) end; %% The two rules pertaining to version negotiation: @@ -881,82 +900,66 @@ auth_phase(Response, infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(pid, #v1{}) -> - self(); -i(name, #v1{sock = Sock}) -> - list_to_binary(name(Sock)); -i(address, #v1{sock = Sock}) -> - socket_info(fun rabbit_net:sockname/1, fun ({A, _}) -> A end, Sock); -i(port, #v1{sock = Sock}) -> - socket_info(fun rabbit_net:sockname/1, fun ({_, P}) -> P end, Sock); -i(peer_address, #v1{sock = Sock}) -> - socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock); -i(peer_port, #v1{sock = Sock}) -> - socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock); -i(ssl, #v1{sock = Sock}) -> - rabbit_net:is_ssl(Sock); -i(ssl_protocol, #v1{sock = Sock}) -> - ssl_info(fun ({P, _}) -> P end, Sock); -i(ssl_key_exchange, #v1{sock = Sock}) -> - ssl_info(fun ({_, {K, _, _}}) -> K end, Sock); -i(ssl_cipher, #v1{sock = Sock}) -> - ssl_info(fun ({_, {_, C, _}}) -> C end, Sock); -i(ssl_hash, #v1{sock = Sock}) -> - ssl_info(fun ({_, {_, _, H}}) -> H end, Sock); -i(peer_cert_issuer, #v1{sock = Sock}) -> - cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock); -i(peer_cert_subject, #v1{sock = Sock}) -> - cert_info(fun rabbit_ssl:peer_cert_subject/1, Sock); -i(peer_cert_validity, #v1{sock = Sock}) -> - cert_info(fun rabbit_ssl:peer_cert_validity/1, Sock); -i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; - SockStat =:= recv_cnt; - SockStat =:= send_oct; - SockStat =:= send_cnt; - SockStat =:= send_pend -> - socket_info(fun (S) -> rabbit_net:getstat(S, [SockStat]) end, - fun ([{_, I}]) -> I end, Sock); -i(state, #v1{connection_state = S}) -> - S; -i(last_blocked_by, #v1{last_blocked_by = By}) -> - By; -i(last_blocked_age, #v1{last_blocked_at = never}) -> +i(pid, #v1{}) -> self(); +i(name, #v1{name = Name}) -> Name; +i(host, #v1{host = Host}) -> Host; +i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost; +i(port, #v1{port = Port}) -> Port; +i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort; +i(SockStat, S) when SockStat =:= recv_oct; + SockStat =:= recv_cnt; + SockStat =:= send_oct; + SockStat =:= send_cnt; + SockStat =:= send_pend -> + socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end, + fun ([{_, I}]) -> I end, S); +i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock); +i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S); +i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S); +i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S); +i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S); +i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S); +i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); +i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); +i(state, #v1{connection_state = CS}) -> CS; +i(last_blocked_by, #v1{last_blocked_by = By}) -> By; +i(last_blocked_age, #v1{last_blocked_at = never}) -> infinity; -i(last_blocked_age, #v1{last_blocked_at = T}) -> +i(last_blocked_age, #v1{last_blocked_at = T}) -> timer:now_diff(erlang:now(), T) / 1000000; -i(channels, #v1{}) -> - length(all_channels()); -i(protocol, #v1{connection = #connection{protocol = none}}) -> - none; -i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> - Protocol:version(); -i(auth_mechanism, #v1{auth_mechanism = none}) -> +i(channels, #v1{}) -> length(all_channels()); +i(auth_mechanism, #v1{auth_mechanism = none}) -> none; -i(auth_mechanism, #v1{auth_mechanism = Mechanism}) -> +i(auth_mechanism, #v1{auth_mechanism = Mechanism}) -> proplists:get_value(name, Mechanism:description()); -i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> - Username; -i(user, #v1{connection = #connection{user = none}}) -> +i(protocol, #v1{connection = #connection{protocol = none}}) -> + none; +i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> + Protocol:version(); +i(user, #v1{connection = #connection{user = none}}) -> ''; -i(vhost, #v1{connection = #connection{vhost = VHost}}) -> +i(user, #v1{connection = #connection{user = #user{ + username = Username}}}) -> + Username; +i(vhost, #v1{connection = #connection{vhost = VHost}}) -> VHost; -i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> +i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> Timeout; -i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> +i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> FrameMax; -i(client_properties, #v1{connection = #connection{ - client_properties = ClientProperties}}) -> +i(client_properties, #v1{connection = #connection{client_properties = + ClientProperties}}) -> ClientProperties; i(Item, #v1{}) -> throw({bad_argument, Item}). -socket_info(Get, Select, Sock) -> +socket_info(Get, Select, #v1{sock = Sock}) -> case Get(Sock) of {ok, T} -> Select(T); {error, _} -> '' end. -ssl_info(F, Sock) -> +ssl_info(F, #v1{sock = Sock}) -> %% The first ok form is R14 %% The second is R13 - the extra term is exportability (by inspection, %% the docs are wrong) @@ -967,7 +970,7 @@ ssl_info(F, Sock) -> {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}}) end. -cert_info(F, Sock) -> +cert_info(F, #v1{sock = Sock}) -> case rabbit_net:peercert(Sock) of nossl -> ''; {error, no_peercert} -> ''; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index afc10b9f..096f9490 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -18,7 +18,7 @@ -compile([export_all]). --export([all_tests/0, test_parsing/0]). +-export([all_tests/0]). -import(rabbit_misc, [pget/2]). @@ -41,11 +41,12 @@ all_tests() -> passed = test_multi_call(), passed = test_file_handle_cache(), passed = test_backing_queue(), + passed = test_rabbit_basic_header_handling(), passed = test_priority_queue(), passed = test_pg_local(), passed = test_unfold(), passed = test_supervisor_delayed_restart(), - passed = test_parsing(), + passed = test_table_codec(), passed = test_content_framing(), passed = test_content_transcoding(), passed = test_topic_matching(), @@ -71,6 +72,7 @@ all_tests() -> passed = test_configurable_server_properties(), passed. + do_if_secondary_node(Up, Down) -> SecondaryNode = rabbit_nodes:make("hare"), @@ -159,6 +161,78 @@ test_multi_call() -> exit(Pid3, bang), passed. +test_rabbit_basic_header_handling() -> + passed = write_table_with_invalid_existing_type_test(), + passed = invalid_existing_headers_test(), + passed = disparate_invalid_header_entries_accumulate_separately_test(), + passed = corrupt_or_invalid_headers_are_overwritten_test(), + passed = invalid_same_header_entry_accumulation_test(), + passed. + +-define(XDEATH_TABLE, + [{<<"reason">>, longstr, <<"blah">>}, + {<<"queue">>, longstr, <<"foo.bar.baz">>}, + {<<"exchange">>, longstr, <<"my-exchange">>}, + {<<"routing-keys">>, array, []}]). + +-define(ROUTE_TABLE, [{<<"redelivered">>, bool, <<"true">>}]). + +-define(BAD_HEADER(K), {<<K>>, longstr, <<"bad ", K>>}). +-define(BAD_HEADER2(K, Suf), {<<K>>, longstr, <<"bad ", K, Suf>>}). +-define(FOUND_BAD_HEADER(K), {<<K>>, array, [{longstr, <<"bad ", K>>}]}). + +write_table_with_invalid_existing_type_test() -> + prepend_check(<<"header1">>, ?XDEATH_TABLE, [?BAD_HEADER("header1")]), + passed. + +invalid_existing_headers_test() -> + Headers = + prepend_check(<<"header2">>, ?ROUTE_TABLE, [?BAD_HEADER("header2")]), + {array, [{table, ?ROUTE_TABLE}]} = + rabbit_misc:table_lookup(Headers, <<"header2">>), + passed. + +disparate_invalid_header_entries_accumulate_separately_test() -> + BadHeaders = [?BAD_HEADER("header2")], + Headers = prepend_check(<<"header2">>, ?ROUTE_TABLE, BadHeaders), + Headers2 = prepend_check(<<"header1">>, ?XDEATH_TABLE, + [?BAD_HEADER("header1") | Headers]), + {table, [?FOUND_BAD_HEADER("header1"), + ?FOUND_BAD_HEADER("header2")]} = + rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY), + passed. + +corrupt_or_invalid_headers_are_overwritten_test() -> + Headers0 = [?BAD_HEADER("header1"), + ?BAD_HEADER("x-invalid-headers")], + Headers1 = prepend_check(<<"header1">>, ?XDEATH_TABLE, Headers0), + {table,[?FOUND_BAD_HEADER("header1"), + ?FOUND_BAD_HEADER("x-invalid-headers")]} = + rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY), + passed. + +invalid_same_header_entry_accumulation_test() -> + BadHeader1 = ?BAD_HEADER2("header1", "a"), + Headers = prepend_check(<<"header1">>, ?ROUTE_TABLE, [BadHeader1]), + Headers2 = prepend_check(<<"header1">>, ?ROUTE_TABLE, + [?BAD_HEADER2("header1", "b") | Headers]), + {table, InvalidHeaders} = + rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY), + {array, [{longstr,<<"bad header1b">>}, + {longstr,<<"bad header1a">>}]} = + rabbit_misc:table_lookup(InvalidHeaders, <<"header1">>), + passed. + +prepend_check(HeaderKey, HeaderTable, Headers) -> + Headers1 = rabbit_basic:prepend_table_header( + HeaderKey, HeaderTable, Headers), + {table, Invalid} = + rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY), + {Type, Value} = rabbit_misc:table_lookup(Headers, HeaderKey), + {array, [{Type, Value} | _]} = + rabbit_misc:table_lookup(Invalid, HeaderKey), + Headers1. + test_priority_queue() -> false = priority_queue:is_queue(not_a_queue), @@ -350,113 +424,45 @@ test_unfold() -> end, 10), passed. -test_parsing() -> - passed = test_content_properties(), - passed = test_field_values(), - passed. - -test_content_prop_encoding(Datum, Binary) -> - Types = [element(1, E) || E <- Datum], - Values = [element(2, E) || E <- Datum], - Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion - -test_content_properties() -> - test_content_prop_encoding([], <<0, 0>>), - test_content_prop_encoding([{bit, true}, {bit, false}, {bit, true}, {bit, false}], - <<16#A0, 0>>), - test_content_prop_encoding([{bit, true}, {octet, 123}, {bit, true}, {octet, undefined}, - {bit, true}], - <<16#E8,0,123>>), - test_content_prop_encoding([{bit, true}, {octet, 123}, {octet, 123}, {bit, true}], - <<16#F0,0,123,123>>), - test_content_prop_encoding([{bit, true}, {shortstr, <<"hi">>}, {bit, true}, - {shortint, 54321}, {bit, true}], - <<16#F8,0,2,"hi",16#D4,16#31>>), - test_content_prop_encoding([{bit, true}, {shortstr, undefined}, {bit, true}, - {shortint, 54321}, {bit, true}], - <<16#B8,0,16#D4,16#31>>), - test_content_prop_encoding([{table, [{<<"a signedint">>, signedint, 12345678}, - {<<"a longstr">>, longstr, <<"yes please">>}, - {<<"a decimal">>, decimal, {123, 12345678}}, - {<<"a timestamp">>, timestamp, 123456789012345}, - {<<"a nested table">>, table, - [{<<"one">>, signedint, 1}, - {<<"two">>, signedint, 2}]}]}], - << - %% property-flags - 16#8000:16, - - %% property-list: - - %% table - 117:32, % table length in bytes - - 11,"a signedint", % name - "I",12345678:32, % type and value - - 9,"a longstr", - "S",10:32,"yes please", - - 9,"a decimal", - "D",123,12345678:32, - - 11,"a timestamp", - "T", 123456789012345:64, - - 14,"a nested table", - "F", - 18:32, - - 3,"one", - "I",1:32, - - 3,"two", - "I",2:32 >>), - passed. - -test_field_values() -> +test_table_codec() -> %% FIXME this does not test inexact numbers (double and float) yet, %% because they won't pass the equality assertions - test_content_prop_encoding( - [{table, [{<<"longstr">>, longstr, <<"Here is a long string">>}, - {<<"signedint">>, signedint, 12345}, - {<<"decimal">>, decimal, {3, 123456}}, - {<<"timestamp">>, timestamp, 109876543209876}, - {<<"table">>, table, [{<<"one">>, signedint, 54321}, - {<<"two">>, longstr, <<"A long string">>}]}, - {<<"byte">>, byte, 255}, - {<<"long">>, long, 1234567890}, - {<<"short">>, short, 655}, - {<<"bool">>, bool, true}, - {<<"binary">>, binary, <<"a binary string">>}, - {<<"void">>, void, undefined}, - {<<"array">>, array, [{signedint, 54321}, - {longstr, <<"A long string">>}]} - - ]}], - << - %% property-flags - 16#8000:16, - %% table length in bytes - 228:32, - - 7,"longstr", "S", 21:32, "Here is a long string", % = 34 - 9,"signedint", "I", 12345:32/signed, % + 15 = 49 - 7,"decimal", "D", 3, 123456:32, % + 14 = 63 - 9,"timestamp", "T", 109876543209876:64, % + 19 = 82 - 5,"table", "F", 31:32, % length of table % + 11 = 93 - 3,"one", "I", 54321:32, % + 9 = 102 - 3,"two", "S", 13:32, "A long string", % + 22 = 124 - 4,"byte", "b", 255:8, % + 7 = 131 - 4,"long", "l", 1234567890:64, % + 14 = 145 - 5,"short", "s", 655:16, % + 9 = 154 - 4,"bool", "t", 1, % + 7 = 161 - 6,"binary", "x", 15:32, "a binary string", % + 27 = 188 - 4,"void", "V", % + 6 = 194 - 5,"array", "A", 23:32, % + 11 = 205 - "I", 54321:32, % + 5 = 210 - "S", 13:32, "A long string" % + 18 = 228 - >>), + Table = [{<<"longstr">>, longstr, <<"Here is a long string">>}, + {<<"signedint">>, signedint, 12345}, + {<<"decimal">>, decimal, {3, 123456}}, + {<<"timestamp">>, timestamp, 109876543209876}, + {<<"table">>, table, [{<<"one">>, signedint, 54321}, + {<<"two">>, longstr, + <<"A long string">>}]}, + {<<"byte">>, byte, 255}, + {<<"long">>, long, 1234567890}, + {<<"short">>, short, 655}, + {<<"bool">>, bool, true}, + {<<"binary">>, binary, <<"a binary string">>}, + {<<"void">>, void, undefined}, + {<<"array">>, array, [{signedint, 54321}, + {longstr, <<"A long string">>}]} + ], + Binary = << + 7,"longstr", "S", 21:32, "Here is a long string", + 9,"signedint", "I", 12345:32/signed, + 7,"decimal", "D", 3, 123456:32, + 9,"timestamp", "T", 109876543209876:64, + 5,"table", "F", 31:32, % length of table + 3,"one", "I", 54321:32, + 3,"two", "S", 13:32, "A long string", + 4,"byte", "b", 255:8, + 4,"long", "l", 1234567890:64, + 5,"short", "s", 655:16, + 4,"bool", "t", 1, + 6,"binary", "x", 15:32, "a binary string", + 4,"void", "V", + 5,"array", "A", 23:32, + "I", 54321:32, + "S", 13:32, "A long string" + >>, + Binary = rabbit_binary_generator:generate_table(Table), + Table = rabbit_binary_parser:parse_table(Binary), passed. %% Test that content frames don't exceed frame-max @@ -1125,6 +1131,9 @@ test_server_status() -> HWM = vm_memory_monitor:get_vm_memory_high_watermark(), ok = control_action(set_vm_memory_high_watermark, ["1"]), ok = control_action(set_vm_memory_high_watermark, ["1.0"]), + %% this will trigger an alarm + ok = control_action(set_vm_memory_high_watermark, ["0.0"]), + %% reset ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]), %% eval @@ -2518,7 +2527,7 @@ test_queue_recover() -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(), - rabbit_amqqueue:start(), + rabbit_amqqueue:start(rabbit_amqqueue:recover()), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index f3a8cacf..a7ea3d99 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -18,13 +18,17 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/5, start_link/5, mainloop/2, mainloop1/2]). +-export([start/5, start_link/5, start/6, start_link/6]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). --record(wstate, {sock, channel, frame_max, protocol, pending}). +%% internal +-export([mainloop/1, mainloop1/1]). + +-record(wstate, {sock, channel, frame_max, protocol, reader, + stats_timer, pending}). -define(HIBERNATE_AFTER, 5000). @@ -40,6 +44,14 @@ (rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). +-spec(start/6 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) + -> rabbit_types:ok(pid())). +-spec(start_link/6 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) + -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: @@ -67,50 +79,58 @@ non_neg_integer(), rabbit_types:protocol()) -> 'ok'). --spec(mainloop/2 :: (_,_) -> 'done'). --spec(mainloop1/2 :: (_,_) -> any()). - -endif. %%--------------------------------------------------------------------------- start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - {ok, - proc_lib:spawn(?MODULE, mainloop, [ReaderPid, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol, - pending = []}])}. + start(Sock, Channel, FrameMax, Protocol, ReaderPid, false). start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - {ok, - proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol, - pending = []}])}. - -mainloop(ReaderPid, State) -> + start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false). + +start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> + State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, + ReaderWantsStats), + {ok, proc_lib:spawn(?MODULE, mainloop, [State])}. + +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> + State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, + ReaderWantsStats), + {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}. + +initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> + (case ReaderWantsStats of + true -> fun rabbit_event:init_stats_timer/2; + false -> fun rabbit_event:init_disabled_stats_timer/2 + end)(#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol, + reader = ReaderPid, + pending = []}, + #wstate.stats_timer). + +mainloop(State) -> try - mainloop1(ReaderPid, State) + mainloop1(State) catch - exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error} + exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State, + ReaderPid ! {channel_exit, Channel, Error} end, done. -mainloop1(ReaderPid, State = #wstate{pending = []}) -> +mainloop1(State = #wstate{pending = []}) -> receive - Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + Message -> ?MODULE:mainloop1(handle_message(Message, State)) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) + erlang:hibernate(?MODULE, mainloop, [State]) end; -mainloop1(ReaderPid, State) -> +mainloop1(State) -> receive - Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(ReaderPid, flush(State)) + ?MODULE:mainloop1(flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -139,9 +159,12 @@ handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> rabbit_amqqueue:notify_sent_queue_down(QPid), State; handle_message({inet_reply, _, ok}, State) -> - State; + rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats); handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); +handle_message(emit_stats, State = #wstate{reader = ReaderPid}) -> + ReaderPid ! ensure_stats, + rabbit_event:reset_stats_timer(State, #wstate.stats_timer); handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). |