summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--codegen.py158
-rw-r--r--docs/rabbitmq-plugins.1.xml13
-rw-r--r--docs/rabbitmqctl.1.xml18
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl1
-rw-r--r--src/background_gc.erl78
-rw-r--r--src/rabbit.erl11
-rw-r--r--src/rabbit_alarm.erl8
-rw-r--r--src/rabbit_amqqueue.erl122
-rw-r--r--src/rabbit_amqqueue_process.erl77
-rw-r--r--src/rabbit_basic.erl70
-rw-r--r--src/rabbit_binary_generator.erl131
-rw-r--r--src/rabbit_binary_parser.erl55
-rw-r--r--src/rabbit_binding.erl29
-rw-r--r--src/rabbit_channel.erl33
-rw-r--r--src/rabbit_channel_sup.erl2
-rw-r--r--src/rabbit_control_main.erl4
-rw-r--r--src/rabbit_event.erl10
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_mirror_queue_misc.erl4
-rw-r--r--src/rabbit_misc.erl40
-rw-r--r--src/rabbit_net.erl42
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_node_monitor.erl17
-rw-r--r--src/rabbit_plugins.erl21
-rw-r--r--src/rabbit_plugins_main.erl60
-rw-r--r--src/rabbit_reader.erl167
-rw-r--r--src/rabbit_tests.erl225
-rw-r--r--src/rabbit_writer.erl83
29 files changed, 885 insertions, 602 deletions
diff --git a/codegen.py b/codegen.py
index 9483e854..5624658b 100644
--- a/codegen.py
+++ b/codegen.py
@@ -24,18 +24,6 @@ from amqp_codegen import *
import string
import re
-erlangTypeMap = {
- 'octet': 'octet',
- 'shortstr': 'shortstr',
- 'longstr': 'longstr',
- 'short': 'shortint',
- 'long': 'longint',
- 'longlong': 'longlongint',
- 'bit': 'bit',
- 'table': 'table',
- 'timestamp': 'timestamp',
-}
-
# Coming up with a proper encoding of AMQP tables in JSON is too much
# hassle at this stage. Given that the only default value we are
# interested in is for the empty table, we only support that.
@@ -123,7 +111,7 @@ def printFileHeader():
def genErl(spec):
def erlType(domain):
- return erlangTypeMap[spec.resolveDomain(domain)]
+ return erlangize(spec.resolveDomain(domain))
def fieldTypeList(fields):
return '[' + ', '.join([erlType(f.domain) for f in fields]) + ']'
@@ -186,11 +174,11 @@ def genErl(spec):
return p+'Len:32/unsigned, '+p+':'+p+'Len/binary'
elif type == 'octet':
return p+':8/unsigned'
- elif type == 'shortint':
+ elif type == 'short':
return p+':16/unsigned'
- elif type == 'longint':
+ elif type == 'long':
return p+':32/unsigned'
- elif type == 'longlongint':
+ elif type == 'longlong':
return p+':64/unsigned'
elif type == 'timestamp':
return p+':64/unsigned'
@@ -233,29 +221,23 @@ def genErl(spec):
def presentBin(fields):
ps = ', '.join(['P' + str(f.index) + ':1' for f in fields])
return '<<' + ps + ', _:%d, R0/binary>>' % (16 - len(fields),)
- def mkMacroName(field):
- return '?' + field.domain.upper() + '_PROP'
- def writePropFieldLine(field, bin_next = None):
+ def writePropFieldLine(field):
i = str(field.index)
- if not bin_next:
- bin_next = 'R' + str(field.index + 1)
- if field.domain in ['octet', 'timestamp']:
- print (" {%s, %s} = %s(%s, %s, %s, %s)," %
- ('F' + i, bin_next, mkMacroName(field), 'P' + i,
- 'R' + i, 'I' + i, 'X' + i))
+ if field.domain == 'bit':
+ print " {F%s, R%s} = {P%s =/= 0, R%s}," % \
+ (i, str(field.index + 1), i, i)
else:
- print (" {%s, %s} = %s(%s, %s, %s, %s, %s)," %
- ('F' + i, bin_next, mkMacroName(field), 'P' + i,
- 'R' + i, 'L' + i, 'S' + i, 'X' + i))
+ print " {F%s, R%s} = if P%s =:= 0 -> {undefined, R%s}; true -> ?%s_VAL(R%s, L%s, V%s, X%s) end," % \
+ (i, str(field.index + 1), i, i, erlType(field.domain).upper(), i, i, i, i)
if len(c.fields) == 0:
- print "decode_properties(%d, _) ->" % (c.index,)
+ print "decode_properties(%d, <<>>) ->" % (c.index,)
else:
print ("decode_properties(%d, %s) ->" %
(c.index, presentBin(c.fields)))
- for field in c.fields[:-1]:
+ for field in c.fields:
writePropFieldLine(field)
- writePropFieldLine(c.fields[-1], "<<>>")
+ print " <<>> = %s," % ('R' + str(len(c.fields)))
print " #'P_%s'{%s};" % (erlangize(c.name), fieldMapList(c.fields))
def genFieldPreprocessing(packed):
@@ -283,9 +265,27 @@ def genErl(spec):
print " <<%s>>;" % (', '.join([methodFieldFragment(f) for f in packedFields]))
def genEncodeProperties(c):
+ def presentBin(fields):
+ ps = ', '.join(['P' + str(f.index) + ':1' for f in fields])
+ return '<<' + ps + ', 0:%d>>' % (16 - len(fields),)
+ def writePropFieldLine(field):
+ i = str(field.index)
+ if field.domain == 'bit':
+ print " {P%s, R%s} = {F%s =:= 1, R%s}," % \
+ (i, str(field.index + 1), i, i)
+ else:
+ print " {P%s, R%s} = if F%s =:= undefined -> {0, R%s}; true -> {1, [?%s_PROP(F%s, L%s) | R%s]} end," % \
+ (i, str(field.index + 1), i, i, erlType(field.domain).upper(), i, i, i)
+
print "encode_properties(#'P_%s'{%s}) ->" % (erlangize(c.name), fieldMapList(c.fields))
- print " rabbit_binary_generator:encode_properties(%s, %s);" % \
- (fieldTypeList(c.fields), fieldTempList(c.fields))
+ if len(c.fields) == 0:
+ print " <<>>;"
+ else:
+ print " R0 = [<<>>],"
+ for field in c.fields:
+ writePropFieldLine(field)
+ print " list_to_binary([%s | lists:reverse(R%s)]);" % \
+ (presentBin(c.fields), str(len(c.fields)))
def messageConstantClass(cls):
# We do this because 0.8 uses "soft error" and 8.1 uses "soft-error".
@@ -350,8 +350,8 @@ def genErl(spec):
'table' | 'byte' | 'double' | 'float' | 'long' |
'short' | 'bool' | 'binary' | 'void' | 'array').
-type(amqp_property_type() ::
- 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
- 'longlongint' | 'timestamp' | 'bit' | 'table').
+ 'shortstr' | 'longstr' | 'octet' | 'short' | 'long' |
+ 'longlong' | 'timestamp' | 'bit' | 'table').
-type(amqp_table() :: [{binary(), amqp_field_type(), amqp_value()}]).
-type(amqp_array() :: [{amqp_field_type(), amqp_value()}]).
@@ -429,25 +429,78 @@ shortstr_size(S) ->
_ -> exit(method_field_shortstr_overflow)
end.
--define(SHORTSTR_PROP(P, R, L, S, X),
- if P =:= 0 -> {undefined, R};
- true -> <<L:8/unsigned, S:L/binary, X/binary>> = R,
- {S, X}
+-define(SHORTSTR_VAL(R, L, V, X),
+ begin
+ <<L:8/unsigned, V:L/binary, X/binary>> = R,
+ {V, X}
+ end).
+
+-define(LONGSTR_VAL(R, L, V, X),
+ begin
+ <<L:32/unsigned, V:L/binary, X/binary>> = R,
+ {V, X}
+ end).
+
+-define(SHORT_VAL(R, L, V, X),
+ begin
+ <<V:8/unsigned, X/binary>> = R,
+ {V, X}
+ end).
+
+-define(LONG_VAL(R, L, V, X),
+ begin
+ <<V:32/unsigned, X/binary>> = R,
+ {V, X}
+ end).
+
+-define(LONGLONG_VAL(R, L, V, X),
+ begin
+ <<V:64/unsigned, X/binary>> = R,
+ {V, X}
+ end).
+
+-define(OCTET_VAL(R, L, V, X),
+ begin
+ <<V:8/unsigned, X/binary>> = R,
+ {V, X}
end).
--define(TABLE_PROP(P, R, L, T, X),
- if P =:= 0 -> {undefined, R};
- true -> <<L:32/unsigned, T:L/binary, X/binary>> = R,
- {rabbit_binary_parser:parse_table(T), X}
+
+-define(TABLE_VAL(R, L, V, X),
+ begin
+ <<L:32/unsigned, V:L/binary, X/binary>> = R,
+ {rabbit_binary_parser:parse_table(V), X}
end).
--define(OCTET_PROP(P, R, I, X),
- if P =:= 0 -> {undefined, R};
- true -> <<I:8/unsigned, X/binary>> = R,
- {I, X}
+
+-define(TIMESTAMP_VAL(R, L, V, X),
+ begin
+ <<V:64/unsigned, X/binary>> = R,
+ {V, X}
end).
--define(TIMESTAMP_PROP(P, R, I, X),
- if P =:= 0 -> {undefined, R};
- true -> <<I:64/unsigned, X/binary>> = R,
- {I, X}
+
+-define(SHORTSTR_PROP(X, L),
+ begin
+ L = size(X),
+ if L < 256 -> <<L:8, X:L/binary>>;
+ true -> exit(content_properties_shortstr_overflow)
+ end
+ end).
+
+-define(LONGSTR_PROP(X, L),
+ begin
+ L = size(X),
+ <<L:32, X:L/binary>>
+ end).
+
+-define(OCTET_PROP(X, L), <<X:8/unsigned>>).
+-define(SHORT_PROP(X, L), <<X:16/unsigned>>).
+-define(LONG_PROP(X, L), <<X:32/unsigned>>).
+-define(LONGLONG_PROP(X, L), <<X:64/unsigned>>).
+-define(TIMESTAMP_PROP(X, L), <<X:64/unsigned>>).
+
+-define(TABLE_PROP(X, T),
+ begin
+ T = rabbit_binary_generator:generate_table(X),
+ <<(size(T)):32, T/binary>>
end).
"""
version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision)
@@ -497,9 +550,6 @@ shortstr_size(S) ->
print "amqp_exception(_Code) -> undefined."
def genHrl(spec):
- def erlType(domain):
- return erlangTypeMap[spec.resolveDomain(domain)]
-
def fieldNameList(fields):
return ', '.join([erlangize(f.name) for f in fields])
diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml
index 5d74c6e1..8ecb4fc8 100644
--- a/docs/rabbitmq-plugins.1.xml
+++ b/docs/rabbitmq-plugins.1.xml
@@ -96,11 +96,13 @@
</varlistentry>
</variablelist>
<para>
- Lists available plugins, their versions, dependencies and
+ Lists all plugins, their versions, dependencies and
descriptions. Each plugin is prefixed with a status
indicator - [ ] to indicate that the plugin is not
enabled, [E] to indicate that it is explicitly enabled,
- and [e] to indicate that it is implicitly enabled.
+ [e] to indicate that it is implicitly enabled, and [!] to
+ indicate that it is enabled but missing and thus not
+ operational.
</para>
<para>
If the optional pattern is given, only plugins whose
@@ -109,16 +111,15 @@
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmq-plugins list</screen>
<para role="example">
- This command lists all the plugins available, on one line each.
+ This command lists all plugins, on one line each.
</para>
<screen role="example">rabbitmq-plugins list -v </screen>
<para role="example">
- This command lists all the plugins available.
+ This command lists all plugins.
</para>
<screen role="example">rabbitmq-plugins list -v management</screen>
<para role="example">
- This command lists all the plugins available, but does not
- display plugins whose name does not contain "management".
+ This command lists all plugins whose name contains "management".
</para>
<screen role="example">rabbitmq-plugins list -e rabbit</screen>
<para role="example">
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 3082fe14..34947b66 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1282,22 +1282,26 @@
<listitem><para>Readable name for the connection.</para></listitem>
</varlistentry>
<varlistentry>
- <term>address</term>
- <listitem><para>Server IP address.</para></listitem>
- </varlistentry>
- <varlistentry>
<term>port</term>
<listitem><para>Server port.</para></listitem>
</varlistentry>
<varlistentry>
- <term>peer_address</term>
- <listitem><para>Peer address.</para></listitem>
+ <term>host</term>
+ <listitem><para>Server hostname obtained via reverse
+ DNS, or its IP address if reverse DNS failed or was
+ not enabled.</para></listitem>
</varlistentry>
<varlistentry>
<term>peer_port</term>
<listitem><para>Peer port.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>peer_host</term>
+ <listitem><para>Peer hostname obtained via reverse
+ DNS, or its IP address if reverse DNS failed or was
+ not enabled.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>ssl</term>
<listitem><para>Boolean indicating whether the
connection is secured with SSL.</para></listitem>
@@ -1414,7 +1418,7 @@
</variablelist>
<para>
If no <command>connectioninfoitem</command>s are
- specified then user, peer address, peer port, time since
+ specified then user, peer host, peer port, time since
flow control and memory block state are displayed.
</para>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 9b1ff8bd..16dfd196 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -43,6 +43,7 @@
{trace_vhosts, []},
{log_levels, [{connection, info}]},
{ssl_cert_login_from, distinguished_name},
+ {reverse_dns_lookups, false},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index adbb6102..b2832b45 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -110,5 +110,6 @@
-define(DESIRED_HIBERNATE, 10000).
-define(CREDIT_DISC_BOUND, {2000, 500}).
+-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/src/background_gc.erl b/src/background_gc.erl
new file mode 100644
index 00000000..7c68a177
--- /dev/null
+++ b/src/background_gc.erl
@@ -0,0 +1,78 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(background_gc).
+
+-behaviour(gen_server2).
+
+-export([start_link/0, run/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(MAX_RATIO, 0.01).
+-define(IDEAL_INTERVAL, 60000).
+
+-record(state, {last_interval}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(run/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [],
+ [{timeout, infinity}]).
+
+run() -> gen_server2:cast(?MODULE, run).
+
+%%----------------------------------------------------------------------------
+
+init([]) -> {ok, interval_gc(#state{last_interval = ?IDEAL_INTERVAL})}.
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}.
+
+handle_cast(run, State) -> gc(), {noreply, State};
+
+handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(run, State) -> {noreply, interval_gc(State)};
+
+handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+terminate(_Reason, State) -> State.
+
+%%----------------------------------------------------------------------------
+
+interval_gc(State = #state{last_interval = LastInterval}) ->
+ {ok, Interval} = rabbit_misc:interval_operation(
+ fun gc/0, ?MAX_RATIO, ?IDEAL_INTERVAL, LastInterval),
+ erlang:send_after(Interval, self(), run),
+ State#state{last_interval = Interval}.
+
+gc() ->
+ [garbage_collect(P) || P <- processes(),
+ {status, waiting} == process_info(P, status)],
+ garbage_collect(), %% since we will never be waiting...
+ ok.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c52c296a..c3a6d283 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -179,6 +179,12 @@
{mfa, {rabbit_node_monitor, notify_node_up, []}},
{requires, networking}]}).
+-rabbit_boot_step({background_gc,
+ [{description, "background garbage collection"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [background_gc]}},
+ {enables, networking}]}).
+
%%---------------------------------------------------------------------------
-include("rabbit_framing.hrl").
@@ -570,7 +576,10 @@ boot_delegate() ->
rabbit_sup:start_supervisor_child(delegate_sup, [Count]).
recover() ->
- rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()).
+ Qs = rabbit_amqqueue:recover(),
+ ok = rabbit_binding:recover(rabbit_exchange:recover(),
+ [QName || #amqqueue{name = QName} <- Qs]),
+ rabbit_amqqueue:start(Qs).
maybe_insert_default_data() ->
case rabbit_table:is_empty() of
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index e6625b2b..d7d4d82a 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -55,8 +55,12 @@ start() ->
ok = gen_event:add_handler(?SERVER, ?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
rabbit_sup:start_restartable_child(
- vm_memory_monitor, [MemoryWatermark, fun rabbit_alarm:set_alarm/1,
- fun rabbit_alarm:clear_alarm/1]),
+ vm_memory_monitor, [MemoryWatermark,
+ fun (Alarm) ->
+ background_gc:run(),
+ set_alarm(Alarm)
+ end,
+ fun clear_alarm/1]),
{ok, DiskLimit} = application:get_env(disk_free_limit),
rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]),
ok.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6ad85b24..9fb453c1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -16,9 +16,11 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
+-export([recover/0, stop/0, start/1, declare/5,
+ delete_immediately/1, delete/3, purge/1]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
+-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
+ assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
@@ -40,8 +42,6 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
--define(MAX_EXPIRY_TIMER, 4294967295).
-
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
-define(FAILOVER_WAIT_MILLIS, 100).
@@ -61,18 +61,21 @@
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-type(routing_result() :: 'routed' | 'unroutable').
--type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-
--spec(start/0 :: () -> [name()]).
+-type(queue_or_absent() :: rabbit_types:amqqueue() |
+ {'absent', rabbit_types:amqqueue()}).
+-type(not_found_or_absent() :: 'not_found' |
+ {'absent', rabbit_types:amqqueue()}).
+-spec(recover/0 :: () -> [rabbit_types:amqqueue()]).
-spec(stop/0 :: () -> 'ok').
+-spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok').
-spec(declare/5 ::
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
- -> {'new' | 'existing', rabbit_types:amqqueue()} |
+ -> {'new' | 'existing' | 'absent', rabbit_types:amqqueue()} |
rabbit_types:channel_exit()).
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
- -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
+ -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
-spec(update/2 ::
(name(),
fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
@@ -80,7 +83,10 @@
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
([name()]) -> [rabbit_types:amqqueue()]).
--spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
+-spec(not_found_or_absent/1 :: (name()) -> not_found_or_absent()).
+-spec(with/2 :: (name(), qfun(A)) ->
+ A | rabbit_types:error(not_found_or_absent())).
+-spec(with/3 :: (name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
-spec(assert_equivalence/5 ::
@@ -175,7 +181,7 @@
-define(CONSUMER_INFO_KEYS,
[queue_name, channel_pid, consumer_tag, ack_required]).
-start() ->
+recover() ->
%% Clear out remnants of old incarnation, in case we restarted
%% faster than other nodes handled DOWN messages from us.
on_node_down(node()),
@@ -195,6 +201,14 @@ stop() ->
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:stop().
+start(Qs) ->
+ %% At this point all recovered queues and their bindings are
+ %% visible to routing, so now it is safe for them to complete
+ %% their initialisation (which may involve interacting with other
+ %% queues).
+ [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
+ ok.
+
find_durable_queues() ->
Node = node(),
%% TODO: use dirty ops instead
@@ -207,8 +221,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
- [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs,
- gen_server2:call(Pid, {init, true}, infinity) == {new, Q}].
+ [Q || Q = #amqqueue{pid = Pid} <- Qs,
+ gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
@@ -223,10 +237,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
gm_pids = []}),
{Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
Q1 = start_queue_process(Node, Q0),
- case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of
- not_found -> rabbit_misc:not_found(QueueName);
- Q2 -> Q2
- end.
+ gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity).
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -236,13 +247,12 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
- case mnesia:read({rabbit_durable_queue, QueueName}) of
- [] -> Q1 = rabbit_policy:set(Q),
- ok = store_queue(Q1),
- B = add_default_binding(Q1),
- fun () -> B(), Q1 end;
- %% Q exists on stopped node
- [_] -> rabbit_misc:const(not_found)
+ case not_found_or_absent(QueueName) of
+ not_found -> Q1 = rabbit_policy:set(Q),
+ ok = store_queue(Q1),
+ B = add_default_binding(Q1),
+ fun () -> B(), Q1 end;
+ {absent, _Q} = R -> rabbit_misc:const(R)
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
@@ -296,28 +306,47 @@ lookup(Names) when is_list(Names) ->
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
+not_found_or_absent(Name) ->
+ %% NB: we assume that the caller has already performed a lookup on
+ %% rabbit_queue and not found anything
+ case mnesia:read({rabbit_durable_queue, Name}) of
+ [] -> not_found;
+ [Q] -> {absent, Q} %% Q exists on stopped node
+ end.
+
+not_found_or_absent_dirty(Name) ->
+ %% We should read from both tables inside a tx, to get a
+ %% consistent view. But the chances of an inconsistency are small,
+ %% and only affect the error kind.
+ case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of
+ {error, not_found} -> not_found;
+ {ok, Q} -> {absent, Q}
+ end.
+
with(Name, F, E) ->
case lookup(Name) of
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
%% with the QPid.
- E1 = fun () ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> E();
- false -> timer:sleep(25),
- with(Name, F, E)
- end
- end,
- rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end);
+ rabbit_misc:with_exit_handler(
+ fun () ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> E(not_found_or_absent_dirty(Name));
+ false -> timer:sleep(25),
+ with(Name, F, E)
+ end
+ end, fun () -> F(Q) end);
{error, not_found} ->
- E()
+ E(not_found_or_absent_dirty(Name))
end.
-with(Name, F) ->
- with(Name, F, fun () -> {error, not_found} end).
+with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
+
with_or_die(Name, F) ->
- with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
+ with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
+ ({absent, Q}) -> rabbit_misc:absent(Q)
+ end).
assert_equivalence(#amqqueue{durable = Durable,
auto_delete = AutoDelete} = Q,
@@ -352,8 +381,8 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]).
check_declare_arguments(QueueName, Args) ->
- Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
- {<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
+ Checks = [{<<"x-expires">>, fun check_expires_arg/2},
+ {<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
[case rabbit_misc:table_lookup(Args, Key) of
@@ -380,20 +409,17 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
-check_positive_int_arg({Type, Val}, Args) ->
+check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
- ok when Val > 0 -> ok;
- ok -> {error, {value_zero_or_less, Val}};
- Error -> Error
+ ok when Val == 0 -> {error, {value_zero, Val}};
+ ok -> rabbit_misc:check_expiry(Val);
+ Error -> Error
end.
-check_non_neg_int_arg({Type, Val}, Args) ->
+check_message_ttl_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
- ok when Val >= 0 -> ok;
- ok -> {error, {value_less_than_zero, Val}};
- Error -> Error
+ ok -> rabbit_misc:check_expiry(Val);
+ Error -> Error
end.
check_dlxrk_arg({longstr, _}, Args) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8d05a78c..92b00db0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -195,10 +195,8 @@ code_change(_OldVsn, State, _Extra) ->
declare(Recover, From, State = #q{q = Q,
backing_queue = BQ,
backing_queue_state = undefined}) ->
- case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found ->
- {stop, normal, not_found, State};
- Q1 ->
+ case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
+ #amqqueue{} = Q1 ->
case matches(Recover, Q, Q1) of
true ->
gen_server2:reply(From, {new, Q}),
@@ -208,6 +206,7 @@ declare(Recover, From, State = #q{q = Q,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = bq_init(BQ, Q, Recover),
+ recovery_barrier(Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -216,28 +215,39 @@ declare(Recover, From, State = #q{q = Q,
noreply(State1);
false ->
{stop, normal, {existing, Q1}, State}
- end
- end.
+ end;
+ Err ->
+ {stop, normal, Err, State}
+ end.
-matches(true, Q, Q) -> true;
-matches(true, _Q, _Q1) -> false;
-matches(false, Q1, Q2) ->
+matches(new, Q1, Q2) ->
%% i.e. not policy
- Q1#amqqueue.name =:= Q2#amqqueue.name andalso
- Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
- Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
+ Q1#amqqueue.name =:= Q2#amqqueue.name andalso
+ Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
+ Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
- Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
- Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
- Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids.
+ Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
+ Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids;
+matches(_, Q, Q) -> true;
+matches(_, _Q, _Q1) -> false.
bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(Q, Recover,
+ BQ:init(Q, Recover =/= new,
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
+recovery_barrier(new) ->
+ ok;
+recovery_barrier(BarrierPid) ->
+ MRef = erlang:monitor(process, BarrierPid),
+ receive
+ {BarrierPid, go} -> erlang:demonitor(MRef, [flush]);
+ {'DOWN', MRef, process, _, _} -> ok
+ end.
+
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
lists:foldl(
fun({Arg, Fun}, State1) ->
@@ -247,9 +257,9 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
end
end, State,
[{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
- {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]).
+ {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
+ {<<"x-message-ttl">>, fun init_ttl/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -553,7 +563,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Confirm, Delivered, State),
+ Props = message_properties(Message, Confirm, Delivered, State),
case attempt_delivery(Delivery, Props, State1) of
{true, State2} ->
State2;
@@ -680,16 +690,21 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
- #message_properties{expiry = calculate_msg_expiry(TTL),
+message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
+ #message_properties{expiry = calculate_msg_expiry(Message, TTL),
needs_confirming = Confirm == eventually,
delivered = Delivered}.
-calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
+calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
+ #content{properties = Props} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ %% We assert that the expiration must be valid - we check in the channel.
+ {ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
+ case lists:min([TTL, MsgTTL]) of
+ undefined -> undefined;
+ T -> now_micros() + T * 1000
+ end.
-drop_expired_messages(State = #q{ttl = undefined}) ->
- State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
@@ -711,8 +726,6 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
ensure_ttl_timer(undefined, State) ->
State;
-ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
- State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
After = (case Expiry - now_micros() of
V when V > 0 -> V + 999; %% always fire later
@@ -853,8 +866,8 @@ make_dead_letter_msg(Reason,
{<<"time">>, timestamp, TimeSec},
{<<"exchange">>, longstr, Exchange#resource.name},
{<<"routing-keys">>, array, RKs1}],
- HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
- Info, Headers))
+ HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
+ Info, Headers))
end,
Content1 = rabbit_basic:map_headers(HeadersFun2, Content),
Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(),
@@ -998,9 +1011,9 @@ handle_call({init, Recover}, From,
q = #amqqueue{name = QName} = Q} = State,
gen_server2:reply(From, not_found),
case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
+ new -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName]);
+ _ -> ok
end,
BQS = bq_init(BQ, Q, Recover),
%% Rely on terminate to delete the queue.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index db2b7e95..9bd1fad9 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -19,8 +19,9 @@
-include("rabbit_framing.hrl").
-export([publish/4, publish/5, publish/1,
- message/3, message/4, properties/1, append_table_header/3,
- extract_headers/1, map_headers/2, delivery/3, header_routes/1]).
+ message/3, message/4, properties/1, prepend_table_header/3,
+ extract_headers/1, map_headers/2, delivery/3, header_routes/1,
+ parse_expiration/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -58,7 +59,7 @@
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
--spec(append_table_header/3 ::
+-spec(prepend_table_header/3 ::
(binary(), rabbit_framing:amqp_table(), headers()) -> headers()).
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
@@ -72,6 +73,9 @@
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
+-spec(parse_expiration/1 ::
+ (rabbit_framing:amqp_property_record())
+ -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())).
-endif.
@@ -177,15 +181,45 @@ properties(P) when is_list(P) ->
end
end, #'P_basic'{}, P).
-append_table_header(Name, Info, undefined) ->
- append_table_header(Name, Info, []);
-append_table_header(Name, Info, Headers) ->
- Prior = case rabbit_misc:table_lookup(Headers, Name) of
- undefined -> [];
- {array, Existing} -> Existing
- end,
+prepend_table_header(Name, Info, undefined) ->
+ prepend_table_header(Name, Info, []);
+prepend_table_header(Name, Info, Headers) ->
+ case rabbit_misc:table_lookup(Headers, Name) of
+ {array, Existing} ->
+ prepend_table(Name, Info, Existing, Headers);
+ undefined ->
+ prepend_table(Name, Info, [], Headers);
+ Other ->
+ Headers2 = prepend_table(Name, Info, [], Headers),
+ set_invalid_header(Name, Other, Headers2)
+ end.
+
+prepend_table(Name, Info, Prior, Headers) ->
rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
+set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) ->
+ case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of
+ undefined ->
+ set_invalid([{Name, array, [Value]}], Headers);
+ {table, ExistingHdr} ->
+ update_invalid(Name, Value, ExistingHdr, Headers);
+ Other ->
+ %% somehow the x-invalid-headers header is corrupt
+ Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}],
+ set_invalid_header(Name, Value, set_invalid(Invalid, Headers))
+ end.
+
+set_invalid(NewHdr, Headers) ->
+ rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr).
+
+update_invalid(Name, Value, ExistingHdr, Header) ->
+ Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of
+ undefined -> [Value];
+ {array, Prior} -> [Value | Prior]
+ end,
+ NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values),
+ set_invalid(NewHdr, Header).
+
extract_headers(Content) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
@@ -224,3 +258,19 @@ header_routes(HeadersTable) ->
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
+
+parse_expiration(#'P_basic'{expiration = undefined}) ->
+ {ok, undefined};
+parse_expiration(#'P_basic'{expiration = Expiration}) ->
+ case string:to_integer(binary_to_list(Expiration)) of
+ {error, no_integer} = E ->
+ E;
+ {N, ""} ->
+ case rabbit_misc:check_expiry(N) of
+ ok -> {ok, N};
+ E = {error, _} -> E
+ end;
+ {_, S} ->
+ {error, {leftover_string, S}}
+ end.
+
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 4700fa31..a333c1ce 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -21,7 +21,7 @@
-export([build_simple_method_frame/3,
build_simple_content_frames/4,
build_heartbeat_frame/0]).
--export([generate_table/1, encode_properties/2]).
+-export([generate_table/1]).
-export([check_empty_frame_size/0]).
-export([ensure_content_encoded/2, clear_encoded_content/1]).
-export([map_exception/3]).
@@ -42,8 +42,6 @@
-> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()).
--spec(encode_properties/2 ::
- ([rabbit_framing:amqp_property_type()], [any()]) -> binary()).
-spec(check_empty_frame_size/0 :: () -> 'ok').
-spec(ensure_content_encoded/2 ::
(rabbit_types:content(), rabbit_types:protocol()) ->
@@ -118,51 +116,24 @@ create_frame(TypeInt, ChannelInt, Payload) ->
%% table_field_to_binary supports the AMQP 0-8/0-9 standard types, S,
%% I, D, T and F, as well as the QPid extensions b, d, f, l, s, t, x,
%% and V.
-
-table_field_to_binary({FName, Type, Value}) ->
- [short_string_to_binary(FName) | field_value_to_binary(Type, Value)].
-
-field_value_to_binary(longstr, Value) ->
- ["S", long_string_to_binary(Value)];
-
-field_value_to_binary(signedint, Value) ->
- ["I", <<Value:32/signed>>];
-
-field_value_to_binary(decimal, {Before, After}) ->
- ["D", Before, <<After:32>>];
-
-field_value_to_binary(timestamp, Value) ->
- ["T", <<Value:64>>];
-
-field_value_to_binary(table, Value) ->
- ["F", table_to_binary(Value)];
-
-field_value_to_binary(array, Value) ->
- ["A", array_to_binary(Value)];
-
-field_value_to_binary(byte, Value) ->
- ["b", <<Value:8/unsigned>>];
-
-field_value_to_binary(double, Value) ->
- ["d", <<Value:64/float>>];
-
-field_value_to_binary(float, Value) ->
- ["f", <<Value:32/float>>];
-
-field_value_to_binary(long, Value) ->
- ["l", <<Value:64/signed>>];
-
-field_value_to_binary(short, Value) ->
- ["s", <<Value:16/signed>>];
-
-field_value_to_binary(bool, Value) ->
- ["t", if Value -> 1; true -> 0 end];
-
-field_value_to_binary(binary, Value) ->
- ["x", long_string_to_binary(Value)];
-
-field_value_to_binary(void, _Value) ->
- ["V"].
+table_field_to_binary({FName, T, V}) ->
+ [short_string_to_binary(FName) | field_value_to_binary(T, V)].
+
+field_value_to_binary(longstr, V) -> ["S", long_string_to_binary(V)];
+field_value_to_binary(signedint, V) -> ["I", <<V:32/signed>>];
+field_value_to_binary(decimal, V) -> {Before, After} = V,
+ ["D", Before, <<After:32>>];
+field_value_to_binary(timestamp, V) -> ["T", <<V:64>>];
+field_value_to_binary(table, V) -> ["F", table_to_binary(V)];
+field_value_to_binary(array, V) -> ["A", array_to_binary(V)];
+field_value_to_binary(byte, V) -> ["b", <<V:8/unsigned>>];
+field_value_to_binary(double, V) -> ["d", <<V:64/float>>];
+field_value_to_binary(float, V) -> ["f", <<V:32/float>>];
+field_value_to_binary(long, V) -> ["l", <<V:64/signed>>];
+field_value_to_binary(short, V) -> ["s", <<V:16/signed>>];
+field_value_to_binary(bool, V) -> ["t", if V -> 1; true -> 0 end];
+field_value_to_binary(binary, V) -> ["x", long_string_to_binary(V)];
+field_value_to_binary(void, _V) -> ["V"].
table_to_binary(Table) when is_list(Table) ->
BinTable = generate_table(Table),
@@ -176,9 +147,8 @@ generate_table(Table) when is_list(Table) ->
list_to_binary(lists:map(fun table_field_to_binary/1, Table)).
generate_array(Array) when is_list(Array) ->
- list_to_binary(lists:map(
- fun ({Type, Value}) -> field_value_to_binary(Type, Value) end,
- Array)).
+ list_to_binary(lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end,
+ Array)).
short_string_to_binary(String) when is_binary(String) ->
Len = size(String),
@@ -196,63 +166,12 @@ long_string_to_binary(String) when is_binary(String) ->
long_string_to_binary(String) ->
[<<(length(String)):32>>, String].
-encode_properties([], []) ->
- <<0, 0>>;
-encode_properties(TypeList, ValueList) ->
- encode_properties(0, TypeList, ValueList, 0, [], []).
-
-encode_properties(_Bit, [], [], FirstShortAcc, FlagsAcc, PropsAcc) ->
- list_to_binary([lists:reverse(FlagsAcc), <<FirstShortAcc:16>>, lists:reverse(PropsAcc)]);
-encode_properties(_Bit, [], _ValueList, _FirstShortAcc, _FlagsAcc, _PropsAcc) ->
- exit(content_properties_values_overflow);
-encode_properties(15, TypeList, ValueList, FirstShortAcc, FlagsAcc, PropsAcc) ->
- NewFlagsShort = FirstShortAcc bor 1, % set the continuation low bit
- encode_properties(0, TypeList, ValueList, 0, [<<NewFlagsShort:16>> | FlagsAcc], PropsAcc);
-encode_properties(Bit, [bit | TypeList], [Value | ValueList], FirstShortAcc, FlagsAcc, PropsAcc) ->
- case Value of
- true -> encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc bor (1 bsl (15 - Bit)), FlagsAcc, PropsAcc);
- false -> encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc, FlagsAcc, PropsAcc);
- Other -> exit({content_properties_illegal_bit_value, Other})
- end;
-encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, FlagsAcc, PropsAcc) ->
- case Value of
- undefined -> encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc, FlagsAcc, PropsAcc);
- _ -> encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc bor (1 bsl (15 - Bit)),
- FlagsAcc,
- [encode_property(T, Value) | PropsAcc])
- end.
-
-encode_property(shortstr, String) ->
- Len = size(String),
- if Len < 256 -> <<Len:8, String:Len/binary>>;
- true -> exit(content_properties_shortstr_overflow)
- end;
-encode_property(longstr, String) ->
- Len = size(String), <<Len:32, String:Len/binary>>;
-encode_property(octet, Int) ->
- <<Int:8/unsigned>>;
-encode_property(shortint, Int) ->
- <<Int:16/unsigned>>;
-encode_property(longint, Int) ->
- <<Int:32/unsigned>>;
-encode_property(longlongint, Int) ->
- <<Int:64/unsigned>>;
-encode_property(timestamp, Int) ->
- <<Int:64/unsigned>>;
-encode_property(table, Table) ->
- table_to_binary(Table).
-
check_empty_frame_size() ->
%% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly.
- ComputedSize = iolist_size(create_frame(?FRAME_BODY, 0, <<>>)),
- if ComputedSize == ?EMPTY_FRAME_SIZE ->
- ok;
- true ->
- exit({incorrect_empty_frame_size, ComputedSize, ?EMPTY_FRAME_SIZE})
+ case iolist_size(create_frame(?FRAME_BODY, 0, <<>>)) of
+ ?EMPTY_FRAME_SIZE -> ok;
+ ComputedSize -> exit({incorrect_empty_frame_size,
+ ComputedSize, ?EMPTY_FRAME_SIZE})
end.
ensure_content_encoded(Content = #content{properties_bin = PropBin,
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 5f0016b6..53878d6a 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -50,47 +50,36 @@ parse_array(<<ValueAndRest/binary>>) ->
{Type, Value, Rest} = parse_field_value(ValueAndRest),
[{Type, Value} | parse_array(Rest)].
-parse_field_value(<<"S", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) ->
- {longstr, ValueString, Rest};
+parse_field_value(<<"S", VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
+ {longstr, V, R};
-parse_field_value(<<"I", Value:32/signed, Rest/binary>>) ->
- {signedint, Value, Rest};
+parse_field_value(<<"I", V:32/signed, R/binary>>) ->
+ {signedint, V, R};
-parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, Rest/binary>>) ->
- {decimal, {Before, After}, Rest};
+parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, R/binary>>) ->
+ {decimal, {Before, After}, R};
-parse_field_value(<<"T", Value:64/unsigned, Rest/binary>>) ->
- {timestamp, Value, Rest};
+parse_field_value(<<"T", V:64/unsigned, R/binary>>) ->
+ {timestamp, V, R};
-parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, Rest/binary>>) ->
- {table, parse_table(Table), Rest};
+parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
+ {table, parse_table(Table), R};
-parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, Rest/binary>>) ->
- {array, parse_array(Array), Rest};
+parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
+ {array, parse_array(Array), R};
-parse_field_value(<<"b", Value:8/unsigned, Rest/binary>>) ->
- {byte, Value, Rest};
+parse_field_value(<<"b", V:8/unsigned, R/binary>>) -> {byte, V, R};
+parse_field_value(<<"d", V:64/float, R/binary>>) -> {double, V, R};
+parse_field_value(<<"f", V:32/float, R/binary>>) -> {float, V, R};
+parse_field_value(<<"l", V:64/signed, R/binary>>) -> {long, V, R};
+parse_field_value(<<"s", V:16/signed, R/binary>>) -> {short, V, R};
+parse_field_value(<<"t", V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
-parse_field_value(<<"d", Value:64/float, Rest/binary>>) ->
- {double, Value, Rest};
+parse_field_value(<<"x", VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
+ {binary, V, R};
-parse_field_value(<<"f", Value:32/float, Rest/binary>>) ->
- {float, Value, Rest};
-
-parse_field_value(<<"l", Value:64/signed, Rest/binary>>) ->
- {long, Value, Rest};
-
-parse_field_value(<<"s", Value:16/signed, Rest/binary>>) ->
- {short, Value, Rest};
-
-parse_field_value(<<"t", Value:8/unsigned, Rest/binary>>) ->
- {bool, (Value /= 0), Rest};
-
-parse_field_value(<<"x", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) ->
- {binary, ValueString, Rest};
-
-parse_field_value(<<"V", Rest/binary>>) ->
- {void, undefined, Rest}.
+parse_field_value(<<"V", R/binary>>) ->
+ {void, undefined, R}.
ensure_content_decoded(Content = #content{properties = Props})
when Props =/= none ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 0d23f716..2d486651 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -35,9 +35,11 @@
-type(key() :: binary()).
--type(bind_errors() :: rabbit_types:error('source_not_found' |
- 'destination_not_found' |
- 'source_and_destination_not_found')).
+-type(bind_errors() :: rabbit_types:error(
+ {'resources_missing',
+ [{'not_found', (rabbit_types:binding_source() |
+ rabbit_types:binding_destination())} |
+ {'absent', rabbit_types:amqqueue()}]})).
-type(bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error('binding_not_found')).
-type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())).
@@ -330,21 +332,32 @@ sync_transient_route(Route, Fun) ->
call_with_source_and_destination(SrcName, DstName, Fun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
- ErrFun = fun (Err) -> rabbit_misc:const({error, Err}) end,
+ ErrFun = fun (Names) ->
+ Errs = [not_found_or_absent(Name) || Name <- Names],
+ rabbit_misc:const({error, {resources_missing, Errs}})
+ end,
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case {mnesia:read({SrcTable, SrcName}),
mnesia:read({DstTable, DstName})} of
{[Src], [Dst]} -> Fun(Src, Dst);
- {[], [_] } -> ErrFun(source_not_found);
- {[_], [] } -> ErrFun(destination_not_found);
- {[], [] } -> ErrFun(source_and_destination_not_found)
- end
+ {[], [_] } -> ErrFun([SrcName]);
+ {[_], [] } -> ErrFun([DstName]);
+ {[], [] } -> ErrFun([SrcName, DstName])
+ end
end).
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
table_for_resource(#resource{kind = queue}) -> rabbit_queue.
+not_found_or_absent(#resource{kind = exchange} = Name) ->
+ {not_found, Name};
+not_found_or_absent(#resource{kind = queue} = Name) ->
+ case rabbit_amqqueue:not_found_or_absent(Name) of
+ not_found -> {not_found, Name};
+ {absent, _Q} = R -> R
+ end.
+
contains(Table, MatchHead) ->
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 54427206..a94d2ab5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -474,6 +474,13 @@ check_user_id_header(#'P_basic'{user_id = Claimed},
"'~s'", [Claimed, Actual])
end.
+check_expiration_header(Props) ->
+ case rabbit_basic:parse_expiration(Props) of
+ {ok, _} -> ok;
+ {error, E} -> precondition_failed("invalid expiration '~s': ~p",
+ [Props#'P_basic'.expiration, E])
+ end.
+
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
"cannot publish to internal ~s",
@@ -614,8 +621,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_internal_exchange(Exchange),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- check_user_id_header(DecodedContent#content.properties, State),
+ DecodedContent = #content {properties = Props} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ check_user_id_header(Props, State),
+ check_expiration_header(Props),
{MsgSeqNo, State1} =
case {TxStatus, ConfirmEnabled} of
{none, false} -> {undefined, State};
@@ -960,8 +969,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, none, State)
- end
+ handle_method(Declare, none, State);
+ {absent, Q} ->
+ rabbit_misc:absent(Q)
+ end;
+ {error, {absent, Q}} ->
+ rabbit_misc:absent(Q)
end;
handle_method(#'queue.declare'{queue = QueueNameBin,
@@ -1170,14 +1183,10 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
(_X, #exchange{}) ->
ok
end) of
- {error, source_not_found} ->
- rabbit_misc:not_found(ExchangeName);
- {error, destination_not_found} ->
- rabbit_misc:not_found(DestinationName);
- {error, source_and_destination_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
- rabbit_misc:rs(DestinationName)]);
+ {error, {resources_missing, [{not_found, Name} | _]}} ->
+ rabbit_misc:not_found(Name);
+ {error, {resources_missing, [{absent, Q} | _]}} ->
+ rabbit_misc:absent(Q);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index bcb83851..42459833 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -83,7 +83,7 @@ init(Type) ->
child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) ->
[{writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol, ReaderPid]},
+ [Sock, Channel, FrameMax, Protocol, ReaderPid, true]},
intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)];
child_specs(direct) ->
[{limiter, {rabbit_limiter, start_link, []},
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 25f7d758..669a0787 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -386,7 +386,7 @@ action(list_bindings, Node, Args, Opts, Inform) ->
action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
+ ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
@@ -611,7 +611,7 @@ display_info_list(Results, InfoItemKeys) when is_list(Results) ->
fun (Result) -> display_row(
[format_info_item(proplists:get_value(X, Result)) ||
X <- InfoItemKeys])
- end, Results),
+ end, lists:sort(Results)),
ok;
display_info_list(Other, _) ->
Other.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 3f1b20fe..7d91b6fa 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -19,8 +19,8 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2]).
--export([reset_stats_timer/2]).
+-export([init_stats_timer/2, init_disabled_stats_timer/2,
+ ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify_if/3]).
@@ -51,6 +51,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(init_stats_timer/2 :: (container(), pos()) -> container()).
+-spec(init_disabled_stats_timer/2 :: (container(), pos()) -> container()).
-spec(ensure_stats_timer/3 :: (container(), pos(), term()) -> container()).
-spec(stop_stats_timer/2 :: (container(), pos()) -> container()).
-spec(reset_stats_timer/2 :: (container(), pos()) -> container()).
@@ -90,10 +91,13 @@ start_link() ->
init_stats_timer(C, P) ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
- {ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
+ {ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
setelement(P, C, #state{level = StatsLevel, interval = Interval,
timer = undefined}).
+init_disabled_stats_timer(C, P) ->
+ setelement(P, C, #state{level = none, interval = 0, timer = undefined}).
+
ensure_stats_timer(C, P, Msg) ->
case element(P, C) of
#state{level = Level, interval = Interval, timer = undefined} = State
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 6a7a28f2..df733546 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -229,7 +229,10 @@ dropwhile(Pred, AckRequired,
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
Dropped = Len - Len1,
- ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}),
+ case Dropped of
+ 0 -> ok;
+ _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
+ end,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
{Next, Msgs, State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 } }.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ec00ecef..2f75ef2e 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -137,7 +137,7 @@ on_node_up() ->
ok.
drop_mirrors(QName, Nodes) ->
- [ok = drop_mirror(QName, Node) || Node <- Nodes],
+ [{ok, _} = drop_mirror(QName, Node) || Node <- Nodes],
ok.
drop_mirror(QName, MirrorNode) ->
@@ -154,7 +154,7 @@ drop_mirror(QName, MirrorNode) ->
"Dropping queue mirror on node ~p for ~s~n",
[MirrorNode, rabbit_misc:rs(Name)]),
exit(Pid, {shutdown, dropped}),
- ok
+ {ok, dropped}
end
end).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ab9a9ceb..137ccf20 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -21,7 +21,7 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4, quit/1,
protocol_error/3, protocol_error/4, protocol_error/1]).
--export([not_found/1, assert_args_equivalence/4]).
+-export([not_found/1, absent/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
-export([table_lookup/2, set_table_value/4]).
-export([r/3, r/2, r_arg/4, rs/1]).
@@ -63,13 +63,18 @@
-export([version/0]).
-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
+-export([check_expiry/1]).
-export([base64url/1]).
+-export([interval_operation/4]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal;
R =:= shutdown).
+%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
+-define(MAX_EXPIRY_TIMER, 4294967295).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -111,6 +116,7 @@
-spec(protocol_error/1 ::
(rabbit_types:amqp_error()) -> channel_or_connection_exit()).
-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()).
+-spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()).
-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
rabbit_framing:amqp_table(),
rabbit_types:r(any()), [binary()]) ->
@@ -228,7 +234,11 @@
-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
-spec(json_to_term/1 :: (any()) -> any()).
-spec(term_to_json/1 :: (any()) -> any()).
+-spec(check_expiry/1 :: (integer()) -> rabbit_types:ok_or_error(any())).
-spec(base64url/1 :: (binary()) -> string()).
+-spec(interval_operation/4 ::
+ (thunk(A), float(), non_neg_integer(), non_neg_integer())
+ -> {A, non_neg_integer()}).
-endif.
@@ -266,6 +276,15 @@ protocol_error(#amqp_error{} = Error) ->
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
+absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) ->
+ %% The assertion of durability is mainly there because we mention
+ %% durability in the error message. That way we will hopefully
+ %% notice if at some future point our logic changes s.t. we get
+ %% here with non-durable queues.
+ protocol_error(not_found,
+ "home node '~s' of durable ~s is down or inaccessible",
+ [node(QPid), rs(QueueName)]).
+
type_class(byte) -> int;
type_class(short) -> int;
type_class(signedint) -> int;
@@ -990,9 +1009,28 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
+check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}};
+check_expiry(N) when N < 0 -> {error, {value_negative, N}};
+check_expiry(_N) -> ok.
+
base64url(In) ->
lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
($\/, Acc) -> [$\_ | Acc];
($\=, Acc) -> Acc;
(Chr, Acc) -> [Chr | Acc]
end, [], base64:encode_to_string(In))).
+
+%% Ideally, you'd want Fun to run every IdealInterval. but you don't
+%% want it to take more than MaxRatio of IdealInterval. So if it takes
+%% more then you want to run it less often. So we time how long it
+%% takes to run, and then suggest how long you should wait before
+%% running it again. Times are in millis.
+interval_operation(Fun, MaxRatio, IdealInterval, LastInterval) ->
+ {Micros, Res} = timer:tc(Fun),
+ {Res, case {Micros > 1000 * (MaxRatio * IdealInterval),
+ Micros > 1000 * (MaxRatio * LastInterval)} of
+ {true, true} -> round(LastInterval * 1.5);
+ {true, false} -> LastInterval;
+ {false, false} -> lists:max([IdealInterval,
+ round(LastInterval / 1.5)])
+ end}.
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 038154c3..562fc197 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -20,7 +20,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
close/1, fast_close/1, sockname/1, peername/1, peercert/1,
- tune_buffer_size/1, connection_string/2]).
+ tune_buffer_size/1, connection_string/2, socket_ends/2]).
%%---------------------------------------------------------------------------
@@ -36,7 +36,7 @@
-type(socket() :: port() | #ssl_socket{}).
-type(opts() :: [{atom(), any()} |
{raw, non_neg_integer(), non_neg_integer(), binary()}]).
-
+-type(host_or_ip() :: binary() | inet:ip_address()).
-spec(is_ssl/1 :: (socket()) -> boolean()).
-spec(ssl_info/1 :: (socket())
-> 'nossl' | ok_val_or_error(
@@ -72,6 +72,10 @@
-spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()).
-spec(connection_string/2 ::
(socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
+-spec(socket_ends/2 ::
+ (socket(), 'inbound' | 'outbound')
+ -> ok_val_or_error({host_or_ip(), rabbit_networking:ip_port(),
+ host_or_ip(), rabbit_networking:ip_port()})).
-endif.
@@ -193,17 +197,37 @@ tune_buffer_size(Sock) ->
end.
connection_string(Sock, Direction) ->
- {From, To} = case Direction of
- inbound -> {fun peername/1, fun sockname/1};
- outbound -> {fun sockname/1, fun peername/1}
- end,
+ case socket_ends(Sock, Direction) of
+ {ok, {FromAddress, FromPort, ToAddress, ToPort}} ->
+ {ok, rabbit_misc:format(
+ "~s:~p -> ~s:~p",
+ [maybe_ntoab(FromAddress), FromPort,
+ maybe_ntoab(ToAddress), ToPort])};
+ Error ->
+ Error
+ end.
+
+socket_ends(Sock, Direction) ->
+ {From, To} = sock_funs(Direction),
case {From(Sock), To(Sock)} of
{{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
- {ok, rabbit_misc:format("~s:~p -> ~s:~p",
- [rabbit_misc:ntoab(FromAddress), FromPort,
- rabbit_misc:ntoab(ToAddress), ToPort])};
+ {ok, {rdns(FromAddress), FromPort,
+ rdns(ToAddress), ToPort}};
{{error, _Reason} = Error, _} ->
Error;
{_, {error, _Reason} = Error} ->
Error
end.
+
+maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
+maybe_ntoab(Host) -> Host.
+
+rdns(Addr) ->
+ {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups),
+ case Lookup of
+ true -> list_to_binary(rabbit_networking:tcp_host(Addr));
+ _ -> Addr
+ end.
+
+sock_funs(inbound) -> {fun peername/1, fun sockname/1};
+sock_funs(outbound) -> {fun sockname/1, fun peername/1}.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 5cf8d1ae..31eeef73 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -21,7 +21,7 @@
node_listeners/1, connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
- close_connection/2, force_connection_event_refresh/0]).
+ close_connection/2, force_connection_event_refresh/0, tcp_host/1]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/6,
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index b11c9d04..8d0e4456 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -85,10 +85,10 @@ cluster_status_filename() ->
prepare_cluster_status_files() ->
rabbit_mnesia:ensure_mnesia_dir(),
- CorruptFiles = fun () -> throw({error, corrupt_cluster_status_files}) end,
+ Corrupt = fun(F) -> throw({error, corrupt_cluster_status_files, F}) end,
RunningNodes1 = case try_read_file(running_nodes_filename()) of
{ok, [Nodes]} when is_list(Nodes) -> Nodes;
- {ok, _ } -> CorruptFiles();
+ {ok, Other} -> Corrupt(Other);
{error, enoent} -> []
end,
ThisNode = [node()],
@@ -102,8 +102,8 @@ prepare_cluster_status_files() ->
{ok, [AllNodes0]} when is_list(AllNodes0) ->
{legacy_cluster_nodes(AllNodes0),
legacy_should_be_disc_node(AllNodes0)};
- {ok, _} ->
- CorruptFiles();
+ {ok, Files} ->
+ Corrupt(Files);
{error, enoent} ->
{legacy_cluster_nodes([]), true}
end,
@@ -134,8 +134,8 @@ read_cluster_status() ->
try_read_file(running_nodes_filename())} of
{{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
{All, Disc, Running};
- {_, _} ->
- throw({error, corrupt_or_missing_cluster_files})
+ {Stat, Run} ->
+ throw({error, {corrupt_or_missing_cluster_files, Stat, Run}})
end.
update_cluster_status() ->
@@ -184,6 +184,11 @@ partitions() ->
%%----------------------------------------------------------------------------
init([]) ->
+ %% We trap exits so that the supervisor will not just kill us. We
+ %% want to be sure that we are not going to be killed while
+ %% writing out the cluster status files - bad things can then
+ %% happen.
+ process_flag(trap_exit, true),
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
partitions = []}}.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index ecb19611..9f94af7d 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -19,18 +19,6 @@
-export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]).
--define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
--define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
--define(ENABLED_DEF, {?ENABLED_OPT, flag}).
--define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
-
--define(GLOBAL_DEFS, []).
-
--define(COMMANDS,
- [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
- enable,
- disable]).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -100,8 +88,13 @@ dependencies(Reverse, Sources, AllPlugins) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
fun (App, _Deps) -> [{App, App}] end,
fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
- [{Name, Deps}
- || #plugin{name = Name, dependencies = Deps} <- AllPlugins]),
+ lists:ukeysort(
+ 1, [{Name, Deps} ||
+ #plugin{name = Name,
+ dependencies = Deps} <- AllPlugins] ++
+ [{Dep, []} ||
+ #plugin{dependencies = Deps} <- AllPlugins,
+ Dep <- Deps])),
Dests = case Reverse of
false -> digraph_utils:reachable(Sources, G);
true -> digraph_utils:reaching(Sources, G)
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 572cf150..2158d1da 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -108,16 +108,19 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
Enabled, AllPlugins),
ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
Missing = ToEnable -- plugin_names(AllPlugins),
- case Missing of
- [] -> ok;
- _ -> throw({error_string,
- fmt_list("The following plugins could not be found:",
- Missing)})
- end,
NewEnabled = lists:usort(Enabled ++ ToEnable),
- write_enabled_plugins(PluginsFile, NewEnabled),
NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
NewEnabled, AllPlugins),
+ MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing,
+ case {Missing, MissingDeps} of
+ {[], []} -> ok;
+ {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)});
+ {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)});
+ {_, _} -> throw({error_string,
+ fmt_missing("plugins", Missing) ++
+ fmt_missing("dependencies", MissingDeps)})
+ end,
+ write_enabled_plugins(PluginsFile, NewEnabled),
maybe_warn_mochiweb(NewImplicitlyEnabled),
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
@@ -183,9 +186,12 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
EnabledImplicitly =
rabbit_plugins:dependencies(false, EnabledExplicitly,
AvailablePlugins) -- EnabledExplicitly,
+ Missing = [#plugin{name = Name, dependencies = []} ||
+ Name <- ((EnabledExplicitly ++ EnabledImplicitly) --
+ plugin_names(AvailablePlugins))],
{ok, RE} = re:compile(Pattern),
Plugins = [ Plugin ||
- Plugin = #plugin{name = Name} <- AvailablePlugins,
+ Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
OnlyEnabledAll -> (lists:member(Name,
@@ -196,30 +202,35 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
Plugins1 = usort_plugins(Plugins),
MaxWidth = lists:max([length(atom_to_list(Name)) ||
#plugin{name = Name} <- Plugins1] ++ [0]),
- [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format,
- MaxWidth) || P <- Plugins1],
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly,
+ plugin_names(Missing), Format, MaxWidth) || P <- Plugins1],
ok.
format_plugin(#plugin{name = Name, version = Version,
description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) ->
+ EnabledExplicitly, EnabledImplicitly, Missing,
+ Format, MaxWidth) ->
Glyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly)} of
- {true, false} -> "[E]";
- {false, true} -> "[e]";
- _ -> "[ ]"
+ lists:member(Name, EnabledImplicitly),
+ lists:member(Name, Missing)} of
+ {true, false, false} -> "[E]";
+ {false, true, false} -> "[e]";
+ {_, _, true} -> "[!]";
+ _ -> "[ ]"
end,
+ Opt = fun (_F, A, A) -> ok;
+ ( F, A, _) -> io:format(F, [A])
+ end,
case Format of
minimal -> io:format("~s~n", [Name]);
- normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++
- "w ~s~n", [Glyph, Name, Version]);
+ normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++ "w ",
+ [Glyph, Name]),
+ Opt("~s", Version, undefined),
+ io:format("~n");
verbose -> io:format("~s ~w~n", [Glyph, Name]),
- io:format(" Version: \t~s~n", [Version]),
- case Deps of
- [] -> ok;
- _ -> io:format(" Dependencies:\t~p~n", [Deps])
- end,
- io:format(" Description:\t~s~n", [Description]),
+ Opt(" Version: \t~s~n", Version, undefined),
+ Opt(" Dependencies:\t~p~n", Deps, []),
+ Opt(" Description: \t~s~n", Description, undefined),
io:format("~n")
end.
@@ -230,6 +241,9 @@ fmt_list(Header, Plugins) ->
lists:flatten(
[Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
+fmt_missing(Desc, Missing) ->
+ fmt_list("The following " ++ Desc ++ " could not be found:", Missing).
+
usort_plugins(Plugins) ->
lists:usort(fun plugins_cmp/2, Plugins).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f4e6865b..928786e9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -35,23 +35,23 @@
%%--------------------------------------------------------------------------
--record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
+-record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
auth_mechanism, auth_state, conserve_resources,
- last_blocked_by, last_blocked_at}).
+ last_blocked_by, last_blocked_at, host, peer_host,
+ port, peer_port}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
channels]).
--define(CREATION_EVENT_KEYS, [pid, name, address, port, peer_address, peer_port,
- ssl, peer_cert_subject, peer_cert_issuer,
- peer_cert_validity, auth_mechanism,
- ssl_protocol, ssl_key_exchange,
- ssl_cipher, ssl_hash,
- protocol, user, vhost, timeout, frame_max,
- client_properties]).
+-define(CREATION_EVENT_KEYS,
+ [pid, name, port, peer_port, host,
+ peer_host, ssl, peer_cert_subject, peer_cert_issuer,
+ peer_cert_validity, auth_mechanism, ssl_protocol,
+ ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
+ timeout, frame_max, client_properties]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -192,16 +192,20 @@ socket_op(Sock, Fun) ->
name(Sock) ->
socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end).
+socket_ends(Sock) ->
+ socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
+
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
- ConnStr = name(Sock),
- log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
+ Name = name(Sock),
+ log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
ClientSock = socket_op(Sock, SockTransform),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
- handshake_timeout),
+ erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
+ {PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
State = #v1{parent = Parent,
sock = ClientSock,
+ name = list_to_binary(Name),
connection = #connection{
protocol = none,
user = none,
@@ -224,19 +228,23 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
auth_state = none,
conserve_resources = false,
last_blocked_by = none,
- last_blocked_at = never},
+ last_blocked_at = never,
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
handshake, 8)),
- log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
+ log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
connection_closed_abruptly -> warning;
_ -> error
end, "closing AMQP connection ~p (~s):~n~p~n",
- [self(), ConnStr, Ex])
+ [self(), Name, Ex])
after
%% We don't call gen_tcp:close/1 here since it waits for
%% pending output to be sent, which results in unnecessary
@@ -341,6 +349,8 @@ handle_other({'$gen_cast', force_event_refresh}, Deb, State)
handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
%% Ignore, we will emit a created event once we start running.
mainloop(Deb, State);
+handle_other(ensure_stats, Deb, State) ->
+ mainloop(Deb, ensure_stats_timer(State));
handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
@@ -491,6 +501,14 @@ handle_exception(State, Channel, Reason) ->
timer:sleep(?SILENT_CLOSE_DELAY * 1000),
throw({handshake_error, State#v1.connection_state, Channel, Reason}).
+%% we've "lost sync" with the client and hence must not accept any
+%% more input
+fatal_frame_error(Error, Type, Channel, Payload, State) ->
+ frame_error(Error, Type, Channel, Payload, State),
+ %% grace period to allow transmission of error
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw(fatal_frame_error).
+
frame_error(Error, Type, Channel, Payload, State) ->
{Str, Bin} = payload_snippet(Payload),
handle_exception(State, Channel,
@@ -513,7 +531,7 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
+ #v1{sock = Sock, name = Name, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
connection = #connection{protocol = Protocol,
frame_max = FrameMax,
@@ -522,7 +540,7 @@ create_channel(Channel, State) ->
capabilities = Capabilities}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
@@ -621,8 +639,9 @@ handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
State = #v1{connection = #connection{frame_max = FrameMax}})
when FrameMax /= 0 andalso
PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
- frame_error({frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
- Type, Channel, <<>>, State);
+ fatal_frame_error(
+ {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
+ Type, Channel, <<>>, State);
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
@@ -633,8 +652,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
switch_callback(State1, frame_header, 7);
- _ -> frame_error({invalid_frame_end_marker, EndMarker},
- Type, Channel, Payload, State)
+ _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
+ Type, Channel, Payload, State)
end;
%% The two rules pertaining to version negotiation:
@@ -881,82 +900,66 @@ auth_phase(Response,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(pid, #v1{}) ->
- self();
-i(name, #v1{sock = Sock}) ->
- list_to_binary(name(Sock));
-i(address, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:sockname/1, fun ({A, _}) -> A end, Sock);
-i(port, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:sockname/1, fun ({_, P}) -> P end, Sock);
-i(peer_address, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock);
-i(peer_port, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
-i(ssl, #v1{sock = Sock}) ->
- rabbit_net:is_ssl(Sock);
-i(ssl_protocol, #v1{sock = Sock}) ->
- ssl_info(fun ({P, _}) -> P end, Sock);
-i(ssl_key_exchange, #v1{sock = Sock}) ->
- ssl_info(fun ({_, {K, _, _}}) -> K end, Sock);
-i(ssl_cipher, #v1{sock = Sock}) ->
- ssl_info(fun ({_, {_, C, _}}) -> C end, Sock);
-i(ssl_hash, #v1{sock = Sock}) ->
- ssl_info(fun ({_, {_, _, H}}) -> H end, Sock);
-i(peer_cert_issuer, #v1{sock = Sock}) ->
- cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
-i(peer_cert_subject, #v1{sock = Sock}) ->
- cert_info(fun rabbit_ssl:peer_cert_subject/1, Sock);
-i(peer_cert_validity, #v1{sock = Sock}) ->
- cert_info(fun rabbit_ssl:peer_cert_validity/1, Sock);
-i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
- SockStat =:= recv_cnt;
- SockStat =:= send_oct;
- SockStat =:= send_cnt;
- SockStat =:= send_pend ->
- socket_info(fun (S) -> rabbit_net:getstat(S, [SockStat]) end,
- fun ([{_, I}]) -> I end, Sock);
-i(state, #v1{connection_state = S}) ->
- S;
-i(last_blocked_by, #v1{last_blocked_by = By}) ->
- By;
-i(last_blocked_age, #v1{last_blocked_at = never}) ->
+i(pid, #v1{}) -> self();
+i(name, #v1{name = Name}) -> Name;
+i(host, #v1{host = Host}) -> Host;
+i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost;
+i(port, #v1{port = Port}) -> Port;
+i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort;
+i(SockStat, S) when SockStat =:= recv_oct;
+ SockStat =:= recv_cnt;
+ SockStat =:= send_oct;
+ SockStat =:= send_cnt;
+ SockStat =:= send_pend ->
+ socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
+ fun ([{_, I}]) -> I end, S);
+i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
+i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S);
+i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S);
+i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S);
+i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
+i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
+i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
+i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
+i(state, #v1{connection_state = CS}) -> CS;
+i(last_blocked_by, #v1{last_blocked_by = By}) -> By;
+i(last_blocked_age, #v1{last_blocked_at = never}) ->
infinity;
-i(last_blocked_age, #v1{last_blocked_at = T}) ->
+i(last_blocked_age, #v1{last_blocked_at = T}) ->
timer:now_diff(erlang:now(), T) / 1000000;
-i(channels, #v1{}) ->
- length(all_channels());
-i(protocol, #v1{connection = #connection{protocol = none}}) ->
- none;
-i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
- Protocol:version();
-i(auth_mechanism, #v1{auth_mechanism = none}) ->
+i(channels, #v1{}) -> length(all_channels());
+i(auth_mechanism, #v1{auth_mechanism = none}) ->
none;
-i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
proplists:get_value(name, Mechanism:description());
-i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
- Username;
-i(user, #v1{connection = #connection{user = none}}) ->
+i(protocol, #v1{connection = #connection{protocol = none}}) ->
+ none;
+i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
+ Protocol:version();
+i(user, #v1{connection = #connection{user = none}}) ->
'';
-i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
+i(user, #v1{connection = #connection{user = #user{
+ username = Username}}}) ->
+ Username;
+i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
VHost;
-i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
+i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
Timeout;
-i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
+i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
FrameMax;
-i(client_properties, #v1{connection = #connection{
- client_properties = ClientProperties}}) ->
+i(client_properties, #v1{connection = #connection{client_properties =
+ ClientProperties}}) ->
ClientProperties;
i(Item, #v1{}) ->
throw({bad_argument, Item}).
-socket_info(Get, Select, Sock) ->
+socket_info(Get, Select, #v1{sock = Sock}) ->
case Get(Sock) of
{ok, T} -> Select(T);
{error, _} -> ''
end.
-ssl_info(F, Sock) ->
+ssl_info(F, #v1{sock = Sock}) ->
%% The first ok form is R14
%% The second is R13 - the extra term is exportability (by inspection,
%% the docs are wrong)
@@ -967,7 +970,7 @@ ssl_info(F, Sock) ->
{ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}})
end.
-cert_info(F, Sock) ->
+cert_info(F, #v1{sock = Sock}) ->
case rabbit_net:peercert(Sock) of
nossl -> '';
{error, no_peercert} -> '';
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index afc10b9f..096f9490 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -18,7 +18,7 @@
-compile([export_all]).
--export([all_tests/0, test_parsing/0]).
+-export([all_tests/0]).
-import(rabbit_misc, [pget/2]).
@@ -41,11 +41,12 @@ all_tests() ->
passed = test_multi_call(),
passed = test_file_handle_cache(),
passed = test_backing_queue(),
+ passed = test_rabbit_basic_header_handling(),
passed = test_priority_queue(),
passed = test_pg_local(),
passed = test_unfold(),
passed = test_supervisor_delayed_restart(),
- passed = test_parsing(),
+ passed = test_table_codec(),
passed = test_content_framing(),
passed = test_content_transcoding(),
passed = test_topic_matching(),
@@ -71,6 +72,7 @@ all_tests() ->
passed = test_configurable_server_properties(),
passed.
+
do_if_secondary_node(Up, Down) ->
SecondaryNode = rabbit_nodes:make("hare"),
@@ -159,6 +161,78 @@ test_multi_call() ->
exit(Pid3, bang),
passed.
+test_rabbit_basic_header_handling() ->
+ passed = write_table_with_invalid_existing_type_test(),
+ passed = invalid_existing_headers_test(),
+ passed = disparate_invalid_header_entries_accumulate_separately_test(),
+ passed = corrupt_or_invalid_headers_are_overwritten_test(),
+ passed = invalid_same_header_entry_accumulation_test(),
+ passed.
+
+-define(XDEATH_TABLE,
+ [{<<"reason">>, longstr, <<"blah">>},
+ {<<"queue">>, longstr, <<"foo.bar.baz">>},
+ {<<"exchange">>, longstr, <<"my-exchange">>},
+ {<<"routing-keys">>, array, []}]).
+
+-define(ROUTE_TABLE, [{<<"redelivered">>, bool, <<"true">>}]).
+
+-define(BAD_HEADER(K), {<<K>>, longstr, <<"bad ", K>>}).
+-define(BAD_HEADER2(K, Suf), {<<K>>, longstr, <<"bad ", K, Suf>>}).
+-define(FOUND_BAD_HEADER(K), {<<K>>, array, [{longstr, <<"bad ", K>>}]}).
+
+write_table_with_invalid_existing_type_test() ->
+ prepend_check(<<"header1">>, ?XDEATH_TABLE, [?BAD_HEADER("header1")]),
+ passed.
+
+invalid_existing_headers_test() ->
+ Headers =
+ prepend_check(<<"header2">>, ?ROUTE_TABLE, [?BAD_HEADER("header2")]),
+ {array, [{table, ?ROUTE_TABLE}]} =
+ rabbit_misc:table_lookup(Headers, <<"header2">>),
+ passed.
+
+disparate_invalid_header_entries_accumulate_separately_test() ->
+ BadHeaders = [?BAD_HEADER("header2")],
+ Headers = prepend_check(<<"header2">>, ?ROUTE_TABLE, BadHeaders),
+ Headers2 = prepend_check(<<"header1">>, ?XDEATH_TABLE,
+ [?BAD_HEADER("header1") | Headers]),
+ {table, [?FOUND_BAD_HEADER("header1"),
+ ?FOUND_BAD_HEADER("header2")]} =
+ rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY),
+ passed.
+
+corrupt_or_invalid_headers_are_overwritten_test() ->
+ Headers0 = [?BAD_HEADER("header1"),
+ ?BAD_HEADER("x-invalid-headers")],
+ Headers1 = prepend_check(<<"header1">>, ?XDEATH_TABLE, Headers0),
+ {table,[?FOUND_BAD_HEADER("header1"),
+ ?FOUND_BAD_HEADER("x-invalid-headers")]} =
+ rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY),
+ passed.
+
+invalid_same_header_entry_accumulation_test() ->
+ BadHeader1 = ?BAD_HEADER2("header1", "a"),
+ Headers = prepend_check(<<"header1">>, ?ROUTE_TABLE, [BadHeader1]),
+ Headers2 = prepend_check(<<"header1">>, ?ROUTE_TABLE,
+ [?BAD_HEADER2("header1", "b") | Headers]),
+ {table, InvalidHeaders} =
+ rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY),
+ {array, [{longstr,<<"bad header1b">>},
+ {longstr,<<"bad header1a">>}]} =
+ rabbit_misc:table_lookup(InvalidHeaders, <<"header1">>),
+ passed.
+
+prepend_check(HeaderKey, HeaderTable, Headers) ->
+ Headers1 = rabbit_basic:prepend_table_header(
+ HeaderKey, HeaderTable, Headers),
+ {table, Invalid} =
+ rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY),
+ {Type, Value} = rabbit_misc:table_lookup(Headers, HeaderKey),
+ {array, [{Type, Value} | _]} =
+ rabbit_misc:table_lookup(Invalid, HeaderKey),
+ Headers1.
+
test_priority_queue() ->
false = priority_queue:is_queue(not_a_queue),
@@ -350,113 +424,45 @@ test_unfold() ->
end, 10),
passed.
-test_parsing() ->
- passed = test_content_properties(),
- passed = test_field_values(),
- passed.
-
-test_content_prop_encoding(Datum, Binary) ->
- Types = [element(1, E) || E <- Datum],
- Values = [element(2, E) || E <- Datum],
- Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
-
-test_content_properties() ->
- test_content_prop_encoding([], <<0, 0>>),
- test_content_prop_encoding([{bit, true}, {bit, false}, {bit, true}, {bit, false}],
- <<16#A0, 0>>),
- test_content_prop_encoding([{bit, true}, {octet, 123}, {bit, true}, {octet, undefined},
- {bit, true}],
- <<16#E8,0,123>>),
- test_content_prop_encoding([{bit, true}, {octet, 123}, {octet, 123}, {bit, true}],
- <<16#F0,0,123,123>>),
- test_content_prop_encoding([{bit, true}, {shortstr, <<"hi">>}, {bit, true},
- {shortint, 54321}, {bit, true}],
- <<16#F8,0,2,"hi",16#D4,16#31>>),
- test_content_prop_encoding([{bit, true}, {shortstr, undefined}, {bit, true},
- {shortint, 54321}, {bit, true}],
- <<16#B8,0,16#D4,16#31>>),
- test_content_prop_encoding([{table, [{<<"a signedint">>, signedint, 12345678},
- {<<"a longstr">>, longstr, <<"yes please">>},
- {<<"a decimal">>, decimal, {123, 12345678}},
- {<<"a timestamp">>, timestamp, 123456789012345},
- {<<"a nested table">>, table,
- [{<<"one">>, signedint, 1},
- {<<"two">>, signedint, 2}]}]}],
- <<
- %% property-flags
- 16#8000:16,
-
- %% property-list:
-
- %% table
- 117:32, % table length in bytes
-
- 11,"a signedint", % name
- "I",12345678:32, % type and value
-
- 9,"a longstr",
- "S",10:32,"yes please",
-
- 9,"a decimal",
- "D",123,12345678:32,
-
- 11,"a timestamp",
- "T", 123456789012345:64,
-
- 14,"a nested table",
- "F",
- 18:32,
-
- 3,"one",
- "I",1:32,
-
- 3,"two",
- "I",2:32 >>),
- passed.
-
-test_field_values() ->
+test_table_codec() ->
%% FIXME this does not test inexact numbers (double and float) yet,
%% because they won't pass the equality assertions
- test_content_prop_encoding(
- [{table, [{<<"longstr">>, longstr, <<"Here is a long string">>},
- {<<"signedint">>, signedint, 12345},
- {<<"decimal">>, decimal, {3, 123456}},
- {<<"timestamp">>, timestamp, 109876543209876},
- {<<"table">>, table, [{<<"one">>, signedint, 54321},
- {<<"two">>, longstr, <<"A long string">>}]},
- {<<"byte">>, byte, 255},
- {<<"long">>, long, 1234567890},
- {<<"short">>, short, 655},
- {<<"bool">>, bool, true},
- {<<"binary">>, binary, <<"a binary string">>},
- {<<"void">>, void, undefined},
- {<<"array">>, array, [{signedint, 54321},
- {longstr, <<"A long string">>}]}
-
- ]}],
- <<
- %% property-flags
- 16#8000:16,
- %% table length in bytes
- 228:32,
-
- 7,"longstr", "S", 21:32, "Here is a long string", % = 34
- 9,"signedint", "I", 12345:32/signed, % + 15 = 49
- 7,"decimal", "D", 3, 123456:32, % + 14 = 63
- 9,"timestamp", "T", 109876543209876:64, % + 19 = 82
- 5,"table", "F", 31:32, % length of table % + 11 = 93
- 3,"one", "I", 54321:32, % + 9 = 102
- 3,"two", "S", 13:32, "A long string", % + 22 = 124
- 4,"byte", "b", 255:8, % + 7 = 131
- 4,"long", "l", 1234567890:64, % + 14 = 145
- 5,"short", "s", 655:16, % + 9 = 154
- 4,"bool", "t", 1, % + 7 = 161
- 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
- 4,"void", "V", % + 6 = 194
- 5,"array", "A", 23:32, % + 11 = 205
- "I", 54321:32, % + 5 = 210
- "S", 13:32, "A long string" % + 18 = 228
- >>),
+ Table = [{<<"longstr">>, longstr, <<"Here is a long string">>},
+ {<<"signedint">>, signedint, 12345},
+ {<<"decimal">>, decimal, {3, 123456}},
+ {<<"timestamp">>, timestamp, 109876543209876},
+ {<<"table">>, table, [{<<"one">>, signedint, 54321},
+ {<<"two">>, longstr,
+ <<"A long string">>}]},
+ {<<"byte">>, byte, 255},
+ {<<"long">>, long, 1234567890},
+ {<<"short">>, short, 655},
+ {<<"bool">>, bool, true},
+ {<<"binary">>, binary, <<"a binary string">>},
+ {<<"void">>, void, undefined},
+ {<<"array">>, array, [{signedint, 54321},
+ {longstr, <<"A long string">>}]}
+ ],
+ Binary = <<
+ 7,"longstr", "S", 21:32, "Here is a long string",
+ 9,"signedint", "I", 12345:32/signed,
+ 7,"decimal", "D", 3, 123456:32,
+ 9,"timestamp", "T", 109876543209876:64,
+ 5,"table", "F", 31:32, % length of table
+ 3,"one", "I", 54321:32,
+ 3,"two", "S", 13:32, "A long string",
+ 4,"byte", "b", 255:8,
+ 4,"long", "l", 1234567890:64,
+ 5,"short", "s", 655:16,
+ 4,"bool", "t", 1,
+ 6,"binary", "x", 15:32, "a binary string",
+ 4,"void", "V",
+ 5,"array", "A", 23:32,
+ "I", 54321:32,
+ "S", 13:32, "A long string"
+ >>,
+ Binary = rabbit_binary_generator:generate_table(Table),
+ Table = rabbit_binary_parser:parse_table(Binary),
passed.
%% Test that content frames don't exceed frame-max
@@ -1125,6 +1131,9 @@ test_server_status() ->
HWM = vm_memory_monitor:get_vm_memory_high_watermark(),
ok = control_action(set_vm_memory_high_watermark, ["1"]),
ok = control_action(set_vm_memory_high_watermark, ["1.0"]),
+ %% this will trigger an alarm
+ ok = control_action(set_vm_memory_high_watermark, ["0.0"]),
+ %% reset
ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]),
%% eval
@@ -2518,7 +2527,7 @@ test_queue_recover() ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(),
- rabbit_amqqueue:start(),
+ rabbit_amqqueue:start(rabbit_amqqueue:recover()),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f3a8cacf..a7ea3d99 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,13 +18,17 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([start/5, start_link/5, start/6, start_link/6]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--record(wstate, {sock, channel, frame_max, protocol, pending}).
+%% internal
+-export([mainloop/1, mainloop1/1]).
+
+-record(wstate, {sock, channel, frame_max, protocol, reader,
+ stats_timer, pending}).
-define(HIBERNATE_AFTER, 5000).
@@ -40,6 +44,14 @@
(rabbit_net:socket(), rabbit_channel:channel_number(),
non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
+-spec(start/6 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ -> rabbit_types:ok(pid())).
+-spec(start_link/6 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ -> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
@@ -67,50 +79,58 @@
non_neg_integer(), rabbit_types:protocol())
-> 'ok').
--spec(mainloop/2 :: (_,_) -> 'done').
--spec(mainloop1/2 :: (_,_) -> any()).
-
-endif.
%%---------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- {ok,
- proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol,
- pending = []}])}.
+ start(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- {ok,
- proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol,
- pending = []}])}.
-
-mainloop(ReaderPid, State) ->
+ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
+ ReaderWantsStats),
+ {ok, proc_lib:spawn(?MODULE, mainloop, [State])}.
+
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
+ ReaderWantsStats),
+ {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
+
+initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+ (case ReaderWantsStats of
+ true -> fun rabbit_event:init_stats_timer/2;
+ false -> fun rabbit_event:init_disabled_stats_timer/2
+ end)(#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol,
+ reader = ReaderPid,
+ pending = []},
+ #wstate.stats_timer).
+
+mainloop(State) ->
try
- mainloop1(ReaderPid, State)
+ mainloop1(State)
catch
- exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error}
+ exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
+ ReaderPid ! {channel_exit, Channel, Error}
end,
done.
-mainloop1(ReaderPid, State = #wstate{pending = []}) ->
+mainloop1(State = #wstate{pending = []}) ->
receive
- Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
+ erlang:hibernate(?MODULE, mainloop, [State])
end;
-mainloop1(ReaderPid, State) ->
+mainloop1(State) ->
receive
- Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
after 0 ->
- ?MODULE:mainloop1(ReaderPid, flush(State))
+ ?MODULE:mainloop1(flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
@@ -139,9 +159,12 @@ handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
State;
handle_message({inet_reply, _, ok}, State) ->
- State;
+ rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
+handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
+ ReaderPid ! ensure_stats,
+ rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).