summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2009-01-15 12:34:31 +0000
committerBen Hood <0x6e6562@gmail.com>2009-01-15 12:34:31 +0000
commit1470f8f387d16f88885a55d518249f302527ff5e (patch)
tree326c9b702ec23c8a4142b7bbf8d4407f32bca6f4
parent6d7792714815e494cb63aec8ee86894e12f5e4d7 (diff)
parentb784026d3621844cf62f3cfdb69dc528184a72c4 (diff)
downloadrabbitmq-server-1470f8f387d16f88885a55d518249f302527ff5e.tar.gz
Merged default into 18557
-rw-r--r--Makefile21
-rw-r--r--ebin/rabbit.app58
-rw-r--r--ebin/rabbit_app.in20
-rw-r--r--generate_app10
-rw-r--r--packaging/RPMS/Fedora/Makefile23
-rw-r--r--packaging/RPMS/Fedora/init.d4
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec44
-rw-r--r--packaging/debs/Debian/debian/init.d1
-rw-r--r--packaging/debs/Debian/debian/postinst4
-rw-r--r--src/gen_server2.erl854
-rw-r--r--src/rabbit.erl32
-rw-r--r--src/rabbit_alarm.erl22
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_channel.erl18
-rw-r--r--src/rabbit_control.erl22
-rw-r--r--src/rabbit_error_logger_file_h.erl2
-rw-r--r--src/rabbit_exchange.erl6
-rw-r--r--src/rabbit_misc.erl32
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_multi.erl2
-rw-r--r--src/rabbit_sasl_report_file_h.erl2
21 files changed, 1021 insertions, 163 deletions
diff --git a/Makefile b/Makefile
index b441fcab..962b6f4b 100644
--- a/Makefile
+++ b/Makefile
@@ -7,7 +7,8 @@ SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl)
-TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
+BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
+TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod))
@@ -39,15 +40,21 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
#all: $(EBIN_DIR)/rabbit.boot
all: $(TARGETS)
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl
+$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
+ escript generate_app $(EBIN_DIR) < $< > $@
+
+$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
erlc $(ERLC_OPTS) $<
-# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) $<
+
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam
+ erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
+# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) > $@
+ $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@
$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) > $@
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@
$(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS)
erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().'
@@ -57,7 +64,7 @@ dialyze: $(TARGETS)
clean: cleandb
rm -f $(EBIN_DIR)/*.beam
- rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script
+ rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script
rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
rm -f docs/*.[0-9].gz
@@ -123,7 +130,7 @@ srcdist: distclean
cp BUILD.in $(TARGET_SRC_DIR)/BUILD
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
>> $(TARGET_SRC_DIR)/BUILD
- sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app
+ sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
cp codegen.py Makefile $(TARGET_SRC_DIR)
diff --git a/ebin/rabbit.app b/ebin/rabbit.app
deleted file mode 100644
index e377a33a..00000000
--- a/ebin/rabbit.app
+++ /dev/null
@@ -1,58 +0,0 @@
-{application, rabbit, %% -*- erlang -*-
- [{description, "RabbitMQ"},
- {id, "RabbitMQ"},
- {vsn, "%%VERSION%%"},
- {modules, [buffering_proxy,
- rabbit_access_control,
- rabbit_alarm,
- rabbit_amqqueue,
- rabbit_amqqueue_process,
- rabbit_amqqueue_sup,
- rabbit_binary_generator,
- rabbit_binary_parser,
- rabbit_channel,
- rabbit_control,
- rabbit,
- rabbit_error_logger,
- rabbit_error_logger_file_h,
- rabbit_exchange,
- rabbit_framing_channel,
- rabbit_framing,
- rabbit_heartbeat,
- rabbit_limiter,
- rabbit_load,
- rabbit_log,
- rabbit_memsup_linux,
- rabbit_misc,
- rabbit_mnesia,
- rabbit_multi,
- rabbit_networking,
- rabbit_node_monitor,
- rabbit_persister,
- rabbit_reader,
- rabbit_router,
- rabbit_sasl_report_file_h,
- rabbit_sup,
- rabbit_tests,
- rabbit_tracer,
- rabbit_writer,
- tcp_acceptor,
- tcp_acceptor_sup,
- tcp_client_sup,
- tcp_listener,
- tcp_listener_sup]},
- {registered, [rabbit_amqqueue_sup,
- rabbit_log,
- rabbit_node_monitor,
- rabbit_persister,
- rabbit_router,
- rabbit_sup,
- rabbit_tcp_client_sup]},
- {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
- {mod, {rabbit, []}},
- {env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
- {extra_startup_steps, []},
- {default_user, <<"guest">>},
- {default_pass, <<"guest">>},
- {default_vhost, <<"/">>},
- {memory_alarms, false}]}]}.
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
new file mode 100644
index 00000000..e2f36c0f
--- /dev/null
+++ b/ebin/rabbit_app.in
@@ -0,0 +1,20 @@
+{application, rabbit, %% -*- erlang -*-
+ [{description, "RabbitMQ"},
+ {id, "RabbitMQ"},
+ {vsn, "%%VERSION%%"},
+ {modules, []},
+ {registered, [rabbit_amqqueue_sup,
+ rabbit_log,
+ rabbit_node_monitor,
+ rabbit_persister,
+ rabbit_router,
+ rabbit_sup,
+ rabbit_tcp_client_sup]},
+ {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
+ {mod, {rabbit, []}},
+ {env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
+ {extra_startup_steps, []},
+ {default_user, <<"guest">>},
+ {default_pass, <<"guest">>},
+ {default_vhost, <<"/">>},
+ {memory_alarms, auto}]}]}.
diff --git a/generate_app b/generate_app
new file mode 100644
index 00000000..62301292
--- /dev/null
+++ b/generate_app
@@ -0,0 +1,10 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+main([BeamDir]) ->
+ Modules = [list_to_atom(filename:basename(F, ".beam")) ||
+ F <- filelib:wildcard("*.beam", BeamDir)],
+ {ok, {application, Application, Properties}} = io:read(''),
+ NewProperties = lists:keyreplace(modules, 1, Properties,
+ {modules, Modules}),
+ io:format("~p.", [{application, Application, NewProperties}]).
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index c05f14a7..cf3a93df 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -6,21 +6,38 @@ TOP_DIR=$(shell pwd)
#only checks build-dependencies using rpms, not debs
DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1'
+ifndef RPM_OS
+RPM_OS=fedora
+endif
+
+ifeq "x$(RPM_OS)" "xsuse"
+REQUIRES=/sbin/chkconfig /sbin/service
+OS_DEFINES=--define '_initrddir /etc/init.d'
+RELEASE_OS=.suse
+else
+REQUIRES=chkconfig initscripts
+OS_DEFINES=--define '_initrddir /etc/rc.d/init.d'
+RELEASE_OS=
+endif
+
rpms: clean server
prepare:
mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
cp $(TOP_DIR)/$(TARBALL) SOURCES
cp rabbitmq-server.spec SPECS
- sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec
+ sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \
+ SPECS/rabbitmq-server.spec
cp init.d SOURCES/rabbitmq-server.init
cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare
- rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target i386
- rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --define '_arch x86_64' \
+ rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \
+ --target i386
+ rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \
+ --define '_libdir /usr/lib64' --define '_arch x86_64' \
--define '_defaultdocdir /usr/share/doc' --target x86_64
clean:
diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d
index 27f150f9..a006a5a7 100644
--- a/packaging/RPMS/Fedora/init.d
+++ b/packaging/RPMS/Fedora/init.d
@@ -16,7 +16,6 @@
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
-PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
DAEMON_NAME=rabbitmq-multi
DAEMON=/usr/lib/rabbitmq/bin/$DAEMON_NAME
NAME=rabbitmq-server
@@ -29,9 +28,6 @@ LOCK_FILE=/var/lock/subsys/$NAME
test -x $DAEMON || exit 0
-# source function library
-. /etc/rc.d/init.d/functions
-
# Include rabbitmq defaults if available
if [ -f /etc/default/rabbitmq ] ; then
. /etc/default/rabbitmq
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 13cfb037..241afd71 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -1,6 +1,6 @@
Name: rabbitmq-server
Version: %%VERSION%%
-Release: 1
+Release: 1%%RELEASE_OS%%
License: MPLv1.1
Group: Development/Libraries
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz
@@ -17,24 +17,18 @@ Requires: erlang, logrotate
Packager: Hubert Plociniczak <hubert@lshift.net>
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root
Summary: The RabbitMQ server
-Requires(post): chkconfig
-Requires(pre): chkconfig initscripts
+Requires(post): %%REQUIRES%%
+Requires(pre): %%REQUIRES%%
%description
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
-%ifarch x86_64
- %define _defaultlibdir /usr/lib64
-%else
- %define _defaultlibdir /usr/lib
-%endif
-
-%define _erllibdir %{_defaultlibdir}/erlang/lib
-%define _rabbitbindir %{_defaultlibdir}/rabbitmq/bin
+%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version}
+%define _rabbit_libdir %{_libdir}/rabbitmq
-%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version}
+%define _maindir %{buildroot}%{_rabbit_erllibdir}
%pre
if [ $1 -gt 1 ]; then
@@ -53,25 +47,21 @@ make
rm -rf %{buildroot}
make install TARGET_DIR=%{_maindir} \
- SBIN_DIR=%{buildroot}%{_rabbitbindir} \
+ SBIN_DIR=%{buildroot}%{_rabbit_libdir}/bin \
MAN_DIR=%{buildroot}%{_mandir}
mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia
mkdir -p %{buildroot}/var/log/rabbitmq
-mkdir -p %{buildroot}/etc/rc.d/init.d/
+mkdir -p %{buildroot}%{_initrddir}
#Copy all necessary lib files etc.
-install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server
-chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server
-%ifarch x86_64
- sed -i 's/\/usr\/lib\//\/usr\/lib64\//' %{buildroot}/etc/rc.d/init.d/rabbitmq-server
-%endif
+install -m 0755 %SOURCE1 %{buildroot}%{_initrddir}/rabbitmq-server
+chmod 0755 %{buildroot}%{_initrddir}/rabbitmq-server
+sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_initrddir}/rabbitmq-server
mkdir -p %{buildroot}%{_sbindir}
install -m 0755 %SOURCE2 %{buildroot}%{_sbindir}/rabbitmqctl
-%ifarch x86_64
- sed -i 's/\/usr\/lib\//\/usr\/lib64\//' %{buildroot}%{_sbindir}/rabbitmqctl
-%endif
+sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_sbindir}/rabbitmqctl
mkdir -p %{buildroot}/etc/logrotate.d
install -m 0644 %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server
@@ -81,8 +71,10 @@ rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL
#Build the list of files
rm -f %{_builddir}/filelist.%{name}.rpm
echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm
-(cd %{buildroot}; find . ! -regex '\./etc.*' \
- -type f | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm)
+(cd %{buildroot}; \
+ find . -type f ! -regex '\./etc.*' \
+ ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \
+ | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm)
%post
# create rabbitmq group
@@ -116,7 +108,9 @@ fi
%defattr(-,root,root,-)
%dir /var/lib/rabbitmq
%dir /var/log/rabbitmq
-/etc/rc.d/init.d/rabbitmq-server
+%{_rabbit_erllibdir}
+%{_rabbit_libdir}
+%{_initrddir}/rabbitmq-server
%config(noreplace) /etc/logrotate.d/rabbitmq-server
%doc LICENSE LICENSE-MPL-RabbitMQ INSTALL
diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d
index ace474c5..70dd0adf 100644
--- a/packaging/debs/Debian/debian/init.d
+++ b/packaging/debs/Debian/debian/init.d
@@ -9,7 +9,6 @@
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
-PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
DAEMON=/usr/lib/rabbitmq/bin/rabbitmq-multi
NAME=rabbitmq-server
DESC=rabbitmq-server
diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst
index 495b8331..05fb179c 100644
--- a/packaging/debs/Debian/debian/postinst
+++ b/packaging/debs/Debian/debian/postinst
@@ -25,8 +25,8 @@ fi
# create rabbitmq user
if ! getent passwd rabbitmq >/dev/null; then
- adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq --no-create-home rabbitmq
- usermod -c "RabbitMQ messaging server" rabbitmq
+ adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \
+ --no-create-home --gecos "RabbitMQ messaging server" rabbitmq
fi
chown -R rabbitmq:rabbitmq /var/lib/rabbitmq
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
new file mode 100644
index 00000000..11bb66d7
--- /dev/null
+++ b/src/gen_server2.erl
@@ -0,0 +1,854 @@
+%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) the module name is gen_server2
+%%
+%% 2) more efficient handling of selective receives in callbacks
+%% gen_server2 processes drain their message queue into an internal
+%% buffer before invoking any callback module functions. Messages are
+%% dequeued from the buffer for processing. Thus the effective message
+%% queue of a gen_server2 process is the concatenation of the internal
+%% buffer and the real message queue.
+%% As a result of the draining, any selective receive invoked inside a
+%% callback is less likely to have to scan a large message queue.
+%%
+%% 3) gen_server2:cast is guaranteed to be order-preserving
+%% The original code could reorder messages when communicating with a
+%% process on a remote node that was not currently connected.
+%%
+%% All modifications are (C) 2009 LShift Ltd.
+
+%% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+%%
+%% $Id$
+%%
+-module(gen_server2).
+
+%%% ---------------------------------------------------
+%%%
+%%% The idea behind THIS server is that the user module
+%%% provides (different) functions to handle different
+%%% kind of inputs.
+%%% If the Parent process terminates the Module:terminate/2
+%%% function is called.
+%%%
+%%% The user module should export:
+%%%
+%%% init(Args)
+%%% ==> {ok, State}
+%%% {ok, State, Timeout}
+%%% ignore
+%%% {stop, Reason}
+%%%
+%%% handle_call(Msg, {From, Tag}, State)
+%%%
+%%% ==> {reply, Reply, State}
+%%% {reply, Reply, State, Timeout}
+%%% {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, Reply, State}
+%%% Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%% handle_cast(Msg, State)
+%%%
+%%% ==> {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
+%%%
+%%% ==> {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%% terminate(Reason, State) Let the user module clean up
+%%% always called when server terminates
+%%%
+%%% ==> ok
+%%%
+%%%
+%%% The work flow (of the server) can be described as follows:
+%%%
+%%% User module Generic
+%%% ----------- -------
+%%% start -----> start
+%%% init <----- .
+%%%
+%%% loop
+%%% handle_call <----- .
+%%% -----> reply
+%%%
+%%% handle_cast <----- .
+%%%
+%%% handle_info <----- .
+%%%
+%%% terminate <----- .
+%%%
+%%% -----> reply
+%%%
+%%%
+%%% ---------------------------------------------------
+
+%% API
+-export([start/3, start/4,
+ start_link/3, start_link/4,
+ call/2, call/3,
+ cast/2, reply/2,
+ abcast/2, abcast/3,
+ multi_call/2, multi_call/3, multi_call/4,
+ enter_loop/3, enter_loop/4, enter_loop/5]).
+
+-export([behaviour_info/1]).
+
+%% System exports
+-export([system_continue/3,
+ system_terminate/4,
+ system_code_change/4,
+ format_status/2]).
+
+%% Internal exports
+-export([init_it/6, print_event/3]).
+
+-import(error_logger, [format/2]).
+
+%%%=========================================================================
+%%% API
+%%%=========================================================================
+
+behaviour_info(callbacks) ->
+ [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
+ {terminate,2},{code_change,3}];
+behaviour_info(_Other) ->
+ undefined.
+
+%%% -----------------------------------------------------------------
+%%% Starts a generic server.
+%%% start(Mod, Args, Options)
+%%% start(Name, Mod, Args, Options)
+%%% start_link(Mod, Args, Options)
+%%% start_link(Name, Mod, Args, Options) where:
+%%% Name ::= {local, atom()} | {global, atom()}
+%%% Mod ::= atom(), callback module implementing the 'real' server
+%%% Args ::= term(), init arguments (to Mod:init/1)
+%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}]
+%%% Flag ::= trace | log | {logfile, File} | statistics | debug
+%%% (debug == log && statistics)
+%%% Returns: {ok, Pid} |
+%%% {error, {already_started, Pid}} |
+%%% {error, Reason}
+%%% -----------------------------------------------------------------
+start(Mod, Args, Options) ->
+ gen:start(?MODULE, nolink, Mod, Args, Options).
+
+start(Name, Mod, Args, Options) ->
+ gen:start(?MODULE, nolink, Name, Mod, Args, Options).
+
+start_link(Mod, Args, Options) ->
+ gen:start(?MODULE, link, Mod, Args, Options).
+
+start_link(Name, Mod, Args, Options) ->
+ gen:start(?MODULE, link, Name, Mod, Args, Options).
+
+
+%% -----------------------------------------------------------------
+%% Make a call to a generic server.
+%% If the server is located at another node, that node will
+%% be monitored.
+%% If the client is trapping exits and is linked server termination
+%% is handled here (? Shall we do that here (or rely on timeouts) ?).
+%% -----------------------------------------------------------------
+call(Name, Request) ->
+ case catch gen:call(Name, '$gen_call', Request) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request]}})
+ end.
+
+call(Name, Request, Timeout) ->
+ case catch gen:call(Name, '$gen_call', Request, Timeout) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
+ end.
+
+%% -----------------------------------------------------------------
+%% Make a cast to a generic server.
+%% -----------------------------------------------------------------
+cast({global,Name}, Request) ->
+ catch global:send(Name, cast_msg(Request)),
+ ok;
+cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
+ do_cast(Dest, Request);
+cast(Dest, Request) when is_atom(Dest) ->
+ do_cast(Dest, Request);
+cast(Dest, Request) when is_pid(Dest) ->
+ do_cast(Dest, Request).
+
+do_cast(Dest, Request) ->
+ do_send(Dest, cast_msg(Request)),
+ ok.
+
+cast_msg(Request) -> {'$gen_cast',Request}.
+
+%% -----------------------------------------------------------------
+%% Send a reply to the client.
+%% -----------------------------------------------------------------
+reply({To, Tag}, Reply) ->
+ catch To ! {Tag, Reply}.
+
+%% -----------------------------------------------------------------
+%% Asyncronous broadcast, returns nothing, it's just send'n prey
+%%-----------------------------------------------------------------
+abcast(Name, Request) when is_atom(Name) ->
+ do_abcast([node() | nodes()], Name, cast_msg(Request)).
+
+abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
+ do_abcast(Nodes, Name, cast_msg(Request)).
+
+do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
+ do_send({Name,Node},Msg),
+ do_abcast(Nodes, Name, Msg);
+do_abcast([], _,_) -> abcast.
+
+%%% -----------------------------------------------------------------
+%%% Make a call to servers at several nodes.
+%%% Returns: {[Replies],[BadNodes]}
+%%% A Timeout can be given
+%%%
+%%% A middleman process is used in case late answers arrives after
+%%% the timeout. If they would be allowed to glog the callers message
+%%% queue, it would probably become confused. Late answers will
+%%% now arrive to the terminated middleman and so be discarded.
+%%% -----------------------------------------------------------------
+multi_call(Name, Req)
+ when is_atom(Name) ->
+ do_multi_call([node() | nodes()], Name, Req, infinity).
+
+multi_call(Nodes, Name, Req)
+ when is_list(Nodes), is_atom(Name) ->
+ do_multi_call(Nodes, Name, Req, infinity).
+
+multi_call(Nodes, Name, Req, infinity) ->
+ do_multi_call(Nodes, Name, Req, infinity);
+multi_call(Nodes, Name, Req, Timeout)
+ when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
+ do_multi_call(Nodes, Name, Req, Timeout).
+
+
+%%-----------------------------------------------------------------
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_
+%%
+%% Description: Makes an existing process into a gen_server.
+%% The calling process will enter the gen_server receive
+%% loop and become a gen_server process.
+%% The process *must* have been started using one of the
+%% start functions in proc_lib, see proc_lib(3).
+%% The user is responsible for any initialization of the
+%% process, including registering a name for it.
+%%-----------------------------------------------------------------
+enter_loop(Mod, Options, State) ->
+ enter_loop(Mod, Options, State, self(), infinity).
+
+enter_loop(Mod, Options, State, ServerName = {_, _}) ->
+ enter_loop(Mod, Options, State, ServerName, infinity);
+
+enter_loop(Mod, Options, State, Timeout) ->
+ enter_loop(Mod, Options, State, self(), Timeout).
+
+enter_loop(Mod, Options, State, ServerName, Timeout) ->
+ Name = get_proc_name(ServerName),
+ Parent = get_parent(),
+ Debug = debug_options(Name, Options),
+ Queue = queue:new(),
+ loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
+
+%%%========================================================================
+%%% Gen-callback functions
+%%%========================================================================
+
+%%% ---------------------------------------------------
+%%% Initiate the new process.
+%%% Register the name using the Rfunc function
+%%% Calls the Mod:init/Args function.
+%%% Finally an acknowledge is sent to Parent and the main
+%%% loop is entered.
+%%% ---------------------------------------------------
+init_it(Starter, self, Name, Mod, Args, Options) ->
+ init_it(Starter, self(), Name, Mod, Args, Options);
+init_it(Starter, Parent, Name, Mod, Args, Options) ->
+ Debug = debug_options(Name, Options),
+ Queue = queue:new(),
+ case catch Mod:init(Args) of
+ {ok, State} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, infinity, Queue, Debug);
+ {ok, State, Timeout} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, Queue, Debug);
+ {stop, Reason} ->
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ ignore ->
+ proc_lib:init_ack(Starter, ignore),
+ exit(normal);
+ {'EXIT', Reason} ->
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ Else ->
+ Error = {bad_return_value, Else},
+ proc_lib:init_ack(Starter, {error, Error}),
+ exit(Error)
+ end.
+
+%%%========================================================================
+%%% Internal functions
+%%%========================================================================
+%%% ---------------------------------------------------
+%%% The MAIN loop.
+%%% ---------------------------------------------------
+loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
+ receive
+ Input -> loop(Parent, Name, State, Mod,
+ Time, queue:in(Input, Queue), Debug)
+ after 0 ->
+ case queue:out(Queue) of
+ {{value, Msg}, Queue1} ->
+ process_msg(Parent, Name, State, Mod,
+ Time, Queue1, Debug, Msg);
+ {empty, Queue1} ->
+ receive
+ Input ->
+ loop(Parent, Name, State, Mod,
+ Time, queue:in(Input, Queue1), Debug)
+ after Time ->
+ process_msg(Parent, Name, State, Mod,
+ Time, Queue1, Debug, timeout)
+ end
+ end
+ end.
+
+process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) ->
+ case Msg of
+ {system, From, Req} ->
+ sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
+ [Name, State, Mod, Time, Queue]);
+ {'EXIT', Parent, Reason} ->
+ terminate(Reason, Name, Msg, Mod, State, Debug);
+ _Msg when Debug =:= [] ->
+ handle_msg(Msg, Parent, Name, State, Mod, Time, Queue);
+ _Msg ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
+ Name, {in, Msg}),
+ handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1)
+ end.
+
+%%% ---------------------------------------------------
+%%% Send/recive functions
+%%% ---------------------------------------------------
+do_send(Dest, Msg) ->
+ catch erlang:send(Dest, Msg).
+
+do_multi_call(Nodes, Name, Req, infinity) ->
+ Tag = make_ref(),
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ rec_nodes(Tag, Monitors, Name, undefined);
+do_multi_call(Nodes, Name, Req, Timeout) ->
+ Tag = make_ref(),
+ Caller = self(),
+ Receiver =
+ spawn(
+ fun() ->
+ %% Middleman process. Should be unsensitive to regular
+ %% exit signals. The sychronization is needed in case
+ %% the receiver would exit before the caller started
+ %% the monitor.
+ process_flag(trap_exit, true),
+ Mref = erlang:monitor(process, Caller),
+ receive
+ {Caller,Tag} ->
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ TimerId = erlang:start_timer(Timeout, self(), ok),
+ Result = rec_nodes(Tag, Monitors, Name, TimerId),
+ exit({self(),Tag,Result});
+ {'DOWN',Mref,_,_,_} ->
+ %% Caller died before sending us the go-ahead.
+ %% Give up silently.
+ exit(normal)
+ end
+ end),
+ Mref = erlang:monitor(process, Receiver),
+ Receiver ! {self(),Tag},
+ receive
+ {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
+ Result;
+ {'DOWN',Mref,_,_,Reason} ->
+ %% The middleman code failed. Or someone did
+ %% exit(_, kill) on the middleman process => Reason==killed
+ exit(Reason)
+ end.
+
+send_nodes(Nodes, Name, Tag, Req) ->
+ send_nodes(Nodes, Name, Tag, Req, []).
+
+send_nodes([Node|Tail], Name, Tag, Req, Monitors)
+ when is_atom(Node) ->
+ Monitor = start_monitor(Node, Name),
+ %% Handle non-existing names in rec_nodes.
+ catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
+ send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
+send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
+ %% Skip non-atom Node
+ send_nodes(Tail, Name, Tag, Req, Monitors);
+send_nodes([], _Name, _Tag, _Req, Monitors) ->
+ Monitors.
+
+%% Against old nodes:
+%% If no reply has been delivered within 2 secs. (per node) check that
+%% the server really exists and wait for ever for the answer.
+%%
+%% Against contemporary nodes:
+%% Wait for reply, server 'DOWN', or timeout from TimerId.
+
+rec_nodes(Tag, Nodes, Name, TimerId) ->
+ rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
+
+rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
+ receive
+ {'DOWN', R, _, _, _} ->
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], Time, TimerId);
+ {timeout, TimerId, _} ->
+ unmonitor(R),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
+ %% R6 node
+ receive
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], 2000, TimerId);
+ {timeout, TimerId, _} ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
+ after Time ->
+ case rpc:call(N, erlang, whereis, [Name]) of
+ Pid when is_pid(Pid) -> % It exists try again.
+ rec_nodes(Tag, [N|Tail], Name, Badnodes,
+ Replies, infinity, TimerId);
+ _ -> % badnode
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes],
+ Replies, 2000, TimerId)
+ end
+ end;
+rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
+ case catch erlang:cancel_timer(TimerId) of
+ false -> % It has already sent it's message
+ receive
+ {timeout, TimerId, _} -> ok
+ after 0 ->
+ ok
+ end;
+ _ -> % Timer was cancelled, or TimerId was 'undefined'
+ ok
+ end,
+ {Replies, Badnodes}.
+
+%% Collect all replies that already have arrived
+rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
+ receive
+ {'DOWN', R, _, _, _} ->
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ after 0 ->
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
+ %% R6 node
+ receive
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ after 0 ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
+ {Replies, Badnodes}.
+
+
+%%% ---------------------------------------------------
+%%% Monitor functions
+%%% ---------------------------------------------------
+
+start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
+ if node() =:= nonode@nohost, Node =/= nonode@nohost ->
+ Ref = make_ref(),
+ self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
+ {Node, Ref};
+ true ->
+ case catch erlang:monitor(process, {Name, Node}) of
+ {'EXIT', _} ->
+ %% Remote node is R6
+ monitor_node(Node, true),
+ Node;
+ Ref when is_reference(Ref) ->
+ {Node, Ref}
+ end
+ end.
+
+%% Cancels a monitor started with Ref=erlang:monitor(_, _).
+unmonitor(Ref) when is_reference(Ref) ->
+ erlang:demonitor(Ref),
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ true
+ after 0 ->
+ true
+ end.
+
+%%% ---------------------------------------------------
+%%% Message handling functions
+%%% ---------------------------------------------------
+
+dispatch({'$gen_cast', Msg}, Mod, State) ->
+ Mod:handle_cast(Msg, State);
+dispatch(Info, Mod, State) ->
+ Mod:handle_info(Info, State).
+
+handle_msg({'$gen_call', From, Msg},
+ Parent, Name, State, Mod, _Time, Queue) ->
+ case catch Mod:handle_call(Msg, From, State) of
+ {reply, Reply, NState} ->
+ reply(From, Reply),
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ {reply, Reply, NState, Time1} ->
+ reply(From, Reply),
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ {noreply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Name, Msg, Mod, NState, [])),
+ reply(From, Reply),
+ exit(R);
+ Other -> handle_common_reply(Other,
+ Parent, Name, Msg, Mod, State, Queue)
+ end;
+handle_msg(Msg,
+ Parent, Name, State, Mod, _Time, Queue) ->
+ Reply = (catch dispatch(Msg, Mod, State)),
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue).
+
+handle_msg({'$gen_call', From, Msg},
+ Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ case catch Mod:handle_call(Msg, From, State) of
+ {reply, Reply, NState} ->
+ Debug1 = reply(Name, From, Reply, NState, Debug),
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ {reply, Reply, NState, Time1} ->
+ Debug1 = reply(Name, From, Reply, NState, Debug),
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ {noreply, NState} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ {noreply, NState, Time1} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
+ reply(Name, From, Reply, NState, Debug),
+ exit(R);
+ Other ->
+ handle_common_reply(Other,
+ Parent, Name, Msg, Mod, State, Queue, Debug)
+ end;
+handle_msg(Msg,
+ Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ Reply = (catch dispatch(Msg, Mod, State)),
+ handle_common_reply(Reply,
+ Parent, Name, Msg, Mod, State, Queue, Debug).
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
+ case Reply of
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ {noreply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, Msg, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, Msg, Mod, State, []);
+ _ ->
+ terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
+ end.
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) ->
+ case Reply of
+ {noreply, NState} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ {noreply, NState, Time1} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, Msg, Mod, NState, Debug);
+ {'EXIT', What} ->
+ terminate(What, Name, Msg, Mod, State, Debug);
+ _ ->
+ terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug)
+ end.
+
+reply(Name, {To, Tag}, Reply, State, Debug) ->
+ reply({To, Tag}, Reply),
+ sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {out, Reply, To, State} ).
+
+
+%%-----------------------------------------------------------------
+%% Callback functions for system messages handling.
+%%-----------------------------------------------------------------
+system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) ->
+ loop(Parent, Name, State, Mod, Time, Queue, Debug).
+
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) ->
+ terminate(Reason, Name, [], Mod, State, Debug).
+
+system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) ->
+ case catch Mod:code_change(OldVsn, State, Extra) of
+ {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]};
+ Else -> Else
+ end.
+
+%%-----------------------------------------------------------------
+%% Format debug messages. Print them as the call-back module sees
+%% them, not as the real erlang messages. Use trace for that.
+%%-----------------------------------------------------------------
+print_event(Dev, {in, Msg}, Name) ->
+ case Msg of
+ {'$gen_call', {From, _Tag}, Call} ->
+ io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
+ [Name, Call, From]);
+ {'$gen_cast', Cast} ->
+ io:format(Dev, "*DBG* ~p got cast ~p~n",
+ [Name, Cast]);
+ _ ->
+ io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
+ end;
+print_event(Dev, {out, Msg, To, State}, Name) ->
+ io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
+ [Name, Msg, To, State]);
+print_event(Dev, {noreply, State}, Name) ->
+ io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
+print_event(Dev, Event, Name) ->
+ io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
+
+
+%%% ---------------------------------------------------
+%%% Terminate the server.
+%%% ---------------------------------------------------
+
+terminate(Reason, Name, Msg, Mod, State, Debug) ->
+ case catch Mod:terminate(Reason, State) of
+ {'EXIT', R} ->
+ error_info(R, Name, Msg, State, Debug),
+ exit(R);
+ _ ->
+ case Reason of
+ normal ->
+ exit(normal);
+ shutdown ->
+ exit(shutdown);
+ _ ->
+ error_info(Reason, Name, Msg, State, Debug),
+ exit(Reason)
+ end
+ end.
+
+error_info(_Reason, application_controller, _Msg, _State, _Debug) ->
+ %% OTP-5811 Don't send an error report if it's the system process
+ %% application_controller which is terminating - let init take care
+ %% of it instead
+ ok;
+error_info(Reason, Name, Msg, State, Debug) ->
+ Reason1 =
+ case Reason of
+ {undef,[{M,F,A}|MFAs]} ->
+ case code:is_loaded(M) of
+ false ->
+ {'module could not be loaded',[{M,F,A}|MFAs]};
+ _ ->
+ case erlang:function_exported(M, F, length(A)) of
+ true ->
+ Reason;
+ false ->
+ {'function not exported',[{M,F,A}|MFAs]}
+ end
+ end;
+ _ ->
+ Reason
+ end,
+ format("** Generic server ~p terminating \n"
+ "** Last message in was ~p~n"
+ "** When Server state == ~p~n"
+ "** Reason for termination == ~n** ~p~n",
+ [Name, Msg, State, Reason1]),
+ sys:print_log(Debug),
+ ok.
+
+%%% ---------------------------------------------------
+%%% Misc. functions.
+%%% ---------------------------------------------------
+
+opt(Op, [{Op, Value}|_]) ->
+ {ok, Value};
+opt(Op, [_|Options]) ->
+ opt(Op, Options);
+opt(_, []) ->
+ false.
+
+debug_options(Name, Opts) ->
+ case opt(debug, Opts) of
+ {ok, Options} -> dbg_options(Name, Options);
+ _ -> dbg_options(Name, [])
+ end.
+
+dbg_options(Name, []) ->
+ Opts =
+ case init:get_argument(generic_debug) of
+ error ->
+ [];
+ _ ->
+ [log, statistics]
+ end,
+ dbg_opts(Name, Opts);
+dbg_options(Name, Opts) ->
+ dbg_opts(Name, Opts).
+
+dbg_opts(Name, Opts) ->
+ case catch sys:debug_options(Opts) of
+ {'EXIT',_} ->
+ format("~p: ignoring erroneous debug options - ~p~n",
+ [Name, Opts]),
+ [];
+ Dbg ->
+ Dbg
+ end.
+
+get_proc_name(Pid) when is_pid(Pid) ->
+ Pid;
+get_proc_name({local, Name}) ->
+ case process_info(self(), registered_name) of
+ {registered_name, Name} ->
+ Name;
+ {registered_name, _Name} ->
+ exit(process_not_registered);
+ [] ->
+ exit(process_not_registered)
+ end;
+get_proc_name({global, Name}) ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(process_not_registered_globally);
+ Pid when Pid =:= self() ->
+ Name;
+ _Pid ->
+ exit(process_not_registered_globally)
+ end.
+
+get_parent() ->
+ case get('$ancestors') of
+ [Parent | _] when is_pid(Parent)->
+ Parent;
+ [Parent | _] when is_atom(Parent)->
+ name_to_pid(Parent);
+ _ ->
+ exit(process_was_not_started_by_proc_lib)
+ end.
+
+name_to_pid(Name) ->
+ case whereis(Name) of
+ undefined ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(could_not_find_registerd_name);
+ Pid ->
+ Pid
+ end;
+ Pid ->
+ Pid
+ end.
+
+%%-----------------------------------------------------------------
+%% Status information
+%%-----------------------------------------------------------------
+format_status(Opt, StatusData) ->
+ [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] =
+ StatusData,
+ NameTag = if is_pid(Name) ->
+ pid_to_list(Name);
+ is_atom(Name) ->
+ Name
+ end,
+ Header = lists:concat(["Status for generic server ", NameTag]),
+ Log = sys:get_debug(log, Debug, []),
+ Specfic =
+ case erlang:function_exported(Mod, format_status, 2) of
+ true ->
+ case catch Mod:format_status(Opt, [PDict, State]) of
+ {'EXIT', _} -> [{data, [{"State", State}]}];
+ Else -> Else
+ end;
+ _ ->
+ [{data, [{"State", State}]}]
+ end,
+ [{header, Header},
+ {data, [{"Status", SysState},
+ {"Parent", Parent},
+ {"Logged events", Log},
+ {"Queued messages", queue:to_list(Queue)}]} |
+ Specfic].
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 41064c77..30b8c394 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -75,14 +75,14 @@ start() ->
try
ok = ensure_working_log_handlers(),
ok = rabbit_mnesia:ensure_mnesia_dir(),
- ok = start_applications(?APPS)
+ ok = rabbit_misc:start_applications(?APPS)
after
%%give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
- ok = stop_applications(?APPS).
+ ok = rabbit_misc:stop_applications(?APPS).
stop_and_halt() ->
spawn(fun () ->
@@ -109,34 +109,6 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
-manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
- Iterate(fun (App, Acc) ->
- case Do(App) of
- ok -> [App | Acc];
- {error, {SkipError, _}} -> Acc;
- {error, Reason} ->
- lists:foreach(Undo, Acc),
- throw({error, {ErrorTag, App, Reason}})
- end
- end, [], Apps),
- ok.
-
-start_applications(Apps) ->
- manage_applications(fun lists:foldl/3,
- fun application:start/1,
- fun application:stop/1,
- already_started,
- cannot_start_application,
- Apps).
-
-stop_applications(Apps) ->
- manage_applications(fun lists:foldr/3,
- fun application:stop/1,
- fun application:start/1,
- not_started,
- cannot_stop_application,
- Apps).
-
start(normal, []) ->
{ok, SupPid} = rabbit_sup:start_link(),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 7bbed8b7..875624ba 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -40,6 +40,9 @@
-define(MEMSUP_CHECK_INTERVAL, 1000).
+%% OSes on which we know memory alarms to be trustworthy
+-define(SUPPORTED_OS, [{unix, linux}]).
+
-record(alarms, {alertees, system_memory_high_watermark = false}).
%%----------------------------------------------------------------------------
@@ -47,18 +50,23 @@
-ifdef(use_specs).
-type(mfa_tuple() :: {atom(), atom(), list()}).
--spec(start/1 :: (bool()) -> 'ok').
+-spec(start/1 :: (bool() | 'auto') -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
-
+
-endif.
%%----------------------------------------------------------------------------
start(MemoryAlarms) ->
- ok = alarm_handler:add_alarm_handler(?MODULE, [MemoryAlarms]),
+ EnableAlarms = case MemoryAlarms of
+ true -> true;
+ false -> false;
+ auto -> lists:member(os:type(), ?SUPPORTED_OS)
+ end,
+ ok = alarm_handler:add_alarm_handler(?MODULE, [EnableAlarms]),
case whereis(memsup) of
- undefined -> if MemoryAlarms -> ok = start_memsup(),
+ undefined -> if EnableAlarms -> ok = start_memsup(),
ok = adjust_memsup_interval();
true -> ok
end;
@@ -93,7 +101,7 @@ handle_call({register, Pid, HighMemMFA},
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
{ok, ok, State#alarms{alertees = NewAlertees}};
-
+
handle_call(_Request, State) ->
{ok, not_understood, State}.
@@ -127,7 +135,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
start_memsup() ->
- Mod = case os:type() of
+ Mod = case os:type() of
%% memsup doesn't take account of buffers or cache when
%% considering "free" memory - therefore on Linux we can
%% get memory alarms very easily without any pressure
@@ -135,7 +143,7 @@ start_memsup() ->
%% our own simple memory monitor.
%%
{unix, linux} -> rabbit_memsup_linux;
-
+
%% Start memsup programmatically rather than via the
%% rabbitmq-server script. This is not quite the right
%% thing to do as os_mon checks to see if memsup is
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5199fb87..586e05ae 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -502,7 +502,8 @@ i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
-i(pid, #q{q = #amqqueue{pid = Pid}}) -> Pid;
+i(pid, _) ->
+ self();
i(messages_ready, #q{message_buffer = MessageBuffer}) ->
queue:len(MessageBuffer);
i(messages_unacknowledged, _) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 304275c4..513d3050 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -112,16 +112,14 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
consumer_mapping = dict:new()}.
handle_message({method, Method, Content}, State) ->
- try
- case handle_method(Method, Content, State) of
- {reply, Reply, NewState} ->
- ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
- NewState;
- {noreply, NewState} ->
- NewState;
- stop ->
- exit(normal)
- end
+ try handle_method(Method, Content, State) of
+ {reply, Reply, NewState} ->
+ ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
+ NewState;
+ {noreply, NewState} ->
+ NewState;
+ stop ->
+ exit(normal)
catch
exit:{amqp, Error, Explanation, none} ->
terminate({amqp, Error, Explanation,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index ecc285a5..cbc11b40 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -57,7 +57,7 @@ start() ->
true -> ok;
false -> io:format("...done.~n")
end,
- init:stop();
+ halt();
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
error("invalid command '~s'",
[lists:flatten(
@@ -138,7 +138,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional
virtual host parameter for which to display results. The default value is \"/\".
<QueueInfoItem> must be a member of the list [name, durable, auto_delete,
-arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted,
+arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted,
messages, acks_uncommitted, consumers, transactions, memory]. The default is
to display name and (number of) messages.
@@ -148,7 +148,7 @@ auto_delete, arguments]. The default is to display name and type.
The output format for \"list_bindings\" is a list of rows containing
exchange name, routing key, queue name and arguments, in that order.
-<ConnectionInfoItem> must be a member of the list [pid, address, port,
+<ConnectionInfoItem> must be a member of the list [node, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
user, peer_address and peer_port.
@@ -242,7 +242,8 @@ action(list_vhost_users, Node, Args = [_VHostPath], Inform) ->
action(list_queues, Node, Args, Inform) ->
Inform("Listing queues", []),
{VHostArg, RemainingArgs} = parse_vhost_flag(Args),
- ArgAtoms = default_if_empty(RemainingArgs, [name, messages]),
+ ArgAtoms = list_replace(node, pid,
+ default_if_empty(RemainingArgs, [name, messages])),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
@@ -267,7 +268,8 @@ action(list_bindings, Node, Args, Inform) ->
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port]),
+ ArgAtoms = list_replace(node, pid,
+ default_if_empty(Args, [user, peer_address, peer_port])),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms).
@@ -308,9 +310,10 @@ format_info_item(Items, Key) ->
case Info of
{_, #resource{name = Name}} ->
url_encode(Name);
- {Key, IpAddress} when Key =:= address; Key =:= peer_address
- andalso is_tuple(IpAddress) ->
- inet_parse:ntoa(IpAddress);
+ _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) ->
+ inet_parse:ntoa(Value);
+ _ when is_pid(Value) ->
+ atom_to_list(node(Value));
_ when is_binary(Value) ->
url_encode(Value);
_ ->
@@ -357,3 +360,6 @@ url_encode_char([], Acc) ->
d2h(N) when N<10 -> N+$0;
d2h(N) -> N+$a-10.
+list_replace(Find, Replace, List) ->
+ [case X of Find -> Replace; _ -> X end || X <- List].
+
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 9a9220b5..183b6984 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -46,7 +46,7 @@ init({{File, Suffix}, []}) ->
case rabbit_misc:append_file(File, Suffix) of
ok -> ok;
{error, Error} ->
- rabbit_log:error("Failed to append contents of " ++
+ rabbit_log:error("Failed to append contents of "
"log file '~s' to '~s':~n~p~n",
[File, [File, Suffix], Error])
end,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index c8069e08..925c335c 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -268,8 +268,10 @@ route_internal(#exchange{name = Name}, RoutingKey) ->
lookup_qpids(Queues) ->
sets:fold(
fun(Key, Acc) ->
- [#amqqueue{pid = QPid}] = mnesia:dirty_read({amqqueue, Key}),
- [QPid | Acc]
+ case mnesia:dirty_read({amqqueue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
end, [], sets:from_list(Queues)).
%% TODO: Should all of the route and binding management not be
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 973e163b..85db50d7 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -50,6 +50,7 @@
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
+-export([start_applications/1, stop_applications/1]).
-import(mnesia).
-import(lists).
@@ -108,6 +109,8 @@
-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'true').
+-spec(start_applications/1 :: ([atom()]) -> 'ok').
+-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-endif.
@@ -398,3 +401,32 @@ format_stderr(Fmt, Args) ->
Port = open_port({fd, 0, 2}, [out]),
port_command(Port, io_lib:format(Fmt, Args)),
port_close(Port).
+
+manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
+ Iterate(fun (App, Acc) ->
+ case Do(App) of
+ ok -> [App | Acc];
+ {error, {SkipError, _}} -> Acc;
+ {error, Reason} ->
+ lists:foreach(Undo, Acc),
+ throw({error, {ErrorTag, App, Reason}})
+ end
+ end, [], Apps),
+ ok.
+
+start_applications(Apps) ->
+ manage_applications(fun lists:foldl/3,
+ fun application:start/1,
+ fun application:stop/1,
+ already_started,
+ cannot_start_application,
+ Apps).
+
+stop_applications(Apps) ->
+ manage_applications(fun lists:foldr/3,
+ fun application:stop/1,
+ fun application:start/1,
+ not_started,
+ cannot_stop_application,
+ Apps).
+
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d19c37cb..eebb38fa 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -243,8 +243,8 @@ init_db(ClusterNodes) ->
%% NB: we cannot use rabbit_log here since
%% it may not have been started yet
error_logger:warning_msg(
- "schema integrity check failed: ~p~n" ++
- "moving database to backup location " ++
+ "schema integrity check failed: ~p~n"
+ "moving database to backup location "
"and recreating schema from scratch~n",
[Reason]),
ok = move_db(),
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 7f6eaa8e..5e8edd53 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -50,7 +50,7 @@ start() ->
case catch action(Command, Args, RpcTimeout) of
ok ->
io:format("done.~n"),
- init:stop();
+ halt();
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
error("invalid command '~s'",
[lists:flatten(
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 9e4c9c8a..2a365ce1 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -47,7 +47,7 @@ init({{File, Suffix}, []}) ->
case rabbit_misc:append_file(File, Suffix) of
ok -> ok;
{error, Error} ->
- rabbit_log:error("Failed to append contents of " ++
+ rabbit_log:error("Failed to append contents of "
"sasl log file '~s' to '~s':~n~p~n",
[File, [File, Suffix], Error])
end,