summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-11 16:16:15 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-11 16:16:15 +0100
commita6a531f2147831521f5a1e2f4811de263a6a78fe (patch)
treecce8cec5d9b0d1337f05b20f6be70a2c5dab786f
parent5301419aade68db6925f44d10b50bb770fcbc207 (diff)
downloadrabbitmq-server-a6a531f2147831521f5a1e2f4811de263a6a78fe.tar.gz
Get the reader to link to the chan_sup instead of the framing_chan. It used to link to the framing chan, which meant that when it got the exit signal from the framing_chan, it could be sure that the channel and writer had already died. However, this is no longer the case - now the framing_chan is actually the last to start and first to exit in the chan_sup and so the reader needs to link to the chan_sup instead. This means the reader needs to track both the framing_chan and the chan_sup
-rw-r--r--src/rabbit_reader.erl63
1 files changed, 35 insertions, 28 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 69f6773f..313b7aaf 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -367,10 +367,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
exit(Reason);
{channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, Channel, Reason} ->
- mainloop(Deb, handle_channel_exit(Channel, Reason, State));
- {'EXIT', Pid, Reason} ->
- mainloop(Deb, handle_dependent_exit(Pid, Reason, State));
+ {channel_exit, ChannelOrFrPid, Reason} ->
+ mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
+ {'EXIT', ChSupPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -463,13 +463,13 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
-handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) ->
- {channel, Channel} = get({chpid, ChPid}),
+handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
+ {channel, Channel} = get({ch_fr_pid, ChFrPid}),
handle_exception(State, Channel, Reason);
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
-handle_dependent_exit(Pid, Reason, State) ->
+handle_dependent_exit(ChSupPid, Reason, State) ->
case (case Reason of
normal -> controlled;
shutdown -> controlled;
@@ -477,30 +477,36 @@ handle_dependent_exit(Pid, Reason, State) ->
_ -> uncontrolled
end) of
controlled ->
- erase({chpid, Pid}),
+ case erase({ch_sup_pid, ChSupPid}) of
+ undefined -> ok;
+ {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
+ end,
maybe_close(State);
uncontrolled ->
- case channel_cleanup(Pid) of
+ case channel_cleanup(ChSupPid) of
undefined ->
- exit({abnormal_dependent_exit, Pid, Reason});
+ exit({abnormal_dependent_exit, ChSupPid, Reason});
Channel ->
maybe_close(handle_exception(State, Channel, Reason))
end
end.
-channel_cleanup(Pid) ->
- case get({chpid, Pid}) of
- undefined -> undefined;
- {channel, Channel} -> erase({channel, Channel}),
- erase({chpid, Pid}),
- Channel
+channel_cleanup(ChSupPid) ->
+ case get({ch_sup_pid, ChSupPid}) of
+ undefined -> undefined;
+ {{channel, Channel}, ChFr} -> erase({channel, Channel}),
+ erase(ChFr),
+ erase({ch_sup_pid, ChSupPid}),
+ Channel
end.
-all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
+all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
+ {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
terminate_channels() ->
NChannels =
- length([rabbit_framing_channel:shutdown(Pid) || Pid <- all_channels()]),
+ length([rabbit_framing_channel:shutdown(ChFrPid)
+ || ChFrPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -518,10 +524,10 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'EXIT', Pid, Reason} ->
- case channel_cleanup(Pid) of
+ {'EXIT', ChSupPid, Reason} ->
+ case channel_cleanup(ChSupPid) of
undefined ->
- exit({abnormal_dependent_exit, Pid, Reason});
+ exit({abnormal_dependent_exit, ChSupPid, Reason});
Channel ->
case Reason of
normal -> ok;
@@ -581,8 +587,8 @@ handle_frame(Type, Channel, Payload,
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
- {chpid, ChPid} ->
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
+ {ch_fr_pid, ChFrPid} ->
+ ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
@@ -888,14 +894,15 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
- {ok, _ChanSup, ChPid} =
+ {ok, ChSupPid, ChFrPid} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, [Protocol, Sock, Channel, FrameMax,
self(), Username, VHost, Collector]),
- link(ChPid),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
+ link(ChSupPid),
+ put({channel, Channel}, {ch_fr_pid, ChFrPid}),
+ put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
+ put({ch_fr_pid, ChFrPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",