summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-10 14:44:45 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-10 14:44:45 +0100
commitc026814be65cc3b9f9377eba4569f6c57f0b06c7 (patch)
tree9c7c2f77121d641798cc28b628ee7ca67a1ea01d
parent4984185a9429d417f6dbc103790ee65154613ad9 (diff)
parent144090860fefc72d73cbb1a97a6f0c6b2d1cf389 (diff)
downloadrabbitmq-server-c026814be65cc3b9f9377eba4569f6c57f0b06c7.tar.gz
merge
-rw-r--r--docs/rabbitmq-server.1.xml3
-rwxr-xr-xscripts/rabbitmq-server17
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_mirror_queue_slave.erl148
-rw-r--r--src/rabbit_net.erl33
-rw-r--r--src/rabbit_networking.erl12
-rw-r--r--src/rabbit_reader.erl19
-rw-r--r--src/rabbit_tests.erl1
-rw-r--r--src/rabbit_types.erl2
-rw-r--r--src/supervisor2.erl135
-rw-r--r--src/supervisor2_tests.erl70
11 files changed, 286 insertions, 167 deletions
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index ca63927c..32ae842c 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -109,7 +109,8 @@ Defaults to 5672.
<term>-detached</term>
<listitem>
<para>
- start the server process in the background
+ Start the server process in the background. Note that this will
+ cause the pid not to be written to the pid file.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmq-server -detached</screen>
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 34915b3d..e1686627 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -65,9 +65,20 @@ case "$(uname -s)" in
CYGWIN*) # we make no attempt to record the cygwin pid; rabbitmqctl wait
# will not be able to make sense of it anyway
;;
- *) mkdir -p $(dirname ${RABBITMQ_PID_FILE});
- echo $$ > ${RABBITMQ_PID_FILE}
- ;;
+ *) # When -detached is passed, we don't write the pid, since it'd be the
+ # wrong one
+ detached=""
+ for opt in "$@"; do
+ if [ "$opt" = "-detached" ]; then
+ detached="true"
+ fi
+ done
+ if [ $detached ]; then
+ echo "Warning: PID file not written; -detached was passed." 1>&2
+ else
+ mkdir -p $(dirname ${RABBITMQ_PID_FILE});
+ echo $$ > ${RABBITMQ_PID_FILE}
+ fi
esac
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a5f227bc..461b25eb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -311,8 +311,17 @@ with(Name, F, E) ->
case lookup(Name) of
{ok, Q = #amqqueue{slave_pids = []}} ->
rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
- {ok, Q} ->
- E1 = fun () -> timer:sleep(25), with(Name, F, E) end,
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ %% We check is_process_alive(QPid) in case we receive a
+ %% nodedown (for example) in F() that has nothing to do
+ %% with the QPid.
+ E1 = fun () ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> E();
+ false -> timer:sleep(25),
+ with(Name, F, E)
+ end
+ end,
rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end);
{error, not_found} ->
E()
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e4d78c45..964c3e24 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -19,17 +19,8 @@
%% For general documentation of HA design, see
%% rabbit_mirror_queue_coordinator
%%
-%% We join the GM group before we add ourselves to the amqqueue
-%% record. As a result:
-%% 1. We can receive msgs from GM that correspond to messages we will
-%% never receive from publishers.
-%% 2. When we receive a message from publishers, we must receive a
-%% message from the GM group for it.
-%% 3. However, that instruction from the GM group can arrive either
-%% before or after the actual message. We need to be able to
-%% distinguish between GM instructions arriving early, and case (1)
-%% above.
-%%
+%% We receive messages from GM and from publishers, and the gm
+%% messages can arrive either before or after the 'actual' message.
%% All instructions from the GM group must be processed in the order
%% in which they're received.
@@ -89,28 +80,33 @@
synchronised
}).
-start_link(Q) ->
- gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-info(QPid) ->
- gen_server2:call(QPid, info, infinity).
+info(QPid) -> gen_server2:call(QPid, info, infinity).
init(#amqqueue { name = QueueName } = Q) ->
+ %% We join the GM group before we add ourselves to the amqqueue
+ %% record. As a result:
+ %% 1. We can receive msgs from GM that correspond to messages we will
+ %% never receive from publishers.
+ %% 2. When we receive a message from publishers, we must receive a
+ %% message from the GM group for it.
+ %% 3. However, that instruction from the GM group can arrive either
+ %% before or after the actual message. We need to be able to
+ %% distinguish between GM instructions arriving early, and case (1)
+ %% above.
+ %%
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(fun() ->
- init_it(Self, Node,
- QueueName)
- end) of
+ case rabbit_misc:execute_mnesia_transaction(
+ fun() -> init_it(Self, Node, QueueName) end) of
{new, MPid} ->
- process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
- receive {joined, GM} ->
- ok
- end,
erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
@@ -153,24 +149,21 @@ init_it(Self, Node, QueueName) ->
[Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
mnesia:read({rabbit_queue, QueueName}),
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] ->
- MPids1 = MPids ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
- {new, QPid};
- [QPid] ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> duplicate_live_master;
- false -> {stale, QPid}
- end;
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true -> existing;
- false -> MPids1 = (MPids -- [SPid]) ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
- {new, QPid}
- end
+ [] -> MPids1 = MPids ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid};
+ [QPid] -> case rabbit_misc:is_process_alive(QPid) of
+ true -> duplicate_live_master;
+ false -> {stale, QPid}
+ end;
+ [SPid] -> case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> MPids1 = (MPids -- [SPid]) ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid}
+ end
end.
handle_call({deliver, Delivery = #delivery { immediate = true }},
@@ -356,14 +349,10 @@ prioritise_info(Msg, _State) ->
%% GM
%% ---------------------------------------------------------------------------
-joined([SPid], _Members) ->
- SPid ! {joined, self()},
- ok.
+joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, []) ->
- ok;
-members_changed([SPid], _Births, Deaths) ->
- inform_deaths(SPid, Deaths).
+members_changed([_SPid], _Births, []) -> ok;
+members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths).
handle_msg([_SPid], _From, master_changed) ->
ok;
@@ -584,9 +573,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
confirm_messages(MsgIds, State #state {
backing_queue_state = BQS1 })),
case BQ:needs_timeout(BQS1) of
- false -> {stop_sync_timer(State1), hibernate};
- idle -> {stop_sync_timer(State1), 0 };
- timed -> {ensure_sync_timer(State1), 0 }
+ false -> {stop_sync_timer(State1), hibernate };
+ idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
+ timed -> {ensure_sync_timer(State1), 0 }
end.
backing_queue_timeout(State = #state { backing_queue = BQ }) ->
@@ -680,26 +669,24 @@ maybe_enqueue_message(
%% msg_seq_no was at the time. We do now!
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { sender_queues = SQ1,
- msg_id_status = dict:erase(MsgId, MS) };
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 };
{ok, {published, ChPid}} ->
%% It was published to the BQ and we didn't know the
%% msg_seq_no so couldn't confirm it at the time.
- case needs_confirming(Delivery, State1) of
- never ->
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- eventually ->
- State1 #state {
- msg_id_status =
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 }
- end;
+ {MS1, SQ1} =
+ case needs_confirming(Delivery, State1) of
+ never -> {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)};
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ {dict:store(MsgId, MMS, MS), SQ};
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)}
+ end,
+ State1 #state { msg_id_status = MS1,
+ sender_queues = SQ1 };
{ok, discarded} ->
%% We've already heard from GM that the msg is to be
%% discarded. We won't see this again.
@@ -748,18 +735,17 @@ process_instruction(
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } },
_EnqueueOnPromotion}}, MQ2} ->
- %% We received the msg from the channel first. Thus we
- %% need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never ->
- {MQ2, PendingCh, MS};
- eventually ->
- {MQ2, PendingCh,
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- {MQ2, PendingCh, MS}
- end;
+ {MQ2, PendingCh,
+ %% We received the msg from the channel first. Thus
+ %% we need to deal with confirms here.
+ case needs_confirming(Delivery, State1) of
+ never -> MS;
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ dict:store(MsgId, MMS , MS);
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ MS
+ end};
{{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index bedf5142..038154c3 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -19,7 +19,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
- close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1,
+ close/1, fast_close/1, sockname/1, peername/1, peercert/1,
tune_buffer_size/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -59,7 +59,7 @@
-spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
--spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()).
+-spec(fast_close/1 :: (socket()) -> ok_or_any_error()).
-spec(sockname/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
@@ -77,6 +77,8 @@
%%---------------------------------------------------------------------------
+-define(SSL_CLOSE_TIMEOUT, 5000).
+
-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
is_ssl(Sock) -> ?IS_SSL(Sock).
@@ -148,8 +150,31 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl);
close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
-maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok;
-maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok.
+fast_close(Sock) when ?IS_SSL(Sock) ->
+ %% We cannot simply port_close the underlying tcp socket since the
+ %% TLS protocol is quite insistent that a proper closing handshake
+ %% should take place (see RFC 5245 s7.2.1). So we call ssl:close
+ %% instead, but that can block for a very long time, e.g. when
+ %% there is lots of pending output and there is tcp backpressure,
+ %% or the ssl_connection process has entered the the
+ %% workaround_transport_delivery_problems function during
+ %% termination, which, inexplicably, does a gen_tcp:recv(Socket,
+ %% 0), which may never return if the client doesn't send a FIN or
+ %% that gets swallowed by the network. Since there is no timeout
+ %% variant of ssl:close, we construct our own.
+ {Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock#ssl_socket.ssl) end),
+ erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}),
+ receive
+ {Pid, ssl_close_timeout} ->
+ erlang:demonitor(MRef, [flush]),
+ exit(Pid, kill);
+ {'DOWN', MRef, process, Pid, _Reason} ->
+ ok
+ end,
+ catch port_close(Sock#ssl_socket.tcp),
+ ok;
+fast_close(Sock) when is_port(Sock) ->
+ catch port_close(Sock), ok.
sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 94a5a2b7..2d0ded12 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -160,7 +160,19 @@ ssl_transform_fun(SslOpts) ->
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
{ok, SslSock} ->
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
+ {error, timeout} ->
+ {error, {ssl_upgrade_error, timeout}};
{error, Reason} ->
+ %% We have no idea what state the ssl_connection
+ %% process is in - it could still be happily
+ %% going, it might be stuck, or it could be just
+ %% about to fail. There is little that our caller
+ %% can do but close the TCP socket, but this could
+ %% cause ssl alerts to get dropped (which is bad
+ %% form, according to the TLS spec). So we give
+ %% the ssl_connection a little bit of time to send
+ %% such alerts.
+ timer:sleep(?SSL_TIMEOUT * 1000),
{error, {ssl_upgrade_error, Reason}};
{'EXIT', Reason} ->
{error, {ssl_upgrade_failure, Reason}}
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 19dac70c..aef48b20 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -184,6 +184,8 @@ socket_op(Sock, Fun) ->
{ok, Res} -> Res;
{error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
[self(), Reason]),
+ %% NB: this is tcp socket, even in case of ssl
+ rabbit_net:fast_close(Sock),
exit(normal)
end.
@@ -236,15 +238,14 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
end, "closing AMQP connection ~p (~s):~n~p~n",
[self(), ConnStr, Ex])
after
- %% The reader is the controlling process and hence its
- %% termination will close the socket. Furthermore,
- %% gen_tcp:close/1 waits for pending output to be sent, which
- %% results in unnecessary delays. However, to keep the
- %% file_handle_cache accounting as accurate as possible it
- %% would be good to close the socket immediately if we
- %% can. But we can only do this for non-ssl sockets.
- %%
- rabbit_net:maybe_fast_close(ClientSock),
+ %% We don't call gen_tcp:close/1 here since it waits for
+ %% pending output to be sent, which results in unnecessary
+ %% delays. We could just terminate - the reader is the
+ %% controlling process and hence its termination will close
+ %% the socket. However, to keep the file_handle_cache
+ %% accounting as accurate as possible we ought to close the
+ %% socket w/o delay before termination.
+ rabbit_net:fast_close(ClientSock),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ccac12c6..e1914ac2 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -32,6 +32,7 @@
-define(TIMEOUT, 5000).
all_tests() ->
+ ok = supervisor2_tests:test_all(),
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 732c29b6..8966bcab 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -64,7 +64,7 @@
#basic_message{exchange_name :: rabbit_exchange:name(),
routing_keys :: [rabbit_router:routing_key()],
content :: content(),
- id :: msg_id(),
+ id :: msg_id(),
is_persistent :: boolean()}).
-type(message() :: basic_message()).
-type(delivery() ::
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 3d3623d7..5af38573 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -255,10 +255,10 @@ behaviour_info(_Other) ->
%%% ---------------------------------------------------
start_link(Mod, Args) ->
gen_server:start_link(?MODULE, {self, Mod, Args}, []).
-
+
start_link(SupName, Mod, Args) ->
gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
-
+
%%% ---------------------------------------------------
%%% Interface functions.
%%% ---------------------------------------------------
@@ -298,9 +298,9 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
check_childspecs(X) -> {error, {badarg, X}}.
%%% ---------------------------------------------------
-%%%
+%%%
%%% Initialize the supervisor.
-%%%
+%%%
%%% ---------------------------------------------------
init({SupName, Mod, Args}) ->
process_flag(trap_exit, true),
@@ -319,7 +319,7 @@ init({SupName, Mod, Args}) ->
Error ->
{stop, {bad_return, {Mod, init, Error}}}
end.
-
+
init_children(State, StartSpec) ->
SupName = State#state.name,
case check_startspec(StartSpec) of
@@ -349,7 +349,7 @@ init_dynamic(_State, StartSpec) ->
%% Func: start_children/2
%% Args: Children = [#child] in start order
%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
-%% Purpose: Start all children. The new list contains #child's
+%% Purpose: Start all children. The new list contains #child's
%% with pids.
%% Returns: {ok, NChildren} | {error, NChildren}
%% NChildren = [#child] in termination order (reversed
@@ -381,7 +381,7 @@ do_start_child(SupName, Child) ->
NChild = Child#child{pid = Pid},
report_progress(NChild, SupName),
{ok, Pid, Extra};
- ignore ->
+ ignore ->
{ok, undefined};
{error, What} -> {error, What};
What -> {error, What}
@@ -400,12 +400,12 @@ do_start_child_i(M, F, A) ->
What ->
{error, What}
end.
-
+
%%% ---------------------------------------------------
-%%%
+%%%
%%% Callback functions.
-%%%
+%%%
%%% ---------------------------------------------------
handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
#child{mfa = {M, F, A}} = hd(State#state.children),
@@ -414,11 +414,11 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
{ok, undefined} ->
{reply, {ok, undefined}, State};
{ok, Pid} ->
- NState = State#state{dynamics =
+ NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
{reply, {ok, Pid}, NState};
{ok, Pid, Extra} ->
- NState = State#state{dynamics =
+ NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
{reply, {ok, Pid, Extra}, NState};
What ->
@@ -497,7 +497,7 @@ handle_call(which_children, _From, State) ->
%%% Hopefully cause a function-clause as there is no API function
%%% that utilizes cast.
handle_cast(null, State) ->
- error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
+ error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
[]),
{noreply, State}.
@@ -527,7 +527,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
end;
handle_info(Msg, State) ->
- error_logger:error_msg("Supervisor received unexpected message: ~p~n",
+ error_logger:error_msg("Supervisor received unexpected message: ~p~n",
[Msg]),
{noreply, State}.
%%
@@ -577,13 +577,13 @@ check_flags({Strategy, MaxIntensity, Period}) ->
check_flags(What) ->
{bad_flags, What}.
-update_childspec(State, StartSpec) when ?is_simple(State) ->
- case check_startspec(StartSpec) of
- {ok, [Child]} ->
- {ok, State#state{children = [Child]}};
- Error ->
- {error, Error}
- end;
+update_childspec(State, StartSpec) when ?is_simple(State) ->
+ case check_startspec(StartSpec) of
+ {ok, [Child]} ->
+ {ok, State#state{children = [Child]}};
+ Error ->
+ {error, Error}
+ end;
update_childspec(State, StartSpec) ->
case check_startspec(StartSpec) of
@@ -604,7 +604,7 @@ update_childspec1([Child|OldC], Children, KeepOld) ->
end;
update_childspec1([], Children, KeepOld) ->
% Return them in (keeped) reverse start order.
- lists:reverse(Children ++ KeepOld).
+ lists:reverse(Children ++ KeepOld).
update_chsp(OldCh, Children) ->
case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name ->
@@ -618,7 +618,7 @@ update_chsp(OldCh, Children) ->
NewC ->
{ok, NewC}
end.
-
+
%%% ---------------------------------------------------
%%% Start a new child.
%%% ---------------------------------------------------
@@ -630,12 +630,12 @@ handle_start_child(Child, State) ->
{ok, Pid} ->
Children = State#state.children,
{{ok, Pid},
- State#state{children =
+ State#state{children =
[Child#child{pid = Pid}|Children]}};
{ok, Pid, Extra} ->
Children = State#state.children,
{{ok, Pid, Extra},
- State#state{children =
+ State#state{children =
[Child#child{pid = Pid}|Children]}};
{error, What} ->
{{error, {What, Child}}, State}
@@ -816,29 +816,32 @@ terminate_simple_children(Child, Dynamics, SupName) ->
{Replies, Timedout} =
lists:foldl(
fun (_Pid, {Replies, Timedout}) ->
- {Reply, Timedout1} =
+ {Pid1, Reason1, Timedout1} =
receive
TimeoutMsg ->
Remaining = Pids -- [P || {P, _} <- Replies],
[exit(P, kill) || P <- Remaining],
- receive {'DOWN', _MRef, process, Pid, Reason} ->
- {{error, Reason}, true}
+ receive
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ {Pid, Reason, true}
end;
{'DOWN', _MRef, process, Pid, Reason} ->
- {child_res(Child, Reason, Timedout), Timedout};
- {'EXIT', Pid, Reason} ->
- receive {'DOWN', _MRef, process, Pid, _} ->
- {{error, Reason}, Timedout}
- end
+ {Pid, Reason, Timedout}
end,
- {[{Pid, Reply} | Replies], Timedout1}
+ {[{Pid1, child_res(Child, Reason1, Timedout1)} | Replies],
+ Timedout1}
end, {[], false}, Pids),
timeout_stop(Child, TRef, TimeoutMsg, Timedout),
ReportError = shutdown_error_reporter(SupName),
- [case Reply of
- {_Pid, ok} -> ok;
- {Pid, {error, R}} -> ReportError(R, Child#child{pid = Pid})
- end || Reply <- Replies],
+ Report = fun(_, ok) -> ok;
+ (Pid, {error, R}) -> ReportError(R, Child#child{pid = Pid})
+ end,
+ [receive
+ {'EXIT', Pid, Reason} ->
+ Report(Pid, child_res(Child, Reason, Timedout))
+ after
+ 0 -> Report(Pid, Reply)
+ end || {Pid, Reply} <- Replies],
ok.
child_exit_reason(#child{shutdown = brutal_kill}) -> kill;
@@ -863,7 +866,7 @@ timeout_stop(#child{shutdown = Time}, TRef, Msg, false) when is_integer(Time) ->
after
0 -> ok
end;
-timeout_stop(#child{}, ok, _Msg, _Timedout) ->
+timeout_stop(#child{}, _TRef, _Msg, _Timedout) ->
ok.
do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
@@ -885,17 +888,17 @@ do_terminate(Child, _SupName) ->
Child.
%%-----------------------------------------------------------------
-%% Shutdowns a child. We must check the EXIT value
+%% Shutdowns a child. We must check the EXIT value
%% of the child, because it might have died with another reason than
-%% the wanted. In that case we want to report the error. We put a
-%% monitor on the child an check for the 'DOWN' message instead of
-%% checking for the 'EXIT' message, because if we check the 'EXIT'
-%% message a "naughty" child, who does unlink(Sup), could hang the
-%% supervisor.
+%% the wanted. In that case we want to report the error. We put a
+%% monitor on the child an check for the 'DOWN' message instead of
+%% checking for the 'EXIT' message, because if we check the 'EXIT'
+%% message a "naughty" child, who does unlink(Sup), could hang the
+%% supervisor.
%% Returns: ok | {error, OtherReason} (this should be reported)
%%-----------------------------------------------------------------
shutdown(Pid, brutal_kill) ->
-
+
case monitor_child(Pid) of
ok ->
exit(Pid, kill),
@@ -905,16 +908,16 @@ shutdown(Pid, brutal_kill) ->
{'DOWN', _MRef, process, Pid, OtherReason} ->
{error, OtherReason}
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end;
shutdown(Pid, Time) ->
-
+
case monitor_child(Pid) of
ok ->
exit(Pid, shutdown), %% Try to shutdown gracefully
- receive
+ receive
{'DOWN', _MRef, process, Pid, shutdown} ->
ok;
{'DOWN', _MRef, process, Pid, OtherReason} ->
@@ -926,14 +929,14 @@ shutdown(Pid, Time) ->
{error, OtherReason}
end
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end.
%% Help function to shutdown/2 switches from link to monitor approach
monitor_child(Pid) ->
-
- %% Do the monitor operation first so that if the child dies
+
+ %% Do the monitor operation first so that if the child dies
%% before the monitoring is done causing a 'DOWN'-message with
%% reason noproc, we will get the real reason in the 'EXIT'-message
%% unless a naughty child has already done unlink...
@@ -943,22 +946,22 @@ monitor_child(Pid) ->
receive
%% If the child dies before the unlik we must empty
%% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
- {'EXIT', Pid, Reason} ->
- receive
+ {'EXIT', Pid, Reason} ->
+ receive
{'DOWN', _, process, Pid, _} ->
{error, Reason}
end
- after 0 ->
+ after 0 ->
%% If a naughty child did unlink and the child dies before
- %% monitor the result will be that shutdown/2 receives a
+ %% monitor the result will be that shutdown/2 receives a
%% 'DOWN'-message with reason noproc.
%% If the child should die after the unlink there
%% will be a 'DOWN'-message with a correct reason
- %% that will be handled in shutdown/2.
- ok
+ %% that will be handled in shutdown/2.
+ ok
end.
-
-
+
+
%%-----------------------------------------------------------------
%% Child/State manipulating functions.
%%-----------------------------------------------------------------
@@ -1012,7 +1015,7 @@ remove_child(Child, State) ->
%% Args: SupName = {local, atom()} | {global, atom()} | self
%% Type = {Strategy, MaxIntensity, Period}
%% Strategy = one_for_one | one_for_all | simple_one_for_one |
-%% rest_for_one
+%% rest_for_one
%% MaxIntensity = integer()
%% Period = integer()
%% Mod :== atom()
@@ -1107,10 +1110,10 @@ validChildType(supervisor) -> true;
validChildType(worker) -> true;
validChildType(What) -> throw({invalid_child_type, What}).
-validName(_Name) -> true.
+validName(_Name) -> true.
-validFunc({M, F, A}) when is_atom(M),
- is_atom(F),
+validFunc({M, F, A}) when is_atom(M),
+ is_atom(F),
is_list(A) -> true;
validFunc(Func) -> throw({invalid_mfa, Func}).
@@ -1128,7 +1131,7 @@ validDelay(Delay) when is_number(Delay),
Delay >= 0 -> true;
validDelay(What) -> throw({invalid_delay, What}).
-validShutdown(Shutdown, _)
+validShutdown(Shutdown, _)
when is_integer(Shutdown), Shutdown > 0 -> true;
validShutdown(infinity, supervisor) -> true;
validShutdown(brutal_kill, _) -> true;
@@ -1154,7 +1157,7 @@ validMods(Mods) -> throw({invalid_modules, Mods}).
%%% Returns: {ok, State'} | {terminate, State'}
%%% ------------------------------------------------------
-add_restart(State) ->
+add_restart(State) ->
I = State#state.intensity,
P = State#state.period,
R = State#state.restarts,
diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl
new file mode 100644
index 00000000..e42ded7b
--- /dev/null
+++ b/src/supervisor2_tests.erl
@@ -0,0 +1,70 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(supervisor2_tests).
+-behaviour(supervisor2).
+
+-export([test_all/0, start_link/0]).
+-export([init/1]).
+
+test_all() ->
+ ok = check_shutdown(stop, 200, 200, 2000),
+ ok = check_shutdown(ignored, 1, 2, 2000).
+
+check_shutdown(SigStop, Iterations, ChildCount, SupTimeout) ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, [SupTimeout]),
+ Res = lists:foldl(
+ fun (I, ok) ->
+ TestSupPid = erlang:whereis(?MODULE),
+ ChildPids =
+ [begin
+ {ok, ChildPid} =
+ supervisor2:start_child(TestSupPid, []),
+ ChildPid
+ end || _ <- lists:seq(1, ChildCount)],
+ MRef = erlang:monitor(process, TestSupPid),
+ [P ! SigStop || P <- ChildPids],
+ ok = supervisor2:terminate_child(Sup, test_sup),
+ {ok, _} = supervisor2:restart_child(Sup, test_sup),
+ receive
+ {'DOWN', MRef, process, TestSupPid, shutdown} ->
+ ok;
+ {'DOWN', MRef, process, TestSupPid, Reason} ->
+ {error, {I, Reason}}
+ end;
+ (_, R) ->
+ R
+ end, ok, lists:seq(1, Iterations)),
+ unlink(Sup),
+ exit(Sup, shutdown),
+ Res.
+
+start_link() ->
+ Pid = spawn_link(fun () ->
+ process_flag(trap_exit, true),
+ receive stop -> ok end
+ end),
+ {ok, Pid}.
+
+init([Timeout]) ->
+ {ok, {{one_for_one, 0, 1},
+ [{test_sup, {supervisor2, start_link,
+ [{local, ?MODULE}, ?MODULE, []]},
+ transient, Timeout, supervisor, [?MODULE]}]}};
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{test_worker, {?MODULE, start_link, []},
+ temporary, 1000, worker, [?MODULE]}]}}.