summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-01 13:21:42 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-01 13:21:42 +0000
commit23084b56d781c5363bed732ee6651fef16d93946 (patch)
treee0fd8ce93a37597c0c5ab83ee9989236ccc9e420
parent02396f8d21a641b4640657e1203b7c2a343a1c73 (diff)
downloadrabbitmq-server-23084b56d781c5363bed732ee6651fef16d93946.tar.gz
refactoring in preperation for eliminating the framing_channel process
-rw-r--r--src/rabbit_framing_channel.erl137
1 files changed, 69 insertions, 68 deletions
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index cb53185f..9243ea16 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -41,7 +41,7 @@
start_link(Parent, ChannelPid, Protocol) ->
{ok, proc_lib:spawn_link(
- fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
+ fun () -> mainloop(Parent, ChannelPid, {method, Protocol}) end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -53,77 +53,78 @@ shutdown(Pid) ->
%%--------------------------------------------------------------------
-read_frame(ChannelPid) ->
+mainloop(Parent, ChannelPid, State) ->
+ Loop = fun (NewState) ->
+ ?MODULE:mainloop(Parent, ChannelPid, NewState)
+ end,
receive
- {frame, Frame} -> Frame;
- terminate -> rabbit_channel:shutdown(ChannelPid),
- read_frame(ChannelPid);
- Msg -> exit({unexpected_message, Msg})
- end.
-
-mainloop(Parent, ChannelPid, Protocol) ->
- case read_frame(ChannelPid) of
- {method, MethodName, FieldsBin} ->
- Method = Protocol:decode_method_fields(MethodName, FieldsBin),
- case Protocol:method_has_content(MethodName) of
- true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
- case collect_content(ChannelPid, ClassId, Protocol) of
- {ok, Content} ->
- rabbit_channel:do(ChannelPid, Method, Content),
- ?MODULE:mainloop(Parent, ChannelPid, Protocol);
- {error, Reason} ->
- channel_exit(Parent, Reason, MethodName)
- end;
- false -> rabbit_channel:do(ChannelPid, Method),
- ?MODULE:mainloop(Parent, ChannelPid, Protocol)
+ {frame, Frame} ->
+ case collect(Frame, State) of
+ {ok, NewState} ->
+ Loop(NewState);
+ {ok, Method, NewState} ->
+ rabbit_channel:do(ChannelPid, Method),
+ Loop(NewState);
+ {ok, Method, Content, NewState} ->
+ rabbit_channel:do(ChannelPid, Method, Content),
+ Loop(NewState);
+ {error, Reason} ->
+ Parent ! {channel_exit, self(), Reason}
end;
- _ ->
- channel_exit(Parent, {unexpected_frame,
- "expected method frame, "
- "got non method frame instead",
- []}, none)
+ terminate ->
+ rabbit_channel:shutdown(ChannelPid),
+ Loop(State);
+ Msg ->
+ exit({unexpected_message, Msg})
end.
-collect_content(ChannelPid, ClassId, Protocol) ->
- case read_frame(ChannelPid) of
- {content_header, ClassId, 0, BodySize, PropertiesBin} ->
- case collect_content_payload(ChannelPid, BodySize, []) of
- {ok, Payload} -> {ok, #content{
- class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- protocol = Protocol,
- payload_fragments_rev = Payload}};
- Error -> Error
- end;
- {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- {error, {unexpected_frame,
- "expected content header for class ~w, "
+collect({method, MethodName, FieldsBin}, {method, Protocol}) ->
+ Method = Protocol:decode_method_fields(MethodName, FieldsBin),
+ case Protocol:method_has_content(MethodName) of
+ true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
+ {ok, {content_header, Method, ClassId, Protocol}};
+ false -> {ok, Method, {method, Protocol}}
+ end;
+collect(_Frame, {method, _Protocol}) ->
+ unexpected_frame("expected method frame, "
+ "got non method frame instead", [], none);
+collect({content_header, ClassId, 0, 0, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, Method, Content, {method, Protocol}};
+collect({content_header, ClassId, 0, BodySize, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, {content_body, Method, BodySize, Content, Protocol}};
+collect({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin},
+ {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
"got one for class ~w instead",
- [ClassId, HeaderClassId]}};
- _ ->
- {error, {unexpected_frame,
- "expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId]}}
- end.
+ [ClassId, HeaderClassId], Method);
+collect(_Frame, {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got non content header frame instead", [ClassId], Method);
+collect({content_body, FragmentBin},
+ {content_body, Method, RemainingSize,
+ Content = #content{payload_fragments_rev = Fragments}, Protocol}) ->
+ NewContent = Content#content{
+ payload_fragments_rev = [FragmentBin | Fragments]},
+ case RemainingSize - size(FragmentBin) of
+ 0 -> {ok, Method, NewContent, {method, Protocol}};
+ Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}}
+ end;
+collect(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) ->
+ unexpected_frame("expected content body, "
+ "got non content body frame instead", [], Method).
-collect_content_payload(_ChannelPid, 0, Acc) ->
- {ok, Acc};
-collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
- case read_frame(ChannelPid) of
- {content_body, FragmentBin} ->
- collect_content_payload(ChannelPid,
- RemainingByteCount - size(FragmentBin),
- [FragmentBin | Acc]);
- _ ->
- {error, {unexpected_frame,
- "expected content body, "
- "got non content body frame instead",
- []}}
- end.
+empty_content(ClassId, PropertiesBin, Protocol) ->
+ #content{class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ protocol = Protocol,
+ payload_fragments_rev = []}.
-channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) ->
- Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params,
- MethodName),
- Parent ! {channel_exit, self(), Reason}.
+unexpected_frame(Format, Params, Method) when is_atom(Method) ->
+ {error, rabbit_misc:amqp_error(unexpected_frame, Format, Params, Method)};
+unexpected_frame(Format, Params, Method) ->
+ unexpected_frame(Format, Params, rabbit_misc:method_record_type(Method)).