diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-01 19:20:06 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-01 19:20:06 +0000 |
commit | 10a1a37e5fb34909f730c5603b80062a55e83bf1 (patch) | |
tree | 42bc1340eef5968c9143fae518244a32fad9a200 | |
parent | 23084b56d781c5363bed732ee6651fef16d93946 (diff) | |
download | rabbitmq-server-10a1a37e5fb34909f730c5603b80062a55e83bf1.tar.gz |
replace rabbit_framing_channel with tiny state machine
...which we store in the reader's process dict
This simplifies the interactions between the channel and the reader.
Note that the reader now monitors the channel processes rather than
their sups. This makes more sense since it interacts with the former
but never the latter.
-rw-r--r-- | src/rabbit_channel_sup.erl | 11 | ||||
-rw-r--r-- | src/rabbit_channel_sup_sup.erl | 2 | ||||
-rw-r--r-- | src/rabbit_framing_channel.erl | 46 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 110 |
4 files changed, 66 insertions, 103 deletions
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 02199a65..83e91f6c 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -50,7 +50,7 @@ rabbit_channel:channel_number(), non_neg_integer(), pid(), rabbit_access_control:username(), rabbit_types:vhost(), pid()}). --spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}). +-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). -endif. @@ -72,13 +72,8 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, [Channel, ReaderPid, WriterPid, Username, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), - {ok, FramingChannelPid} = - supervisor2:start_child( - SupPid, - {framing_channel, {rabbit_framing_channel, start_link, - [ReaderPid, ChannelPid, Protocol]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), - {ok, SupPid, FramingChannelPid}. + {ok, FramingState} = rabbit_framing_channel:init(Protocol), + {ok, SupPid, {ChannelPid, FramingState}}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 21c39780..fd99af56 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -43,7 +43,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> - {'ok', pid(), pid()}). + {'ok', pid(), {pid(), any()}}). -endif. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 9243ea16..57089fc2 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,51 +32,11 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/3, process/2, shutdown/1]). - -%% internal --export([mainloop/3]). - -%%-------------------------------------------------------------------- - -start_link(Parent, ChannelPid, Protocol) -> - {ok, proc_lib:spawn_link( - fun () -> mainloop(Parent, ChannelPid, {method, Protocol}) end)}. - -process(Pid, Frame) -> - Pid ! {frame, Frame}, - ok. - -shutdown(Pid) -> - Pid ! terminate, - ok. +-export([init/1, collect/2]). %%-------------------------------------------------------------------- -mainloop(Parent, ChannelPid, State) -> - Loop = fun (NewState) -> - ?MODULE:mainloop(Parent, ChannelPid, NewState) - end, - receive - {frame, Frame} -> - case collect(Frame, State) of - {ok, NewState} -> - Loop(NewState); - {ok, Method, NewState} -> - rabbit_channel:do(ChannelPid, Method), - Loop(NewState); - {ok, Method, Content, NewState} -> - rabbit_channel:do(ChannelPid, Method, Content), - Loop(NewState); - {error, Reason} -> - Parent ! {channel_exit, self(), Reason} - end; - terminate -> - rabbit_channel:shutdown(ChannelPid), - Loop(State); - Msg -> - exit({unexpected_message, Msg}) - end. +init(Protocol) -> {ok, {method, Protocol}}. collect({method, MethodName, FieldsBin}, {method, Protocol}) -> Method = Protocol:decode_method_fields(MethodName, FieldsBin), @@ -117,6 +77,8 @@ collect(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) -> unexpected_frame("expected content body, " "got non content body frame instead", [], Method). +%%-------------------------------------------------------------------- + empty_content(ClassId, PropertiesBin, Protocol) -> #content{class_id = ClassId, properties = none, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 92a2f4d7..9b304018 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -347,12 +347,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> + {channel_exit, _Channel, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, ChannelOrFrPid, Reason} -> - mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); - {'DOWN', _MRef, process, ChSupPid, Reason} -> - mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); + {channel_exit, Channel, Reason} -> + mainloop(Deb, handle_exception(State, Channel, Reason)); + {'DOWN', _MRef, process, ChPid, Reason} -> + mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); terminate_connection -> State; handshake_timeout -> @@ -443,45 +443,32 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) -> - {channel, Channel} = get({ch_fr_pid, ChFrPid}), - handle_exception(State, Channel, Reason); -handle_channel_exit(Channel, Reason, State) -> - handle_exception(State, Channel, Reason). - -handle_dependent_exit(ChSupPid, Reason, State) -> +handle_dependent_exit(ChPid, Reason, State) -> case termination_kind(Reason) of controlled -> - case erase({ch_sup_pid, ChSupPid}) of - undefined -> ok; - {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr) - end, + erase({ch_pid, ChPid}), maybe_close(State); uncontrolled -> - case channel_cleanup(ChSupPid) of - undefined -> - exit({abnormal_dependent_exit, ChSupPid, Reason}); - Channel -> - maybe_close(handle_exception(State, Channel, Reason)) + case channel_cleanup(ChPid) of + undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); + Channel -> maybe_close( + handle_exception(State, Channel, Reason)) end end. -channel_cleanup(ChSupPid) -> - case get({ch_sup_pid, ChSupPid}) of - undefined -> undefined; - {{channel, Channel}, ChFr} -> erase({channel, Channel}), - erase(ChFr), - erase({ch_sup_pid, ChSupPid}), - Channel +channel_cleanup(ChPid) -> + case get({ch_pid, ChPid}) of + undefined -> undefined; + Channel -> erase({channel, Channel}), + erase({ch_pid, ChPid}), + Channel end. -all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid}, - {_Channel, {ch_fr_pid, ChFrPid}}} <- get()]. +all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()]. terminate_channels() -> NChannels = - length([rabbit_framing_channel:shutdown(ChFrPid) - || ChFrPid <- all_channels()]), + length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -499,10 +486,10 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'DOWN', _MRef, process, ChSupPid, Reason} -> - case channel_cleanup(ChSupPid) of + {'DOWN', _MRef, process, ChPid, Reason} -> + case channel_cleanup(ChPid) of undefined -> - exit({abnormal_dependent_exit, ChSupPid, Reason}); + exit({abnormal_dependent_exit, ChPid, Reason}); Channel -> case termination_kind(Reason) of controlled -> @@ -565,20 +552,23 @@ handle_frame(Type, Channel, Payload, heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> case get({channel, Channel}) of - {ch_fr_pid, ChFrPid} -> - ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), + {ChPid, FramingState} -> + State1 = process_channel_frame( + AnalyzedFrame, Channel, ChPid, FramingState, + State), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), - State; + State1; {method, MethodName, _} -> - case (State#v1.connection_state =:= blocking andalso + case (State#v1.connection_state =:= blocking + andalso Protocol:method_has_content(MethodName)) of - true -> State#v1{connection_state = blocked}; - false -> State + true -> State1#v1{connection_state = blocked}; + false -> State1 end; _ -> - State + State1 end; closing -> %% According to the spec, after sending a @@ -601,9 +591,8 @@ handle_frame(Type, Channel, Payload, State; undefined -> case ?IS_RUNNING(State) of - true -> ok = send_to_new_channel( - Channel, AnalyzedFrame, State), - State; + true -> send_to_new_channel( + Channel, AnalyzedFrame, State); false -> throw({channel_frame_while_starting, Channel, State#v1.connection_state, AnalyzedFrame}) @@ -809,7 +798,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, fun() -> internal_emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> - lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), + lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection_state = CS, @@ -971,15 +960,32 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - {ok, ChSupPid, ChFrPid} = + {ok, _ChSupPid, {ChPid, ChFrSt}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector}), - erlang:monitor(process, ChSupPid), - put({channel, Channel}, {ch_fr_pid, ChFrPid}), - put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), - put({ch_fr_pid, ChFrPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame). + erlang:monitor(process, ChPid), + put({channel, Channel}, {ChPid, ChFrSt}), + put({ch_pid, ChPid}, Channel), + process_channel_frame(AnalyzedFrame, Channel, ChPid, ChFrSt, State). + +process_channel_frame(Frame, Channel, ChPid, ChFrSt, State) -> + UpdateFramingState = fun (NewChFrSt) -> + put({channel, Channel}, {ChPid, NewChFrSt}), + State + end, + case rabbit_framing_channel:collect(Frame, ChFrSt) of + {ok, NewChFrSt} -> + UpdateFramingState(NewChFrSt); + {ok, Method, NewChFrSt} -> + rabbit_channel:do(ChPid, Method), + UpdateFramingState(NewChFrSt); + {ok, Method, Content, NewChFrSt} -> + rabbit_channel:do(ChPid, Method, Content), + UpdateFramingState(NewChFrSt); + {error, Reason} -> + handle_exception(State, Channel, Reason) + end. log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", |