summaryrefslogtreecommitdiff
path: root/src/rabbit_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r--src/rabbit_reader.erl32
1 files changed, 24 insertions, 8 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3f8d7cac..ef8038e7 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
info(Pid) ->
- gen_server:call(Pid, info).
+ gen_server:call(Pid, info, infinity).
info(Pid, Items) ->
- case gen_server:call(Pid, {info, Items}) of
+ case gen_server:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -173,7 +173,8 @@ setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
once ->
- rabbit_log:info("Enabling profiling for this connection, and disabling for subsequent.~n"),
+ rabbit_log:info("Enabling profiling for this connection, "
+ "and disabling for subsequent.~n"),
rabbit_misc:set_config(profiling_enabled, false),
fprof:trace(start);
true ->
@@ -230,8 +231,12 @@ start_connection(Parent, Deb, ClientSock) ->
connection_state = pre_init},
handshake, 8))
catch
- Ex -> rabbit_log:error("error on TCP connection ~p from ~s:~p~n~p~n",
- [self(), PeerAddressS, PeerPort, Ex])
+ Ex -> (if Ex == connection_closed_abruptly ->
+ fun rabbit_log:warning/2;
+ true ->
+ fun rabbit_log:error/2
+ end)("exception on TCP connection ~p from ~s:~p~n~p~n",
+ [self(), PeerAddressS, PeerPort, Ex])
after
rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
@@ -283,6 +288,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit(Reason);
{'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
throw(E);
+ {channel_exit, Channel, Reason} ->
+ mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
{terminate_channel, Channel, Ref1} ->
@@ -350,6 +357,14 @@ terminate_channel(Channel, Ref, State) ->
end,
State.
+handle_channel_exit(Channel, Reason, State) ->
+ %% We remove the channel from the inbound map only. That allows
+ %% the channel to be re-opened, but also means the remaining
+ %% cleanup, including possibly closing the connection, is deferred
+ %% until we get the (normal) exit signal.
+ erase({channel, Channel}),
+ handle_exception(State, Channel, Reason).
+
handle_dependent_exit(Pid, normal, State) ->
channel_cleanup(Pid),
maybe_close(State);
@@ -404,7 +419,8 @@ wait_for_channel_termination(N, TimerRef) ->
normal -> ok;
_ ->
rabbit_log:error(
- "connection ~p, channel ~p - error while terminating:~n~p~n",
+ "connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
[self(), Channel, Reason])
end,
wait_for_channel_termination(N-1, TimerRef)
@@ -709,8 +725,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/4,
- [self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/5,
+ [Channel, self(), WriterPid, Username, VHost]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);