From 23084b56d781c5363bed732ee6651fef16d93946 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sat, 1 Jan 2011 13:21:42 +0000 Subject: refactoring in preperation for eliminating the framing_channel process --- src/rabbit_framing_channel.erl | 137 +++++++++++++++++++++-------------------- 1 file changed, 69 insertions(+), 68 deletions(-) diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index cb53185f..9243ea16 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -41,7 +41,7 @@ start_link(Parent, ChannelPid, Protocol) -> {ok, proc_lib:spawn_link( - fun () -> mainloop(Parent, ChannelPid, Protocol) end)}. + fun () -> mainloop(Parent, ChannelPid, {method, Protocol}) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -53,77 +53,78 @@ shutdown(Pid) -> %%-------------------------------------------------------------------- -read_frame(ChannelPid) -> +mainloop(Parent, ChannelPid, State) -> + Loop = fun (NewState) -> + ?MODULE:mainloop(Parent, ChannelPid, NewState) + end, receive - {frame, Frame} -> Frame; - terminate -> rabbit_channel:shutdown(ChannelPid), - read_frame(ChannelPid); - Msg -> exit({unexpected_message, Msg}) - end. - -mainloop(Parent, ChannelPid, Protocol) -> - case read_frame(ChannelPid) of - {method, MethodName, FieldsBin} -> - Method = Protocol:decode_method_fields(MethodName, FieldsBin), - case Protocol:method_has_content(MethodName) of - true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), - case collect_content(ChannelPid, ClassId, Protocol) of - {ok, Content} -> - rabbit_channel:do(ChannelPid, Method, Content), - ?MODULE:mainloop(Parent, ChannelPid, Protocol); - {error, Reason} -> - channel_exit(Parent, Reason, MethodName) - end; - false -> rabbit_channel:do(ChannelPid, Method), - ?MODULE:mainloop(Parent, ChannelPid, Protocol) + {frame, Frame} -> + case collect(Frame, State) of + {ok, NewState} -> + Loop(NewState); + {ok, Method, NewState} -> + rabbit_channel:do(ChannelPid, Method), + Loop(NewState); + {ok, Method, Content, NewState} -> + rabbit_channel:do(ChannelPid, Method, Content), + Loop(NewState); + {error, Reason} -> + Parent ! {channel_exit, self(), Reason} end; - _ -> - channel_exit(Parent, {unexpected_frame, - "expected method frame, " - "got non method frame instead", - []}, none) + terminate -> + rabbit_channel:shutdown(ChannelPid), + Loop(State); + Msg -> + exit({unexpected_message, Msg}) end. -collect_content(ChannelPid, ClassId, Protocol) -> - case read_frame(ChannelPid) of - {content_header, ClassId, 0, BodySize, PropertiesBin} -> - case collect_content_payload(ChannelPid, BodySize, []) of - {ok, Payload} -> {ok, #content{ - class_id = ClassId, - properties = none, - properties_bin = PropertiesBin, - protocol = Protocol, - payload_fragments_rev = Payload}}; - Error -> Error - end; - {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> - {error, {unexpected_frame, - "expected content header for class ~w, " +collect({method, MethodName, FieldsBin}, {method, Protocol}) -> + Method = Protocol:decode_method_fields(MethodName, FieldsBin), + case Protocol:method_has_content(MethodName) of + true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), + {ok, {content_header, Method, ClassId, Protocol}}; + false -> {ok, Method, {method, Protocol}} + end; +collect(_Frame, {method, _Protocol}) -> + unexpected_frame("expected method frame, " + "got non method frame instead", [], none); +collect({content_header, ClassId, 0, 0, PropertiesBin}, + {content_header, Method, ClassId, Protocol}) -> + Content = empty_content(ClassId, PropertiesBin, Protocol), + {ok, Method, Content, {method, Protocol}}; +collect({content_header, ClassId, 0, BodySize, PropertiesBin}, + {content_header, Method, ClassId, Protocol}) -> + Content = empty_content(ClassId, PropertiesBin, Protocol), + {ok, {content_body, Method, BodySize, Content, Protocol}}; +collect({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin}, + {content_header, Method, ClassId, _Protocol}) -> + unexpected_frame("expected content header for class ~w, " "got one for class ~w instead", - [ClassId, HeaderClassId]}}; - _ -> - {error, {unexpected_frame, - "expected content header for class ~w, " - "got non content header frame instead", - [ClassId]}} - end. + [ClassId, HeaderClassId], Method); +collect(_Frame, {content_header, Method, ClassId, _Protocol}) -> + unexpected_frame("expected content header for class ~w, " + "got non content header frame instead", [ClassId], Method); +collect({content_body, FragmentBin}, + {content_body, Method, RemainingSize, + Content = #content{payload_fragments_rev = Fragments}, Protocol}) -> + NewContent = Content#content{ + payload_fragments_rev = [FragmentBin | Fragments]}, + case RemainingSize - size(FragmentBin) of + 0 -> {ok, Method, NewContent, {method, Protocol}}; + Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}} + end; +collect(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) -> + unexpected_frame("expected content body, " + "got non content body frame instead", [], Method). -collect_content_payload(_ChannelPid, 0, Acc) -> - {ok, Acc}; -collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> - case read_frame(ChannelPid) of - {content_body, FragmentBin} -> - collect_content_payload(ChannelPid, - RemainingByteCount - size(FragmentBin), - [FragmentBin | Acc]); - _ -> - {error, {unexpected_frame, - "expected content body, " - "got non content body frame instead", - []}} - end. +empty_content(ClassId, PropertiesBin, Protocol) -> + #content{class_id = ClassId, + properties = none, + properties_bin = PropertiesBin, + protocol = Protocol, + payload_fragments_rev = []}. -channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) -> - Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params, - MethodName), - Parent ! {channel_exit, self(), Reason}. +unexpected_frame(Format, Params, Method) when is_atom(Method) -> + {error, rabbit_misc:amqp_error(unexpected_frame, Format, Params, Method)}; +unexpected_frame(Format, Params, Method) -> + unexpected_frame(Format, Params, rabbit_misc:method_record_type(Method)). -- cgit v1.2.1