diff options
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r-- | src/rabbit_reader.erl | 32 |
1 files changed, 24 insertions, 8 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3f8d7cac..ef8038e7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. info(Pid) -> - gen_server:call(Pid, info). + gen_server:call(Pid, info, infinity). info(Pid, Items) -> - case gen_server:call(Pid, {info, Items}) of + case gen_server:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -173,7 +173,8 @@ setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of once -> - rabbit_log:info("Enabling profiling for this connection, and disabling for subsequent.~n"), + rabbit_log:info("Enabling profiling for this connection, " + "and disabling for subsequent.~n"), rabbit_misc:set_config(profiling_enabled, false), fprof:trace(start); true -> @@ -230,8 +231,12 @@ start_connection(Parent, Deb, ClientSock) -> connection_state = pre_init}, handshake, 8)) catch - Ex -> rabbit_log:error("error on TCP connection ~p from ~s:~p~n~p~n", - [self(), PeerAddressS, PeerPort, Ex]) + Ex -> (if Ex == connection_closed_abruptly -> + fun rabbit_log:warning/2; + true -> + fun rabbit_log:error/2 + end)("exception on TCP connection ~p from ~s:~p~n~p~n", + [self(), PeerAddressS, PeerPort, Ex]) after rabbit_log:info("closing TCP connection ~p from ~s:~p~n", [self(), PeerAddressS, PeerPort]), @@ -283,6 +288,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit(Reason); {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> throw(E); + {channel_exit, Channel, Reason} -> + mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); {terminate_channel, Channel, Ref1} -> @@ -350,6 +357,14 @@ terminate_channel(Channel, Ref, State) -> end, State. +handle_channel_exit(Channel, Reason, State) -> + %% We remove the channel from the inbound map only. That allows + %% the channel to be re-opened, but also means the remaining + %% cleanup, including possibly closing the connection, is deferred + %% until we get the (normal) exit signal. + erase({channel, Channel}), + handle_exception(State, Channel, Reason). + handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), maybe_close(State); @@ -404,7 +419,8 @@ wait_for_channel_termination(N, TimerRef) -> normal -> ok; _ -> rabbit_log:error( - "connection ~p, channel ~p - error while terminating:~n~p~n", + "connection ~p, channel ~p - " + "error while terminating:~n~p~n", [self(), Channel, Reason]) end, wait_for_channel_termination(N-1, TimerRef) @@ -709,8 +725,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/4, - [self(), WriterPid, Username, VHost]), + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); |