summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarek Majkowski <marek@rabbitmq.com>2010-07-01 11:14:05 +0100
committerMarek Majkowski <marek@rabbitmq.com>2010-07-01 11:14:05 +0100
commitc4873b1f14b8272bbbf1c6ceda0badad29b3ac86 (patch)
tree88f993f76e6541360f006429caaf7926046f477a
parent91fcaa3520f00ba0ed3d3cea870b2c1f34cb496b (diff)
parent084e69a6adf36365d253826239d11ff90f5587aa (diff)
downloadrabbitmq-server-bug22906.tar.gz
default merged into bug22906bug22906
-rw-r--r--Makefile16
-rw-r--r--docs/html-to-website-xml.xsl4
-rw-r--r--ebin/rabbit_app.in4
-rw-r--r--include/rabbit.hrl1
-rw-r--r--packaging/macports/Portfile.in6
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_channel.erl142
-rw-r--r--src/rabbit_exchange.erl16
-rw-r--r--src/rabbit_memory_monitor.erl3
-rw-r--r--src/rabbit_mnesia.erl18
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_reader.erl49
-rw-r--r--src/rabbit_router.erl4
-rw-r--r--src/rabbit_tests.erl47
16 files changed, 208 insertions, 116 deletions
diff --git a/Makefile b/Makefile
index 725f20a6..5935f034 100644
--- a/Makefile
+++ b/Makefile
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json $(AMQP_CODEGEN_DIR)/rabbitmq-0.8-extensions.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
@@ -76,6 +76,18 @@ SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
endif
endif
+# Versions prior to this are not supported
+NEED_MAKE := 3.80
+ifneq "$(NEED_MAKE)" "$(firstword $(sort $(NEED_MAKE) $(MAKE_VERSION)))"
+$(error Versions of make prior to $(NEED_MAKE) are not supported)
+endif
+
+# .DEFAULT_GOAL introduced in 3.81
+DEFAULT_GOAL_MAKE := 3.81
+ifneq "$(DEFAULT_GOAL_MAKE)" "$(firstword $(sort $(DEFAULT_GOAL_MAKE) $(MAKE_VERSION)))"
+.DEFAULT_GOAL=all
+endif
+
all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
@@ -268,7 +280,7 @@ install_dirs:
mkdir -p $(SBIN_DIR)
mkdir -p $(MAN_DIR)
-$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML))))
+$(foreach XML,$(USAGES_XML),$(eval $(call usage_dep, $(XML))))
# Note that all targets which depend on clean must have clean in their
# name. Also any target that doesn't depend on clean should not have
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index f2117e26..662dbea0 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -58,13 +58,13 @@
<!-- Specific instructions to revert the DocBook HTML to be more like our ad-hoc XML schema -->
<xsl:template match="div[@class='refsect1'] | div[@class='refnamediv'] | div[@class='refsynopsisdiv']">
- <doc:section name="{@title}">
+ <doc:section name="{h2}">
<xsl:apply-templates select="node()"/>
</doc:section>
</xsl:template>
<xsl:template match="div[@class='refsect2']">
- <doc:subsection name="{@title}">
+ <doc:subsection name="{h3}">
<xsl:apply-templates select="node()"/>
</doc:subsection>
</xsl:template>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index bdf407eb..ce94cafe 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -11,8 +11,8 @@
rabbit_sup,
rabbit_tcp_client_sup]},
{applications, [kernel, stdlib, sasl, mnesia, os_mon]},
-%% we also depend on ssl but it shouldn't be in here as we don't
-%% actually want to start it
+%% we also depend on crypto, public_key and ssl but they shouldn't be
+%% in here as we don't actually want to start it
{mod, {rabbit, []}},
{env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
{ssl_listeners, []},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0d75310b..06297c69 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -79,6 +79,7 @@
-type(maybe(T) :: T | 'none').
-type(erlang_node() :: atom()).
+-type(node_type() :: disc_only | disc | ram | unknown).
-type(ssl_socket() :: #ssl_socket{}).
-type(socket() :: port() | ssl_socket()).
-type(thunk(T) :: fun(() -> T)).
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index 35e438d9..059092bf 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -4,7 +4,7 @@
PortSystem 1.0
name rabbitmq-server
version @VERSION@
-revision 2
+revision 2
categories net
maintainers paperplanes.de:meyer rabbitmq.com:tonyg openmaintainer
platforms darwin
@@ -24,8 +24,8 @@ checksums \
rmd160 @rmd160@
patchfiles erlang-r14a-build-fix.diff
-depends_build port:erlang port:xmlto port:libxslt
-depends_run port:erlang
+depends_lib port:erlang
+depends_build port:xmlto port:libxslt
platform darwin 7 {
depends_build-append port:py25-simplejson
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c389178a..6cf6d7d5 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -193,7 +193,7 @@
-spec(rotate_logs/1 :: (file_suffix()) -> 'ok' | {'error', any()}).
-spec(status/0 :: () ->
[{running_applications, [{atom(), string(), string()}]} |
- {nodes, [erlang_node()]} |
+ {nodes, [{node_type(), [erlang_node()]}]} |
{running_nodes, [erlang_node()]}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3c9c41bd..378d0cbc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -66,7 +66,7 @@
-spec(start/0 :: () -> 'ok').
-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
- maybe(pid())) -> amqqueue()).
+ maybe(pid())) -> {'new' | 'existing', amqqueue()}).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
@@ -110,7 +110,7 @@
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
--spec(set_ram_duration_target/2 :: (pid(), number()) -> 'ok').
+-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -395,7 +395,7 @@ delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid,
+ delegate:invoke(Pid,
fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
delegate_pcast(Pid, Pri, Msg) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5fdf0ffa..70e6e755 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -137,7 +137,7 @@ declare(Recover, From,
backing_queue = BQ, backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
- Q -> gen_server2:reply(From, Q),
+ Q -> gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use,
[self()]),
@@ -146,7 +146,7 @@ declare(Recover, From,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
noreply(State#q{backing_queue_state = BQS});
- Q1 -> {stop, normal, Q1, State}
+ Q1 -> {stop, normal, {existing, Q1}, State}
end.
terminate_shutdown(Fun, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f4434ade..1d91494b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
Reader ! {channel_exit, Channel, Reason},
State#ch{state = terminating}.
-return_queue_declare_ok(State, NoWait, Q) ->
- NewState = State#ch{most_recently_declared_queue =
- (Q#amqqueue.name)#resource.name},
+return_queue_declare_ok(#resource{name = ActualName},
+ NoWait, MessageCount, ConsumerCount, State) ->
+ NewState = State#ch{most_recently_declared_queue = ActualName},
case NoWait of
true -> {noreply, NewState};
- false ->
- {ok, ActualName, MessageCount, ConsumerCount} =
- rabbit_misc:with_exit_handler(
- fun () -> {ok, Q#amqqueue.name, 0, 0} end,
- fun () -> rabbit_amqqueue:stat(Q) end),
- Reply = #'queue.declare_ok'{queue = ActualName#resource.name,
- message_count = MessageCount,
- consumer_count = ConsumerCount},
- {reply, Reply, NewState}
+ false -> Reply = #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount},
+ {reply, Reply, NewState}
end.
check_resource_access(Username, Resource, Perm) ->
@@ -344,7 +339,7 @@ with_exclusive_access_or_die(QName, ReaderPid, F) ->
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
- not_allowed, "no previously declared queue", []);
+ not_found, "no previously declared queue", []);
expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath,
most_recently_declared_queue = MRDQ }) ->
rabbit_misc:r(VHostPath, queue, MRDQ);
@@ -354,7 +349,7 @@ expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) ->
expand_routing_key_shortcut(<<>>, <<>>,
#ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
- not_allowed, "no previously declared queue", []);
+ not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
#ch{ most_recently_declared_queue = MRDQ }) ->
MRDQ;
@@ -447,13 +442,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routed ->
ok;
unroutable ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312, <<"unroutable">>);
+ ok = basic_return(Message, WriterPid, no_route);
not_delivered ->
- %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>)
+ ok = basic_return(Message, WriterPid, no_consumers)
end,
{noreply, case TxnKey of
none -> State;
@@ -608,9 +599,8 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
-handle_method(#'basic.recover'{requeue = true},
- _, State = #ch{ transaction_id = none,
- unacked_message_q = UAMQ }) ->
+handle_method(#'basic.recover_async'{requeue = true},
+ _, State = #ch{ unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
%% The Qpid python test suite incorrectly assumes
@@ -620,12 +610,12 @@ handle_method(#'basic.recover'{requeue = true},
rabbit_amqqueue:requeue(
QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
-handle_method(#'basic.recover'{requeue = false},
- _, State = #ch{ transaction_id = none,
- writer_pid = WriterPid,
+handle_method(#'basic.recover_async'{requeue = false},
+ _, State = #ch{ writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
ok = rabbit_misc:queue_fold(
fun ({_DeliveryTag, none, _Msg}, ok) ->
@@ -645,12 +635,17 @@ handle_method(#'basic.recover'{requeue = false},
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State};
-handle_method(#'basic.recover'{}, _, _State) ->
- rabbit_misc:protocol_error(
- not_allowed, "attempt to recover a transactional channel",[]);
+handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
+ {noreply, State2 = #ch{writer_pid = WriterPid}} =
+ handle_method(#'basic.recover_async'{requeue = Requeue},
+ Content,
+ State),
+ ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
+ {noreply, State2};
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -716,7 +711,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
nowait = NoWait,
- arguments = Args},
+ arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
reader_pid = ReaderPid,
queue_collector_pid = CollectorPid}) ->
@@ -724,46 +719,43 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
true -> ReaderPid;
false -> none
end,
- %% We use this in both branches, because queue_declare may yet return an
- %% existing queue.
- Finish = fun (#amqqueue{name = QueueName,
- durable = Durable1,
- auto_delete = AutoDelete1} = Q)
- when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
- check_exclusive_access(Q, Owner, strict),
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
- end,
- Q;
- %% non-equivalence trumps exclusivity arbitrarily
- (#amqqueue{name = QueueName}) ->
- rabbit_misc:protocol_error(
- precondition_failed,
- "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end,
- Q = case rabbit_amqqueue:with(
- rabbit_misc:r(VHostPath, queue, QueueNameBin),
- Finish) of
- {error, not_found} ->
- ActualNameBin =
- case QueueNameBin of
+ ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
- QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner));
- #amqqueue{} = Other ->
- Other
- end,
- return_queue_declare_ok(State, NoWait, Q);
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ check_configure_permitted(QueueName, State),
+ case rabbit_amqqueue:with(QueueName,
+ fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of
+ {{ok, QueueName, MessageCount, ConsumerCount},
+ #amqqueue{durable = Durable1, auto_delete = AutoDelete1} = Q}
+ when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
+ check_exclusive_access(Q, Owner, strict),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount,
+ ConsumerCount, State);
+ {{ok, QueueName, _MessageCount, _ConsumerCount}, #amqqueue{}} ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]);
+ {error, not_found} ->
+ case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner) of
+ {new, Q = #amqqueue{}} ->
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as
+ %% the connection shuts down.
+ ok = case Owner of
+ none -> ok;
+ _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ end,
+ return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
+ {existing, _Q} ->
+ %% must have been created between the stat and the
+ %% declare. Loop around again.
+ handle_method(Declare, none, State)
+ end
+ end;
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
@@ -772,8 +764,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end),
- return_queue_declare_ok(State, NoWait, Q);
+ {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
+ rabbit_amqqueue:with_or_die(
+ QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ check_exclusive_access(Q, ReaderPid, lax),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
+ State);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
@@ -946,7 +942,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content},
- WriterPid, ReplyCode, ReplyText) ->
+ WriterPid, Reason) ->
+ {_Close, ReplyCode, ReplyText} =
+ rabbit_framing:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.return'{reply_code = ReplyCode,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index c5149b08..d77bf833 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -100,7 +100,7 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
+-define(INFO_KEYS, [name, type, durable, auto_delete, arguments]).
recover() ->
Exs = rabbit_misc:table_fold(
@@ -335,7 +335,7 @@ delete_queue_bindings(QueueName, FwdDeleteFun) ->
Module = type_to_module(Type),
case IsDeleted of
auto_deleted -> Module:delete(X, Bs);
- no_delete -> Module:remove_bindings(X, Bs)
+ not_deleted -> Module:remove_bindings(X, Bs)
end
end, Cleanup)
end.
@@ -438,11 +438,11 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
end) of
Err = {error, _} ->
Err;
- {{Action, X = #exchange{ type = Type }}, B} ->
+ {{IsDeleted, X = #exchange{ type = Type }}, B} ->
Module = type_to_module(Type),
- case Action of
- auto_delete -> Module:delete(X, [B]);
- no_delete -> Module:remove_bindings(X, [B])
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, [B]);
+ not_deleted -> Module:remove_bindings(X, [B])
end
end.
@@ -526,10 +526,10 @@ delete(ExchangeName, IfUnused) ->
end.
maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {no_delete, Exchange};
+ {not_deleted, Exchange};
maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
case conditional_delete(Exchange) of
- {error, in_use} -> {no_delete, Exchange};
+ {error, in_use} -> {not_deleted, Exchange};
{deleted, Exchange, []} -> {auto_deleted, Exchange}
end.
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 91e97ffe..e78b59f1 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -90,7 +90,8 @@
-spec(update/0 :: () -> 'ok').
-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
-spec(deregister/1 :: (pid()) -> 'ok').
--spec(report_ram_duration/2 :: (pid(), float() | 'infinity') -> number()).
+-spec(report_ram_duration/2 :: (pid(), float() | 'infinity') ->
+ number() | 'infinity').
-spec(stop/0 :: () -> 'ok').
-endif.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a0b7aa4e..d4b29943 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -47,7 +47,8 @@
-ifdef(use_specs).
--spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]).
+-spec(status/0 :: () -> [{'nodes', [{node_type(), [erlang_node()]}]} |
+ {'running_nodes', [erlang_node()]}]).
-spec(dir/0 :: () -> file_path()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
@@ -64,7 +65,20 @@
%%----------------------------------------------------------------------------
status() ->
- [{nodes, mnesia:system_info(db_nodes)},
+ [{nodes, case mnesia:system_info(is_running) of
+ yes -> [{Key, Nodes} ||
+ {Key, CopyType} <- [{disc_only, disc_only_copies},
+ {disc, disc_copies},
+ {ram, ram_copies}],
+ begin
+ Nodes = mnesia:table_info(schema, CopyType),
+ Nodes =/= []
+ end];
+ no -> case mnesia:system_info(db_nodes) of
+ [] -> [];
+ Nodes -> [{unknown, Nodes}]
+ end
+ end},
{running_nodes, mnesia:system_info(running_db_nodes)}].
init() ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index c3d0b7b7..68ffc98a 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -102,7 +102,7 @@ boot_ssl() ->
{ok, []} ->
ok;
{ok, SslListeners} ->
- ok = rabbit_misc:start_applications([crypto, ssl]),
+ ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOpts} = application:get_env(ssl_options),
[start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
ok
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 73a58f13..f2a903dc 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -53,6 +53,7 @@
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation
%---------------------------------------------------------------------------
@@ -102,6 +103,8 @@
%% heartbeat timeout -> *throw*
%% closing:
%% socket close -> *terminate*
+%% receive connection.close -> send connection.close_ok,
+%% *closing*
%% receive frame -> ignore, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
@@ -118,6 +121,8 @@
%% start terminate_connection timer, *closed*
%% closed:
%% socket close -> *terminate*
+%% receive connection.close -> send connection.close_ok,
+%% *closed*
%% receive connection.close_ok -> self() ! terminate_connection,
%% *closed*
%% receive frame -> ignore, *closed*
@@ -485,10 +490,18 @@ handle_frame(Type, Channel, Payload, State) ->
closing ->
%% According to the spec, after sending a
%% channel.close we must ignore all frames except
+ %% channel.close and channel.close_ok. In the
+ %% event of a channel.close, we should send back a
%% channel.close_ok.
case AnalyzedFrame of
{method, 'channel.close_ok', _} ->
erase({channel, Channel});
+ {method, 'channel.close', _} ->
+ %% We're already closing this channel, so
+ %% there's no cleanup to do (notify
+ %% queues, etc.)
+ ok = rabbit_writer:send_command(State#v1.sock,
+ #'channel.close_ok'{});
_ -> ok
end,
State;
@@ -605,27 +618,33 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
ok = send_on_channel0(
Sock,
#'connection.tune'{channel_max = 0,
- %% set to zero once QPid fix their negotiation
- frame_max = 131072,
+ frame_max = ?FRAME_MAX,
heartbeat = 0}),
State#v1{connection_state = tuning,
connection = Connection#connection{
user = User,
client_properties = ClientProperties}};
-handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
- frame_max = FrameMax,
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
sock = Sock}) ->
- %% if we have a channel_max limit that the client wishes to
- %% exceed, die as per spec. Not currently a problem, so we ignore
- %% the client's channel_max parameter.
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}};
+ if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "frame_max=~w < ~w min size",
+ [FrameMax, ?FRAME_MIN_SIZE]);
+ (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "frame_max=~w > ~w max size",
+ [FrameMax, ?FRAME_MAX]);
+ true ->
+ rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ timeout_sec = ClientHeartbeat,
+ frame_max = FrameMax}}
+ end;
+
handle_method0(#'connection.open'{virtual_host = VHostPath,
insist = Insist},
State = #v1{connection_state = opening,
@@ -659,6 +678,12 @@ handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
+handle_method0(#'connection.close'{}, State = #v1{connection_state = CS})
+ when CS =:= closing; CS =:= closed ->
+ %% We're already closed or closing, so we don't need to cleanup
+ %% anything.
+ ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
self() ! terminate_connection,
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 5cd15a94..75196bc0 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -90,13 +90,13 @@ match_routing_key(Name, RoutingKey) ->
lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
lookup_qpids(Queues) ->
- sets:fold(
+ lists:foldl(
fun (Key, Acc) ->
case mnesia:dirty_read({rabbit_queue, Key}) of
[#amqqueue{pid = QPid}] -> [QPid | Acc];
[] -> Acc
end
- end, [], sets:from_list(Queues)).
+ end, [], lists:usort(Queues)).
%%--------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ecc2613d..34eec121 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -55,6 +55,7 @@ all_tests() ->
passed = test_pg_local(),
passed = test_unfold(),
passed = test_parsing(),
+ passed = test_content_framing(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
@@ -353,6 +354,45 @@ test_field_values() ->
>>),
passed.
+%% Test that content frames don't exceed frame-max
+test_content_framing(FrameMax, Fragments) ->
+ [Header | Frames] =
+ rabbit_binary_generator:build_simple_content_frames(
+ 1,
+ #content{class_id = 0, properties_bin = <<>>,
+ payload_fragments_rev = Fragments},
+ FrameMax),
+ %% header is formatted correctly and the size is the total of the
+ %% fragments
+ <<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
+ BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header),
+ BodySize = size(list_to_binary(Fragments)),
+ false = lists:any(
+ fun (ContentFrame) ->
+ FrameBinary = list_to_binary(ContentFrame),
+ %% assert
+ <<_TypeAndChannel:3/binary,
+ Size:32/unsigned,
+ _Payload:Size/binary,
+ 16#CE>> = FrameBinary,
+ size(FrameBinary) > FrameMax
+ end,
+ Frames),
+ passed.
+
+test_content_framing() ->
+ %% no content
+ passed = test_content_framing(4096, []),
+ passed = test_content_framing(4096, [<<>>]),
+ %% easily fit in one frame
+ passed = test_content_framing(4096, [<<"Easy">>]),
+ %% exactly one frame (empty frame = 8 bytes)
+ passed = test_content_framing(11, [<<"One">>]),
+ %% more than one frame
+ passed = test_content_framing(20, [<<"into more than one frame">>,
+ <<"This will have to go">>]),
+ passed.
+
test_topic_match(P, R) ->
test_topic_match(P, R, true).
@@ -752,10 +792,11 @@ test_server_status() ->
Writer = spawn(fun () -> receive shutdown -> ok end end),
Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
self()),
- [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
+ [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
+ {new, Queue = #amqqueue{}} <-
+ [rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
- false, false, [], none) ||
- Name <- [<<"foo">>, <<"bar">>]],
+ false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
<<"ctag">>, true, undefined),