summaryrefslogtreecommitdiff
path: root/src/rabbit_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r--src/rabbit_reader.erl154
1 files changed, 56 insertions, 98 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 69dbc008..5cc98992 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -49,7 +49,6 @@
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
--define(CHANNEL_CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
%---------------------------------------------------------------------------
@@ -94,23 +93,19 @@
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing, *running*
-%% terminate_channel timeout -> remove 'closing' mark, *running*
+%% -> log error, mark channel as closing, *running*
%% handshake_timeout -> ignore, *running*
%% heartbeat timeout -> *throw*
%% closing:
%% socket close -> *terminate*
%% receive frame -> ignore, *closing*
-%% terminate_channel timeout -> remove 'closing' mark, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
%% channel exit with hard error
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing
+%% -> log error, mark channel as closing
%% if last channel to exit then send connection.close_ok,
%% start terminate_connection timer, *closed*
%% else *closing*
@@ -123,7 +118,6 @@
%% *closed*
%% receive frame -> ignore, *closed*
%% terminate_connection timeout -> *terminate*
-%% terminate_channel timeout -> remove 'closing' mark, *closed*
%% handshake_timeout -> ignore, *closed*
%% heartbeat timeout -> *throw*
%% channel exit -> log error, *closed*
@@ -269,12 +263,10 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
throw({inet_error, Reason});
{'EXIT', Parent, Reason} ->
if State#v1.connection_state =:= running ->
- send_exception(
- State, 0,
- {amqp, connection_forced,
- io_lib:format(
- "broker forced connection closure with reason '~w'",
- [Reason]), none});
+ send_exception(State, 0,
+ rabbit_misc:amqp_error(connection_forced,
+ "broker forced connection closure with reason '~w'",
+ [Reason], none));
true -> ok
end,
%% this is what we are expected to do according to
@@ -292,8 +284,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
- {terminate_channel, Channel, Ref1} ->
- mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -341,32 +331,14 @@ close_connection(State = #v1{connection = #connection{
State#v1{connection_state = closed}.
close_channel(Channel, State) ->
- Ref = make_ref(),
- TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT,
- self(),
- {terminate_channel, Channel, Ref}),
- put({closing_channel, Channel}, {Ref, TRef}),
- State.
-
-terminate_channel(Channel, Ref, State) ->
- case get({closing_channel, Channel}) of
- undefined -> ok; %% got close_ok in the meantime
- {Ref, _} -> erase({closing_channel, Channel}),
- ok;
- {_Ref, _} -> ok %% got close_ok, and have new closing channel
- end,
+ put({channel, Channel}, closing),
State.
handle_channel_exit(Channel, Reason, State) ->
- %% We remove the channel from the inbound map only. That allows
- %% the channel to be re-opened, but also means the remaining
- %% cleanup, including possibly closing the connection, is deferred
- %% until we get the (normal) exit signal.
- erase({channel, Channel}),
handle_exception(State, Channel, Reason).
handle_dependent_exit(Pid, normal, State) ->
- channel_cleanup(Pid),
+ erase({chpid, Pid}),
maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
case channel_cleanup(Pid) of
@@ -376,17 +348,10 @@ handle_dependent_exit(Pid, Reason, State) ->
channel_cleanup(Pid) ->
case get({chpid, Pid}) of
- undefined ->
- case get({closing_chpid, Pid}) of
- undefined -> undefined;
- {channel, Channel} ->
- erase({closing_chpid, Pid}),
- Channel
- end;
- {channel, Channel} ->
- erase({channel, Channel}),
- erase({chpid, Pid}),
- Channel
+ undefined -> undefined;
+ {channel, Channel} -> erase({channel, Channel}),
+ erase({chpid, Pid}),
+ Channel
end.
all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
@@ -451,7 +416,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
State;
handle_frame(Type, 0, Payload, State) ->
case analyze_frame(Type, Payload) of
- error -> throw({unknown_frame, Type, Payload});
+ error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
trace -> State;
{method, MethodName, FieldsBin} ->
@@ -460,20 +425,34 @@ handle_frame(Type, 0, Payload, State) ->
end;
handle_frame(Type, Channel, Payload, State) ->
case analyze_frame(Type, Payload) of
- error -> throw({unknown_frame, Type, Payload});
+ error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{chpid, ChPid} ->
- ok = check_for_close(Channel, ChPid, AnalyzedFrame),
+ case AnalyzedFrame of
+ {method, 'channel.close', _} ->
+ erase({channel, Channel});
+ _ -> ok
+ end,
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
State;
+ closing ->
+ %% According to the spec, after sending a
+ %% channel.close we must ignore all frames except
+ %% channel.close_ok.
+ case AnalyzedFrame of
+ {method, 'channel.close_ok', _} ->
+ erase({channel, Channel});
+ _ -> ok
+ end,
+ State;
undefined ->
case State#v1.connection_state of
- running -> send_to_new_channel(
- Channel, AnalyzedFrame, State),
+ running -> ok = send_to_new_channel(
+ Channel, AnalyzedFrame, State),
State;
Other -> throw({channel_frame_while_starting,
Channel, Other, AnalyzedFrame})
@@ -567,17 +546,17 @@ handle_method0(MethodName, FieldsBin, State) ->
MethodName, FieldsBin),
State)
catch exit:Reason ->
- CompleteReason =
- case Reason of
- {amqp, Error, Explanation, none} ->
- {amqp, Error, Explanation, MethodName};
- OtherReason -> OtherReason
- end,
+ CompleteReason = case Reason of
+ #amqp_error{method = none} ->
+ Reason#amqp_error{method = MethodName};
+ OtherReason -> OtherReason
+ end,
case State#v1.connection_state of
running -> send_exception(State, 0, CompleteReason);
Other -> throw({channel0_error, Other, CompleteReason})
end
end.
+
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response},
State = #v1{connection_state = starting,
@@ -703,7 +682,7 @@ i(channels, #v1{}) ->
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
- none;
+ '';
i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
VHost;
i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
@@ -716,38 +695,17 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
send_to_new_channel(Channel, AnalyzedFrame, State) ->
- case get({closing_channel, Channel}) of
- undefined ->
- #v1{sock = Sock,
- connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
- ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);
- {_, TRef} ->
- %% According to the spec, after sending a channel.close we
- %% must ignore all frames except channel.close_ok.
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- erlang:cancel_timer(TRef),
- erase({closing_channel, Channel}),
- ok;
- _Other -> ok
- end
- end.
-
-check_for_close(Channel, ChPid, {method, 'channel.close', _}) ->
- channel_cleanup(ChPid),
- put({closing_chpid, ChPid}, {channel, Channel}),
- ok;
-check_for_close(_Channel, _ChPid, _Frame) ->
- ok.
+ #v1{sock = Sock, connection = #connection{
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
+ ChPid = rabbit_framing_channel:start_link(
+ fun rabbit_channel:start_link/5,
+ [Channel, self(), WriterPid, Username, VHost]),
+ put({channel, Channel}, {chpid, ChPid}),
+ put({chpid, ChPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
@@ -793,18 +751,18 @@ map_exception(Channel, Reason) ->
end,
{ShouldClose, CloseChannel, CloseMethod}.
-lookup_amqp_exception({amqp, {ShouldClose, Code, Text}, Expl, Method}) ->
+lookup_amqp_exception(
+ #amqp_error{name = Name, explanation = Expl, method = Method}) ->
+ {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name),
ExplBin = list_to_binary(Expl),
CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>,
SafeTextBin = if size(CompleteTextBin) > 255 ->
<<CompleteTextBin:252/binary, "...">>;
- true ->
- CompleteTextBin
+ true -> CompleteTextBin
end,
{ShouldClose, Code, SafeTextBin, Method};
-lookup_amqp_exception({amqp, ErrorName, Expl, Method}) ->
- Details = rabbit_framing:lookup_amqp_exception(ErrorName),
- lookup_amqp_exception({amqp, Details, Expl, Method});
lookup_amqp_exception(Other) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {true, ?INTERNAL_ERROR, <<"INTERNAL_ERROR">>, none}.
+ {ShouldClose, Code, Text} =
+ rabbit_framing:lookup_amqp_exception(internal_error),
+ {ShouldClose, Code, Text, none}.