summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-01 19:20:06 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-01 19:20:06 +0000
commit10a1a37e5fb34909f730c5603b80062a55e83bf1 (patch)
tree42bc1340eef5968c9143fae518244a32fad9a200
parent23084b56d781c5363bed732ee6651fef16d93946 (diff)
downloadrabbitmq-server-10a1a37e5fb34909f730c5603b80062a55e83bf1.tar.gz
replace rabbit_framing_channel with tiny state machine
...which we store in the reader's process dict This simplifies the interactions between the channel and the reader. Note that the reader now monitors the channel processes rather than their sups. This makes more sense since it interacts with the former but never the latter.
-rw-r--r--src/rabbit_channel_sup.erl11
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_framing_channel.erl46
-rw-r--r--src/rabbit_reader.erl110
4 files changed, 66 insertions, 103 deletions
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 02199a65..83e91f6c 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -50,7 +50,7 @@
rabbit_channel:channel_number(), non_neg_integer(), pid(),
rabbit_access_control:username(), rabbit_types:vhost(), pid()}).
--spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
+-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
-endif.
@@ -72,13 +72,8 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
[Channel, ReaderPid, WriterPid, Username, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
- {ok, FramingChannelPid} =
- supervisor2:start_child(
- SupPid,
- {framing_channel, {rabbit_framing_channel, start_link,
- [ReaderPid, ChannelPid, Protocol]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
- {ok, SupPid, FramingChannelPid}.
+ {ok, FramingState} = rabbit_framing_channel:init(Protocol),
+ {ok, SupPid, {ChannelPid, FramingState}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 21c39780..fd99af56 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -43,7 +43,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
- {'ok', pid(), pid()}).
+ {'ok', pid(), {pid(), any()}}).
-endif.
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 9243ea16..57089fc2 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,51 +32,11 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/3, process/2, shutdown/1]).
-
-%% internal
--export([mainloop/3]).
-
-%%--------------------------------------------------------------------
-
-start_link(Parent, ChannelPid, Protocol) ->
- {ok, proc_lib:spawn_link(
- fun () -> mainloop(Parent, ChannelPid, {method, Protocol}) end)}.
-
-process(Pid, Frame) ->
- Pid ! {frame, Frame},
- ok.
-
-shutdown(Pid) ->
- Pid ! terminate,
- ok.
+-export([init/1, collect/2]).
%%--------------------------------------------------------------------
-mainloop(Parent, ChannelPid, State) ->
- Loop = fun (NewState) ->
- ?MODULE:mainloop(Parent, ChannelPid, NewState)
- end,
- receive
- {frame, Frame} ->
- case collect(Frame, State) of
- {ok, NewState} ->
- Loop(NewState);
- {ok, Method, NewState} ->
- rabbit_channel:do(ChannelPid, Method),
- Loop(NewState);
- {ok, Method, Content, NewState} ->
- rabbit_channel:do(ChannelPid, Method, Content),
- Loop(NewState);
- {error, Reason} ->
- Parent ! {channel_exit, self(), Reason}
- end;
- terminate ->
- rabbit_channel:shutdown(ChannelPid),
- Loop(State);
- Msg ->
- exit({unexpected_message, Msg})
- end.
+init(Protocol) -> {ok, {method, Protocol}}.
collect({method, MethodName, FieldsBin}, {method, Protocol}) ->
Method = Protocol:decode_method_fields(MethodName, FieldsBin),
@@ -117,6 +77,8 @@ collect(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) ->
unexpected_frame("expected content body, "
"got non content body frame instead", [], Method).
+%%--------------------------------------------------------------------
+
empty_content(ClassId, PropertiesBin, Protocol) ->
#content{class_id = ClassId,
properties = none,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 92a2f4d7..9b304018 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -347,12 +347,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
- {channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
+ {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, ChannelOrFrPid, Reason} ->
- mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
+ {channel_exit, Channel, Reason} ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -443,45 +443,32 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
-handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
- {channel, Channel} = get({ch_fr_pid, ChFrPid}),
- handle_exception(State, Channel, Reason);
-handle_channel_exit(Channel, Reason, State) ->
- handle_exception(State, Channel, Reason).
-
-handle_dependent_exit(ChSupPid, Reason, State) ->
+handle_dependent_exit(ChPid, Reason, State) ->
case termination_kind(Reason) of
controlled ->
- case erase({ch_sup_pid, ChSupPid}) of
- undefined -> ok;
- {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
- end,
+ erase({ch_pid, ChPid}),
maybe_close(State);
uncontrolled ->
- case channel_cleanup(ChSupPid) of
- undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
- Channel ->
- maybe_close(handle_exception(State, Channel, Reason))
+ case channel_cleanup(ChPid) of
+ undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
+ Channel -> maybe_close(
+ handle_exception(State, Channel, Reason))
end
end.
-channel_cleanup(ChSupPid) ->
- case get({ch_sup_pid, ChSupPid}) of
- undefined -> undefined;
- {{channel, Channel}, ChFr} -> erase({channel, Channel}),
- erase(ChFr),
- erase({ch_sup_pid, ChSupPid}),
- Channel
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ Channel -> erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ Channel
end.
-all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
- {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()].
terminate_channels() ->
NChannels =
- length([rabbit_framing_channel:shutdown(ChFrPid)
- || ChFrPid <- all_channels()]),
+ length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -499,10 +486,10 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- case channel_cleanup(ChSupPid) of
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ case channel_cleanup(ChPid) of
undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
+ exit({abnormal_dependent_exit, ChPid, Reason});
Channel ->
case termination_kind(Reason) of
controlled ->
@@ -565,20 +552,23 @@ handle_frame(Type, Channel, Payload,
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ch_fr_pid, ChFrPid} ->
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
+ {ChPid, FramingState} ->
+ State1 = process_channel_frame(
+ AnalyzedFrame, Channel, ChPid, FramingState,
+ State),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
- State;
+ State1;
{method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking andalso
+ case (State#v1.connection_state =:= blocking
+ andalso
Protocol:method_has_content(MethodName)) of
- true -> State#v1{connection_state = blocked};
- false -> State
+ true -> State1#v1{connection_state = blocked};
+ false -> State1
end;
_ ->
- State
+ State1
end;
closing ->
%% According to the spec, after sending a
@@ -601,9 +591,8 @@ handle_frame(Type, Channel, Payload,
State;
undefined ->
case ?IS_RUNNING(State) of
- true -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
+ true -> send_to_new_channel(
+ Channel, AnalyzedFrame, State);
false -> throw({channel_frame_while_starting,
Channel, State#v1.connection_state,
AnalyzedFrame})
@@ -809,7 +798,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
- lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection_state = CS,
@@ -971,15 +960,32 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
- {ok, ChSupPid, ChFrPid} =
+ {ok, _ChSupPid, {ChPid, ChFrSt}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {Protocol, Sock, Channel, FrameMax,
self(), Username, VHost, Collector}),
- erlang:monitor(process, ChSupPid),
- put({channel, Channel}, {ch_fr_pid, ChFrPid}),
- put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
- put({ch_fr_pid, ChFrPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
+ erlang:monitor(process, ChPid),
+ put({channel, Channel}, {ChPid, ChFrSt}),
+ put({ch_pid, ChPid}, Channel),
+ process_channel_frame(AnalyzedFrame, Channel, ChPid, ChFrSt, State).
+
+process_channel_frame(Frame, Channel, ChPid, ChFrSt, State) ->
+ UpdateFramingState = fun (NewChFrSt) ->
+ put({channel, Channel}, {ChPid, NewChFrSt}),
+ State
+ end,
+ case rabbit_framing_channel:collect(Frame, ChFrSt) of
+ {ok, NewChFrSt} ->
+ UpdateFramingState(NewChFrSt);
+ {ok, Method, NewChFrSt} ->
+ rabbit_channel:do(ChPid, Method),
+ UpdateFramingState(NewChFrSt);
+ {ok, Method, Content, NewChFrSt} ->
+ rabbit_channel:do(ChPid, Method, Content),
+ UpdateFramingState(NewChFrSt);
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
+ end.
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",