diff options
author | Ben Hood <0x6e6562@gmail.com> | 2009-01-15 12:34:31 +0000 |
---|---|---|
committer | Ben Hood <0x6e6562@gmail.com> | 2009-01-15 12:34:31 +0000 |
commit | 1470f8f387d16f88885a55d518249f302527ff5e (patch) | |
tree | 326c9b702ec23c8a4142b7bbf8d4407f32bca6f4 | |
parent | 6d7792714815e494cb63aec8ee86894e12f5e4d7 (diff) | |
parent | b784026d3621844cf62f3cfdb69dc528184a72c4 (diff) | |
download | rabbitmq-server-1470f8f387d16f88885a55d518249f302527ff5e.tar.gz |
Merged default into 18557
-rw-r--r-- | Makefile | 21 | ||||
-rw-r--r-- | ebin/rabbit.app | 58 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 20 | ||||
-rw-r--r-- | generate_app | 10 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/Makefile | 23 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/init.d | 4 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 44 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/init.d | 1 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/postinst | 4 | ||||
-rw-r--r-- | src/gen_server2.erl | 854 | ||||
-rw-r--r-- | src/rabbit.erl | 32 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 22 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 18 | ||||
-rw-r--r-- | src/rabbit_control.erl | 22 | ||||
-rw-r--r-- | src/rabbit_error_logger_file_h.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 6 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 32 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 2 | ||||
-rw-r--r-- | src/rabbit_sasl_report_file_h.erl | 2 |
21 files changed, 1021 insertions, 163 deletions
@@ -7,7 +7,8 @@ SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) @@ -39,15 +40,21 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e #all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl +$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app + escript generate_app $(EBIN_DIR) < $< > $@ + +$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl erlc $(ERLC_OPTS) $< -# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) $< + +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam + erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< +# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) > $@ + $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) > $@ + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS) erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().' @@ -57,7 +64,7 @@ dialyze: $(TARGETS) clean: cleandb rm -f $(EBIN_DIR)/*.beam - rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script + rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz @@ -123,7 +130,7 @@ srcdist: distclean cp BUILD.in $(TARGET_SRC_DIR)/BUILD elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ >> $(TARGET_SRC_DIR)/BUILD - sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app + sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile $(TARGET_SRC_DIR) diff --git a/ebin/rabbit.app b/ebin/rabbit.app deleted file mode 100644 index e377a33a..00000000 --- a/ebin/rabbit.app +++ /dev/null @@ -1,58 +0,0 @@ -{application, rabbit, %% -*- erlang -*- - [{description, "RabbitMQ"}, - {id, "RabbitMQ"}, - {vsn, "%%VERSION%%"}, - {modules, [buffering_proxy, - rabbit_access_control, - rabbit_alarm, - rabbit_amqqueue, - rabbit_amqqueue_process, - rabbit_amqqueue_sup, - rabbit_binary_generator, - rabbit_binary_parser, - rabbit_channel, - rabbit_control, - rabbit, - rabbit_error_logger, - rabbit_error_logger_file_h, - rabbit_exchange, - rabbit_framing_channel, - rabbit_framing, - rabbit_heartbeat, - rabbit_limiter, - rabbit_load, - rabbit_log, - rabbit_memsup_linux, - rabbit_misc, - rabbit_mnesia, - rabbit_multi, - rabbit_networking, - rabbit_node_monitor, - rabbit_persister, - rabbit_reader, - rabbit_router, - rabbit_sasl_report_file_h, - rabbit_sup, - rabbit_tests, - rabbit_tracer, - rabbit_writer, - tcp_acceptor, - tcp_acceptor_sup, - tcp_client_sup, - tcp_listener, - tcp_listener_sup]}, - {registered, [rabbit_amqqueue_sup, - rabbit_log, - rabbit_node_monitor, - rabbit_persister, - rabbit_router, - rabbit_sup, - rabbit_tcp_client_sup]}, - {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, - {mod, {rabbit, []}}, - {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, - {extra_startup_steps, []}, - {default_user, <<"guest">>}, - {default_pass, <<"guest">>}, - {default_vhost, <<"/">>}, - {memory_alarms, false}]}]}. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in new file mode 100644 index 00000000..e2f36c0f --- /dev/null +++ b/ebin/rabbit_app.in @@ -0,0 +1,20 @@ +{application, rabbit, %% -*- erlang -*- + [{description, "RabbitMQ"}, + {id, "RabbitMQ"}, + {vsn, "%%VERSION%%"}, + {modules, []}, + {registered, [rabbit_amqqueue_sup, + rabbit_log, + rabbit_node_monitor, + rabbit_persister, + rabbit_router, + rabbit_sup, + rabbit_tcp_client_sup]}, + {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, + {mod, {rabbit, []}}, + {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, + {extra_startup_steps, []}, + {default_user, <<"guest">>}, + {default_pass, <<"guest">>}, + {default_vhost, <<"/">>}, + {memory_alarms, auto}]}]}. diff --git a/generate_app b/generate_app new file mode 100644 index 00000000..62301292 --- /dev/null +++ b/generate_app @@ -0,0 +1,10 @@ +#!/usr/bin/env escript +%% -*- erlang -*- + +main([BeamDir]) -> + Modules = [list_to_atom(filename:basename(F, ".beam")) || + F <- filelib:wildcard("*.beam", BeamDir)], + {ok, {application, Application, Properties}} = io:read(''), + NewProperties = lists:keyreplace(modules, 1, Properties, + {modules, Modules}), + io:format("~p.", [{application, Application, NewProperties}]). diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index c05f14a7..cf3a93df 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -6,21 +6,38 @@ TOP_DIR=$(shell pwd) #only checks build-dependencies using rpms, not debs DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1' +ifndef RPM_OS +RPM_OS=fedora +endif + +ifeq "x$(RPM_OS)" "xsuse" +REQUIRES=/sbin/chkconfig /sbin/service +OS_DEFINES=--define '_initrddir /etc/init.d' +RELEASE_OS=.suse +else +REQUIRES=chkconfig initscripts +OS_DEFINES=--define '_initrddir /etc/rc.d/init.d' +RELEASE_OS= +endif + rpms: clean server prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp cp $(TOP_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS - sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec + sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \ + SPECS/rabbitmq-server.spec cp init.d SOURCES/rabbitmq-server.init cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare - rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target i386 - rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --define '_arch x86_64' \ + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ + --target i386 + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ + --define '_libdir /usr/lib64' --define '_arch x86_64' \ --define '_defaultdocdir /usr/share/doc' --target x86_64 clean: diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d index 27f150f9..a006a5a7 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/RPMS/Fedora/init.d @@ -16,7 +16,6 @@ # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin DAEMON_NAME=rabbitmq-multi DAEMON=/usr/lib/rabbitmq/bin/$DAEMON_NAME NAME=rabbitmq-server @@ -29,9 +28,6 @@ LOCK_FILE=/var/lock/subsys/$NAME test -x $DAEMON || exit 0 -# source function library -. /etc/rc.d/init.d/functions - # Include rabbitmq defaults if available if [ -f /etc/default/rabbitmq ] ; then . /etc/default/rabbitmq diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 13cfb037..241afd71 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -1,6 +1,6 @@ Name: rabbitmq-server Version: %%VERSION%% -Release: 1 +Release: 1%%RELEASE_OS%% License: MPLv1.1 Group: Development/Libraries Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz @@ -17,24 +17,18 @@ Requires: erlang, logrotate Packager: Hubert Plociniczak <hubert@lshift.net> BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root Summary: The RabbitMQ server -Requires(post): chkconfig -Requires(pre): chkconfig initscripts +Requires(post): %%REQUIRES%% +Requires(pre): %%REQUIRES%% %description RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%ifarch x86_64 - %define _defaultlibdir /usr/lib64 -%else - %define _defaultlibdir /usr/lib -%endif - -%define _erllibdir %{_defaultlibdir}/erlang/lib -%define _rabbitbindir %{_defaultlibdir}/rabbitmq/bin +%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version} +%define _rabbit_libdir %{_libdir}/rabbitmq -%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version} +%define _maindir %{buildroot}%{_rabbit_erllibdir} %pre if [ $1 -gt 1 ]; then @@ -53,25 +47,21 @@ make rm -rf %{buildroot} make install TARGET_DIR=%{_maindir} \ - SBIN_DIR=%{buildroot}%{_rabbitbindir} \ + SBIN_DIR=%{buildroot}%{_rabbit_libdir}/bin \ MAN_DIR=%{buildroot}%{_mandir} mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia mkdir -p %{buildroot}/var/log/rabbitmq -mkdir -p %{buildroot}/etc/rc.d/init.d/ +mkdir -p %{buildroot}%{_initrddir} #Copy all necessary lib files etc. -install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server -chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server -%ifarch x86_64 - sed -i 's/\/usr\/lib\//\/usr\/lib64\//' %{buildroot}/etc/rc.d/init.d/rabbitmq-server -%endif +install -m 0755 %SOURCE1 %{buildroot}%{_initrddir}/rabbitmq-server +chmod 0755 %{buildroot}%{_initrddir}/rabbitmq-server +sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_initrddir}/rabbitmq-server mkdir -p %{buildroot}%{_sbindir} install -m 0755 %SOURCE2 %{buildroot}%{_sbindir}/rabbitmqctl -%ifarch x86_64 - sed -i 's/\/usr\/lib\//\/usr\/lib64\//' %{buildroot}%{_sbindir}/rabbitmqctl -%endif +sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_sbindir}/rabbitmqctl mkdir -p %{buildroot}/etc/logrotate.d install -m 0644 %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server @@ -81,8 +71,10 @@ rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL #Build the list of files rm -f %{_builddir}/filelist.%{name}.rpm echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm -(cd %{buildroot}; find . ! -regex '\./etc.*' \ - -type f | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) +(cd %{buildroot}; \ + find . -type f ! -regex '\./etc.*' \ + ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \ + | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) %post # create rabbitmq group @@ -116,7 +108,9 @@ fi %defattr(-,root,root,-) %dir /var/lib/rabbitmq %dir /var/log/rabbitmq -/etc/rc.d/init.d/rabbitmq-server +%{_rabbit_erllibdir} +%{_rabbit_libdir} +%{_initrddir}/rabbitmq-server %config(noreplace) /etc/logrotate.d/rabbitmq-server %doc LICENSE LICENSE-MPL-RabbitMQ INSTALL diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d index ace474c5..70dd0adf 100644 --- a/packaging/debs/Debian/debian/init.d +++ b/packaging/debs/Debian/debian/init.d @@ -9,7 +9,6 @@ # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin DAEMON=/usr/lib/rabbitmq/bin/rabbitmq-multi NAME=rabbitmq-server DESC=rabbitmq-server diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 495b8331..05fb179c 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -25,8 +25,8 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then - adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq --no-create-home rabbitmq - usermod -c "RabbitMQ messaging server" rabbitmq + adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \ + --no-create-home --gecos "RabbitMQ messaging server" rabbitmq fi chown -R rabbitmq:rabbitmq /var/lib/rabbitmq diff --git a/src/gen_server2.erl b/src/gen_server2.erl new file mode 100644 index 00000000..11bb66d7 --- /dev/null +++ b/src/gen_server2.erl @@ -0,0 +1,854 @@ +%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP +%% distribution, with the following modifications: +%% +%% 1) the module name is gen_server2 +%% +%% 2) more efficient handling of selective receives in callbacks +%% gen_server2 processes drain their message queue into an internal +%% buffer before invoking any callback module functions. Messages are +%% dequeued from the buffer for processing. Thus the effective message +%% queue of a gen_server2 process is the concatenation of the internal +%% buffer and the real message queue. +%% As a result of the draining, any selective receive invoked inside a +%% callback is less likely to have to scan a large message queue. +%% +%% 3) gen_server2:cast is guaranteed to be order-preserving +%% The original code could reorder messages when communicating with a +%% process on a remote node that was not currently connected. +%% +%% All modifications are (C) 2009 LShift Ltd. + +%% ``The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved via the world wide web at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. +%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings +%% AB. All Rights Reserved.'' +%% +%% $Id$ +%% +-module(gen_server2). + +%%% --------------------------------------------------- +%%% +%%% The idea behind THIS server is that the user module +%%% provides (different) functions to handle different +%%% kind of inputs. +%%% If the Parent process terminates the Module:terminate/2 +%%% function is called. +%%% +%%% The user module should export: +%%% +%%% init(Args) +%%% ==> {ok, State} +%%% {ok, State, Timeout} +%%% ignore +%%% {stop, Reason} +%%% +%%% handle_call(Msg, {From, Tag}, State) +%%% +%%% ==> {reply, Reply, State} +%%% {reply, Reply, State, Timeout} +%%% {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, Reply, State} +%%% Reason = normal | shutdown | Term terminate(State) is called +%%% +%%% handle_cast(Msg, State) +%%% +%%% ==> {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term terminate(State) is called +%%% +%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... +%%% +%%% ==> {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called +%%% +%%% terminate(Reason, State) Let the user module clean up +%%% always called when server terminates +%%% +%%% ==> ok +%%% +%%% +%%% The work flow (of the server) can be described as follows: +%%% +%%% User module Generic +%%% ----------- ------- +%%% start -----> start +%%% init <----- . +%%% +%%% loop +%%% handle_call <----- . +%%% -----> reply +%%% +%%% handle_cast <----- . +%%% +%%% handle_info <----- . +%%% +%%% terminate <----- . +%%% +%%% -----> reply +%%% +%%% +%%% --------------------------------------------------- + +%% API +-export([start/3, start/4, + start_link/3, start_link/4, + call/2, call/3, + cast/2, reply/2, + abcast/2, abcast/3, + multi_call/2, multi_call/3, multi_call/4, + enter_loop/3, enter_loop/4, enter_loop/5]). + +-export([behaviour_info/1]). + +%% System exports +-export([system_continue/3, + system_terminate/4, + system_code_change/4, + format_status/2]). + +%% Internal exports +-export([init_it/6, print_event/3]). + +-import(error_logger, [format/2]). + +%%%========================================================================= +%%% API +%%%========================================================================= + +behaviour_info(callbacks) -> + [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2}, + {terminate,2},{code_change,3}]; +behaviour_info(_Other) -> + undefined. + +%%% ----------------------------------------------------------------- +%%% Starts a generic server. +%%% start(Mod, Args, Options) +%%% start(Name, Mod, Args, Options) +%%% start_link(Mod, Args, Options) +%%% start_link(Name, Mod, Args, Options) where: +%%% Name ::= {local, atom()} | {global, atom()} +%%% Mod ::= atom(), callback module implementing the 'real' server +%%% Args ::= term(), init arguments (to Mod:init/1) +%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}] +%%% Flag ::= trace | log | {logfile, File} | statistics | debug +%%% (debug == log && statistics) +%%% Returns: {ok, Pid} | +%%% {error, {already_started, Pid}} | +%%% {error, Reason} +%%% ----------------------------------------------------------------- +start(Mod, Args, Options) -> + gen:start(?MODULE, nolink, Mod, Args, Options). + +start(Name, Mod, Args, Options) -> + gen:start(?MODULE, nolink, Name, Mod, Args, Options). + +start_link(Mod, Args, Options) -> + gen:start(?MODULE, link, Mod, Args, Options). + +start_link(Name, Mod, Args, Options) -> + gen:start(?MODULE, link, Name, Mod, Args, Options). + + +%% ----------------------------------------------------------------- +%% Make a call to a generic server. +%% If the server is located at another node, that node will +%% be monitored. +%% If the client is trapping exits and is linked server termination +%% is handled here (? Shall we do that here (or rely on timeouts) ?). +%% ----------------------------------------------------------------- +call(Name, Request) -> + case catch gen:call(Name, '$gen_call', Request) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request]}}) + end. + +call(Name, Request, Timeout) -> + case catch gen:call(Name, '$gen_call', Request, Timeout) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) + end. + +%% ----------------------------------------------------------------- +%% Make a cast to a generic server. +%% ----------------------------------------------------------------- +cast({global,Name}, Request) -> + catch global:send(Name, cast_msg(Request)), + ok; +cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> + do_cast(Dest, Request); +cast(Dest, Request) when is_atom(Dest) -> + do_cast(Dest, Request); +cast(Dest, Request) when is_pid(Dest) -> + do_cast(Dest, Request). + +do_cast(Dest, Request) -> + do_send(Dest, cast_msg(Request)), + ok. + +cast_msg(Request) -> {'$gen_cast',Request}. + +%% ----------------------------------------------------------------- +%% Send a reply to the client. +%% ----------------------------------------------------------------- +reply({To, Tag}, Reply) -> + catch To ! {Tag, Reply}. + +%% ----------------------------------------------------------------- +%% Asyncronous broadcast, returns nothing, it's just send'n prey +%%----------------------------------------------------------------- +abcast(Name, Request) when is_atom(Name) -> + do_abcast([node() | nodes()], Name, cast_msg(Request)). + +abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> + do_abcast(Nodes, Name, cast_msg(Request)). + +do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) -> + do_send({Name,Node},Msg), + do_abcast(Nodes, Name, Msg); +do_abcast([], _,_) -> abcast. + +%%% ----------------------------------------------------------------- +%%% Make a call to servers at several nodes. +%%% Returns: {[Replies],[BadNodes]} +%%% A Timeout can be given +%%% +%%% A middleman process is used in case late answers arrives after +%%% the timeout. If they would be allowed to glog the callers message +%%% queue, it would probably become confused. Late answers will +%%% now arrive to the terminated middleman and so be discarded. +%%% ----------------------------------------------------------------- +multi_call(Name, Req) + when is_atom(Name) -> + do_multi_call([node() | nodes()], Name, Req, infinity). + +multi_call(Nodes, Name, Req) + when is_list(Nodes), is_atom(Name) -> + do_multi_call(Nodes, Name, Req, infinity). + +multi_call(Nodes, Name, Req, infinity) -> + do_multi_call(Nodes, Name, Req, infinity); +multi_call(Nodes, Name, Req, Timeout) + when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> + do_multi_call(Nodes, Name, Req, Timeout). + + +%%----------------------------------------------------------------- +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ +%% +%% Description: Makes an existing process into a gen_server. +%% The calling process will enter the gen_server receive +%% loop and become a gen_server process. +%% The process *must* have been started using one of the +%% start functions in proc_lib, see proc_lib(3). +%% The user is responsible for any initialization of the +%% process, including registering a name for it. +%%----------------------------------------------------------------- +enter_loop(Mod, Options, State) -> + enter_loop(Mod, Options, State, self(), infinity). + +enter_loop(Mod, Options, State, ServerName = {_, _}) -> + enter_loop(Mod, Options, State, ServerName, infinity); + +enter_loop(Mod, Options, State, Timeout) -> + enter_loop(Mod, Options, State, self(), Timeout). + +enter_loop(Mod, Options, State, ServerName, Timeout) -> + Name = get_proc_name(ServerName), + Parent = get_parent(), + Debug = debug_options(Name, Options), + Queue = queue:new(), + loop(Parent, Name, State, Mod, Timeout, Queue, Debug). + +%%%======================================================================== +%%% Gen-callback functions +%%%======================================================================== + +%%% --------------------------------------------------- +%%% Initiate the new process. +%%% Register the name using the Rfunc function +%%% Calls the Mod:init/Args function. +%%% Finally an acknowledge is sent to Parent and the main +%%% loop is entered. +%%% --------------------------------------------------- +init_it(Starter, self, Name, Mod, Args, Options) -> + init_it(Starter, self(), Name, Mod, Args, Options); +init_it(Starter, Parent, Name, Mod, Args, Options) -> + Debug = debug_options(Name, Options), + Queue = queue:new(), + case catch Mod:init(Args) of + {ok, State} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, infinity, Queue, Debug); + {ok, State, Timeout} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, Queue, Debug); + {stop, Reason} -> + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + ignore -> + proc_lib:init_ack(Starter, ignore), + exit(normal); + {'EXIT', Reason} -> + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + Else -> + Error = {bad_return_value, Else}, + proc_lib:init_ack(Starter, {error, Error}), + exit(Error) + end. + +%%%======================================================================== +%%% Internal functions +%%%======================================================================== +%%% --------------------------------------------------- +%%% The MAIN loop. +%%% --------------------------------------------------- +loop(Parent, Name, State, Mod, Time, Queue, Debug) -> + receive + Input -> loop(Parent, Name, State, Mod, + Time, queue:in(Input, Queue), Debug) + after 0 -> + case queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, Msg); + {empty, Queue1} -> + receive + Input -> + loop(Parent, Name, State, Mod, + Time, queue:in(Input, Queue1), Debug) + after Time -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, timeout) + end + end + end. + +process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> + case Msg of + {system, From, Req} -> + sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, Queue]); + {'EXIT', Parent, Reason} -> + terminate(Reason, Name, Msg, Mod, State, Debug); + _Msg when Debug =:= [] -> + handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); + _Msg -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, + Name, {in, Msg}), + handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) + end. + +%%% --------------------------------------------------- +%%% Send/recive functions +%%% --------------------------------------------------- +do_send(Dest, Msg) -> + catch erlang:send(Dest, Msg). + +do_multi_call(Nodes, Name, Req, infinity) -> + Tag = make_ref(), + Monitors = send_nodes(Nodes, Name, Tag, Req), + rec_nodes(Tag, Monitors, Name, undefined); +do_multi_call(Nodes, Name, Req, Timeout) -> + Tag = make_ref(), + Caller = self(), + Receiver = + spawn( + fun() -> + %% Middleman process. Should be unsensitive to regular + %% exit signals. The sychronization is needed in case + %% the receiver would exit before the caller started + %% the monitor. + process_flag(trap_exit, true), + Mref = erlang:monitor(process, Caller), + receive + {Caller,Tag} -> + Monitors = send_nodes(Nodes, Name, Tag, Req), + TimerId = erlang:start_timer(Timeout, self(), ok), + Result = rec_nodes(Tag, Monitors, Name, TimerId), + exit({self(),Tag,Result}); + {'DOWN',Mref,_,_,_} -> + %% Caller died before sending us the go-ahead. + %% Give up silently. + exit(normal) + end + end), + Mref = erlang:monitor(process, Receiver), + Receiver ! {self(),Tag}, + receive + {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> + Result; + {'DOWN',Mref,_,_,Reason} -> + %% The middleman code failed. Or someone did + %% exit(_, kill) on the middleman process => Reason==killed + exit(Reason) + end. + +send_nodes(Nodes, Name, Tag, Req) -> + send_nodes(Nodes, Name, Tag, Req, []). + +send_nodes([Node|Tail], Name, Tag, Req, Monitors) + when is_atom(Node) -> + Monitor = start_monitor(Node, Name), + %% Handle non-existing names in rec_nodes. + catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req}, + send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]); +send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> + %% Skip non-atom Node + send_nodes(Tail, Name, Tag, Req, Monitors); +send_nodes([], _Name, _Tag, _Req, Monitors) -> + Monitors. + +%% Against old nodes: +%% If no reply has been delivered within 2 secs. (per node) check that +%% the server really exists and wait for ever for the answer. +%% +%% Against contemporary nodes: +%% Wait for reply, server 'DOWN', or timeout from TimerId. + +rec_nodes(Tag, Nodes, Name, TimerId) -> + rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). + +rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> + receive + {'DOWN', R, _, _, _} -> + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], Time, TimerId); + {timeout, TimerId, _} -> + unmonitor(R), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> + %% R6 node + receive + {nodedown, N} -> + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], 2000, TimerId); + {timeout, TimerId, _} -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) + after Time -> + case rpc:call(N, erlang, whereis, [Name]) of + Pid when is_pid(Pid) -> % It exists try again. + rec_nodes(Tag, [N|Tail], Name, Badnodes, + Replies, infinity, TimerId); + _ -> % badnode + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], + Replies, 2000, TimerId) + end + end; +rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> + case catch erlang:cancel_timer(TimerId) of + false -> % It has already sent it's message + receive + {timeout, TimerId, _} -> ok + after 0 -> + ok + end; + _ -> % Timer was cancelled, or TimerId was 'undefined' + ok + end, + {Replies, Badnodes}. + +%% Collect all replies that already have arrived +rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> + receive + {'DOWN', R, _, _, _} -> + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + after 0 -> + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> + %% R6 node + receive + {nodedown, N} -> + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + after 0 -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> + {Replies, Badnodes}. + + +%%% --------------------------------------------------- +%%% Monitor functions +%%% --------------------------------------------------- + +start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> + if node() =:= nonode@nohost, Node =/= nonode@nohost -> + Ref = make_ref(), + self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, + {Node, Ref}; + true -> + case catch erlang:monitor(process, {Name, Node}) of + {'EXIT', _} -> + %% Remote node is R6 + monitor_node(Node, true), + Node; + Ref when is_reference(Ref) -> + {Node, Ref} + end + end. + +%% Cancels a monitor started with Ref=erlang:monitor(_, _). +unmonitor(Ref) when is_reference(Ref) -> + erlang:demonitor(Ref), + receive + {'DOWN', Ref, _, _, _} -> + true + after 0 -> + true + end. + +%%% --------------------------------------------------- +%%% Message handling functions +%%% --------------------------------------------------- + +dispatch({'$gen_cast', Msg}, Mod, State) -> + Mod:handle_cast(Msg, State); +dispatch(Info, Mod, State) -> + Mod:handle_info(Info, State). + +handle_msg({'$gen_call', From, Msg}, + Parent, Name, State, Mod, _Time, Queue) -> + case catch Mod:handle_call(Msg, From, State) of + {reply, Reply, NState} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {reply, Reply, NState, Time1} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {noreply, NState} -> + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {noreply, NState, Time1} -> + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Name, Msg, Mod, NState, [])), + reply(From, Reply), + exit(R); + Other -> handle_common_reply(Other, + Parent, Name, Msg, Mod, State, Queue) + end; +handle_msg(Msg, + Parent, Name, State, Mod, _Time, Queue) -> + Reply = (catch dispatch(Msg, Mod, State)), + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). + +handle_msg({'$gen_call', From, Msg}, + Parent, Name, State, Mod, _Time, Queue, Debug) -> + case catch Mod:handle_call(Msg, From, State) of + {reply, Reply, NState} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {reply, Reply, NState, Time1} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {noreply, NState} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {noreply, NState, Time1} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), + reply(Name, From, Reply, NState, Debug), + exit(R); + Other -> + handle_common_reply(Other, + Parent, Name, Msg, Mod, State, Queue, Debug) + end; +handle_msg(Msg, + Parent, Name, State, Mod, _Time, Queue, Debug) -> + Reply = (catch dispatch(Msg, Mod, State)), + handle_common_reply(Reply, + Parent, Name, Msg, Mod, State, Queue, Debug). + +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> + case Reply of + {noreply, NState} -> + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {noreply, NState, Time1} -> + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {stop, Reason, NState} -> + terminate(Reason, Name, Msg, Mod, NState, []); + {'EXIT', What} -> + terminate(What, Name, Msg, Mod, State, []); + _ -> + terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) + end. + +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> + case Reply of + {noreply, NState} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {noreply, NState, Time1} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {stop, Reason, NState} -> + terminate(Reason, Name, Msg, Mod, NState, Debug); + {'EXIT', What} -> + terminate(What, Name, Msg, Mod, State, Debug); + _ -> + terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug) + end. + +reply(Name, {To, Tag}, Reply, State, Debug) -> + reply({To, Tag}, Reply), + sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {out, Reply, To, State} ). + + +%%----------------------------------------------------------------- +%% Callback functions for system messages handling. +%%----------------------------------------------------------------- +system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> + loop(Parent, Name, State, Mod, Time, Queue, Debug). + +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> + terminate(Reason, Name, [], Mod, State, Debug). + +system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> + case catch Mod:code_change(OldVsn, State, Extra) of + {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; + Else -> Else + end. + +%%----------------------------------------------------------------- +%% Format debug messages. Print them as the call-back module sees +%% them, not as the real erlang messages. Use trace for that. +%%----------------------------------------------------------------- +print_event(Dev, {in, Msg}, Name) -> + case Msg of + {'$gen_call', {From, _Tag}, Call} -> + io:format(Dev, "*DBG* ~p got call ~p from ~w~n", + [Name, Call, From]); + {'$gen_cast', Cast} -> + io:format(Dev, "*DBG* ~p got cast ~p~n", + [Name, Cast]); + _ -> + io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) + end; +print_event(Dev, {out, Msg, To, State}, Name) -> + io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", + [Name, Msg, To, State]); +print_event(Dev, {noreply, State}, Name) -> + io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); +print_event(Dev, Event, Name) -> + io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]). + + +%%% --------------------------------------------------- +%%% Terminate the server. +%%% --------------------------------------------------- + +terminate(Reason, Name, Msg, Mod, State, Debug) -> + case catch Mod:terminate(Reason, State) of + {'EXIT', R} -> + error_info(R, Name, Msg, State, Debug), + exit(R); + _ -> + case Reason of + normal -> + exit(normal); + shutdown -> + exit(shutdown); + _ -> + error_info(Reason, Name, Msg, State, Debug), + exit(Reason) + end + end. + +error_info(_Reason, application_controller, _Msg, _State, _Debug) -> + %% OTP-5811 Don't send an error report if it's the system process + %% application_controller which is terminating - let init take care + %% of it instead + ok; +error_info(Reason, Name, Msg, State, Debug) -> + Reason1 = + case Reason of + {undef,[{M,F,A}|MFAs]} -> + case code:is_loaded(M) of + false -> + {'module could not be loaded',[{M,F,A}|MFAs]}; + _ -> + case erlang:function_exported(M, F, length(A)) of + true -> + Reason; + false -> + {'function not exported',[{M,F,A}|MFAs]} + end + end; + _ -> + Reason + end, + format("** Generic server ~p terminating \n" + "** Last message in was ~p~n" + "** When Server state == ~p~n" + "** Reason for termination == ~n** ~p~n", + [Name, Msg, State, Reason1]), + sys:print_log(Debug), + ok. + +%%% --------------------------------------------------- +%%% Misc. functions. +%%% --------------------------------------------------- + +opt(Op, [{Op, Value}|_]) -> + {ok, Value}; +opt(Op, [_|Options]) -> + opt(Op, Options); +opt(_, []) -> + false. + +debug_options(Name, Opts) -> + case opt(debug, Opts) of + {ok, Options} -> dbg_options(Name, Options); + _ -> dbg_options(Name, []) + end. + +dbg_options(Name, []) -> + Opts = + case init:get_argument(generic_debug) of + error -> + []; + _ -> + [log, statistics] + end, + dbg_opts(Name, Opts); +dbg_options(Name, Opts) -> + dbg_opts(Name, Opts). + +dbg_opts(Name, Opts) -> + case catch sys:debug_options(Opts) of + {'EXIT',_} -> + format("~p: ignoring erroneous debug options - ~p~n", + [Name, Opts]), + []; + Dbg -> + Dbg + end. + +get_proc_name(Pid) when is_pid(Pid) -> + Pid; +get_proc_name({local, Name}) -> + case process_info(self(), registered_name) of + {registered_name, Name} -> + Name; + {registered_name, _Name} -> + exit(process_not_registered); + [] -> + exit(process_not_registered) + end; +get_proc_name({global, Name}) -> + case global:safe_whereis_name(Name) of + undefined -> + exit(process_not_registered_globally); + Pid when Pid =:= self() -> + Name; + _Pid -> + exit(process_not_registered_globally) + end. + +get_parent() -> + case get('$ancestors') of + [Parent | _] when is_pid(Parent)-> + Parent; + [Parent | _] when is_atom(Parent)-> + name_to_pid(Parent); + _ -> + exit(process_was_not_started_by_proc_lib) + end. + +name_to_pid(Name) -> + case whereis(Name) of + undefined -> + case global:safe_whereis_name(Name) of + undefined -> + exit(could_not_find_registerd_name); + Pid -> + Pid + end; + Pid -> + Pid + end. + +%%----------------------------------------------------------------- +%% Status information +%%----------------------------------------------------------------- +format_status(Opt, StatusData) -> + [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = + StatusData, + NameTag = if is_pid(Name) -> + pid_to_list(Name); + is_atom(Name) -> + Name + end, + Header = lists:concat(["Status for generic server ", NameTag]), + Log = sys:get_debug(log, Debug, []), + Specfic = + case erlang:function_exported(Mod, format_status, 2) of + true -> + case catch Mod:format_status(Opt, [PDict, State]) of + {'EXIT', _} -> [{data, [{"State", State}]}]; + Else -> Else + end; + _ -> + [{data, [{"State", State}]}] + end, + [{header, Header}, + {data, [{"Status", SysState}, + {"Parent", Parent}, + {"Logged events", Log}, + {"Queued messages", queue:to_list(Queue)}]} | + Specfic]. diff --git a/src/rabbit.erl b/src/rabbit.erl index 41064c77..30b8c394 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -75,14 +75,14 @@ start() -> try ok = ensure_working_log_handlers(), ok = rabbit_mnesia:ensure_mnesia_dir(), - ok = start_applications(?APPS) + ok = rabbit_misc:start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) end. stop() -> - ok = stop_applications(?APPS). + ok = rabbit_misc:stop_applications(?APPS). stop_and_halt() -> spawn(fun () -> @@ -109,34 +109,6 @@ rotate_logs(BinarySuffix) -> %%-------------------------------------------------------------------- -manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> - Iterate(fun (App, Acc) -> - case Do(App) of - ok -> [App | Acc]; - {error, {SkipError, _}} -> Acc; - {error, Reason} -> - lists:foreach(Undo, Acc), - throw({error, {ErrorTag, App, Reason}}) - end - end, [], Apps), - ok. - -start_applications(Apps) -> - manage_applications(fun lists:foldl/3, - fun application:start/1, - fun application:stop/1, - already_started, - cannot_start_application, - Apps). - -stop_applications(Apps) -> - manage_applications(fun lists:foldr/3, - fun application:stop/1, - fun application:start/1, - not_started, - cannot_stop_application, - Apps). - start(normal, []) -> {ok, SupPid} = rabbit_sup:start_link(), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 7bbed8b7..875624ba 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -40,6 +40,9 @@ -define(MEMSUP_CHECK_INTERVAL, 1000). +%% OSes on which we know memory alarms to be trustworthy +-define(SUPPORTED_OS, [{unix, linux}]). + -record(alarms, {alertees, system_memory_high_watermark = false}). %%---------------------------------------------------------------------------- @@ -47,18 +50,23 @@ -ifdef(use_specs). -type(mfa_tuple() :: {atom(), atom(), list()}). --spec(start/1 :: (bool()) -> 'ok'). +-spec(start/1 :: (bool() | 'auto') -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). - + -endif. %%---------------------------------------------------------------------------- start(MemoryAlarms) -> - ok = alarm_handler:add_alarm_handler(?MODULE, [MemoryAlarms]), + EnableAlarms = case MemoryAlarms of + true -> true; + false -> false; + auto -> lists:member(os:type(), ?SUPPORTED_OS) + end, + ok = alarm_handler:add_alarm_handler(?MODULE, [EnableAlarms]), case whereis(memsup) of - undefined -> if MemoryAlarms -> ok = start_memsup(), + undefined -> if EnableAlarms -> ok = start_memsup(), ok = adjust_memsup_interval(); true -> ok end; @@ -93,7 +101,7 @@ handle_call({register, Pid, HighMemMFA}, end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), {ok, ok, State#alarms{alertees = NewAlertees}}; - + handle_call(_Request, State) -> {ok, not_understood, State}. @@ -127,7 +135,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of + Mod = case os:type() of %% memsup doesn't take account of buffers or cache when %% considering "free" memory - therefore on Linux we can %% get memory alarms very easily without any pressure @@ -135,7 +143,7 @@ start_memsup() -> %% our own simple memory monitor. %% {unix, linux} -> rabbit_memsup_linux; - + %% Start memsup programmatically rather than via the %% rabbitmq-server script. This is not quite the right %% thing to do as os_mon checks to see if memsup is diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5199fb87..586e05ae 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -502,7 +502,8 @@ i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; -i(pid, #q{q = #amqqueue{pid = Pid}}) -> Pid; +i(pid, _) -> + self(); i(messages_ready, #q{message_buffer = MessageBuffer}) -> queue:len(MessageBuffer); i(messages_unacknowledged, _) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 304275c4..513d3050 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -112,16 +112,14 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> - try - case handle_method(Method, Content, State) of - {reply, Reply, NewState} -> - ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - NewState; - {noreply, NewState} -> - NewState; - stop -> - exit(normal) - end + try handle_method(Method, Content, State) of + {reply, Reply, NewState} -> + ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), + NewState; + {noreply, NewState} -> + NewState; + stop -> + exit(normal) catch exit:{amqp, Error, Explanation, none} -> terminate({amqp, Error, Explanation, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index ecc285a5..cbc11b40 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -57,7 +57,7 @@ start() -> true -> ok; false -> io:format("...done.~n") end, - init:stop(); + halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> error("invalid command '~s'", [lists:flatten( @@ -138,7 +138,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is \"/\". <QueueInfoItem> must be a member of the list [name, durable, auto_delete, -arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted, +arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, consumers, transactions, memory]. The default is to display name and (number of) messages. @@ -148,7 +148,7 @@ auto_delete, arguments]. The default is to display name and type. The output format for \"list_bindings\" is a list of rows containing exchange name, routing key, queue name and arguments, in that order. -<ConnectionInfoItem> must be a member of the list [pid, address, port, +<ConnectionInfoItem> must be a member of the list [node, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display user, peer_address and peer_port. @@ -242,7 +242,8 @@ action(list_vhost_users, Node, Args = [_VHostPath], Inform) -> action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), {VHostArg, RemainingArgs} = parse_vhost_flag(Args), - ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), + ArgAtoms = list_replace(node, pid, + default_if_empty(RemainingArgs, [name, messages])), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); @@ -267,7 +268,8 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port]), + ArgAtoms = list_replace(node, pid, + default_if_empty(Args, [user, peer_address, peer_port])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms). @@ -308,9 +310,10 @@ format_info_item(Items, Key) -> case Info of {_, #resource{name = Name}} -> url_encode(Name); - {Key, IpAddress} when Key =:= address; Key =:= peer_address - andalso is_tuple(IpAddress) -> - inet_parse:ntoa(IpAddress); + _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) -> + inet_parse:ntoa(Value); + _ when is_pid(Value) -> + atom_to_list(node(Value)); _ when is_binary(Value) -> url_encode(Value); _ -> @@ -357,3 +360,6 @@ url_encode_char([], Acc) -> d2h(N) when N<10 -> N+$0; d2h(N) -> N+$a-10. +list_replace(Find, Replace, List) -> + [case X of Find -> Replace; _ -> X end || X <- List]. + diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 9a9220b5..183b6984 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -46,7 +46,7 @@ init({{File, Suffix}, []}) -> case rabbit_misc:append_file(File, Suffix) of ok -> ok; {error, Error} -> - rabbit_log:error("Failed to append contents of " ++ + rabbit_log:error("Failed to append contents of " "log file '~s' to '~s':~n~p~n", [File, [File, Suffix], Error]) end, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index c8069e08..925c335c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -268,8 +268,10 @@ route_internal(#exchange{name = Name}, RoutingKey) -> lookup_qpids(Queues) -> sets:fold( fun(Key, Acc) -> - [#amqqueue{pid = QPid}] = mnesia:dirty_read({amqqueue, Key}), - [QPid | Acc] + case mnesia:dirty_read({amqqueue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end end, [], sets:from_list(Queues)). %% TODO: Should all of the route and binding management not be diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 973e163b..85db50d7 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -50,6 +50,7 @@ -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). +-export([start_applications/1, stop_applications/1]). -import(mnesia). -import(lists). @@ -108,6 +109,8 @@ -spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'true'). +-spec(start_applications/1 :: ([atom()]) -> 'ok'). +-spec(stop_applications/1 :: ([atom()]) -> 'ok'). -endif. @@ -398,3 +401,32 @@ format_stderr(Fmt, Args) -> Port = open_port({fd, 0, 2}, [out]), port_command(Port, io_lib:format(Fmt, Args)), port_close(Port). + +manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> + Iterate(fun (App, Acc) -> + case Do(App) of + ok -> [App | Acc]; + {error, {SkipError, _}} -> Acc; + {error, Reason} -> + lists:foreach(Undo, Acc), + throw({error, {ErrorTag, App, Reason}}) + end + end, [], Apps), + ok. + +start_applications(Apps) -> + manage_applications(fun lists:foldl/3, + fun application:start/1, + fun application:stop/1, + already_started, + cannot_start_application, + Apps). + +stop_applications(Apps) -> + manage_applications(fun lists:foldr/3, + fun application:stop/1, + fun application:start/1, + not_started, + cannot_stop_application, + Apps). + diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d19c37cb..eebb38fa 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -243,8 +243,8 @@ init_db(ClusterNodes) -> %% NB: we cannot use rabbit_log here since %% it may not have been started yet error_logger:warning_msg( - "schema integrity check failed: ~p~n" ++ - "moving database to backup location " ++ + "schema integrity check failed: ~p~n" + "moving database to backup location " "and recreating schema from scratch~n", [Reason]), ok = move_db(), diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 7f6eaa8e..5e8edd53 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -50,7 +50,7 @@ start() -> case catch action(Command, Args, RpcTimeout) of ok -> io:format("done.~n"), - init:stop(); + halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> error("invalid command '~s'", [lists:flatten( diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 9e4c9c8a..2a365ce1 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -47,7 +47,7 @@ init({{File, Suffix}, []}) -> case rabbit_misc:append_file(File, Suffix) of ok -> ok; {error, Error} -> - rabbit_log:error("Failed to append contents of " ++ + rabbit_log:error("Failed to append contents of " "sasl log file '~s' to '~s':~n~p~n", [File, [File, Suffix], Error]) end, |