summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Jones <paulj@lshift.net>2010-01-22 13:03:38 +0000
committerPaul Jones <paulj@lshift.net>2010-01-22 13:03:38 +0000
commit0c435c5f542ede6ed2771058febdc5682ee10a24 (patch)
treecab73efdd06b6d251c4a3de7fa6ccd843ab0d7a3
parent7a4b0f522abb04fee32525a7790f63eaa397116b (diff)
parent9898bc1ba8ca17b8d7b858f937c550179db58b2f (diff)
downloadrabbitmq-server-bug19230.tar.gz
Merge default into bug19230bug19230
-rw-r--r--.hgignore14
-rw-r--r--Makefile71
-rw-r--r--codegen.py61
-rw-r--r--docs/rabbitmq-activate-plugins.1.pod2
-rw-r--r--docs/rabbitmq-deactivate-plugins.1.pod37
-rw-r--r--docs/rabbitmqctl.1.pod15
-rw-r--r--ebin/rabbit_app.in7
-rw-r--r--generate_app6
-rw-r--r--include/rabbit.hrl27
-rw-r--r--include/rabbit_framing_spec.hrl2
-rw-r--r--packaging/RPMS/Fedora/Makefile2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec7
-rw-r--r--packaging/common/rabbitmq-asroot-script-wrapper18
-rw-r--r--packaging/common/rabbitmq-script-wrapper12
-rw-r--r--packaging/common/rabbitmq-server.init13
-rw-r--r--packaging/debs/Debian/Makefile2
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/postrm.in (renamed from packaging/debs/Debian/debian/postrm)8
-rw-r--r--packaging/debs/Debian/debian/rules3
-rw-r--r--packaging/macports/Makefile58
-rw-r--r--packaging/macports/Portfile.in (renamed from packaging/macports/net/rabbitmq-server/Portfile)63
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper12
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper13
-rw-r--r--packaging/macports/patch-org.macports.rabbitmq-server.plist.diff (renamed from packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff)0
-rw-r--r--packaging/windows/Makefile1
-rw-r--r--scripts/rabbitmq-activate-plugins.bat19
-rwxr-xr-xscripts/rabbitmq-deactivate-plugins37
-rw-r--r--scripts/rabbitmq-deactivate-plugins.bat39
-rwxr-xr-xscripts/rabbitmq-multi25
-rwxr-xr-xscripts/rabbitmq-multi.bat37
-rwxr-xr-xscripts/rabbitmq-server34
-rwxr-xr-xscripts/rabbitmq-server.bat60
-rwxr-xr-xscripts/rabbitmq-service.bat75
-rwxr-xr-xscripts/rabbitmqctl5
-rwxr-xr-xscripts/rabbitmqctl.bat10
-rw-r--r--src/gen_server2.erl19
-rw-r--r--src/priority_queue.erl4
-rw-r--r--src/rabbit.erl317
-rw-r--r--src/rabbit_alarm.erl118
-rw-r--r--src/rabbit_amqqueue.erl36
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_basic.erl5
-rw-r--r--src/rabbit_binary_generator.erl95
-rw-r--r--src/rabbit_binary_parser.erl65
-rw-r--r--src/rabbit_channel.erl83
-rw-r--r--src/rabbit_control.erl115
-rw-r--r--src/rabbit_dialyzer.erl91
-rw-r--r--src/rabbit_error_logger.erl6
-rw-r--r--src/rabbit_exchange.erl8
-rw-r--r--src/rabbit_guid.erl9
-rw-r--r--src/rabbit_heartbeat.erl4
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_load.erl17
-rw-r--r--src/rabbit_memsup.erl142
-rw-r--r--src/rabbit_memsup_darwin.erl88
-rw-r--r--src/rabbit_memsup_linux.erl101
-rw-r--r--src/rabbit_misc.erl70
-rw-r--r--src/rabbit_mnesia.erl22
-rw-r--r--src/rabbit_multi.erl105
-rw-r--r--src/rabbit_net.erl132
-rw-r--r--src/rabbit_networking.erl94
-rw-r--r--src/rabbit_plugin_activator.erl112
-rw-r--r--src/rabbit_reader.erl241
-rw-r--r--src/rabbit_sup.erl11
-rw-r--r--src/rabbit_tests.erl60
-rw-r--r--src/rabbit_writer.erl4
-rw-r--r--src/tcp_acceptor.erl27
-rw-r--r--src/tcp_listener.erl29
-rw-r--r--src/tcp_listener_sup.erl14
-rw-r--r--src/vm_memory_monitor.erl325
70 files changed, 2204 insertions, 1197 deletions
diff --git a/.hgignore b/.hgignore
index 3323e2fc..ccd0b09f 100644
--- a/.hgignore
+++ b/.hgignore
@@ -1,18 +1,18 @@
syntax: glob
*.beam
*~
+*.swp
+*.patch
erl_crash.dump
syntax: regexp
^cover/
^dist/
-^include/rabbit_framing.hrl$
-^src/rabbit_framing.erl$
-^rabbit.plt$
-^ebin/rabbit.app$
-^ebin/rabbit.rel$
-^ebin/rabbit.boot$
-^ebin/rabbit.script$
+^include/rabbit_framing\.hrl$
+^src/rabbit_framing\.erl$
+^rabbit\.plt$
+^basic.plt$
+^ebin/rabbit\.(app|rel|boot|script)$
^plugins/
^priv/plugins/
diff --git a/Makefile b/Makefile
index f0702756..db8f7001 100644
--- a/Makefile
+++ b/Makefile
@@ -1,29 +1,44 @@
-ifndef TMPDIR
-TMPDIR := /tmp
-endif
-RABBITMQ_NODENAME=rabbit
-RABBITMQ_SERVER_START_ARGS=
-RABBITMQ_MNESIA_DIR=$(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
-RABBITMQ_LOG_BASE=$(TMPDIR)
+TMPDIR ?= /tmp
+
+RABBITMQ_NODENAME ?= rabbit
+RABBITMQ_SERVER_START_ARGS ?=
+RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
+RABBITMQ_LOG_BASE ?= $(TMPDIR)
SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl)
-BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
+BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod))
+ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes)
+PYTHON=python
+else
+ifeq ($(shell python2.6 -c 'import simplejson' 2>/dev/null && echo yes),yes)
+PYTHON=python2.6
+else
+ifeq ($(shell python2.5 -c 'import simplejson' 2>/dev/null && echo yes),yes)
+PYTHON=python2.5
+else
+# Hmm. Missing simplejson?
PYTHON=python
+endif
+endif
+endif
+
+BASIC_PLT=basic.plt
+RABBIT_PLT=rabbit.plt
ifndef USE_SPECS
# our type specs rely on features / bug fixes in dialyzer that are
-# only available in R13B upwards (R13B is eshell 5.7.1)
+# only available in R13B01 upwards (R13B01 is eshell 5.7.2)
#
# NB: the test assumes that version number will only contain single digits
-USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.0" ]; then echo "true"; else echo "false"; fi)
+USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.1" ]; then echo "true"; else echo "false"; fi)
endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
@@ -39,10 +54,12 @@ AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
+ERL_EBIN=erl -noinput -pa $(EBIN_DIR)
+
all: $(TARGETS)
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
- escript generate_app $(EBIN_DIR) < $< > $@
+ escript generate_app $(EBIN_DIR) $@ < $<
$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
erlc $(ERLC_OPTS) $<
@@ -57,17 +74,32 @@ $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.p
$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
$(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@
-$(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS)
- erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().'
+dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
+ $(ERL_EBIN) -eval \
+ "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))."
+
+# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target
+create-plt: $(RABBIT_PLT)
-dialyze: $(BEAM_TARGETS)
- dialyzer -c $?
+$(RABBIT_PLT): $(BEAM_TARGETS) $(BASIC_PLT)
+ cp $(BASIC_PLT) $@
+ $(ERL_EBIN) -eval \
+ "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:add_to_plt(\"$@\", \"$(BEAM_TARGETS)\"))."
+
+$(BASIC_PLT): $(BEAM_TARGETS)
+ if [ -f $@ ]; then \
+ touch $@; \
+ else \
+ $(ERL_EBIN) -eval \
+ "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:create_basic_plt(\"$@\"))."; \
+ fi
clean:
rm -f $(EBIN_DIR)/*.beam
- rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script
+ rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel
rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
rm -f docs/*.[0-9].gz
+ rm -f $(RABBIT_PLT)
cleandb:
rm -rf $(RABBITMQ_MNESIA_DIR)/*
@@ -82,13 +114,14 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\
run: all
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
- RABBITMQ_NODE_ONLY=true \
- RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -s rabbit" \
+ RABBITMQ_ALLOW_INPUT=true \
+ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \
./scripts/rabbitmq-server
run-node: all
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
+ RABBITMQ_ALLOW_INPUT=true \
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \
./scripts/rabbitmq-server
@@ -173,7 +206,7 @@ install: all docs_all install_dirs
cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR)
chmod 0755 scripts/*
- for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins; do \
+ for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins rabbitmq-deactivate-plugins; do \
cp scripts/$$script $(TARGET_DIR)/sbin; \
[ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \
done
diff --git a/codegen.py b/codegen.py
index 84741ea2..6f39574f 100644
--- a/codegen.py
+++ b/codegen.py
@@ -92,6 +92,40 @@ class PackedMethodBitField:
def full(self):
return self.count() == 8
+
+def printFileHeader():
+ print """%% Autogenerated code. Do not edit.
+%%
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%"""
def genErl(spec):
def erlType(domain):
@@ -117,6 +151,13 @@ def genErl(spec):
def genMethodHasContent(m):
print "method_has_content(%s) -> %s;" % (m.erlangName(), str(m.hasContent).lower())
+
+ def genMethodIsSynchronous(m):
+ hasNoWait = "nowait" in fieldNameList(m.arguments)
+ if m.isSynchronous and hasNoWait:
+ print "is_method_synchronous(#%s{nowait = NoWait}) -> not(NoWait);" % (m.erlangName())
+ else:
+ print "is_method_synchronous(#%s{}) -> %s;" % (m.erlangName(), str(m.isSynchronous).lower())
def genMethodFieldTypes(m):
"""Not currently used - may be useful in future?"""
@@ -221,12 +262,12 @@ def genErl(spec):
print " rabbit_binary_generator:encode_properties(%s, %s);" % \
(fieldTypeList(c.fields), fieldTempList(c.fields))
- def massageConstantClass(cls):
+ def messageConstantClass(cls):
# We do this because 0.8 uses "soft error" and 8.1 uses "soft-error".
return erlangConstantName(cls)
def genLookupException(c,v,cls):
- mCls = massageConstantClass(cls)
+ mCls = messageConstantClass(cls)
if mCls == 'SOFT_ERROR': genLookupException1(c,'false')
elif mCls == 'HARD_ERROR': genLookupException1(c, 'true')
elif mCls == '': pass
@@ -237,8 +278,14 @@ def genErl(spec):
print 'lookup_amqp_exception(%s) -> {%s, ?%s, <<"%s">>};' % \
(n.lower(), hardErrorBoolStr, n, n)
+ def genAmqpException(c,v,cls):
+ n = erlangConstantName(c)
+ print 'amqp_exception(?%s) -> %s;' % \
+ (n, n.lower())
+
methods = spec.allMethods()
+ printFileHeader()
print """-module(rabbit_framing).
-include("rabbit_framing.hrl").
@@ -246,12 +293,14 @@ def genErl(spec):
-export([method_id/1]).
-export([method_has_content/1]).
+-export([is_method_synchronous/1]).
-export([method_fieldnames/1]).
-export([decode_method_fields/2]).
-export([decode_properties/2]).
-export([encode_method_fields/1]).
-export([encode_properties/1]).
-export([lookup_amqp_exception/1]).
+-export([amqp_exception/1]).
bitvalue(true) -> 1;
bitvalue(false) -> 0;
@@ -266,6 +315,9 @@ bitvalue(undefined) -> 0.
for m in methods: genMethodHasContent(m)
print "method_has_content(Name) -> exit({unknown_method_name, Name})."
+ for m in methods: genMethodIsSynchronous(m)
+ print "is_method_synchronous(Name) -> exit({unknown_method_name, Name})."
+
for m in methods: genMethodFieldNames(m)
print "method_fieldnames(Name) -> exit({unknown_method_name, Name})."
@@ -285,8 +337,10 @@ bitvalue(undefined) -> 0.
for (c,v,cls) in spec.constants: genLookupException(c,v,cls)
print "lookup_amqp_exception(Code) ->"
print " rabbit_log:warning(\"Unknown AMQP error code '~p'~n\", [Code]),"
- print " {true, ?INTERNAL_ERROR, <<\"INTERNAL_ERROR\">>}."
+ print " {true, ?INTERNAL_ERROR, <<\"INTERNAL_ERROR\">>}."
+ for(c,v,cls) in spec.constants: genAmqpException(c,v,cls)
+ print "amqp_exception(_Code) -> undefined."
def genHrl(spec):
def erlType(domain):
@@ -306,6 +360,7 @@ def genHrl(spec):
methods = spec.allMethods()
+ printFileHeader()
print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
print "-define(PROTOCOL_PORT, %d)." % (spec.port)
diff --git a/docs/rabbitmq-activate-plugins.1.pod b/docs/rabbitmq-activate-plugins.1.pod
index 58ffea79..42f0c4d2 100644
--- a/docs/rabbitmq-activate-plugins.1.pod
+++ b/docs/rabbitmq-activate-plugins.1.pod
@@ -26,7 +26,7 @@ execute:
=head1 SEE ALSO
L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>,
-L<rabbitmqctl(1)>
+L<rabbitmqctl(1)>, L<rabbitmq-deactivate-plugins(1)>
=head1 AUTHOR
diff --git a/docs/rabbitmq-deactivate-plugins.1.pod b/docs/rabbitmq-deactivate-plugins.1.pod
new file mode 100644
index 00000000..eb4fbb90
--- /dev/null
+++ b/docs/rabbitmq-deactivate-plugins.1.pod
@@ -0,0 +1,37 @@
+=head1 NAME
+
+rabbitmq-deactivate-plugins - command line tool for deactivating plugins
+in a RabbitMQ broker
+
+=head1 SYNOPSIS
+
+rabbitmq-deactivate-plugins
+
+=head1 DESCRIPTION
+
+RabbitMQ is an implementation of AMQP, the emerging standard for high
+performance enterprise messaging. The RabbitMQ server is a robust and
+scalable implementation of an AMQP broker.
+
+rabbitmq-deactivate-plugins is a command line tool for deactivating
+plugins installed into the broker.
+
+=head1 EXAMPLES
+
+To deactivate all of the installed plugins in the current RabbitMQ install,
+execute:
+
+ rabbitmq-deactivate-plugins
+
+=head1 SEE ALSO
+
+L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>,
+L<rabbitmqctl(1)>, L<rabbitmq-activate-plugins(1)>
+
+=head1 AUTHOR
+
+The RabbitMQ Team <info@rabbitmq.com>
+
+=head1 REFERENCES
+
+RabbitMQ Web Site: L<http://www.rabbitmq.com>
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 6d4aadeb..5255be28 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -198,9 +198,9 @@ whether the queue will be deleted when no longer used
queue arguments
-=item node
+=item pid
-node on which the process associated with the queue resides
+id of the Erlang process associated with the queue
=item messages_ready
@@ -279,7 +279,7 @@ exchange arguments
=item list_bindings [-p I<vhostpath>]
List bindings by virtual host. Each line printed describes a binding,
-with the exchange name, routing key, queue name and arguments,
+with the exchange name, queue name, routing key and arguments,
separated by tab characters.
=item list_connections [I<connectioninfoitem> ...]
@@ -287,7 +287,7 @@ separated by tab characters.
List queue information by virtual host. Each line printed describes an
connection, with the requested I<connectioninfoitem> values separated
by tab characters. If no I<connectioninfoitem>s are specified then
-I<user>, I<peer_address> and I<peer_port> are assumed.
+I<user>, I<peer_address>, I<peer_port> and I<state> are assumed.
=back
@@ -297,7 +297,7 @@ I<user>, I<peer_address> and I<peer_port> are assumed.
=item node
-node on which the process associated with the connection resides
+id of the Erlang process associated with the connection
=item address
@@ -340,6 +340,11 @@ connection timeout
maximum frame size (bytes)
+=item client_properties
+
+informational properties transmitted by the client during connection
+establishment
+
=item recv_oct
octets received
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 0057ea04..3616fcbf 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -15,9 +15,10 @@
%% actually want to start it
{mod, {rabbit, []}},
{env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
- {extra_startup_steps, []},
+ {ssl_listeners, []},
+ {ssl_options, []},
+ {vm_memory_high_watermark, 0.4},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},
- {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
- {memory_alarms, auto}]}]}.
+ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}.
diff --git a/generate_app b/generate_app
index 62301292..576b485e 100644
--- a/generate_app
+++ b/generate_app
@@ -1,10 +1,12 @@
#!/usr/bin/env escript
%% -*- erlang -*-
-main([BeamDir]) ->
+main([BeamDir, TargetFile]) ->
Modules = [list_to_atom(filename:basename(F, ".beam")) ||
F <- filelib:wildcard("*.beam", BeamDir)],
{ok, {application, Application, Properties}} = io:read(''),
NewProperties = lists:keyreplace(modules, 1, Properties,
{modules, Modules}),
- io:format("~p.", [{application, Application, NewProperties}]).
+ file:write_file(
+ TargetFile,
+ io_lib:format("~p.~n", [{application, Application, NewProperties}])).
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 784c21b3..4b157cbc 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -36,7 +36,7 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {user, timeout_sec, frame_max, vhost}).
+-record(connection, {user, timeout_sec, frame_max, vhost, client_properties}).
-record(content,
{class_id,
@@ -64,8 +64,11 @@
-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
+-record(amqp_error, {name, explanation, method = none}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -74,7 +77,8 @@
-type(maybe(T) :: T | 'none').
-type(erlang_node() :: atom()).
--type(socket() :: port()).
+-type(ssl_socket() :: #ssl_socket{}).
+-type(socket() :: port() | ssl_socket()).
-type(thunk(T) :: fun(() -> T)).
-type(info_key() :: atom()).
-type(info() :: {info_key(), any()}).
@@ -99,15 +103,15 @@
read :: regexp()}).
-type(amqqueue() ::
#amqqueue{name :: queue_name(),
- durable :: bool(),
- auto_delete :: bool(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
arguments :: amqp_table(),
pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
type :: exchange_type(),
- durable :: bool(),
- auto_delete :: bool(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
arguments :: amqp_table()}).
-type(binding() ::
#binding{exchange_name :: exchange_name(),
@@ -137,14 +141,14 @@
persistent_key :: maybe(pkey())}).
-type(message() :: basic_message()).
-type(delivery() ::
- #delivery{mandatory :: bool(),
- immediate :: bool(),
+ #delivery{mandatory :: boolean(),
+ immediate :: boolean(),
txn :: maybe(txn()),
sender :: pid(),
message :: message()}).
%% this really should be an abstract type
-type(msg_id() :: non_neg_integer()).
--type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}).
+-type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
-type(listener() ::
#listener{node :: erlang_node(),
protocol :: atom(),
@@ -152,7 +156,10 @@
port :: non_neg_integer()}).
-type(not_found() :: {'error', 'not_found'}).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-
+-type(amqp_error() ::
+ #amqp_error{name :: atom(),
+ explanation :: string(),
+ method :: atom()}).
-endif.
%%----------------------------------------------------------------------------
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
index f45fa6ca..a78c2301 100644
--- a/include/rabbit_framing_spec.hrl
+++ b/include/rabbit_framing_spec.hrl
@@ -50,8 +50,6 @@
%% TODO: make this more precise
-type(amqp_method_name() :: atom()).
-type(channel_number() :: non_neg_integer()).
-%% TODO: make this more precise
--type(amqp_error() :: {bool(), non_neg_integer(), binary()}).
-type(resource_name() :: binary()).
-type(routing_key() :: binary()).
-type(username() :: binary()).
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index fa2844fd..bc5b58ca 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -34,6 +34,8 @@ prepare:
-e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
SOURCES/rabbitmq-server.init
+ sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ SOURCES/rabbitmq-script-wrapper
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 7f442831..62fb1dfb 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -56,6 +56,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins
+install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
@@ -74,9 +75,8 @@ echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm
%pre
if [ $1 -gt 1 ]; then
- #Upgrade - stop and remove previous instance of rabbitmq-server init.d script
+ # Upgrade - stop previous instance of rabbitmq-server init.d script
/sbin/service rabbitmq-server stop
- /sbin/chkconfig --del rabbitmq-server
fi
# create rabbitmq group
@@ -118,6 +118,9 @@ fi
rm -rf %{buildroot}
%changelog
+* Mon Oct 5 2009 David Wragg <dpw@lshift.net> 1.7.0-1
+- New upstream release
+
* Wed Jun 17 2009 Matthias Radestock <matthias@lshift.net> 1.6.0-1
- New upstream release
diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper
index 0dd1c0fb..ee5947b6 100644
--- a/packaging/common/rabbitmq-asroot-script-wrapper
+++ b/packaging/common/rabbitmq-asroot-script-wrapper
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/bin/sh
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License at
@@ -30,24 +30,16 @@
## Contributor(s): ______________________________________.
##
-# Escape spaces and quotes, because shell is revolting.
-for arg in "$@" ; do
- # Escape quotes in parameters, so that they're passed through cleanly.
- arg=$(sed -e 's/"/\\"/' <<-END
- $arg
- END
- )
- CMDLINE="${CMDLINE} \"${arg}\""
-done
-
cd /var/lib/rabbitmq
SCRIPT=`basename $0`
if [ `id -u` = 0 ] ; then
- /usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}
+ /usr/lib/rabbitmq/bin/${SCRIPT} "$@"
else
- echo -e "\nOnly root should run ${SCRIPT}\n"
+ echo
+ echo "Only root should run ${SCRIPT}"
+ echo
exit 1
fi
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index f1a9b1ff..f66f8e59 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/bin/sh
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License at
@@ -33,7 +33,7 @@
# Escape spaces and quotes, because shell is revolting.
for arg in "$@" ; do
# Escape quotes in parameters, so that they're passed through cleanly.
- arg=$(sed -e 's/"/\\"/' <<-END
+ arg=$(sed -e 's/"/\\"/g' <<-END
$arg
END
)
@@ -45,10 +45,14 @@ cd /var/lib/rabbitmq
SCRIPT=`basename $0`
if [ `id -u` = 0 ] ; then
- su rabbitmq -s /bin/sh -c "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
+ @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
+elif [ `id -u` = `id -u rabbitmq` ] ; then
+ /usr/lib/rabbitmq/bin/${SCRIPT} "$@"
else
/usr/lib/rabbitmq/bin/${SCRIPT}
- echo -e "\nOnly root should run ${SCRIPT}\n"
+ echo
+ echo "Only root or rabbitmq should run ${SCRIPT}"
+ echo
exit 1
fi
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index e71562f8..39d23983 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -23,6 +23,7 @@ DESC=rabbitmq-server
USER=rabbitmq
NODE_COUNT=1
ROTATE_SUFFIX=
+INIT_LOG_DIR=/var/log/rabbitmq
DEFAULTS_FILE= # This is filled in when building packages
LOCK_FILE= # This is filled in when building packages
@@ -39,7 +40,7 @@ set -e
start_rabbitmq () {
set +e
- $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err
+ $DAEMON start_all ${NODE_COUNT} > ${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err
case "$?" in
0)
echo SUCCESS
@@ -47,11 +48,11 @@ start_rabbitmq () {
RETVAL=0
;;
1)
- echo TIMEOUT - check /var/log/rabbitmq/startup_\{log,err\}
+ echo TIMEOUT - check ${INIT_LOG_DIR}/startup_\{log,err\}
RETVAL=1
;;
*)
- echo FAILED - check /var/log/rabbitmq/startup_log, _err
+ echo FAILED - check ${INIT_LOG_DIR}/startup_log, _err
RETVAL=1
;;
esac
@@ -62,14 +63,12 @@ stop_rabbitmq () {
set +e
status_rabbitmq quiet
if [ $RETVAL = 0 ] ; then
- $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err
+ $DAEMON stop_all > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err
RETVAL=$?
if [ $RETVAL = 0 ] ; then
- # Try to stop epmd if run by the rabbitmq user
- pkill -u rabbitmq epmd || :
[ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE
else
- echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
+ echo FAILED - check ${INIT_LOG_DIR}/shutdown_log, _err
fi
else
echo No nodes running
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index dafaf9ce..ab05f732 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -26,6 +26,8 @@ package: clean
-e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
$(UNPACKED_DIR)/debian/rabbitmq-server.init
+ sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ $(UNPACKED_DIR)/debian/rabbitmq-script-wrapper
chmod a+x $(UNPACKED_DIR)/debian/rules
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index ac94c8a3..e4cfe7b5 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (1.7.0-1) intrepid; urgency=low
+
+ * New Upstream Release
+
+ -- David Wragg <dpw@lshift.net> Mon, 05 Oct 2009 13:44:41 +0100
+
rabbitmq-server (1.6.0-1) hardy; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/postrm b/packaging/debs/Debian/debian/postrm.in
index a999d95b..bfcf1f53 100644
--- a/packaging/debs/Debian/debian/postrm
+++ b/packaging/debs/Debian/debian/postrm.in
@@ -34,7 +34,15 @@ case "$1" in
if [ -d /etc/rabbitmq ]; then
rm -r /etc/rabbitmq
fi
+ # Remove traces of plugins
+ rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins
+ for ext in rel script boot ; do
+ rm -f @RABBIT_LIB@/ebin/rabbit.$ext
+ done
if getent passwd rabbitmq >/dev/null; then
+ # Stop epmd if run by the rabbitmq user
+ pkill -u rabbitmq epmd || :
+
deluser rabbitmq
fi
if getent group rabbitmq >/dev/null; then
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 365eea6e..3799c438 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -17,6 +17,7 @@ install/rabbitmq-server::
for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \
install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
- for script in rabbitmq-activate-plugins; do \
+ for script in rabbitmq-activate-plugins rabbitmq-deactivate-plugins; do \
install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
+ sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile
new file mode 100644
index 00000000..53d27f9b
--- /dev/null
+++ b/packaging/macports/Makefile
@@ -0,0 +1,58 @@
+TARBALL_DIR=../../dist
+TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz))
+COMMON_DIR=../common
+VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
+
+# The URL at which things really get deployed
+REAL_WEB_URL=http://www.rabbitmq.com/
+
+# The user@host for an OSX machine with macports installed, which is
+# used to generate the macports index files. That step will be
+# skipped if this variable is not set. If you do set it, you might
+# also want to set SSH_OPTS, which allows adding ssh options, e.g. to
+# specify a key that will get into the OSX machine without a
+# passphrase.
+MACPORTS_USERHOST=
+
+MACPORTS_DIR=macports
+DEST=$(MACPORTS_DIR)/net/rabbitmq-server
+
+all: macports
+
+dirs:
+ mkdir -p $(DEST)/files
+
+$(DEST)/Portfile: Portfile.in
+ for algo in md5 sha1 rmd160 ; do \
+ checksum=$$(openssl $$algo $(TARBALL_DIR)/$(TARBALL) | awk '{print $$NF}') ; \
+ echo "s|@$$algo@|$$checksum|g" ; \
+ done >checksums.sed
+ sed -e "s|@VERSION@|$(VERSION)|g;s|@BASE_URL@|$(REAL_WEB_URL)|g" \
+ -f checksums.sed <$^ >$@
+
+macports: dirs $(DEST)/Portfile
+ for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \
+ cp $(COMMON_DIR)/$$f $(DEST)/files ; \
+ done
+ sed -i -e 's|@SU_RABBITMQ_SH_C@|sudo -u rabbitmq -H /bin/sh -c|' \
+ $(DEST)/files/rabbitmq-script-wrapper
+ cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files
+
+# This target ssh's into the OSX host in order to finalize the
+# macports repo
+macports_index:
+ if [ -n "$(MACPORTS_USERHOST)" ] ; then \
+ tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \
+ d="/tmp/mkportindex.$$$$" ; \
+ mkdir $$d \
+ && cd $$d \
+ && tar xf - \
+ && /opt/local/bin/portindex -a -o . >/dev/null \
+ && tar cf - . \
+ && cd \
+ && rm -rf $$d' \
+ | tar xf - -C $(MACPORTS_DIR) ; \
+ fi
+
+clean:
+ rm -rf $(DEST) checksums.sed
diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/Portfile.in
index 1826d5c4..e1f58212 100644
--- a/packaging/macports/net/rabbitmq-server/Portfile
+++ b/packaging/macports/Portfile.in
@@ -3,10 +3,10 @@
PortSystem 1.0
name rabbitmq-server
-version 1.6.0
-revision 0
+version @VERSION@
+revision 1
categories net
-maintainers tonyg@rabbitmq.com
+maintainers rabbitmq.com:tonyg
platforms darwin
description The RabbitMQ AMQP Server
long_description \
@@ -15,17 +15,32 @@ long_description \
robust and scalable implementation of an AMQP broker.
-homepage http://www.rabbitmq.com/
-master_sites http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/
+homepage @BASE_URL@
+master_sites @BASE_URL@releases/rabbitmq-server/v${version}/
checksums \
- md5 af3b0d868d58e5aefb4f0837b82ca010 \
- sha1 1834c670d076fa9878223aacaa35a5a6528f1d86 \
- rmd160 d6c9de4e1fb48c6ceb1cb5d717ca2afb5e3266fe
+ md5 @md5@ \
+ sha1 @sha1@ \
+ rmd160 @rmd160@
-depends_build port:erlang port:py25-simplejson
+depends_build port:erlang
depends_run port:erlang
+platform darwin 7 {
+ depends_build-append port:py25-simplejson
+ build.args PYTHON=${prefix}/bin/python2.5
+}
+platform darwin 8 {
+ depends_build-append port:py25-simplejson
+ build.args PYTHON=${prefix}/bin/python2.5
+}
+platform darwin 9 {
+ depends_build-append port:py25-simplejson
+ build.args PYTHON=${prefix}/bin/python2.5
+}
+# no need for simplejson on Snow Leopard or higher
+
+
set serveruser rabbitmq
set servergroup rabbitmq
set serverhome ${prefix}/var/lib/rabbitmq
@@ -34,13 +49,12 @@ set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia
set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server
set sbindir ${destroot}${prefix}/lib/rabbitmq/bin
set wrappersbin ${destroot}${prefix}/sbin
+set realsbin ${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version}/sbin
use_configure no
use_parallel_build yes
-build.args PYTHON=${prefix}/bin/python2.5
-
destroot.destdir \
TARGET_DIR=${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version} \
SBIN_DIR=${sbindir} \
@@ -61,23 +75,23 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
- ${sbindir}/rabbitmq-env
+ ${realsbin}/rabbitmq-env
reinplace -E "s:(CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \
- ${sbindir}/rabbitmq-multi \
- ${sbindir}/rabbitmq-server \
- ${sbindir}/rabbitmqctl
+ ${realsbin}/rabbitmq-multi \
+ ${realsbin}/rabbitmq-server \
+ ${realsbin}/rabbitmqctl
reinplace -E "s:(LOG_BASE)=/:\\1=${prefix}/:" \
- ${sbindir}/rabbitmq-multi \
- ${sbindir}/rabbitmq-server \
- ${sbindir}/rabbitmqctl
+ ${realsbin}/rabbitmq-multi \
+ ${realsbin}/rabbitmq-server \
+ ${realsbin}/rabbitmqctl
reinplace -E "s:(MNESIA_BASE)=/:\\1=${prefix}/:" \
- ${sbindir}/rabbitmq-multi \
- ${sbindir}/rabbitmq-server \
- ${sbindir}/rabbitmqctl
+ ${realsbin}/rabbitmq-multi \
+ ${realsbin}/rabbitmq-server \
+ ${realsbin}/rabbitmqctl
reinplace -E "s:(PIDS_FILE)=/:\\1=${prefix}/:" \
- ${sbindir}/rabbitmq-multi \
- ${sbindir}/rabbitmq-server \
- ${sbindir}/rabbitmqctl
+ ${realsbin}/rabbitmq-multi \
+ ${realsbin}/rabbitmq-server \
+ ${realsbin}/rabbitmqctl
xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
${wrappersbin}/rabbitmq-multi
@@ -94,6 +108,7 @@ post-destroot {
${wrappersbin}/rabbitmq-activate-plugins
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
+ file copy ${wrappersbin}/rabbitmq-activate-plugins ${wrappersbin}/rabbitmq-deactivate-plugins
}
pre-install {
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper
deleted file mode 100644
index c4488dcb..00000000
--- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/bin/bash
-cd /var/lib/rabbitmq
-
-SCRIPT=`basename $0`
-
-if [ `id -u` = 0 ] ; then
- /usr/lib/rabbitmq/bin/${SCRIPT} "$@"
-else
- echo -e "\nOnly root should run ${SCRIPT}\n"
- exit 1
-fi
-
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper
deleted file mode 100644
index 0d7118c4..00000000
--- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper
+++ /dev/null
@@ -1,13 +0,0 @@
-#!/bin/bash
-cd /var/lib/rabbitmq
-
-SCRIPT=`basename $0`
-
-if [ `id -u` = 0 ] ; then
- sudo -u rabbitmq -H /usr/lib/rabbitmq/bin/${SCRIPT} "$@"
-else
- /usr/lib/rabbitmq/bin/${SCRIPT}
- echo -e "\nOnly root should run ${SCRIPT}\n"
- exit 1
-fi
-
diff --git a/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff b/packaging/macports/patch-org.macports.rabbitmq-server.plist.diff
index 45b49496..45b49496 100644
--- a/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff
+++ b/packaging/macports/patch-org.macports.rabbitmq-server.plist.diff
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index 387becb3..f17fe777 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -14,6 +14,7 @@ dist:
mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-activate-plugins.bat $(SOURCE_DIR)/sbin
+ mv $(SOURCE_DIR)/scripts/rabbitmq-deactivate-plugins.bat $(SOURCE_DIR)/sbin
rm -rf $(SOURCE_DIR)/scripts
rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile
rm -f $(SOURCE_DIR)/README
diff --git a/scripts/rabbitmq-activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat
index 3540bf2d..e7aa7095 100644
--- a/scripts/rabbitmq-activate-plugins.bat
+++ b/scripts/rabbitmq-activate-plugins.bat
@@ -30,6 +30,8 @@ REM
REM Contributor(s): ______________________________________.
REM
+setlocal
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
@@ -42,8 +44,17 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-set RABBITMQ_PLUGINS_DIR="%~dp0..\plugins"
-set RABBITMQ_PLUGINS_EXPAND_DIR="%~dp0..\priv\plugins"
-set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+set RABBITMQ_PLUGINS_DIR=%~dp0..\plugins
+set RABBITMQ_PLUGINS_EXPAND_DIR=%~dp0..\priv\plugins
+set RABBITMQ_EBIN_DIR=%~dp0..\ebin
+
+"%ERLANG_HOME%\bin\erl.exe" ^
+-pa "%RABBITMQ_EBIN_DIR%" ^
+-noinput -hidden ^
+-s rabbit_plugin_activator ^
+-rabbit plugins_dir \""%RABBITMQ_PLUGINS_DIR:\=/%"\" ^
+-rabbit plugins_expand_dir \""%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%"\" ^
+-rabbit rabbit_ebin \""%RABBITMQ_EBIN_DIR:\=/%"\" ^
+-extra %*
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -s rabbit_plugin_activator -rabbit plugins_dir \"%RABBITMQ_PLUGINS_DIR:\=/%\" -rabbit plugins_expand_dir \"%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%\" -rabbit rabbit_ebin \"%RABBITMQ_EBIN_DIR:\=/%\" -extra %*
+endlocal
diff --git a/scripts/rabbitmq-deactivate-plugins b/scripts/rabbitmq-deactivate-plugins
new file mode 100755
index 00000000..771c4734
--- /dev/null
+++ b/scripts/rabbitmq-deactivate-plugins
@@ -0,0 +1,37 @@
+#!/bin/sh
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2009 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+. `dirname $0`/rabbitmq-env
+
+RABBITMQ_EBIN=${RABBITMQ_HOME}/ebin
+
+rm -f ${RABBITMQ_EBIN}/rabbit.rel ${RABBITMQ_EBIN}/rabbit.script ${RABBITMQ_EBIN}/rabbit.boot
diff --git a/scripts/rabbitmq-deactivate-plugins.bat b/scripts/rabbitmq-deactivate-plugins.bat
new file mode 100644
index 00000000..40155183
--- /dev/null
+++ b/scripts/rabbitmq-deactivate-plugins.bat
@@ -0,0 +1,39 @@
+@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License at
+REM http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+REM License for the specific language governing rights and limitations
+REM under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developers of the Original Code are LShift Ltd,
+REM Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+REM Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Ltd. Portions created by Cohesive Financial Technologies LLC are
+REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
+REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM
+REM All Rights Reserved.
+REM
+REM Contributor(s): ______________________________________.
+REM
+
+setlocal
+
+set RABBITMQ_EBIN_DIR=%~dp0..\ebin
+
+del /f "%RABBITMQ_EBIN_DIR%"\rabbit.rel "%RABBITMQ_EBIN_DIR%"\rabbit.script "%RABBITMQ_EBIN_DIR%"\rabbit.boot
+
+endlocal
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 7db4cb70..a6eb102a 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -30,29 +30,45 @@
## Contributor(s): ______________________________________.
##
NODENAME=rabbit
-NODE_IP_ADDRESS=0.0.0.0
-NODE_PORT=5672
SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
MULTI_START_ARGS=
+CONFIG_FILE=/etc/rabbitmq/rabbitmq
. `dirname $0`/rabbitmq-env
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
+DEFAULT_NODE_IP_ADDRESS=0.0.0.0
+DEFAULT_NODE_PORT=5672
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
+if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
+then
+ if [ "x" != "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
+ fi
+else
+ if [ "x" = "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
+ fi
+fi
+[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME}
[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE}
[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS}
[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS}
+[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
export \
RABBITMQ_NODENAME \
RABBITMQ_NODE_IP_ADDRESS \
RABBITMQ_NODE_PORT \
RABBITMQ_SCRIPT_HOME \
- RABBITMQ_PIDS_FILE
+ RABBITMQ_PIDS_FILE \
+ RABBITMQ_CONFIG_FILE
+
+RABBITMQ_CONFIG_ARG=
+[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
# we need to turn off path expansion because some of the vars, notably
# RABBITMQ_MULTI_ERL_ARGS, may contain terms that look like globs and
@@ -65,6 +81,7 @@ exec erl \
-hidden \
${RABBITMQ_MULTI_ERL_ARGS} \
-sname rabbitmq_multi$$ \
+ ${RABBITMQ_CONFIG_ARG} \
-s rabbit_multi \
${RABBITMQ_MULTI_START_ARGS} \
-extra "$@"
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index 8abf13f1..6dda13af 100755
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -30,6 +30,8 @@ REM
REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_BASE%"=="" (
set RABBITMQ_BASE=%APPDATA%\RabbitMQ
)
@@ -39,20 +41,32 @@ if "%RABBITMQ_NODENAME%"=="" (
)
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
+if "%RABBITMQ_CONFIG_FILE%"=="" (
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+)
+
+if exist "%RABBITMQ_CONFIG_FILE%.config" (
+ set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+) else (
+ set RABBITMQ_CONFIG_ARG=
+)
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -61,5 +75,14 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_MULTI_ERL_ARGS% -sname rabbitmq_multi -s rabbit_multi %RABBITMQ_MULTI_START_ARGS% -extra %*
+"%ERLANG_HOME%\bin\erl.exe" ^
+-pa "%~dp0..\ebin" ^
+-noinput -hidden ^
+%RABBITMQ_MULTI_ERL_ARGS% ^
+-sname rabbitmq_multi ^
+%RABBITMQ_CONFIG_ARG% ^
+-s rabbit_multi ^
+%RABBITMQ_MULTI_START_ARGS% ^
+-extra %*
+endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 547220b4..cbc295f7 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -31,23 +31,35 @@
##
NODENAME=rabbit
-NODE_IP_ADDRESS=0.0.0.0
-NODE_PORT=5672
SERVER_ERL_ARGS="+K true +A30 \
-kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \
-kernel inet_default_connect_options [{nodelay,true}]"
CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config
+CONFIG_FILE=/etc/rabbitmq/rabbitmq
LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia
SERVER_START_ARGS=
. `dirname $0`/rabbitmq-env
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
+DEFAULT_NODE_IP_ADDRESS=0.0.0.0
+DEFAULT_NODE_PORT=5672
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
+if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
+then
+ if [ "x" != "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
+ fi
+else
+ if [ "x" = "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
+ fi
+fi
+[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE}
+[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE}
[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE}
[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
@@ -73,16 +85,22 @@ else
fi
RABBITMQ_START_RABBIT=
-[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit'
+[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
-if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then
+if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ] && [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit"
RABBITMQ_EBIN_PATH=""
else
RABBITMQ_BOOT_FILE=start_sasl
RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}"
+ [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="${RABBITMQ_START_RABBIT} -s rabbit"
fi
+RABBITMQ_CONFIG_ARG=
+[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
+
+RABBITMQ_LISTEN_ARG=
+[ "x" != "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_LISTEN_ARG="-rabbit tcp_listeners [{\""${RABBITMQ_NODE_IP_ADDRESS}"\","${RABBITMQ_NODE_PORT}"}]"
# we need to turn off path expansion because some of the vars, notably
# RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and
@@ -94,18 +112,16 @@ exec erl \
${RABBITMQ_START_RABBIT} \
-sname ${RABBITMQ_NODENAME} \
-boot ${RABBITMQ_BOOT_FILE} \
+ ${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
- -rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \
+ ${RABBITMQ_LISTEN_ARG} \
-sasl errlog_type error \
-kernel error_logger '{file,"'${RABBITMQ_LOGS}'"}' \
-sasl sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \
-os_mon start_cpu_sup true \
-os_mon start_disksup false \
-os_mon start_memsup false \
- -os_mon start_os_sup false \
- -os_mon memsup_system_only true \
- -os_mon system_memory_high_watermark 0.95 \
-mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
${RABBITMQ_CLUSTER_CONFIG_OPTION} \
${RABBITMQ_SERVER_START_ARGS} \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index a784fee3..51102851 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -30,6 +30,8 @@ REM
REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_BASE%"=="" (
set RABBITMQ_BASE=%APPDATA%\RabbitMQ
)
@@ -39,17 +41,19 @@ if "%RABBITMQ_NODENAME%"=="" (
)
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -73,17 +77,17 @@ rem Log management (rotation, filtering based of size...) is left as an exercice
set BACKUP_EXTENSION=.1
-set LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log"
-set SASL_LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log"
+set LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log
+set SASL_LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log
-set LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%"
-set SASL_LOGS_BAKCUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%"
+set LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%
+set SASL_LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%
-if exist %LOGS% (
- type %LOGS% >> %LOGS_BACKUP%
+if exist "%LOGS%" (
+ type "%LOGS%" >> "%LOGS_BACKUP%"
)
-if exist %SASL_LOGS% (
- type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
+if exist "%SASL_LOGS%" (
+ type "%SASL_LOGS%" >> "%SASL_LOGS_BACKUP%"
)
rem End of log management
@@ -103,24 +107,41 @@ if "%RABBITMQ_MNESIA_DIR%"=="" (
set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
- set RABBITMQ_BOOT_FILE="%RABBITMQ_EBIN_ROOT%\rabbit"
+ set RABBITMQ_BOOT_FILE=%RABBITMQ_EBIN_ROOT%\rabbit
set RABBITMQ_EBIN_PATH=
) else (
set RABBITMQ_BOOT_FILE=start_sasl
set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
)
+if "%RABBITMQ_CONFIG_FILE%"=="" (
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+)
+
+if exist "%RABBITMQ_CONFIG_FILE%.config" (
+ set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+) else (
+ set RABBITMQ_CONFIG_ARG=
+)
+
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners [{\""%RABBITMQ_NODE_IP_ADDRESS%"\","%RABBITMQ_NODE_PORT%"}]
+ )
+)
"%ERLANG_HOME%\bin\erl.exe" ^
%RABBITMQ_EBIN_PATH% ^
-noinput ^
--boot %RABBITMQ_BOOT_FILE% ^
+-boot "%RABBITMQ_BOOT_FILE%" ^
+%RABBITMQ_CONFIG_ARG% ^
-sname %RABBITMQ_NODENAME% ^
-s rabbit ^
+W w ^
+A30 ^
-kernel inet_default_listen_options "[{nodelay, true}, {sndbuf, 16384}, {recbuf, 4096}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
@@ -128,10 +149,9 @@ if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--os_mon start_os_sup false ^
--os_mon memsup_system_only true ^
--os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBITMQ_SERVER_START_ARGS% ^
%*
+
+endlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 29be1742..46681125 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -30,6 +30,8 @@ REM
REM Contributor(s): ______________________________________.
REM
+setlocal
+
if "%RABBITMQ_SERVICENAME%"=="" (
set RABBITMQ_SERVICENAME=RabbitMQ
)
@@ -43,15 +45,17 @@ if "%RABBITMQ_NODENAME%"=="" (
)
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if "%ERLANG_SERVICE_MANAGER_PATH%"=="" (
- set ERLANG_SERVICE_MANAGER_PATH=C:\Program Files\erl5.5.5\erts-5.5.5\bin
+ set ERLANG_SERVICE_MANAGER_PATH=C:\Program Files\erl5.6.5\erts-5.6.5\bin
)
set CONSOLE_FLAG=
@@ -69,7 +73,7 @@ if not exist "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" (
echo ERLANG_SERVICE_MANAGER_PATH not set correctly.
echo **********************************************
echo.
- echo %ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe not found!
+ echo "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" not found!
echo Please set ERLANG_SERVICE_MANAGER_PATH to the folder containing "erlsrv.exe".
echo.
exit /B 1
@@ -91,17 +95,17 @@ rem Log management (rotation, filtering based on size...) is left as an exercise
set BACKUP_EXTENSION=.1
-set LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log"
-set SASL_LOGS="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log"
+set LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log
+set SASL_LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log
-set LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%"
-set SASL_LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%"
+set LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%
+set SASL_LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%
-if exist %LOGS% (
- type %LOGS% >> %LOGS_BACKUP%
+if exist "%LOGS%" (
+ type "%LOGS%" >> "%LOGS_BACKUP%"
)
-if exist %SASL_LOGS% (
- type %SASL_LOGS% >> %SASL_LOGS_BACKUP%
+if exist "%SASL_LOGS%" (
+ type "%SASL_LOGS%" >> "%SASL_LOGS_BACKUP%"
)
rem End of log management
@@ -156,26 +160,49 @@ if errorlevel 1 (
echo %RABBITMQ_SERVICENAME% service is already present - only updating service parameters
)
-set RABBIT_EBIN=%~dp0..\ebin
+set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
+if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
+ echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
+ set RABBITMQ_BOOT_FILE=%RABBITMQ_EBIN_ROOT%\rabbit
+ set RABBITMQ_EBIN_PATH=
+) else (
+ set RABBITMQ_BOOT_FILE=start_sasl
+ set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
+)
+if "%RABBITMQ_CONFIG_FILE%"=="" (
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+)
+
+if exist "%RABBITMQ_CONFIG_FILE%.config" (
+ set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+) else (
+ set RABBITMQ_CONFIG_ARG=
+)
+
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]"
+ )
+)
set ERLANG_SERVICE_ARGUMENTS= ^
--pa "%RABBIT_EBIN%" ^
--boot start_sasl ^
+%RABBITMQ_EBIN_PATH% ^
+-boot "%RABBITMQ_BOOT_FILE%" ^
+%RABBITMQ_CONFIG_ARG% ^
-s rabbit ^
+W w ^
+A30 ^
-kernel inet_default_listen_options "[{nodelay,true},{sndbuf,16384},{recbuf,4096}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\",%RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
+%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
-sasl sasl_error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--os_mon start_os_sup false ^
--os_mon memsup_system_only true ^
--os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBITMQ_SERVER_START_ARGS% ^
@@ -186,7 +213,7 @@ set ERLANG_SERVICE_ARGUMENTS=%ERLANG_SERVICE_ARGUMENTS:"=\"%
"%ERLANG_SERVICE_MANAGER_PATH%\erlsrv" set %RABBITMQ_SERVICENAME% ^
-machine "%ERLANG_SERVICE_MANAGER_PATH%\erl.exe" ^
--env ERL_CRASH_DUMP="%RABBITMQ_BASE_UNIX%/log" ^
+-env ERL_CRASH_DUMP="%RABBITMQ_BASE_UNIX%/erl_crash.dump" ^
-workdir "%RABBITMQ_BASE%" ^
-stopaction "rabbit:stop_and_halt()." ^
-sname %RABBITMQ_NODENAME% ^
@@ -202,3 +229,5 @@ goto END
:END
+
+endlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 9c45e73d..a332afc6 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,8 +30,11 @@
## Contributor(s): ______________________________________.
##
+NODENAME=rabbit
+
. `dirname $0`/rabbitmq-env
+[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
exec erl \
@@ -41,4 +44,6 @@ exec erl \
${RABBITMQ_CTL_ERL_ARGS} \
-sname rabbitmqctl$$ \
-s rabbit_control \
+ -nodename $RABBITMQ_NODENAME \
-extra "$@"
+
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 5111724f..512e8587 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -30,6 +30,12 @@ REM
REM Contributor(s): ______________________________________.
REM
+setlocal
+
+if "%RABBITMQ_NODENAME%"=="" (
+ set RABBITMQ_NODENAME=rabbit
+)
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
@@ -42,4 +48,6 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -nodename %RABBITMQ_NODENAME% -extra %*
+
+endlocal
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 36fb4fa8..53edf8de 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -180,6 +180,20 @@
-import(error_logger, [format/2]).
%%%=========================================================================
+%%% Specs. These exist only to shut up dialyzer's warnings
+%%%=========================================================================
+
+-ifdef(use_specs).
+
+-spec(handle_common_termination/6 ::
+ (any(), any(), any(), atom(), any(), any()) -> no_return()).
+
+-spec(hibernate/7 ::
+ (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()).
+
+-endif.
+
+%%%=========================================================================
%%% API
%%%=========================================================================
@@ -437,7 +451,10 @@ unregister_name({local,Name}) ->
unregister_name({global,Name}) ->
_ = global:unregister_name(Name);
unregister_name(Pid) when is_pid(Pid) ->
- Pid.
+ Pid;
+% Under R12 let's just ignore it, as we have a single term as Name.
+% On R13 it will never get here, as we get tuple with 'local/global' atom.
+unregister_name(_Name) -> ok.
extend_backoff(undefined) ->
undefined;
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index c74b39a9..74b41a91 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -67,8 +67,8 @@
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
-spec(new/0 :: () -> pqueue()).
--spec(is_queue/1 :: (any()) -> bool()).
--spec(is_empty/1 :: (pqueue()) -> bool()).
+-spec(is_queue/1 :: (any()) -> boolean()).
+-spec(is_empty/1 :: (pqueue()) -> boolean()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 02dd00a3..88b8e7a4 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -33,12 +33,110 @@
-behaviour(application).
--export([start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]).
+-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]).
-export([start/2, stop/1]).
-export([log_location/1]).
+%%---------------------------------------------------------------------------
+%% Boot steps.
+-export([maybe_insert_default_data/0]).
+
+-rabbit_boot_step({codec_correctness_check,
+ [{description, "codec correctness check"},
+ {mfa, {rabbit_binary_generator,
+ check_empty_content_body_frame_size,
+ []}}]}).
+
+-rabbit_boot_step({database,
+ [{mfa, {rabbit_mnesia, init, []}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_log,
+ [{description, "logging server"},
+ {mfa, {rabbit_sup, start_child, [rabbit_log]}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_hooks,
+ [{description, "internal event notification system"},
+ {mfa, {rabbit_hooks, start, []}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({kernel_ready,
+ [{description, "kernel ready"}]}).
+
+-rabbit_boot_step({rabbit_alarm,
+ [{description, "alarm handler"},
+ {mfa, {rabbit_alarm, start, []}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_amqqueue_sup,
+ [{description, "queue supervisor"},
+ {mfa, {rabbit_amqqueue, start, []}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_router,
+ [{description, "cluster router"},
+ {mfa, {rabbit_sup, start_child, [rabbit_router]}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_node_monitor,
+ [{description, "node monitor"},
+ {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}},
+ {post, kernel_ready},
+ {post, rabbit_amqqueue_sup},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({core_initialized,
+ [{description, "core initialized"}]}).
+
+-rabbit_boot_step({empty_db_check,
+ [{description, "empty DB check"},
+ {mfa, {?MODULE, maybe_insert_default_data, []}},
+ {post, core_initialized}]}).
+
+-rabbit_boot_step({exchange_recovery,
+ [{description, "exchange recovery"},
+ {mfa, {rabbit_exchange, recover, []}},
+ {post, empty_db_check}]}).
+
+-rabbit_boot_step({queue_recovery,
+ [{description, "queue recovery"},
+ {mfa, {rabbit_amqqueue, recover, []}},
+ {post, exchange_recovery}]}).
+
+-rabbit_boot_step({persister,
+ [{mfa, {rabbit_sup, start_child, [rabbit_persister]}},
+ {post, queue_recovery}]}).
+
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
+ {mfa, {rabbit_sup, start_child, [rabbit_guid]}},
+ {post, persister},
+ {pre, routing_ready}]}).
+
+-rabbit_boot_step({routing_ready,
+ [{description, "message delivery logic ready"}]}).
+
+-rabbit_boot_step({log_relay,
+ [{description, "error log relay"},
+ {mfa, {rabbit_error_logger, boot, []}},
+ {post, routing_ready}]}).
+
+-rabbit_boot_step({networking,
+ [{mfa, {rabbit_networking, boot, []}},
+ {post, log_relay},
+ {pre, networking_listening}]}).
+
+-rabbit_boot_step({networking_listening,
+ [{description, "network listeners available"}]}).
+
+%%---------------------------------------------------------------------------
+
-import(application).
-import(mnesia).
-import(lists).
@@ -57,6 +155,7 @@
-type(log_location() :: 'tty' | 'undefined' | string()).
-type(file_suffix() :: binary()).
+-spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> 'ok').
@@ -71,10 +170,13 @@
%%----------------------------------------------------------------------------
+prepare() ->
+ ok = ensure_working_log_handlers(),
+ ok = rabbit_mnesia:ensure_mnesia_dir().
+
start() ->
try
- ok = ensure_working_log_handlers(),
- ok = rabbit_mnesia:ensure_mnesia_dir(),
+ ok = prepare(),
ok = rabbit_misc:start_applications(?APPS)
after
%%give the error loggers some time to catch up
@@ -111,88 +213,126 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
start(normal, []) ->
-
{ok, SupPid} = rabbit_sup:start_link(),
print_banner(),
-
- {ok, ExtraSteps} = application:get_env(extra_startup_steps),
-
- lists:foreach(
- fun ({Msg, Thunk}) ->
- io:format("starting ~-20s ...", [Msg]),
- Thunk(),
- io:format("done~n");
- ({Msg, M, F, A}) ->
- io:format("starting ~-20s ...", [Msg]),
- apply(M, F, A),
- io:format("done~n")
- end,
- [{"database",
- fun () -> ok = rabbit_mnesia:init() end},
- {"core processes",
- fun () ->
- ok = start_child(rabbit_log),
- ok = rabbit_hooks:start(),
-
- ok = rabbit_amqqueue:start(),
-
- {ok, MemoryAlarms} = application:get_env(memory_alarms),
- ok = rabbit_alarm:start(MemoryAlarms),
-
- ok = rabbit_binary_generator:
- check_empty_content_body_frame_size(),
-
- ok = start_child(rabbit_presence),
- ok = start_child(rabbit_router),
- ok = start_child(rabbit_node_monitor)
- end},
- {"recovery",
- fun () ->
- ok = maybe_insert_default_data(),
- ok = rabbit_exchange:recover(),
- ok = rabbit_amqqueue:recover()
- end},
- {"persister",
- fun () ->
- ok = start_child(rabbit_persister)
- end},
- {"guid generator",
- fun () ->
- ok = start_child(rabbit_guid)
- end},
- {"builtin applications",
- fun () ->
- {ok, DefaultVHost} = application:get_env(default_vhost),
- ok = error_logger:add_report_handler(
- rabbit_error_logger, [DefaultVHost]),
- ok = start_builtin_amq_applications()
- end},
- {"TCP listeners",
- fun () ->
- ok = rabbit_networking:start(),
- {ok, TCPListeners} = application:get_env(tcp_listeners),
- lists:foreach(
- fun ({Host, Port}) ->
- ok = rabbit_networking:start_tcp_listener(Host, Port)
- end,
- TCPListeners)
- end}]
- ++ ExtraSteps),
-
+ [ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
{ok, SupPid}.
+
stop(_State) ->
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
+ ok = case rabbit_mnesia:is_clustered() of
+ true -> rabbit_amqqueue:on_node_down(node());
+ false -> rabbit_mnesia:empty_ram_only_tables()
+ end,
ok.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
+
+boot_error(Format, Args) ->
+ io:format("BOOT ERROR: " ++ Format, Args),
+ error_logger:error_msg(Format, Args),
+ timer:sleep(1000),
+ exit({?MODULE, failure_during_boot}).
+
+run_boot_step({StepName, Attributes}) ->
+ Description = case lists:keysearch(description, 1, Attributes) of
+ {value, {_, D}} -> D;
+ false -> StepName
+ end,
+ case [MFA || {mfa, MFA} <- Attributes] of
+ [] ->
+ io:format("-- ~s~n", [Description]);
+ MFAs ->
+ io:format("starting ~-40s ...", [Description]),
+ [case catch apply(M,F,A) of
+ {'EXIT', Reason} ->
+ boot_error("FAILED~nReason: ~p~n", [Reason]);
+ ok ->
+ ok
+ end || {M,F,A} <- MFAs],
+ io:format("done~n"),
+ ok
+ end.
+
+boot_steps() ->
+ AllApps = [App || {App, _, _} <- application:loaded_applications()],
+ Modules = lists:usort(
+ lists:append([Modules
+ || {ok, Modules} <-
+ [application:get_key(App, modules)
+ || App <- AllApps]])),
+ UnsortedSteps =
+ lists:flatmap(fun (Module) ->
+ [{StepName, Attributes}
+ || {rabbit_boot_step, [{StepName, Attributes}]}
+ <- Module:module_info(attributes)]
+ end, Modules),
+ sort_boot_steps(UnsortedSteps).
+
+sort_boot_steps(UnsortedSteps) ->
+ G = digraph:new([acyclic]),
+
+ %% Add vertices, with duplicate checking.
+ [case digraph:vertex(G, StepName) of
+ false -> digraph:add_vertex(G, StepName, Step);
+ _ -> boot_error("Duplicate boot step name: ~w~n", [StepName])
+ end || Step = {StepName, _Attrs} <- UnsortedSteps],
+
+ %% Add edges, detecting cycles and missing vertices.
+ lists:foreach(fun ({StepName, Attributes}) ->
+ [add_boot_step_dep(G, StepName, PrecedingStepName)
+ || {post, PrecedingStepName} <- Attributes],
+ [add_boot_step_dep(G, SucceedingStepName, StepName)
+ || {pre, SucceedingStepName} <- Attributes]
+ end, UnsortedSteps),
+
+ %% Use topological sort to find a consistent ordering (if there is
+ %% one, otherwise fail).
+ SortedStepsRev = [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)],
+ SortedSteps = lists:reverse(SortedStepsRev),
+
+ digraph:delete(G),
+
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}}
+ || {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error("Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end.
+
+add_boot_step_dep(G, RunsSecond, RunsFirst) ->
+ case digraph:add_edge(G, RunsSecond, RunsFirst) of
+ {error, Reason} ->
+ boot_error("Could not add boot step dependency of ~w on ~w:~n~s",
+ [RunsSecond, RunsFirst,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next])
+ || Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end]);
+ _ ->
+ ok
+ end.
+
+%%---------------------------------------------------------------------------
log_location(Type) ->
- case application:get_env(Type, case Type of
+ case application:get_env(Type, case Type of
kernel -> error_logger;
sasl -> sasl_error_logger
end) of
@@ -204,6 +344,16 @@ log_location(Type) ->
_ -> undefined
end.
+app_location() ->
+ {ok, Application} = application:get_application(),
+ filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")).
+
+home_dir() ->
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> Home;
+ Other -> Other
+ end.
+
%---------------------------------------------------------------------------
print_banner() ->
@@ -226,21 +376,18 @@ print_banner() ->
[Product, string:right([$v|Version], ProductLen),
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
- Settings = [{"node", node()},
- {"log", log_location(kernel)},
- {"sasl log", log_location(sasl)},
- {"database dir", rabbit_mnesia:dir()}],
+ Settings = [{"node", node()},
+ {"app descriptor", app_location()},
+ {"home dir", home_dir()},
+ {"cookie hash", rabbit_misc:cookie_hash()},
+ {"log", log_location(kernel)},
+ {"sasl log", log_location(sasl)},
+ {"database dir", rabbit_mnesia:dir()}],
DescrLen = lists:max([length(K) || {K, _V} <- Settings]),
Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
io:nl().
-start_child(Mod) ->
- {ok,_} = supervisor:start_child(rabbit_sup,
- {Mod, {Mod, start_link, []},
- transient, 100, worker, [Mod]}),
- ok.
-
ensure_working_log_handlers() ->
Handlers = gen_event:which_handlers(error_logger),
ok = ensure_working_log_handler(error_logger_file_h,
@@ -266,7 +413,7 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
throw({error, {cannot_log_to_tty,
TTYHandler, not_installed}})
end;
- _ -> case lists:member(NewFHandler, Handlers) of
+ _ -> case lists:member(NewFHandler, Handlers) of
true -> ok;
false -> case rotate_logs(LogLocation, "",
OldFHandler, NewFHandler) of
@@ -298,12 +445,6 @@ insert_default_data() ->
DefaultReadPerm),
ok.
-start_builtin_amq_applications() ->
- %%TODO: we may want to create a separate supervisor for these so
- %%they don't bring down the entire app when they die and fail to
- %%restart
- ok.
-
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 309c9a0e..534409aa 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -33,24 +33,19 @@
-behaviour(gen_event).
--export([start/1, stop/0, register/2]).
+-export([start/0, stop/0, register/2]).
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
--define(MEMSUP_CHECK_INTERVAL, 1000).
-
-%% OSes on which we know memory alarms to be trustworthy
--define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]).
-
--record(alarms, {alertees, system_memory_high_watermark = false}).
+-record(alarms, {alertees, vm_memory_high_watermark = false}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(mfa_tuple() :: {atom(), atom(), list()}).
--spec(start/1 :: (bool() | 'auto') -> 'ok').
+-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
@@ -58,20 +53,16 @@
%%----------------------------------------------------------------------------
-start(MemoryAlarms) ->
- EnableAlarms = case MemoryAlarms of
- true -> true;
- false -> false;
- auto -> lists:member(os:type(), ?SUPPORTED_OS)
- end,
- ok = alarm_handler:add_alarm_handler(?MODULE, [EnableAlarms]),
- case whereis(memsup) of
- undefined -> if EnableAlarms -> ok = start_memsup(),
- ok = adjust_memsup_interval();
- true -> ok
- end;
- _ -> ok = adjust_memsup_interval()
- end.
+start() ->
+ ok = alarm_handler:add_alarm_handler(?MODULE, []),
+ {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
+ ok = case MemoryWatermark == 0 of
+ true ->
+ ok;
+ false ->
+ rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark])
+ end,
+ ok.
stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
@@ -83,43 +74,33 @@ register(Pid, HighMemMFA) ->
%%----------------------------------------------------------------------------
-init([MemoryAlarms]) ->
- {ok, #alarms{alertees = case MemoryAlarms of
- true -> dict:new();
- false -> undefined
- end}}.
+init([]) ->
+ {ok, #alarms{alertees = dict:new()}}.
-handle_call({register, _Pid, _HighMemMFA},
- State = #alarms{alertees = undefined}) ->
- {ok, ok, State};
-handle_call({register, Pid, HighMemMFA},
+handle_call({register, Pid, {M, F, A} = HighMemMFA},
State = #alarms{alertees = Alertess}) ->
_MRef = erlang:monitor(process, Pid),
- case State#alarms.system_memory_high_watermark of
- true -> {M, F, A} = HighMemMFA,
- ok = erlang:apply(M, F, A ++ [Pid, true]);
- false -> ok
- end,
+ ok = case State#alarms.vm_memory_high_watermark of
+ true -> apply(M, F, A ++ [Pid, true]);
+ false -> ok
+ end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
{ok, ok, State#alarms{alertees = NewAlertees}};
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
+handle_event({set_alarm, {vm_memory_high_watermark, []}}, State) ->
ok = alert(true, State#alarms.alertees),
- {ok, State#alarms{system_memory_high_watermark = true}};
+ {ok, State#alarms{vm_memory_high_watermark = true}};
-handle_event({clear_alarm, system_memory_high_watermark}, State) ->
+handle_event({clear_alarm, vm_memory_high_watermark}, State) ->
ok = alert(false, State#alarms.alertees),
- {ok, State#alarms{system_memory_high_watermark = false}};
+ {ok, State#alarms{vm_memory_high_watermark = false}};
handle_event(_Event, State) ->
{ok, State}.
-handle_info({'DOWN', _MRef, process, _Pid, _Reason},
- State = #alarms{alertees = undefined}) ->
- {ok, State};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #alarms{alertees = Alertess}) ->
{ok, State#alarms{alertees = dict:erase(Pid, Alertess)}};
@@ -134,57 +115,6 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-
-start_memsup() ->
- {Mod, Args} =
- case os:type() of
- %% memsup doesn't take account of buffers or cache when
- %% considering "free" memory - therefore on Linux we can
- %% get memory alarms very easily without any pressure
- %% existing on memory at all. Therefore we need to use
- %% our own simple memory monitor.
- %%
- {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]};
- {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]};
-
- %% Start memsup programmatically rather than via the
- %% rabbitmq-server script. This is not quite the right
- %% thing to do as os_mon checks to see if memsup is
- %% available before starting it, but as memsup is
- %% available everywhere (even on VXWorks) it should be
- %% ok.
- %%
- %% One benefit of the programmatic startup is that we
- %% can add our alarm_handler before memsup is running,
- %% thus ensuring that we notice memory alarms that go
- %% off on startup.
- %%
- _ -> {memsup, []}
- end,
- %% This is based on os_mon:childspec(memsup, true)
- {ok, _} = supervisor:start_child(
- os_mon_sup,
- {memsup, {Mod, start_link, Args},
- permanent, 2000, worker, [Mod]}),
- ok.
-
-adjust_memsup_interval() ->
- %% The default memsup check interval is 1 minute, which is way too
- %% long - rabbit can gobble up all memory in a matter of seconds.
- %% Unfortunately the memory_check_interval configuration parameter
- %% and memsup:set_check_interval/1 function only provide a
- %% granularity of minutes. So we have to peel off one layer of the
- %% API to get to the underlying layer which operates at the
- %% granularity of milliseconds.
- %%
- %% Note that the new setting will only take effect after the first
- %% check has completed, i.e. after one minute. So if rabbit eats
- %% all the memory within the first minute after startup then we
- %% are out of luck.
- ok = os_mon:call(memsup,
- {set_check_interval, ?MEMSUP_CHECK_INTERVAL},
- infinity).
-
alert(_Alert, undefined) ->
ok;
alert(Alert, Alertees) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d89779f7..aa24a99f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -63,7 +63,7 @@
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
+-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
@@ -83,8 +83,8 @@
{'error', 'in_use'} |
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), delivery()) -> bool()).
--spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
+-spec(deliver/2 :: (pid(), delivery()) -> boolean()).
+-spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
@@ -92,16 +92,16 @@
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
--spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
+-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/8 ::
- (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
+ (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
+-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -163,15 +163,23 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
case rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> ok = store_queue(Q),
- case WantDefaultBinding of
- true -> add_default_binding(Q);
- false -> ok
- end,
- Q;
- [ExistingQ] -> ExistingQ
+ [] ->
+ case mnesia:read(
+ {rabbit_durable_queue, QueueName}) of
+ [] -> ok = store_queue(Q),
+ case WantDefaultBinding of
+ true -> add_default_binding(Q);
+ false -> ok
+ end,
+ Q;
+ [_] -> not_found %% existing Q on stopped node
+ end;
+ [ExistingQ] ->
+ ExistingQ
end
end) of
+ not_found -> exit(Q#amqqueue.pid, shutdown),
+ rabbit_misc:not_found(QueueName);
Q -> rabbit_hooks:trigger(queue_create, [QueueName]),
Q;
ExistingQ -> exit(Q#amqqueue.pid, shutdown),
@@ -259,7 +267,7 @@ requeue(QPid, MsgIds, ChPid) ->
gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
+ gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
safe_pmap_ok(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe2e8509..80b7a92c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -281,13 +281,13 @@ possibly_unblock(State, ChPid, Update) ->
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
- unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
+ unblock -> {NewBlockedConsumers, NewActiveConsumers} =
move_consumers(ChPid,
State#q.blocked_consumers,
State#q.active_consumers),
run_poke_burst(
State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedeConsumers})
+ blocked_consumers = NewBlockedConsumers})
end
end.
@@ -297,7 +297,7 @@ should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
- not_found -> noreply(State);
+ not_found -> {ok, State};
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
unacked_messages = UAM} ->
erlang:demonitor(MonitorRef),
@@ -321,8 +321,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
blocked_consumers = remove_consumers(
ChPid, State#q.blocked_consumers)}),
case should_auto_delete(NewState) of
- false -> noreply(NewState);
- true -> {stop, normal, NewState}
+ false -> {ok, NewState};
+ true -> {stop, NewState}
end
end.
@@ -576,10 +576,16 @@ handle_call({commit, Txn}, From, State) ->
erase_tx(Txn),
noreply(NewState);
-handle_call({notify_down, ChPid}, From, State) ->
- %% optimisation: we reply straight away so the sender can continue
- gen_server2:reply(From, ok),
- handle_ch_down(ChPid, State);
+handle_call({notify_down, ChPid}, _From, State) ->
+ %% we want to do this synchronously, so that auto_deleted queues
+ %% are no longer visible by the time we send a response to the
+ %% client. The queue is ultimately deleted in terminate/2; if we
+ %% return stop with a reply, terminate/2 will be called by
+ %% gen_server2 *before* the reply is sent.
+ case handle_ch_down(ChPid, State) of
+ {ok, NewState} -> reply(ok, NewState);
+ {stop, NewState} -> {stop, normal, ok, NewState}
+ end;
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
@@ -813,7 +819,10 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
NewState = State#q{owner = none},
{stop, normal, NewState};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
- handle_ch_down(DownPid, State);
+ case handle_ch_down(DownPid, State) of
+ {ok, NewState} -> noreply(NewState);
+ {stop, NewState} -> {stop, normal, NewState}
+ end;
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 4033aaaf..bec2cd08 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -45,13 +45,14 @@
-type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())).
-spec(publish/1 :: (delivery()) -> publish_result()).
--spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
+-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
+ delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> message()).
-spec(properties/1 :: (properties_input()) -> amqp_properties()).
-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> publish_result()).
--spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(),
+-spec(publish/7 :: (exchange_name(), routing_key(), boolean(), boolean(),
maybe(txn()), properties_input(), binary()) ->
publish_result()).
-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 6cfa9e6d..01ac4f02 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -132,52 +132,66 @@ create_frame(TypeInt, ChannelInt, Payload) ->
%% I, D, T and F, as well as the QPid extensions b, d, f, l, s, t, x,
%% and V.
-table_field_to_binary({FName, longstr, Value}) ->
- [short_string_to_binary(FName), "S", long_string_to_binary(Value)];
+table_field_to_binary({FName, Type, Value}) ->
+ [short_string_to_binary(FName) | field_value_to_binary(Type, Value)].
-table_field_to_binary({FName, signedint, Value}) ->
- [short_string_to_binary(FName), "I", <<Value:32/signed>>];
+field_value_to_binary(longstr, Value) ->
+ ["S", long_string_to_binary(Value)];
-table_field_to_binary({FName, decimal, {Before, After}}) ->
- [short_string_to_binary(FName), "D", Before, <<After:32>>];
+field_value_to_binary(signedint, Value) ->
+ ["I", <<Value:32/signed>>];
-table_field_to_binary({FName, timestamp, Value}) ->
- [short_string_to_binary(FName), "T", <<Value:64>>];
+field_value_to_binary(decimal, {Before, After}) ->
+ ["D", Before, <<After:32>>];
-table_field_to_binary({FName, table, Value}) ->
- [short_string_to_binary(FName), "F", table_to_binary(Value)];
+field_value_to_binary(timestamp, Value) ->
+ ["T", <<Value:64>>];
-table_field_to_binary({FName, byte, Value}) ->
- [short_string_to_binary(FName), "b", <<Value:8/unsigned>>];
+field_value_to_binary(table, Value) ->
+ ["F", table_to_binary(Value)];
-table_field_to_binary({FName, double, Value}) ->
- [short_string_to_binary(FName), "d", <<Value:64/float>>];
+field_value_to_binary(array, Value) ->
+ ["A", array_to_binary(Value)];
-table_field_to_binary({FName, float, Value}) ->
- [short_string_to_binary(FName), "f", <<Value:32/float>>];
+field_value_to_binary(byte, Value) ->
+ ["b", <<Value:8/unsigned>>];
-table_field_to_binary({FName, long, Value}) ->
- [short_string_to_binary(FName), "l", <<Value:64/signed>>];
+field_value_to_binary(double, Value) ->
+ ["d", <<Value:64/float>>];
-table_field_to_binary({FName, short, Value}) ->
- [short_string_to_binary(FName), "s", <<Value:16/signed>>];
+field_value_to_binary(float, Value) ->
+ ["f", <<Value:32/float>>];
-table_field_to_binary({FName, bool, Value}) ->
- [short_string_to_binary(FName), "t", if Value -> 1; true -> 0 end];
+field_value_to_binary(long, Value) ->
+ ["l", <<Value:64/signed>>];
-table_field_to_binary({FName, binary, Value}) ->
- [short_string_to_binary(FName), "x", long_string_to_binary(Value)];
+field_value_to_binary(short, Value) ->
+ ["s", <<Value:16/signed>>];
-table_field_to_binary({FName, void, _Value}) ->
- [short_string_to_binary(FName), "V"].
+field_value_to_binary(bool, Value) ->
+ ["t", if Value -> 1; true -> 0 end];
+
+field_value_to_binary(binary, Value) ->
+ ["x", long_string_to_binary(Value)];
+
+field_value_to_binary(void, _Value) ->
+ ["V"].
table_to_binary(Table) when is_list(Table) ->
BinTable = generate_table(Table),
[<<(size(BinTable)):32>>, BinTable].
+array_to_binary(Array) when is_list(Array) ->
+ BinArray = generate_array(Array),
+ [<<(size(BinArray)):32>>, BinArray].
+
generate_table(Table) when is_list(Table) ->
list_to_binary(lists:map(fun table_field_to_binary/1, Table)).
+generate_array(Array) when is_list(Array) ->
+ list_to_binary(lists:map(
+ fun ({Type, Value}) -> field_value_to_binary(Type, Value) end,
+ Array)).
short_string_to_binary(String) when is_binary(String) and (size(String) < 256) ->
[<<(size(String)):8>>, String];
@@ -186,13 +200,11 @@ short_string_to_binary(String) ->
true = (StringLength < 256), % assertion
[<<StringLength:8>>, String].
-
long_string_to_binary(String) when is_binary(String) ->
[<<(size(String)):32>>, String];
long_string_to_binary(String) ->
[<<(length(String)):32>>, String].
-
encode_properties([], []) ->
<<0, 0>>;
encode_properties(TypeList, ValueList) ->
@@ -238,32 +250,7 @@ encode_property(longlongint, Int) ->
encode_property(timestamp, Int) ->
<<Int:64/unsigned>>;
encode_property(table, Table) ->
- encode_table(Table).
-
-
-encode_table(Table) ->
- TableBin = list_to_binary(lists:map(fun encode_table_entry/1, Table)),
- Len = size(TableBin),
- <<Len:32/unsigned, TableBin:Len/binary>>.
-
-
-encode_table_entry({Name, longstr, Value}) ->
- NLen = size(Name),
- VLen = size(Value),
- <<NLen:8/unsigned, Name:NLen/binary, "S", VLen:32/unsigned, Value:VLen/binary>>;
-encode_table_entry({Name, signedint, Value}) ->
- NLen = size(Name),
- <<NLen:8/unsigned, Name:NLen/binary, "I", Value:32/signed>>;
-encode_table_entry({Name, decimal, {Before, After}}) ->
- NLen = size(Name),
- <<NLen:8/unsigned, Name:NLen/binary, "D", Before:8/unsigned, After:32/unsigned>>;
-encode_table_entry({Name, timestamp, Value}) ->
- NLen = size(Name),
- <<NLen:8/unsigned, Name:NLen/binary, "T", Value:64/unsigned>>;
-encode_table_entry({Name, table, Value}) ->
- NLen = size(Name),
- TableBin = encode_table(Value),
- <<NLen:8/unsigned, Name:NLen/binary, "F", TableBin/binary>>.
+ table_to_binary(Table).
check_empty_content_body_frame_size() ->
%% Intended to ensure that EMPTY_CONTENT_BODY_FRAME_SIZE is
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 4ef382aa..506e87ec 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -56,45 +56,57 @@
parse_table(<<>>) ->
[];
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary, ValueAndRest/binary>>) ->
+ {Type, Value, Rest} = parse_field_value(ValueAndRest),
+ [{NameString, Type, Value} | parse_table(Rest)].
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "S", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) ->
- [{NameString, longstr, ValueString} | parse_table(Rest)];
+parse_array(<<>>) ->
+ [];
+parse_array(<<ValueAndRest/binary>>) ->
+ {Type, Value, Rest} = parse_field_value(ValueAndRest),
+ [{Type, Value} | parse_array(Rest)].
+
+parse_field_value(<<"S", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) ->
+ {longstr, ValueString, Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "I", Value:32/signed, Rest/binary>>) ->
- [{NameString, signedint, Value} | parse_table(Rest)];
+parse_field_value(<<"I", Value:32/signed, Rest/binary>>) ->
+ {signedint, Value, Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "D", Before:8/unsigned, After:32/unsigned, Rest/binary>>) ->
- [{NameString, decimal, {Before, After}} | parse_table(Rest)];
+parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, Rest/binary>>) ->
+ {decimal, {Before, After}, Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "T", Value:64/unsigned, Rest/binary>>) ->
- [{NameString, timestamp, Value} | parse_table(Rest)];
+parse_field_value(<<"T", Value:64/unsigned, Rest/binary>>) ->
+ {timestamp, Value, Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "F", VLen:32/unsigned, Table:VLen/binary, Rest/binary>>) ->
- [{NameString, table, parse_table(Table)} | parse_table(Rest)];
+parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, Rest/binary>>) ->
+ {table, parse_table(Table), Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "b", Value:8/unsigned, Rest/binary>>) ->
- [{NameString, byte, Value} | parse_table(Rest)];
+parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, Rest/binary>>) ->
+ {array, parse_array(Array), Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "d", Value:64/float, Rest/binary>>) ->
- [{NameString, double, Value} | parse_table(Rest)];
+parse_field_value(<<"b", Value:8/unsigned, Rest/binary>>) ->
+ {byte, Value, Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "f", Value:32/float, Rest/binary>>) ->
- [{NameString, float, Value} | parse_table(Rest)];
+parse_field_value(<<"d", Value:64/float, Rest/binary>>) ->
+ {double, Value, Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "l", Value:64/signed, Rest/binary>>) ->
- [{NameString, long, Value} | parse_table(Rest)];
+parse_field_value(<<"f", Value:32/float, Rest/binary>>) ->
+ {float, Value, Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "s", Value:16/signed, Rest/binary>>) ->
- [{NameString, short, Value} | parse_table(Rest)];
+parse_field_value(<<"l", Value:64/signed, Rest/binary>>) ->
+ {long, Value, Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "t", Value:8/unsigned, Rest/binary>>) ->
- [{NameString, bool, (Value /= 0)} | parse_table(Rest)];
+parse_field_value(<<"s", Value:16/signed, Rest/binary>>) ->
+ {short, Value, Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "x", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) ->
- [{NameString, binary, ValueString} | parse_table(Rest)];
+parse_field_value(<<"t", Value:8/unsigned, Rest/binary>>) ->
+ {bool, (Value /= 0), Rest};
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "V", Rest/binary>>) ->
- [{NameString, void, undefined} | parse_table(Rest)].
+parse_field_value(<<"x", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) ->
+ {binary, ValueString, Rest};
+
+parse_field_value(<<"V", Rest/binary>>) ->
+ {void, undefined, Rest}.
parse_properties([], _PropBin) ->
@@ -147,7 +159,6 @@ parse_property(bit, Rest) ->
parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) ->
{parse_table(Table), Rest}.
-
ensure_content_decoded(Content = #content{properties = Props})
when Props =/= 'none' ->
Content;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 16b7c938..7e195d2f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -60,8 +60,8 @@
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok').
--spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
+-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-endif.
@@ -125,11 +125,11 @@ handle_cast({method, Method, Content}, State) ->
stop ->
{stop, normal, State#ch{state = terminating}}
catch
- exit:{amqp, Error, Explanation, none} ->
- ok = notify_queues(internal_rollback(State)),
- Reason = {amqp, Error, Explanation,
- rabbit_misc:method_record_type(Method)},
- State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ exit:Reason = #amqp_error{} ->
+ ok = rollback_and_notify(State),
+ MethodName = rabbit_misc:method_record_type(Method),
+ State#ch.reader_pid ! {channel_exit, State#ch.channel,
+ Reason#amqp_error{method = MethodName}},
{stop, normal, State#ch{state = terminating}};
exit:normal ->
{stop, normal, State};
@@ -175,7 +175,7 @@ terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
terminate(Reason, State = #ch{writer_pid = WriterPid,
limiter_pid = LimiterPid}) ->
- Res = notify_queues(internal_rollback(State)),
+ Res = rollback_and_notify(State),
case Reason of
normal -> ok = Res;
_ -> ok
@@ -260,12 +260,6 @@ expand_routing_key_shortcut(<<>>, <<>>,
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
-die_precondition_failed(Fmt, Params) ->
- %% FIXME: 406 should be replaced with precondition_failed when we
- %% move to AMQP spec >=8.1
- rabbit_misc:protocol_error({false, 406, <<"PRECONDITION_FAILED">>},
- Fmt, Params).
-
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
@@ -297,7 +291,7 @@ handle_method(_Method, _, #ch{state = starting}) ->
rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
- ok = notify_queues(internal_rollback(State)),
+ ok = rollback_and_notify(State),
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
stop;
@@ -532,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
- lists:foreach(
- fun ({_DeliveryTag, none, _Msg}) ->
- %% Was sent as a basic.get_ok. Don't redeliver
- %% it. FIXME: appropriate?
- ok;
- ({DeliveryTag, ConsumerTag,
- {QName, QPid, MsgId, _Redelivered, Message}}) ->
- %% Was sent as a proper consumer delivery. Resend it as
- %% before.
- %%
- %% FIXME: What should happen if the consumer's been
- %% cancelled since?
- %%
- %% FIXME: should we allocate a fresh DeliveryTag?
- ok = internal_deliver(
+ ok = rabbit_misc:queue_fold(
+ fun ({_DeliveryTag, none, _Msg}, ok) ->
+ %% Was sent as a basic.get_ok. Don't redeliver
+ %% it. FIXME: appropriate?
+ ok;
+ ({DeliveryTag, ConsumerTag,
+ {QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
+ %% Was sent as a proper consumer delivery. Resend
+ %% it as before.
+ %%
+ %% FIXME: What should happen if the consumer's been
+ %% cancelled since?
+ %%
+ %% FIXME: should we allocate a fresh DeliveryTag?
+ internal_deliver(
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
- end, queue:to_list(UAMQ)),
+ end, ok, UAMQ),
%% No answer required, apparently!
{noreply, State};
@@ -610,8 +604,8 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
{error, not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, in_use} ->
- die_precondition_failed(
- "~s in use", [rabbit_misc:rs(ExchangeName)]);
+ rabbit_misc:protocol_error(
+ precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
@@ -685,11 +679,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
QueueName,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
- die_precondition_failed(
- "~s in use", [rabbit_misc:rs(QueueName)]);
+ rabbit_misc:protocol_error(
+ precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
- die_precondition_failed(
- "~s not empty", [rabbit_misc:rs(QueueName)]);
+ rabbit_misc:protocol_error(
+ precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
#'queue.delete_ok'{
@@ -872,8 +866,13 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
internal_error, "rollback failed: ~w", [Errors])
end.
+rollback_and_notify(State = #ch{transaction_id = none}) ->
+ notify_queues(State);
+rollback_and_notify(State) ->
+ notify_queues(internal_rollback(State)).
+
fold_per_queue(F, Acc0, UAQ) ->
- D = lists:foldl(
+ D = rabbit_misc:queue_fold(
fun ({_DTag, _CTag,
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
%% dict:append would be simpler and avoid the
@@ -884,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) ->
fun (MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
D)
- end, dict:new(), queue:to_list(UAQ)),
+ end, dict:new(), UAQ),
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
@@ -913,9 +912,9 @@ consumer_queues(Consumers) ->
notify_limiter(undefined, _Acked) ->
ok;
notify_limiter(LimiterPid, Acked) ->
- case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, queue:to_list(Acked)) of
+ case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index cf20520e..ddd0c002 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -52,10 +52,11 @@
%%----------------------------------------------------------------------------
start() ->
+ {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
FullCommand = init:get_plain_arguments(),
#params{quiet = Quiet, node = Node, command = Command, args = Args} =
parse_args(FullCommand, #params{quiet = false,
- node = rabbit_misc:localnode(rabbit)}),
+ node = rabbit_misc:makenode(NodeStr)}),
Inform = case Quiet of
true -> fun(_Format, _Args1) -> ok end;
false -> fun(Format, Args1) ->
@@ -80,20 +81,41 @@ start() ->
{error, Reason} ->
error("~p", [Reason]),
halt(2);
+ {badrpc, Reason} ->
+ error("unable to connect to node ~w: ~w", [Node, Reason]),
+ print_badrpc_diagnostics(Node),
+ halt(2);
Other ->
error("~p", [Other]),
halt(2)
end.
-error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
+
+error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
+
+print_badrpc_diagnostics(Node) ->
+ fmt_stderr("diagnostics:", []),
+ {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ fmt_stderr("- unable to connect to epmd on ~s: ~w",
+ [NodeHost, EpmdReason]);
+ {ok, NamePorts} ->
+ fmt_stderr("- nodes and their ports on ~s: ~p",
+ [NodeHost, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]])
+ end,
+ fmt_stderr("- current node: ~w", [node()]),
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]);
+ Other -> fmt_stderr("- no current node home dir: ~p", [Other])
+ end,
+ fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]),
+ ok.
parse_args(["-n", NodeS | Args], Params) ->
- Node = case lists:member($@, NodeS) of
- true -> list_to_atom(NodeS);
- false -> rabbit_misc:localnode(list_to_atom(NodeS))
- end,
- parse_args(Args, Params#params{node = Node});
+ parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)});
parse_args(["-q" | Args], Params) ->
parse_args(Args, Params#params{quiet = true});
parse_args([Command | Args], Params) ->
@@ -151,7 +173,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional
virtual host parameter for which to display results. The default value is \"/\".
<QueueInfoItem> must be a member of the list [name, durable, auto_delete,
-arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted,
+arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted,
messages, acks_uncommitted, consumers, transactions, memory]. The default is
to display name and (number of) messages.
@@ -159,12 +181,12 @@ messages, acks_uncommitted, consumers, transactions, memory]. The default is
auto_delete, arguments]. The default is to display name and type.
The output format for \"list_bindings\" is a list of rows containing
-exchange name, routing key, queue name and arguments, in that order.
+exchange name, queue name, routing key and arguments, in that order.
-<ConnectionInfoItem> must be a member of the list [node, address, port,
+<ConnectionInfoItem> must be a member of the list [pid, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
-recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
-user, peer_address and peer_port.
+client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend].
+The default is to display user, peer_address, peer_port and state.
"),
halt(1).
@@ -197,9 +219,11 @@ action(cluster, Node, ClusterNodeSs, Inform) ->
action(status, Node, [], Inform) ->
Inform("Status of node ~p", [Node]),
- Res = call(Node, {rabbit, status, []}),
- io:format("~p~n", [Res]),
- ok;
+ case call(Node, {rabbit, status, []}) of
+ {badrpc, _} = Res -> Res;
+ Res -> io:format("~p~n", [Res]),
+ ok
+ end;
action(rotate_logs, Node, [], Inform) ->
Inform("Reopening logs for node ~p", [Node]),
@@ -244,8 +268,7 @@ action(list_user_permissions, Node, Args = [_Username], Inform) ->
action(list_queues, Node, Args, Inform) ->
Inform("Listing queues", []),
{VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = list_replace(node, pid,
- default_if_empty(RemainingArgs, [name, messages])),
+ ArgAtoms = default_if_empty(RemainingArgs, [name, messages]),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
@@ -261,7 +284,7 @@ action(list_exchanges, Node, Args, Inform) ->
action(list_bindings, Node, Args, Inform) ->
Inform("Listing bindings", []),
{VHostArg, _} = parse_vhost_flag_bin(Args),
- InfoKeys = [exchange_name, routing_key, queue_name, args],
+ InfoKeys = [exchange_name, queue_name, routing_key, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
@@ -270,8 +293,7 @@ action(list_bindings, Node, Args, Inform) ->
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = list_replace(node, pid,
- default_if_empty(Args, [user, peer_address, peer_port])),
+ ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
@@ -314,7 +336,7 @@ default_if_empty(List, Default) when is_list(List) ->
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
- lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) ||
+ lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) ||
X <- InfoItemKeys])
end, Results),
ok;
@@ -325,18 +347,23 @@ display_row(Row) ->
io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
io:nl().
-format_info_item(Items, Key) ->
- {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items),
- case Info of
- {_, #resource{name = Name}} ->
+format_info_item(Key, Items) ->
+ case proplists:get_value(Key, Items) of
+ #resource{name = Name} ->
escape(Name);
- _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) ->
+ Value when Key =:= address; Key =:= peer_address andalso
+ is_tuple(Value) ->
inet_parse:ntoa(Value);
- _ when is_pid(Value) ->
- atom_to_list(node(Value));
- _ when is_binary(Value) ->
+ Value when is_pid(Value) ->
+ pid_to_string(Value);
+ Value when is_binary(Value) ->
escape(Value);
- _ ->
+ Value when is_atom(Value) ->
+ escape(atom_to_list(Value));
+ Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _]
+ when is_binary(TableEntryKey) andalso is_atom(TableEntryType) ->
+ io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+ Value ->
io_lib:format("~w", [Value])
end.
@@ -361,12 +388,14 @@ rpc_call(Node, Mod, Fun, Args) ->
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
-escape(Bin) when binary(Bin) ->
- escape_char(lists:reverse(binary_to_list(Bin)), []).
+escape(Bin) when is_binary(Bin) ->
+ escape(binary_to_list(Bin));
+escape(L) when is_list(L) ->
+ escape_char(lists:reverse(L), []).
escape_char([$\\ | T], Acc) ->
escape_char(T, [$\\, $\\ | Acc]);
-escape_char([X | T], Acc) when X > 32, X /= 127 ->
+escape_char([X | T], Acc) when X >= 32, X /= 127 ->
escape_char(T, [X | Acc]);
escape_char([X | T], Acc) ->
escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3),
@@ -374,6 +403,20 @@ escape_char([X | T], Acc) ->
escape_char([], Acc) ->
Acc.
-list_replace(Find, Replace, List) ->
- [case X of Find -> Replace; _ -> X end || X <- List].
+prettify_amqp_table(Table) ->
+ [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table].
+
+prettify_typed_amqp_value(Type, Value) ->
+ case Type of
+ longstr -> escape(Value);
+ table -> prettify_amqp_table(Value);
+ array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
+ _ -> Value
+ end.
+%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7)
+pid_to_string(Pid) ->
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ = term_to_binary(Pid),
+ Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
new file mode 100644
index 00000000..23e6fc44
--- /dev/null
+++ b/src/rabbit_dialyzer.erl
@@ -0,0 +1,91 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_dialyzer).
+-include("rabbit.hrl").
+
+-export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, halt_with_code/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(create_basic_plt/1 :: (string()) -> 'ok').
+-spec(add_to_plt/2 :: (string(), string()) -> 'ok').
+-spec(dialyze_files/2 :: (string(), string()) -> 'ok').
+-spec(halt_with_code/1 :: (atom()) -> no_return()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+create_basic_plt(BasicPltPath) ->
+ OptsRecord = dialyzer_options:build(
+ [{analysis_type, plt_build},
+ {output_plt, BasicPltPath},
+ {files_rec, otp_apps_dependencies_paths()}]),
+ dialyzer_cl:start(OptsRecord),
+ ok.
+
+add_to_plt(PltPath, FilesString) ->
+ {ok, Files} = regexp:split(FilesString, " "),
+ DialyzerWarnings = dialyzer:run([{analysis_type, plt_add},
+ {init_plt, PltPath},
+ {output_plt, PltPath},
+ {files, Files}]),
+ print_warnings(DialyzerWarnings),
+ ok.
+
+dialyze_files(PltPath, ModifiedFiles) ->
+ {ok, Files} = regexp:split(ModifiedFiles, " "),
+ DialyzerWarnings = dialyzer:run([{init_plt, PltPath},
+ {files, Files}]),
+ case DialyzerWarnings of
+ [] -> io:format("~nOk~n"),
+ ok;
+ _ -> io:format("~nFAILED with the following warnings:~n"),
+ print_warnings(DialyzerWarnings),
+ fail
+ end.
+
+print_warnings(Warnings) ->
+ [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings],
+ io:format("~n"),
+ ok.
+
+otp_apps_dependencies_paths() ->
+ [code:lib_dir(App, ebin) ||
+ App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]].
+
+halt_with_code(ok) ->
+ halt();
+halt_with_code(fail) ->
+ halt(1).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index b28574b7..b9bd71b7 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -37,8 +37,14 @@
-behaviour(gen_event).
+-export([boot/0]).
+
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]).
+boot() ->
+ {ok, DefaultVHost} = application:get_env(default_vhost),
+ ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]).
+
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index fd0f71d2..0e3b9a37 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -61,7 +61,7 @@
'exchange_not_found' |
'exchange_and_queue_not_found'}).
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
+-spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(),
amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
@@ -83,9 +83,9 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
--spec(topic_matches/2 :: (binary(), binary()) -> bool()).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
--spec(delete/2 :: (exchange_name(), bool()) ->
+-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
[{exchange_name(), routing_key(), amqp_table()}]).
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index b789fbd1..ea61a679 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -104,13 +104,8 @@ guid() ->
%% generate a readable string representation of a guid. Note that any
%% monotonicity of the guid is not preserved in the encoding.
string_guid(Prefix) ->
- %% we use the (undocumented) ssl_base64 module here because it is
- %% present throughout OTP R11 and R12 whereas base64 only becomes
- %% available in R11B-4.
- %%
- %% TODO: once debian stable and EPEL have moved from R11B-2 to
- %% R11B-4 or later we should change this to use base64.
- Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))).
+ Prefix ++ "-" ++ base64:encode_to_string(
+ erlang:md5(term_to_binary(guid()))).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 0a68c9ad..ed0066fe 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -53,7 +53,7 @@ start_heartbeat(Sock, TimeoutSec) ->
spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2,
send_oct, 0,
fun () ->
- catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
continue
end,
erlang:monitor(process, Parent)) end),
@@ -73,7 +73,7 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) ->
{'DOWN', MonitorRef, process, _Object, _Info} -> ok;
Other -> exit({unexpected_message, Other})
after TimeoutMillisec ->
- case inet:getstat(Sock, [StatName]) of
+ case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
if NewStatVal =/= StatVal ->
F({NewStatVal, 0});
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 9f3dcbd0..087a9f64 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -47,7 +47,7 @@
-spec(start_link/1 :: (pid()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()).
+-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl
index 7bf85347..6ef638cb 100644
--- a/src/rabbit_load.erl
+++ b/src/rabbit_load.erl
@@ -41,7 +41,7 @@
-ifdef(use_specs).
-type(erlang_node() :: atom()).
--type(load() :: {{non_neg_integer(), float()}, erlang_node()}).
+-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}).
-spec(local_load/0 :: () -> load()).
-spec(remote_loads/0 :: () -> [load()]).
-spec(pick/0 :: () -> erlang_node()).
@@ -52,8 +52,11 @@
local_load() ->
LoadAvg = case whereis(cpu_sup) of
- undefined -> 0.0;
- _Other -> cpu_sup:avg1()
+ undefined -> unknown;
+ _ -> case cpu_sup:avg1() of
+ L when is_integer(L) -> L;
+ {error, timeout} -> unknown
+ end
end,
{{statistics(run_queue), LoadAvg}, node()}.
@@ -65,8 +68,12 @@ remote_loads() ->
pick() ->
RemoteLoads = remote_loads(),
{{RunQ, LoadAvg}, Node} = local_load(),
- %% add bias towards current node
- AdjustedLoadAvg = LoadAvg * ?FUDGE_FACTOR,
+ %% add bias towards current node; we rely on Erlang's term order
+ %% of SomeFloat < local_unknown < unknown.
+ AdjustedLoadAvg = case LoadAvg of
+ unknown -> local_unknown;
+ _ -> LoadAvg * ?FUDGE_FACTOR
+ end,
Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads],
{_, SelectedNode} = lists:min(Loads),
SelectedNode.
diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl
deleted file mode 100644
index b0d57cb2..00000000
--- a/src/rabbit_memsup.erl
+++ /dev/null
@@ -1,142 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_memsup).
-
--behaviour(gen_server).
-
--export([start_link/1]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([update/0]).
-
--record(state, {memory_fraction,
- timeout,
- timer,
- mod,
- mod_state,
- alarmed
- }).
-
--define(SERVER, memsup). %% must be the same as the standard memsup
-
--define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(update/0 :: () -> 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link(Args) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
-
-update() ->
- gen_server:cast(?SERVER, update).
-
-%%----------------------------------------------------------------------------
-
-init([Mod]) ->
- Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
- TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
- InitState = Mod:init(),
- State = #state { memory_fraction = Fraction,
- timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
- timer = TRef,
- mod = Mod,
- mod_state = InitState,
- alarmed = false },
- {ok, internal_update(State)}.
-
-start_timer(Timeout) ->
- {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
- TRef.
-
-%% Export the same API as the real memsup. Note that
-%% get_sysmem_high_watermark gives an int in the range 0 - 100, while
-%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0.
-handle_call(get_sysmem_high_watermark, _From, State) ->
- {reply, trunc(100 * State#state.memory_fraction), State};
-
-handle_call({set_sysmem_high_watermark, Float}, _From, State) ->
- {reply, ok, State#state{memory_fraction = Float}};
-
-handle_call(get_check_interval, _From, State) ->
- {reply, State#state.timeout, State};
-
-handle_call({set_check_interval, Timeout}, _From, State) ->
- {ok, cancel} = timer:cancel(State#state.timer),
- {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
-
-handle_call(get_memory_data, _From,
- State = #state { mod = Mod, mod_state = ModState }) ->
- {reply, Mod:get_memory_data(ModState), State};
-
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast(update, State) ->
- {noreply, internal_update(State)};
-
-handle_cast(_Request, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-internal_update(State = #state { memory_fraction = MemoryFraction,
- alarmed = Alarmed,
- mod = Mod, mod_state = ModState }) ->
- ModState1 = Mod:update(ModState),
- {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1),
- NewAlarmed = MemUsed / MemTotal > MemoryFraction,
- case {Alarmed, NewAlarmed} of
- {false, true} ->
- alarm_handler:set_alarm({system_memory_high_watermark, []});
- {true, false} ->
- alarm_handler:clear_alarm(system_memory_high_watermark);
- _ ->
- ok
- end,
- State #state { mod_state = ModState1, alarmed = NewAlarmed }.
diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl
deleted file mode 100644
index 3de2d843..00000000
--- a/src/rabbit_memsup_darwin.erl
+++ /dev/null
@@ -1,88 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_memsup_darwin).
-
--export([init/0, update/1, get_memory_data/1]).
-
--record(state, {total_memory,
- allocated_memory}).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
- allocated_memory :: ('undefined' | non_neg_integer())
- }).
-
--spec(init/0 :: () -> state()).
--spec(update/1 :: (state()) -> state()).
--spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
- ('undefined' | pid())}).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-init() ->
- #state{total_memory = undefined,
- allocated_memory = undefined}.
-
-update(State) ->
- File = os:cmd("/usr/bin/vm_stat"),
- Lines = string:tokens(File, "\n"),
- Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
- [PageSize, Inactive, Active, Free, Wired] =
- [dict:fetch(Key, Dict) ||
- Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free',
- 'Pages wired down']],
- MemTotal = PageSize * (Inactive + Active + Free + Wired),
- MemUsed = PageSize * (Active + Wired),
- State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
-
-get_memory_data(State) ->
- {State#state.total_memory, State#state.allocated_memory, undefined}.
-
-%%----------------------------------------------------------------------------
-
-%% A line looks like "Foo bar: 123456."
-parse_line(Line) ->
- [Name, RHS | _Rest] = string:tokens(Line, ":"),
- case Name of
- "Mach Virtual Memory Statistics" ->
- ["(page", "size", "of", PageSize, "bytes)"] =
- string:tokens(RHS, " "),
- {page_size, list_to_integer(PageSize)};
- _ ->
- [Value | _Rest1] = string:tokens(RHS, " ."),
- {list_to_atom(Name), list_to_integer(Value)}
- end.
diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl
deleted file mode 100644
index ca942d7c..00000000
--- a/src/rabbit_memsup_linux.erl
+++ /dev/null
@@ -1,101 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_memsup_linux).
-
--export([init/0, update/1, get_memory_data/1]).
-
--record(state, {total_memory,
- allocated_memory}).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
- allocated_memory :: ('undefined' | non_neg_integer())
- }).
-
--spec(init/0 :: () -> state()).
--spec(update/1 :: (state()) -> state()).
--spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
- ('undefined' | pid())}).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-init() ->
- #state{total_memory = undefined,
- allocated_memory = undefined}.
-
-update(State) ->
- File = read_proc_file("/proc/meminfo"),
- Lines = string:tokens(File, "\n"),
- Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
- [MemTotal, MemFree, Buffers, Cached] =
- [dict:fetch(Key, Dict) ||
- Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']],
- MemUsed = MemTotal - MemFree - Buffers - Cached,
- State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
-
-get_memory_data(State) ->
- {State#state.total_memory, State#state.allocated_memory, undefined}.
-
-%%----------------------------------------------------------------------------
-
--define(BUFFER_SIZE, 1024).
-
-%% file:read_file does not work on files in /proc as it seems to get
-%% the size of the file first and then read that many bytes. But files
-%% in /proc always have length 0, we just have to read until we get
-%% eof.
-read_proc_file(File) ->
- {ok, IoDevice} = file:open(File, [read, raw]),
- Res = read_proc_file(IoDevice, []),
- file:close(IoDevice),
- lists:flatten(lists:reverse(Res)).
-
-read_proc_file(IoDevice, Acc) ->
- case file:read(IoDevice, ?BUFFER_SIZE) of
- {ok, Res} -> read_proc_file(IoDevice, [Res | Acc]);
- eof -> Acc
- end.
-
-%% A line looks like "FooBar: 123456 kB"
-parse_line(Line) ->
- [Name, RHS | _Rest] = string:tokens(Line, ":"),
- [Value | UnitsRest] = string:tokens(RHS, " "),
- Value1 = case UnitsRest of
- [] -> list_to_integer(Value); %% no units
- ["kB"] -> list_to_integer(Value) * 1024
- end,
- {list_to_atom(Name), Value1}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 95a274e3..0866da3f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -35,7 +35,8 @@
-include_lib("kernel/include/file.hrl").
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
--export([die/1, frame_error/2, protocol_error/3, protocol_error/4]).
+-export([die/1, frame_error/2, amqp_error/4,
+ protocol_error/3, protocol_error/4]).
-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
@@ -46,7 +47,7 @@
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
--export([localnode/1, tcp_name/3]).
+-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
@@ -54,7 +55,7 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
--export([unfold/2, ceil/1]).
+-export([unfold/2, ceil/1, queue_fold/3]).
-import(mnesia).
-import(lists).
@@ -74,10 +75,9 @@
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
-spec(die/1 :: (atom()) -> no_return()).
-spec(frame_error/2 :: (atom(), binary()) -> no_return()).
--spec(protocol_error/3 ::
- (atom() | amqp_error(), string(), [any()]) -> no_return()).
--spec(protocol_error/4 ::
- (atom() | amqp_error(), string(), [any()], atom()) -> no_return()).
+-spec(amqp_error/4 :: (atom(), string(), [any()], atom()) -> amqp_error()).
+-spec(protocol_error/3 :: (atom(), string(), [any()]) -> no_return()).
+-spec(protocol_error/4 :: (atom(), string(), [any()], atom()) -> no_return()).
-spec(not_found/1 :: (r(atom())) -> no_return()).
-spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()).
-spec(get_config/2 :: (atom(), A) -> A).
@@ -97,7 +97,7 @@
-spec(enable_cover/1 :: (string()) -> ok_or_error()).
-spec(report_cover/1 :: (string()) -> 'ok').
-spec(throw_on_error/2 ::
- (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
+ (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(with_user/2 :: (username(), thunk(A)) -> A).
@@ -105,7 +105,9 @@
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
--spec(localnode/1 :: (atom()) -> erlang_node()).
+-spec(makenode/1 :: ({string(), string()} | string()) -> erlang_node()).
+-spec(nodeparts/1 :: (erlang_node() | string()) -> {string(), string()}).
+-spec(cookie_hash/0 :: () -> string()).
-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
@@ -124,6 +126,7 @@
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
+-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
-endif.
@@ -144,15 +147,17 @@ die(Error) ->
protocol_error(Error, "~w", [Error]).
frame_error(MethodName, BinaryFields) ->
- protocol_error(frame_error, "cannot decode ~w",
- [BinaryFields], MethodName).
+ protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName).
-protocol_error(Error, Explanation, Params) ->
- protocol_error(Error, Explanation, Params, none).
+amqp_error(Name, ExplanationFormat, Params, Method) ->
+ Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)),
+ #amqp_error{name = Name, explanation = Explanation, method = Method}.
-protocol_error(Error, Explanation, Params, Method) ->
- CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)),
- exit({amqp, Error, CompleteExplanation, Method}).
+protocol_error(Name, ExplanationFormat, Params) ->
+ protocol_error(Name, ExplanationFormat, Params, none).
+
+protocol_error(Name, ExplanationFormat, Params, Method) ->
+ exit(amqp_error(Name, ExplanationFormat, Params, Method)).
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
@@ -304,12 +309,22 @@ execute_mnesia_transaction(TxFun) ->
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
-localnode(Name) ->
- %% This is horrible, but there doesn't seem to be a way to split a
- %% nodename into its constituent parts.
- list_to_atom(lists:append(atom_to_list(Name),
- lists:dropwhile(fun (E) -> E =/= $@ end,
- atom_to_list(node())))).
+makenode({Prefix, Suffix}) ->
+ list_to_atom(lists:append([Prefix, "@", Suffix]));
+makenode(NodeStr) ->
+ makenode(nodeparts(NodeStr)).
+
+nodeparts(Node) when is_atom(Node) ->
+ nodeparts(atom_to_list(Node));
+nodeparts(NodeStr) ->
+ case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
+ {Prefix, []} -> {_, Suffix} = nodeparts(node()),
+ {Prefix, Suffix};
+ {Prefix, Suffix} -> {Prefix, tl(Suffix)}
+ end.
+
+cookie_hash() ->
+ base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
@@ -325,6 +340,9 @@ intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)].
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
%% the order in which results are received.
+%%
+%% WARNING: This is is deliberately lightweight rather than robust -- if F
+%% throws, upmap will hang forever, so make sure F doesn't throw!
upmap(F, L) ->
Parent = self(),
Ref = make_ref(),
@@ -413,7 +431,7 @@ append_file(File, _, Suffix) ->
ensure_parent_dirs_exist(Filename) ->
case filelib:ensure_dir(Filename) of
ok -> ok;
- {error, Reason} ->
+ {error, Reason} ->
throw({error, {cannot_create_parent_dirs, Filename, Reason}})
end.
@@ -475,3 +493,9 @@ ceil(N) ->
0 -> N;
_ -> 1 + T
end.
+
+queue_fold(Fun, Init, Q) ->
+ case queue:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 37e20335..749038db 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -32,7 +32,8 @@
-module(rabbit_mnesia).
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
- cluster/1, reset/0, force_reset/0]).
+ cluster/1, reset/0, force_reset/0, is_clustered/0,
+ empty_ram_only_tables/0]).
-export([table_names/0]).
@@ -50,10 +51,12 @@
-spec(dir/0 :: () -> string()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
--spec(is_db_empty/0 :: () -> bool()).
+-spec(is_db_empty/0 :: () -> boolean()).
-spec(cluster/1 :: ([erlang_node()]) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
+-spec(is_clustered/0 :: () -> boolean()).
+-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-endif.
@@ -98,6 +101,21 @@ cluster(ClusterNodes) ->
reset() -> reset(false).
force_reset() -> reset(true).
+is_clustered() ->
+ RunningNodes = mnesia:system_info(running_db_nodes),
+ [node()] /= RunningNodes andalso [] /= RunningNodes.
+
+empty_ram_only_tables() ->
+ Node = node(),
+ lists:foreach(
+ fun (TabName) ->
+ case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
+ true -> {atomic, ok} = mnesia:clear_table(TabName);
+ false -> ok
+ end
+ end, table_names()),
+ ok.
+
%%--------------------------------------------------------------------
table_definitions() ->
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index d9197535..dc642df4 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -99,11 +99,16 @@ Available commands:
action(start_all, [NodeCount], RpcTimeout) ->
io:format("Starting all nodes...~n", []),
- N = list_to_integer(NodeCount),
- {NodePids, Running} = start_nodes(N, N, [], true,
- getenv("RABBITMQ_NODENAME"),
- getenv("RABBITMQ_NODE_PORT"),
- RpcTimeout),
+ application:load(rabbit),
+ NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")),
+ {NodePids, Running} =
+ case list_to_integer(NodeCount) of
+ 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName),
+ RpcTimeout),
+ {[NodePid], Started};
+ N -> start_nodes(N, N, [], true, NodeName,
+ get_node_tcp_listener(), RpcTimeout)
+ end,
write_pids_file(NodePids),
case Running of
true -> ok;
@@ -114,12 +119,13 @@ action(status, [], RpcTimeout) ->
io:format("Status of all running nodes...~n", []),
call_all_nodes(
fun({Node, Pid}) ->
- Status = rpc:call(Node, rabbit, status, [], RpcTimeout),
+ RabbitRunning =
+ case is_rabbit_running(Node, RpcTimeout) of
+ false -> not_running;
+ true -> running
+ end,
io:format("Node '~p' with Pid ~p: ~p~n",
- [Node, Pid, case parse_status(Status) of
- false -> not_running;
- true -> running
- end])
+ [Node, Pid, RabbitRunning])
end);
action(stop_all, [], RpcTimeout) ->
@@ -155,26 +161,29 @@ action(rotate_logs, [Suffix], RpcTimeout) ->
%% Running is a boolean exhibiting success at some moment
start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running};
-start_nodes(N, Total, PNodePid, Running,
- NodeNameBase, NodePortBase, RpcTimeout) ->
+start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) ->
+ {NodePre, NodeSuff} = NodeNameBase,
NodeNumber = Total - N,
- NodeName = if NodeNumber == 0 ->
- %% For compatibility with running a single node
- NodeNameBase;
- true ->
- NodeNameBase ++ "_" ++ integer_to_list(NodeNumber)
+ NodePre1 = case NodeNumber of
+ %% For compatibility with running a single node
+ 0 -> NodePre;
+ _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber)
end,
- {NodePid, Started} = start_node(NodeName,
- list_to_integer(NodePortBase) + NodeNumber,
- RpcTimeout),
+ Node = rabbit_misc:makenode({NodePre1, NodeSuff}),
+ os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
+ case Listener of
+ {NodeIpAddress, NodePortBase} ->
+ NodePort = NodePortBase + NodeNumber,
+ os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
+ os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress);
+ undefined ->
+ ok
+ end,
+ {NodePid, Started} = start_node(Node, RpcTimeout),
start_nodes(N - 1, Total, [NodePid | PNodePid],
- Started and Running,
- NodeNameBase, NodePortBase, RpcTimeout).
+ Started and Running, NodeNameBase, Listener, RpcTimeout).
-start_node(NodeName, NodePort, RpcTimeout) ->
- os:putenv("RABBITMQ_NODENAME", NodeName),
- os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
- Node = rabbit_misc:localnode(list_to_atom(NodeName)),
+start_node(Node, RpcTimeout) ->
io:format("Starting node ~s...~n", [Node]),
case rpc:call(Node, os, getpid, []) of
{badrpc, _} ->
@@ -197,7 +206,7 @@ start_node(NodeName, NodePort, RpcTimeout) ->
wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 ->
false;
wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
- case parse_status(rpc:call(Node, rabbit, status, [])) of
+ case is_rabbit_running(Node, RpcTimeout) of
true -> true;
false -> receive
{'EXIT', Port, PosixCode} ->
@@ -211,22 +220,20 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
run_cmd(FullPath) ->
erlang:open_port({spawn, FullPath}, [nouse_stdio]).
-parse_status({badrpc, _}) ->
- false;
-
-parse_status(Status) ->
- case lists:keysearch(running_applications, 1, Status) of
- {value, {running_applications, Apps}} ->
- lists:keymember(rabbit, 1, Apps);
- _ ->
- false
+is_rabbit_running(Node, RpcTimeout) ->
+ case rpc:call(Node, rabbit, status, [], RpcTimeout) of
+ {badrpc, _} -> false;
+ Status -> case proplists:get_value(running_applications, Status) of
+ undefined -> false;
+ Apps -> lists:keymember(rabbit, 1, Apps)
+ end
end.
with_os(Handlers) ->
{OsFamily, _} = os:type(),
- case lists:keysearch(OsFamily, 1, Handlers) of
- {value, {_, Handler}} -> Handler();
- false -> throw({unsupported_os, OsFamily})
+ case proplists:get_value(OsFamily, Handlers) of
+ undefined -> throw({unsupported_os, OsFamily});
+ Handler -> Handler()
end.
script_filename() ->
@@ -292,7 +299,7 @@ kill_wait(Pid, TimeLeft, Forceful) ->
io:format(".", []),
is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful).
-% Test using some OS clunkiness since we shouldn't trust
+% Test using some OS clunkiness since we shouldn't trust
% rpc:call(os, getpid, []) at this point
is_dead(Pid) ->
PidS = integer_to_list(Pid),
@@ -320,3 +327,21 @@ getenv(Var) ->
false -> throw({missing_env_var, Var});
Value -> Value
end.
+
+get_node_tcp_listener() ->
+ try
+ {getenv("RABBITMQ_NODE_IP_ADDRESS"),
+ list_to_integer(getenv("RABBITMQ_NODE_PORT"))}
+ catch _ ->
+ case application:get_env(rabbit, tcp_listeners) of
+ {ok, [{_IpAddy, _Port} = Listener]} ->
+ Listener;
+ {ok, []} ->
+ undefined;
+ {ok, Other} ->
+ throw({cannot_start_multiple_nodes, multiple_tcp_listeners,
+ Other});
+ undefined ->
+ throw({missing_configuration, tcp_listeners})
+ end
+ end.
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
new file mode 100644
index 00000000..a5ccc8e9
--- /dev/null
+++ b/src/rabbit_net.erl
@@ -0,0 +1,132 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_net).
+-include("rabbit.hrl").
+-include_lib("kernel/include/inet.hrl").
+
+-export([async_recv/3, close/1, controlling_process/2,
+ getstat/2, peername/1, port_command/2,
+ send/2, sockname/1]).
+%%---------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(stat_option() ::
+ 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
+ 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
+-type(error() :: {'error', any()}).
+
+-spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}).
+-spec(close/1 :: (socket()) -> 'ok' | error()).
+-spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()).
+-spec(port_command/2 :: (socket(), iolist()) -> 'true').
+-spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()).
+-spec(peername/1 :: (socket()) ->
+ {'ok', {ip_address(), non_neg_integer()}} | error()).
+-spec(sockname/1 :: (socket()) ->
+ {'ok', {ip_address(), non_neg_integer()}} | error()).
+-spec(getstat/2 :: (socket(), [stat_option()]) ->
+ {'ok', [{stat_option(), integer()}]} | error()).
+
+-endif.
+
+%%---------------------------------------------------------------------------
+
+
+async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
+ Pid = self(),
+ Ref = make_ref(),
+
+ spawn(fun() -> Pid ! {inet_async, Sock, Ref,
+ ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
+ end),
+
+ {ok, Ref};
+
+async_recv(Sock, Length, infinity) when is_port(Sock) ->
+ prim_inet:async_recv(Sock, Length, -1);
+
+async_recv(Sock, Length, Timeout) when is_port(Sock) ->
+ prim_inet:async_recv(Sock, Length, Timeout).
+
+close(Sock) when is_record(Sock, ssl_socket) ->
+ ssl:close(Sock#ssl_socket.ssl);
+
+close(Sock) when is_port(Sock) ->
+ gen_tcp:close(Sock).
+
+
+controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) ->
+ ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
+
+controlling_process(Sock, Pid) when is_port(Sock) ->
+ gen_tcp:controlling_process(Sock, Pid).
+
+
+getstat(Sock, Stats) when is_record(Sock, ssl_socket) ->
+ inet:getstat(Sock#ssl_socket.tcp, Stats);
+
+getstat(Sock, Stats) when is_port(Sock) ->
+ inet:getstat(Sock, Stats).
+
+
+peername(Sock) when is_record(Sock, ssl_socket) ->
+ ssl:peername(Sock#ssl_socket.ssl);
+
+peername(Sock) when is_port(Sock) ->
+ inet:peername(Sock).
+
+
+port_command(Sock, Data) when is_record(Sock, ssl_socket) ->
+ case ssl:send(Sock#ssl_socket.ssl, Data) of
+ ok ->
+ self() ! {inet_reply, Sock, ok},
+ true;
+ {error, Reason} ->
+ erlang:error(Reason)
+ end;
+
+port_command(Sock, Data) when is_port(Sock) ->
+ erlang:port_command(Sock, Data).
+
+send(Sock, Data) when is_record(Sock, ssl_socket) ->
+ ssl:send(Sock#ssl_socket.ssl, Data);
+
+send(Sock, Data) when is_port(Sock) ->
+ gen_tcp:send(Sock, Data).
+
+
+sockname(Sock) when is_record(Sock, ssl_socket) ->
+ ssl:sockname(Sock#ssl_socket.ssl);
+
+sockname(Sock) when is_port(Sock) ->
+ inet:sockname(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 2dbd5a5a..84658a85 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -31,18 +31,31 @@
-module(rabbit_networking).
--export([start/0, start_tcp_listener/2, stop_tcp_listener/2,
- on_node_down/1, active_listeners/0, node_listeners/1,
- connections/0, connection_info/1, connection_info/2,
- connection_info_all/0, connection_info_all/1]).
+-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3,
+ stop_tcp_listener/2, on_node_down/1, active_listeners/0,
+ node_listeners/1, connections/0, connection_info/1,
+ connection_info/2, connection_info_all/0,
+ connection_info_all/1]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
--export([tcp_listener_started/2, tcp_listener_stopped/2, start_client/1]).
+-export([tcp_listener_started/2, tcp_listener_stopped/2,
+ start_client/1, start_ssl_client/2]).
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
+-define(RABBIT_TCP_OPTS, [
+ binary,
+ {packet, raw}, % no packaging
+ {reuseaddr, true}, % allow rebind without waiting
+ %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
+ %% {delay_send, true},
+ {exit_on_close, false}
+ ]).
+
+-define(SSL_TIMEOUT, 5). %% seconds
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -52,6 +65,7 @@
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
+-spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok').
-spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
-spec(active_listeners/0 :: () -> [listener()]).
-spec(node_listeners/1 :: (erlang_node()) -> [listener()]).
@@ -68,6 +82,27 @@
%%----------------------------------------------------------------------------
+boot() ->
+ ok = start(),
+ ok = boot_tcp(),
+ ok = boot_ssl().
+
+boot_tcp() ->
+ {ok, TcpListeners} = application:get_env(tcp_listeners),
+ [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners],
+ ok.
+
+boot_ssl() ->
+ case application:get_env(ssl_listeners) of
+ {ok, []} ->
+ ok;
+ {ok, SslListeners} ->
+ ok = rabbit_misc:start_applications([crypto, ssl]),
+ {ok, SslOpts} = application:get_env(ssl_options),
+ [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
+ ok
+ end.
+
start() ->
{ok,_} = supervisor:start_child(
rabbit_sup,
@@ -90,27 +125,30 @@ check_tcp_listener_address(NamePrefix, Host, Port) ->
if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
[Port]),
- throw({error, invalid_port, Port})
+ throw({error, {invalid_port, Port}})
end,
Name = rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
{IPAddress, Name}.
start_tcp_listener(Host, Port) ->
- {IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
+ start_listener(Host, Port, "TCP Listener",
+ {?MODULE, start_client, []}).
+
+start_ssl_listener(Host, Port, SslOpts) ->
+ start_listener(Host, Port, "SSL Listener",
+ {?MODULE, start_ssl_client, [SslOpts]}).
+
+start_listener(Host, Port, Label, OnConnect) ->
+ {IPAddress, Name} =
+ check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
{ok,_} = supervisor:start_child(
rabbit_sup,
{Name,
{tcp_listener_sup, start_link,
- [IPAddress, Port,
- [binary,
- {packet, raw}, % no packaging
- {reuseaddr, true}, % allow rebind without waiting
- %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
- %% {delay_send, true},
- {exit_on_close, false}],
+ [IPAddress, Port, ?RABBIT_TCP_OPTS ,
{?MODULE, tcp_listener_started, []},
{?MODULE, tcp_listener_stopped, []},
- {?MODULE, start_client, []}]},
+ OnConnect, Label]},
transient, infinity, supervisor, [tcp_listener_sup]}),
ok.
@@ -146,12 +184,32 @@ node_listeners(Node) ->
on_node_down(Node) ->
ok = mnesia:dirty_delete(rabbit_listener, Node).
-start_client(Sock) ->
+start_client(Sock, SockTransform) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
- ok = gen_tcp:controlling_process(Sock, Child),
- Child ! {go, Sock},
+ ok = rabbit_net:controlling_process(Sock, Child),
+ Child ! {go, Sock, SockTransform},
Child.
+start_client(Sock) ->
+ start_client(Sock, fun (S) -> {ok, S} end).
+
+start_ssl_client(SslOpts, Sock) ->
+ start_client(
+ Sock,
+ fun (Sock1) ->
+ case catch ssl:ssl_accept(Sock1, SslOpts, ?SSL_TIMEOUT * 1000) of
+ {ok, SslSock} ->
+ rabbit_log:info("upgraded TCP connection ~p to SSL~n",
+ [self()]),
+ {ok, #ssl_socket{tcp = Sock1, ssl = SslSock}};
+ {error, Reason} ->
+ {error, {ssl_upgrade_error, Reason}};
+ {'EXIT', Reason} ->
+ {error, {ssl_upgrade_failure, Reason}}
+
+ end
+ end).
+
connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
rabbit_tcp_client_sup)].
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 71278bfb..4fcfab78 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -39,6 +39,17 @@
-define(BaseApps, [rabbit]).
%%----------------------------------------------------------------------------
+%% Specs
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
start() ->
%% Ensure Rabbit is loaded so we can access it's environment
@@ -49,6 +60,8 @@ start() ->
UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir),
RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin),
+ RootName = RabbitEBin ++ "/rabbit",
+
%% Unpack any .ez plugins
unpack_ez_plugins(PluginDir, UnpackedPluginDir),
@@ -60,37 +73,43 @@ start() ->
%% Build the entire set of dependencies - this will load the
%% applications along the way
AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
- {unknown_app, {App, Err}} ->
- io:format("ERROR: Failed to load application " ++
- "~s: ~p~n", [App, Err]),
- halt(1);
+ {failed_to_load_app, App, Err} ->
+ error("failed to load application ~s:~n~p", [App, Err]);
AppList ->
AppList
end,
AppVersions = [determine_version(App) || App <- AllApps],
- {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions),
+ {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions),
%% Build the overall release descriptor
- RDesc = {release,
- {"rabbit", RabbitVersion},
+ RDesc = {release,
+ {"rabbit", RabbitVersion},
{erts, erlang:system_info(version)},
AppVersions},
%% Write it out to ebin/rabbit.rel
- file:write_file(RabbitEBin ++ "/rabbit.rel",
- io_lib:format("~p.~n", [RDesc])),
+ file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
%% Compile the script
- case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of
- {ok, Module, Warnings} ->
+ ScriptFile = RootName ++ ".script",
+ case systools:make_script(RootName, [local, silent]) of
+ {ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
- %% hiding real issues.
+ %% hiding real issues. On Ubuntu, we also get warnings
+ %% about kernel/stdlib sources being out of date, which we
+ %% also ignore for the same reason.
WarningStr = Module:format_warning(
- [W || W <- Warnings,
- case W of
- {warning, {source_not_found, _}} -> false;
- _ -> true
+ [W || W <- Warnings,
+ case W of
+ {warning, {source_not_found, _}} -> false;
+ {warning, {obj_out_of_date, {_,_,WApp,_,_}}}
+ when WApp == mnesia;
+ WApp == stdlib;
+ WApp == kernel;
+ WApp == sasl;
+ WApp == os_mon -> false;
+ _ -> true
end]),
case length(WarningStr) of
0 -> ok;
@@ -98,9 +117,19 @@ start() ->
end,
ok;
{error, Module, Error} ->
- io:format("Boot file generation failed: ~s~n",
- [Module:format_error(Error)]),
- halt(1)
+ error("generation of boot script file ~s failed:~n~s",
+ [ScriptFile, Module:format_error(Error)])
+ end,
+
+ case post_process_script(ScriptFile) of
+ ok -> ok;
+ {error, Reason} ->
+ error("post processing of boot script file ~s failed:~n~w",
+ [ScriptFile, Reason])
+ end,
+ case systools:script2boot(RootName) of
+ ok -> ok;
+ error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
halt(),
ok.
@@ -115,17 +144,17 @@ get_env(Key, Default) ->
end.
determine_version(App) ->
- application:load(App),
- {ok, Vsn} = application:get_key(App, vsn),
+ application:load(App),
+ {ok, Vsn} = application:get_key(App, vsn),
{App, Vsn}.
assert_dir(Dir) ->
case filelib:is_dir(Dir) of
true -> ok;
- false ->
- ok = filelib:ensure_dir(Dir),
- ok = file:make_dir(Dir)
+ false -> ok = filelib:ensure_dir(Dir),
+ ok = file:make_dir(Dir)
end.
+
delete_dir(Dir) ->
case filelib:is_dir(Dir) of
true ->
@@ -143,6 +172,7 @@ delete_dir(Dir) ->
false ->
ok
end.
+
is_symlink(Name) ->
case file:read_link(Name) of
{ok, _} -> true;
@@ -185,14 +215,40 @@ expand_dependencies(Current, [Next|Rest]) ->
expand_dependencies(Current, Rest);
false ->
case application:load(Next) of
- ok ->
+ ok ->
ok;
- {error, {already_loaded, _}} ->
+ {error, {already_loaded, _}} ->
ok;
- X ->
- throw({unknown_app, {Next, X}})
+ {error, Reason} ->
+ throw({failed_to_load_app, Next, Reason})
end,
{ok, Required} = application:get_key(Next, applications),
Unique = [A || A <- Required, not(sets:is_element(A, Current))],
expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique)
end.
+
+post_process_script(ScriptFile) ->
+ case file:consult(ScriptFile) of
+ {ok, [{script, Name, Entries}]} ->
+ NewEntries = lists:flatmap(fun process_entry/1, Entries),
+ case file:open(ScriptFile, [write]) of
+ {ok, Fd} ->
+ io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
+ [date(), time(), {script, Name, NewEntries}]),
+ file:close(Fd),
+ ok;
+ {error, OReason} ->
+ {error, {failed_to_open_script_file_for_writing, OReason}}
+ end;
+ {error, Reason} ->
+ {error, {failed_to_load_script, Reason}}
+ end.
+
+process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) ->
+ [Entry, {apply,{rabbit,prepare,[]}}];
+process_entry(Entry) ->
+ [Entry].
+
+error(Fmt, Args) ->
+ io:format("ERROR: " ++ Fmt ++ "~n", Args),
+ halt(1).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7be92812..e78d889d 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -49,7 +49,6 @@
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
--define(CHANNEL_CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
%---------------------------------------------------------------------------
@@ -59,7 +58,7 @@
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
- state, channels, user, vhost, timeout, frame_max]).
+ state, channels, user, vhost, timeout, frame_max, client_properties]).
%% connection lifecycle
%%
@@ -94,23 +93,19 @@
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing, *running*
-%% terminate_channel timeout -> remove 'closing' mark, *running*
+%% -> log error, mark channel as closing, *running*
%% handshake_timeout -> ignore, *running*
%% heartbeat timeout -> *throw*
%% closing:
%% socket close -> *terminate*
%% receive frame -> ignore, *closing*
-%% terminate_channel timeout -> remove 'closing' mark, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
%% channel exit with hard error
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing
+%% -> log error, mark channel as closing
%% if last channel to exit then send connection.close_ok,
%% start terminate_connection timer, *closed*
%% else *closing*
@@ -123,7 +118,6 @@
%% *closed*
%% receive frame -> ignore, *closed*
%% terminate_connection timeout -> *terminate*
-%% terminate_channel timeout -> remove 'closing' mark, *closed*
%% handshake_timeout -> ignore, *closed*
%% heartbeat timeout -> *throw*
%% channel exit -> log error, *closed*
@@ -148,7 +142,8 @@ start_link() ->
init(Parent) ->
Deb = sys:debug_options([]),
receive
- {go, Sock} -> start_connection(Parent, Deb, Sock)
+ {go, Sock, SockTransform} ->
+ start_connection(Parent, Deb, Sock, SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -198,34 +193,35 @@ teardown_profiling(Value) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-peername(Sock) ->
- try
- {Address, Port} = inet_op(fun () -> inet:peername(Sock) end),
- AddressS = inet_parse:ntoa(Address),
- {AddressS, Port}
- catch
- Ex -> rabbit_log:error("error on TCP connection ~p:~p~n",
- [self(), Ex]),
- rabbit_log:info("closing TCP connection ~p", [self()]),
- exit(normal)
+socket_op(Sock, Fun) ->
+ case Fun(Sock) of
+ {ok, Res} -> Res;
+ {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
+ [self(), Reason]),
+ rabbit_log:info("closing TCP connection ~p~n",
+ [self()]),
+ exit(normal)
end.
-start_connection(Parent, Deb, ClientSock) ->
+start_connection(Parent, Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
- {PeerAddressS, PeerPort} = peername(ClientSock),
+ {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
+ PeerAddressS = inet_parse:ntoa(PeerAddress),
+ rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
+ [self(), PeerAddressS, PeerPort]),
+ ClientSock = socket_op(Sock, SockTransform),
+ erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
+ handshake_timeout),
ProfilingValue = setup_profiling(),
try
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
- handshake_timeout),
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
connection = #connection{
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
- vhost = none},
+ vhost = none,
+ client_properties = none},
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init},
@@ -269,12 +265,10 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
throw({inet_error, Reason});
{'EXIT', Parent, Reason} ->
if State#v1.connection_state =:= running ->
- send_exception(
- State, 0,
- {amqp, connection_forced,
- io_lib:format(
- "broker forced connection closure with reason '~w'",
- [Reason]), none});
+ send_exception(State, 0,
+ rabbit_misc:amqp_error(connection_forced,
+ "broker forced connection closure with reason '~w'",
+ [Reason], none));
true -> ok
end,
%% this is what we are expected to do according to
@@ -292,8 +286,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
- {terminate_channel, Channel, Ref1} ->
- mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -323,8 +315,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end.
switch_callback(OldState, NewCallback, Length) ->
- Ref = inet_op(fun () -> prim_inet:async_recv(
- OldState#v1.sock, Length, -1) end),
+ Ref = inet_op(fun () -> rabbit_net:async_recv(
+ OldState#v1.sock, Length, infinity) end),
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
@@ -341,32 +333,14 @@ close_connection(State = #v1{connection = #connection{
State#v1{connection_state = closed}.
close_channel(Channel, State) ->
- Ref = make_ref(),
- TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT,
- self(),
- {terminate_channel, Channel, Ref}),
- put({closing_channel, Channel}, {Ref, TRef}),
- State.
-
-terminate_channel(Channel, Ref, State) ->
- case get({closing_channel, Channel}) of
- undefined -> ok; %% got close_ok in the meantime
- {Ref, _} -> erase({closing_channel, Channel}),
- ok;
- {_Ref, _} -> ok %% got close_ok, and have new closing channel
- end,
+ put({channel, Channel}, closing),
State.
handle_channel_exit(Channel, Reason, State) ->
- %% We remove the channel from the inbound map only. That allows
- %% the channel to be re-opened, but also means the remaining
- %% cleanup, including possibly closing the connection, is deferred
- %% until we get the (normal) exit signal.
- erase({channel, Channel}),
handle_exception(State, Channel, Reason).
handle_dependent_exit(Pid, normal, State) ->
- channel_cleanup(Pid),
+ erase({chpid, Pid}),
maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
case channel_cleanup(Pid) of
@@ -376,17 +350,10 @@ handle_dependent_exit(Pid, Reason, State) ->
channel_cleanup(Pid) ->
case get({chpid, Pid}) of
- undefined ->
- case get({closing_chpid, Pid}) of
- undefined -> undefined;
- {channel, Channel} ->
- erase({closing_chpid, Pid}),
- Channel
- end;
- {channel, Channel} ->
- erase({channel, Channel}),
- erase({chpid, Pid}),
- Channel
+ undefined -> undefined;
+ {channel, Channel} -> erase({channel, Channel}),
+ erase({chpid, Pid}),
+ Channel
end.
all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
@@ -451,7 +418,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
State;
handle_frame(Type, 0, Payload, State) ->
case analyze_frame(Type, Payload) of
- error -> throw({unknown_frame, Type, Payload});
+ error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
trace -> State;
{method, MethodName, FieldsBin} ->
@@ -460,20 +427,34 @@ handle_frame(Type, 0, Payload, State) ->
end;
handle_frame(Type, Channel, Payload, State) ->
case analyze_frame(Type, Payload) of
- error -> throw({unknown_frame, Type, Payload});
+ error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{chpid, ChPid} ->
- ok = check_for_close(Channel, ChPid, AnalyzedFrame),
+ case AnalyzedFrame of
+ {method, 'channel.close', _} ->
+ erase({channel, Channel});
+ _ -> ok
+ end,
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
State;
+ closing ->
+ %% According to the spec, after sending a
+ %% channel.close we must ignore all frames except
+ %% channel.close_ok.
+ case AnalyzedFrame of
+ {method, 'channel.close_ok', _} ->
+ erase({channel, Channel});
+ _ -> ok
+ end,
+ State;
undefined ->
case State#v1.connection_state of
- running -> send_to_new_channel(
- Channel, AnalyzedFrame, State),
+ running -> ok = send_to_new_channel(
+ Channel, AnalyzedFrame, State),
State;
Other -> throw({channel_frame_while_starting,
Channel, Other, AnalyzedFrame})
@@ -539,7 +520,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
end;
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> gen_tcp:send(
+ ok = inet_op(fun () -> rabbit_net:send(
Sock, <<"AMQP",1,1,
?PROTOCOL_VERSION_MAJOR,
?PROTOCOL_VERSION_MINOR>>) end),
@@ -567,19 +548,20 @@ handle_method0(MethodName, FieldsBin, State) ->
MethodName, FieldsBin),
State)
catch exit:Reason ->
- CompleteReason =
- case Reason of
- {amqp, Error, Explanation, none} ->
- {amqp, Error, Explanation, MethodName};
- OtherReason -> OtherReason
- end,
+ CompleteReason = case Reason of
+ #amqp_error{method = none} ->
+ Reason#amqp_error{method = MethodName};
+ OtherReason -> OtherReason
+ end,
case State#v1.connection_state of
running -> send_exception(State, 0, CompleteReason);
Other -> throw({channel0_error, Other, CompleteReason})
end
end.
+
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
- response = Response},
+ response = Response,
+ client_properties = ClientProperties},
State = #v1{connection_state = starting,
connection = Connection,
sock = Sock}) ->
@@ -591,7 +573,9 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
frame_max = 131072,
heartbeat = 0}),
State#v1{connection_state = tuning,
- connection = Connection#connection{user = User}};
+ connection = Connection#connection{
+ user = User,
+ client_properties = ClientProperties}};
handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
frame_max = FrameMax,
heartbeat = ClientHeartbeat},
@@ -675,23 +659,23 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) ->
self();
i(address, #v1{sock = Sock}) ->
- {ok, {A, _}} = inet:sockname(Sock),
+ {ok, {A, _}} = rabbit_net:sockname(Sock),
A;
i(port, #v1{sock = Sock}) ->
- {ok, {_, P}} = inet:sockname(Sock),
+ {ok, {_, P}} = rabbit_net:sockname(Sock),
P;
i(peer_address, #v1{sock = Sock}) ->
- {ok, {A, _}} = inet:peername(Sock),
+ {ok, {A, _}} = rabbit_net:peername(Sock),
A;
i(peer_port, #v1{sock = Sock}) ->
- {ok, {_, P}} = inet:peername(Sock),
+ {ok, {_, P}} = rabbit_net:peername(Sock),
P;
i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
- case inet:getstat(Sock, [SockStat]) of
+ case rabbit_net:getstat(Sock, [SockStat]) of
{ok, [{SockStat, StatVal}]} -> StatVal;
{error, einval} -> undefined;
{error, Error} -> throw({cannot_get_socket_stats, Error})
@@ -703,51 +687,33 @@ i(channels, #v1{}) ->
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
- none;
+ '';
i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
VHost;
i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
Timeout;
i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
FrameMax;
+i(client_properties, #v1{connection = #connection{
+ client_properties = ClientProperties}}) ->
+ ClientProperties;
i(Item, #v1{}) ->
throw({bad_argument, Item}).
%%--------------------------------------------------------------------------
send_to_new_channel(Channel, AnalyzedFrame, State) ->
- case get({closing_channel, Channel}) of
- undefined ->
- #v1{sock = Sock,
- connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
- ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);
- {_, TRef} ->
- %% According to the spec, after sending a channel.close we
- %% must ignore all frames except channel.close_ok.
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- erlang:cancel_timer(TRef),
- erase({closing_channel, Channel}),
- ok;
- _Other -> ok
- end
- end.
-
-check_for_close(Channel, ChPid, {method, 'channel.close', _}) ->
- channel_cleanup(ChPid),
- put({closing_chpid, ChPid}, {channel, Channel}),
- ok;
-check_for_close(_Channel, _ChPid, _Frame) ->
- ok.
+ #v1{sock = Sock, connection = #connection{
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
+ ChPid = rabbit_framing_channel:start_link(
+ fun rabbit_channel:start_link/5,
+ [Channel, self(), WriterPid, Username, VHost]),
+ put({channel, Channel}, {chpid, ChPid}),
+ put({chpid, ChPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
@@ -793,18 +759,27 @@ map_exception(Channel, Reason) ->
end,
{ShouldClose, CloseChannel, CloseMethod}.
-lookup_amqp_exception({amqp, {ShouldClose, Code, Text}, Expl, Method}) ->
- ExplBin = list_to_binary(Expl),
- CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>,
- SafeTextBin = if size(CompleteTextBin) > 255 ->
- <<CompleteTextBin:252/binary, "...">>;
- true ->
- CompleteTextBin
- end,
- {ShouldClose, Code, SafeTextBin, Method};
-lookup_amqp_exception({amqp, ErrorName, Expl, Method}) ->
- Details = rabbit_framing:lookup_amqp_exception(ErrorName),
- lookup_amqp_exception({amqp, Details, Expl, Method});
+%% FIXME: this clause can go when we move to AMQP spec >=8.1
+lookup_amqp_exception(#amqp_error{name = precondition_failed,
+ explanation = Expl,
+ method = Method}) ->
+ ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl),
+ {false, 406, ExplBin, Method};
+lookup_amqp_exception(#amqp_error{name = Name,
+ explanation = Expl,
+ method = Method}) ->
+ {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name),
+ ExplBin = amqp_exception_explanation(Text, Expl),
+ {ShouldClose, Code, ExplBin, Method};
lookup_amqp_exception(Other) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {true, ?INTERNAL_ERROR, <<"INTERNAL_ERROR">>, none}.
+ {ShouldClose, Code, Text} =
+ rabbit_framing:lookup_amqp_exception(internal_error),
+ {ShouldClose, Code, Text, none}.
+
+amqp_exception_explanation(Text, Expl) ->
+ ExplBin = list_to_binary(Expl),
+ CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>,
+ if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
+ true -> CompleteTextBin
+ end.
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 730d7909..ef32544c 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/0, start_child/1, start_child/2]).
-export([init/1]).
@@ -42,5 +42,14 @@
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_child(Mod) ->
+ start_child(Mod, []).
+
+start_child(Mod, Args) ->
+ {ok, _} = supervisor:start_child(?SERVER,
+ {Mod, {Mod, start_link, Args},
+ transient, 100, worker, [Mod]}),
+ ok.
+
init([]) ->
{ok, {{one_for_one, 10, 10}, []}}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b4cd30bc..ba048184 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -193,6 +193,7 @@ test_unfold() ->
test_parsing() ->
passed = test_content_properties(),
+ passed = test_field_values(),
passed.
test_content_properties() ->
@@ -218,9 +219,12 @@ test_content_properties() ->
[{<<"one">>, signedint, 1},
{<<"two">>, signedint, 2}]}]}],
<<
- 16#8000:16, % flags
- % properties:
+ % property-flags
+ 16#8000:16,
+ % property-list:
+
+ % table
117:32, % table length in bytes
11,"a signedint", % name
@@ -249,6 +253,51 @@ test_content_properties() ->
V -> exit({got_success_but_expected_failure, V})
end.
+test_field_values() ->
+ %% FIXME this does not test inexact numbers (double and float) yet,
+ %% because they won't pass the equality assertions
+ test_content_prop_roundtrip(
+ [{table, [{<<"longstr">>, longstr, <<"Here is a long string">>},
+ {<<"signedint">>, signedint, 12345},
+ {<<"decimal">>, decimal, {3, 123456}},
+ {<<"timestamp">>, timestamp, 109876543209876},
+ {<<"table">>, table, [{<<"one">>, signedint, 54321},
+ {<<"two">>, longstr, <<"A long string">>}]},
+ {<<"byte">>, byte, 255},
+ {<<"long">>, long, 1234567890},
+ {<<"short">>, short, 655},
+ {<<"bool">>, bool, true},
+ {<<"binary">>, binary, <<"a binary string">>},
+ {<<"void">>, void, undefined},
+ {<<"array">>, array, [{signedint, 54321},
+ {longstr, <<"A long string">>}]}
+
+ ]}],
+ <<
+ % property-flags
+ 16#8000:16,
+ % table length in bytes
+ 228:32,
+
+ 7,"longstr", "S", 21:32, "Here is a long string", % = 34
+ 9,"signedint", "I", 12345:32/signed, % + 15 = 49
+ 7,"decimal", "D", 3, 123456:32, % + 14 = 63
+ 9,"timestamp", "T", 109876543209876:64, % + 19 = 82
+ 5,"table", "F", 31:32, % length of table % + 11 = 93
+ 3,"one", "I", 54321:32, % + 9 = 102
+ 3,"two", "S", 13:32, "A long string",% + 22 = 124
+ 4,"byte", "b", 255:8, % + 7 = 131
+ 4,"long", "l", 1234567890:64, % + 14 = 145
+ 5,"short", "s", 655:16, % + 9 = 154
+ 4,"bool", "t", 1, % + 7 = 161
+ 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
+ 4,"void", "V", % + 6 = 194
+ 5,"array", "A", 23:32, % + 11 = 205
+ "I", 54321:32, % + 5 = 210
+ "S", 13:32, "A long string" % + 18 = 228
+ >>),
+ passed.
+
test_topic_match(P, R) ->
test_topic_match(P, R, true).
@@ -495,7 +544,7 @@ test_cluster_management() ->
ok = control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
- SecondaryNode = rabbit_misc:localnode(hare),
+ SecondaryNode = rabbit_misc:makenode("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = test_cluster_management2(SecondaryNode);
pang -> io:format("Skipping clustering tests with node ~p~n",
@@ -666,7 +715,10 @@ test_server_status() ->
{ok, _} = rabbit_amqqueue:delete(Q, false, false),
%% list connections
- [#listener{host = H, port = P} | _] = rabbit_networking:active_listeners(),
+ [#listener{host = H, port = P} | _] =
+ [L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
+ N =:= node()],
+
{ok, C} = gen_tcp:connect(H, P, []),
timer:sleep(100),
ok = info_action(
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index e338ddfe..1679ce7c 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -169,7 +169,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
- fun () -> gen_tcp:send(Sock, Data) end).
+ fun () -> rabbit_net:send(Sock, Data) end).
internal_send_command(Sock, Channel, MethodRecord) ->
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
@@ -206,6 +206,6 @@ internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
ok.
port_cmd(Sock, Data) ->
- try erlang:port_command(Sock, Data)
+ try rabbit_net:port_command(Sock, Data)
catch error:Error -> exit({writer, send_failed, Error})
end.
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index aa8b8ad5..bc742561 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -67,15 +67,20 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
- %% report
- {ok, {Address, Port}} = inet:sockname(LSock),
- {ok, {PeerAddress, PeerPort}} = inet:peername(Sock),
- error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
- [inet_parse:ntoa(Address), Port,
- inet_parse:ntoa(PeerAddress), PeerPort]),
-
- %% handle
- apply(M, F, A ++ [Sock]),
+ try
+ %% report
+ {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
+ {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
+ error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
+ [inet_parse:ntoa(Address), Port,
+ inet_parse:ntoa(PeerAddress), PeerPort]),
+ %% handle
+ apply(M, F, A ++ [Sock])
+ catch {inet_error, Reason} ->
+ gen_tcp:close(Sock),
+ error_logger:error_msg("unable to accept TCP connection: ~p~n",
+ [Reason])
+ end,
%% accept more
case prim_inet:async_accept(LSock, -1) of
@@ -95,3 +100,7 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+%%--------------------------------------------------------------------
+
+inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index 92a47cf1..4a2e149b 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -33,28 +33,28 @@
-behaviour(gen_server).
--export([start_link/7]).
+-export([start_link/8]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {sock, on_startup, on_shutdown}).
+-record(state, {sock, on_startup, on_shutdown, label}).
%%--------------------------------------------------------------------
start_link(IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
- OnStartup, OnShutdown) ->
+ OnStartup, OnShutdown, Label) ->
gen_server:start_link(
?MODULE, {IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
- OnStartup, OnShutdown}, []).
+ OnStartup, OnShutdown, Label}, []).
%%--------------------------------------------------------------------
init({IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
- {M,F,A} = OnStartup, OnShutdown}) ->
+ {M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
{active, false}]) of
@@ -65,15 +65,16 @@ init({IPAddress, Port, SocketOpts,
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
- error_logger:info_msg("started TCP listener on ~s:~p~n",
- [inet_parse:ntoa(LIPAddress), LPort]),
+ error_logger:info_msg("started ~s on ~s:~p~n",
+ [Label, inet_parse:ntoa(LIPAddress), LPort]),
apply(M, F, A ++ [IPAddress, Port]),
- {ok, #state{sock=LSock,
- on_startup = OnStartup, on_shutdown = OnShutdown}};
+ {ok, #state{sock = LSock,
+ on_startup = OnStartup, on_shutdown = OnShutdown,
+ label = Label}};
{error, Reason} ->
error_logger:error_msg(
- "failed to start TCP listener on ~s:~p - ~p~n",
- [inet_parse:ntoa(IPAddress), Port, Reason]),
+ "failed to start ~s on ~s:~p - ~p~n",
+ [Label, inet_parse:ntoa(IPAddress), Port, Reason]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
@@ -86,11 +87,11 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}}) ->
+terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) ->
{ok, {IPAddress, Port}} = inet:sockname(LSock),
gen_tcp:close(LSock),
- error_logger:info_msg("stopped TCP listener on ~s:~p~n",
- [inet_parse:ntoa(IPAddress), Port]),
+ error_logger:info_msg("stopped ~s on ~s:~p~n",
+ [Label, inet_parse:ntoa(IPAddress), Port]),
apply(M, F, A ++ [IPAddress, Port]).
code_change(_OldVsn, State, _Extra) ->
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 901a0da3..d6bbac08 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -33,23 +33,23 @@
-behaviour(supervisor).
--export([start_link/6, start_link/7]).
+-export([start_link/7, start_link/8]).
-export([init/1]).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback) ->
+ AcceptCallback, Label) ->
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, 1).
+ AcceptCallback, 1, Label).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, ConcurrentAcceptorCount) ->
+ AcceptCallback, ConcurrentAcceptorCount, Label) ->
supervisor:start_link(
?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, ConcurrentAcceptorCount}).
+ AcceptCallback, ConcurrentAcceptorCount, Label}).
init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, ConcurrentAcceptorCount}) ->
+ AcceptCallback, ConcurrentAcceptorCount, Label}) ->
%% This is gross. The tcp_listener needs to know about the
%% tcp_acceptor_sup, and the only way I can think of accomplishing
%% that without jumping through hoops is to register the
@@ -62,5 +62,5 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
{tcp_listener, {tcp_listener, start_link,
[IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, Name,
- OnStartup, OnShutdown]},
+ OnStartup, OnShutdown, Label]},
transient, 100, worker, [tcp_listener]}]}}.
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
new file mode 100644
index 00000000..8be28f52
--- /dev/null
+++ b/src/vm_memory_monitor.erl
@@ -0,0 +1,325 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+%% In practice Erlang shouldn't be allowed to grow to more than a half
+%% of available memory. The pessimistic scenario is when the Erlang VM
+%% has a single process that's consuming all memory. In such a case,
+%% during garbage collection, Erlang tries to allocate a huge chunk of
+%% continuous memory, which can result in a crash or heavy swapping.
+%%
+%% This module tries to warn Rabbit before such situations occur, so
+%% that it has a higher chance to avoid running out of memory.
+%%
+%% This code depends on Erlang os_mon application.
+
+-module(vm_memory_monitor).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([update/0, get_total_memory/0,
+ get_check_interval/0, set_check_interval/1,
+ get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1]).
+
+
+-define(SERVER, ?MODULE).
+-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
+
+%% For an unknown OS, we assume that we have 1GB of memory. It'll be
+%% wrong. Scale by vm_memory_high_watermark in configuration to get a
+%% sensible value.
+-define(MEMORY_SIZE_FOR_UNKNOWN_OS, 1073741824).
+
+-record(state, {total_memory,
+ memory_limit,
+ timeout,
+ timer,
+ alarmed
+ }).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (float()) ->
+ ('ignore' | {'error', any()} | {'ok', pid()})).
+-spec(update/0 :: () -> 'ok').
+-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
+-spec(get_check_interval/0 :: () -> non_neg_integer()).
+-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').
+-spec(get_vm_memory_high_watermark/0 :: () -> float()).
+-spec(set_vm_memory_high_watermark/1 :: (float()) -> 'ok').
+
+-endif.
+
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+update() ->
+ gen_server:cast(?SERVER, update).
+
+get_total_memory() ->
+ get_total_memory(os:type()).
+
+get_check_interval() ->
+ gen_server:call(?MODULE, get_check_interval).
+
+set_check_interval(Fraction) ->
+ gen_server:call(?MODULE, {set_check_interval, Fraction}).
+
+get_vm_memory_high_watermark() ->
+ gen_server:call(?MODULE, get_vm_memory_high_watermark).
+
+set_vm_memory_high_watermark(Fraction) ->
+ gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction}).
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+start_link(Args) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+
+init([MemFraction]) ->
+ TotalMemory =
+ case get_total_memory() of
+ unknown ->
+ error_logger:warning_msg(
+ "Unknown total memory size for your OS ~p. "
+ "Assuming memory size is ~pMB.~n",
+ [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]),
+ ?MEMORY_SIZE_FOR_UNKNOWN_OS;
+ M -> M
+ end,
+ MemLimit = get_mem_limit(MemFraction, TotalMemory),
+ error_logger:info_msg("Memory limit set to ~pMB.~n",
+ [trunc(MemLimit/1048576)]),
+ TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
+ State = #state { total_memory = TotalMemory,
+ memory_limit = MemLimit,
+ timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
+ timer = TRef,
+ alarmed = false},
+ {ok, internal_update(State)}.
+
+handle_call(get_vm_memory_high_watermark, _From, State) ->
+ {reply, State#state.memory_limit / State#state.total_memory, State};
+
+handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) ->
+ MemLimit = get_mem_limit(MemFraction, State#state.total_memory),
+ error_logger:info_msg("Memory alarm changed to ~p, ~p bytes.~n",
+ [MemFraction, MemLimit]),
+ {reply, ok, State#state{memory_limit = MemLimit}};
+
+handle_call(get_check_interval, _From, State) ->
+ {reply, State#state.timeout, State};
+
+handle_call({set_check_interval, Timeout}, _From, State) ->
+ {ok, cancel} = timer:cancel(State#state.timer),
+ {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+%% Server Internals
+%%----------------------------------------------------------------------------
+
+internal_update(State = #state { memory_limit = MemLimit,
+ alarmed = Alarmed}) ->
+ MemUsed = erlang:memory(total),
+ NewAlarmed = MemUsed > MemLimit,
+ case {Alarmed, NewAlarmed} of
+ {false, true} ->
+ emit_update_info(set, MemUsed, MemLimit),
+ alarm_handler:set_alarm({vm_memory_high_watermark, []});
+ {true, false} ->
+ emit_update_info(clear, MemUsed, MemLimit),
+ alarm_handler:clear_alarm(vm_memory_high_watermark);
+ _ ->
+ ok
+ end,
+ State #state {alarmed = NewAlarmed}.
+
+emit_update_info(State, MemUsed, MemLimit) ->
+ error_logger:info_msg(
+ "vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n",
+ [State, MemUsed, MemLimit]).
+
+start_timer(Timeout) ->
+ {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
+ TRef.
+
+%% On a 32-bit machine, if you're using more than 2 gigs of RAM you're
+%% in big trouble anyway.
+get_vm_limit() ->
+ case erlang:system_info(wordsize) of
+ 4 -> 4294967296; %% 4 GB for 32 bits 2^32
+ 8 -> 281474976710656 %% 256 TB for 64 bits 2^48
+ %%http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
+ end.
+
+get_mem_limit(MemFraction, TotalMemory) ->
+ lists:min([trunc(TotalMemory * MemFraction), get_vm_limit()]).
+
+%%----------------------------------------------------------------------------
+%% Internal Helpers
+%%----------------------------------------------------------------------------
+cmd(Command) ->
+ Exec = hd(string:tokens(Command, " ")),
+ case os:find_executable(Exec) of
+ false -> throw({command_not_found, Exec});
+ _ -> os:cmd(Command)
+ end.
+
+%% get_total_memory(OS) -> Total
+%% Windows and Freebsd code based on: memsup:get_memory_usage/1
+%% Original code was part of OTP and released under "Erlang Public License".
+
+get_total_memory({unix,darwin}) ->
+ File = cmd("/usr/bin/vm_stat"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line_mach/1, Lines)),
+ [PageSize, Inactive, Active, Free, Wired] =
+ [dict:fetch(Key, Dict) ||
+ Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free',
+ 'Pages wired down']],
+ PageSize * (Inactive + Active + Free + Wired);
+
+get_total_memory({unix,freebsd}) ->
+ PageSize = freebsd_sysctl("vm.stats.vm.v_page_size"),
+ PageCount = freebsd_sysctl("vm.stats.vm.v_page_count"),
+ PageCount * PageSize;
+
+get_total_memory({win32,_OSname}) ->
+ [Result|_] = os_mon_sysinfo:get_mem_info(),
+ {ok, [_MemLoad, TotPhys, _AvailPhys,
+ _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} =
+ io_lib:fread("~d~d~d~d~d~d~d", Result),
+ TotPhys;
+
+get_total_memory({unix, linux}) ->
+ File = read_proc_file("/proc/meminfo"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line_linux/1, Lines)),
+ dict:fetch('MemTotal', Dict);
+
+get_total_memory({unix, sunos}) ->
+ File = cmd("/usr/sbin/prtconf"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line_sunos/1, Lines)),
+ dict:fetch('Memory size', Dict);
+
+get_total_memory(_OsType) ->
+ unknown.
+
+%% A line looks like "Foo bar: 123456."
+parse_line_mach(Line) ->
+ [Name, RHS | _Rest] = string:tokens(Line, ":"),
+ case Name of
+ "Mach Virtual Memory Statistics" ->
+ ["(page", "size", "of", PageSize, "bytes)"] =
+ string:tokens(RHS, " "),
+ {page_size, list_to_integer(PageSize)};
+ _ ->
+ [Value | _Rest1] = string:tokens(RHS, " ."),
+ {list_to_atom(Name), list_to_integer(Value)}
+ end.
+
+%% A line looks like "FooBar: 123456 kB"
+parse_line_linux(Line) ->
+ [Name, RHS | _Rest] = string:tokens(Line, ":"),
+ [Value | UnitsRest] = string:tokens(RHS, " "),
+ Value1 = case UnitsRest of
+ [] -> list_to_integer(Value); %% no units
+ ["kB"] -> list_to_integer(Value) * 1024
+ end,
+ {list_to_atom(Name), Value1}.
+
+%% A line looks like "Memory size: 1024 Megabytes"
+parse_line_sunos(Line) ->
+ case string:tokens(Line, ":") of
+ [Name, RHS | _Rest] ->
+ [Value1 | UnitsRest] = string:tokens(RHS, " "),
+ Value2 = case UnitsRest of
+ ["Gigabytes"] ->
+ list_to_integer(Value1) * 1024 * 1024 * 1024;
+ ["Megabytes"] ->
+ list_to_integer(Value1) * 1024 * 1024;
+ ["Kilobytes"] ->
+ list_to_integer(Value1) * 1024;
+ _ ->
+ Value1 ++ UnitsRest %% no known units
+ end,
+ {list_to_atom(Name), Value2};
+ [Name] -> {list_to_atom(Name), none}
+ end.
+
+freebsd_sysctl(Def) ->
+ list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n").
+
+%% file:read_file does not work on files in /proc as it seems to get
+%% the size of the file first and then read that many bytes. But files
+%% in /proc always have length 0, we just have to read until we get
+%% eof.
+read_proc_file(File) ->
+ {ok, IoDevice} = file:open(File, [read, raw]),
+ Res = read_proc_file(IoDevice, []),
+ file:close(IoDevice),
+ lists:flatten(lists:reverse(Res)).
+
+-define(BUFFER_SIZE, 1024).
+read_proc_file(IoDevice, Acc) ->
+ case file:read(IoDevice, ?BUFFER_SIZE) of
+ {ok, Res} -> read_proc_file(IoDevice, [Res | Acc]);
+ eof -> Acc
+ end.