diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-03 12:43:13 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-03 12:43:13 +0000 |
commit | 5621b7e1dc52ed3c01a76580a02c1a860d25f738 (patch) | |
tree | 5a765f647c640d38fb3111037f86efda02b171f0 | |
parent | 9e58ec677542efb8ba67c6d68ad863a2120ccdb3 (diff) | |
download | rabbitmq-server-5621b7e1dc52ed3c01a76580a02c1a860d25f738.tar.gz |
add specs and move frame analysis from reader to command assembler
The latter avoids a mutual dependency between the reader and command
assembler.
-rw-r--r-- | src/rabbit_command_assembler.erl | 55 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 24 |
2 files changed, 57 insertions, 22 deletions
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl index b3fb6561..ba3fbed5 100644 --- a/src/rabbit_command_assembler.erl +++ b/src/rabbit_command_assembler.erl @@ -30,12 +30,65 @@ %% -module(rabbit_command_assembler). +-include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([init/1, process/2]). +-export([analyze_frame/3, init/1, process/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY | + ?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY | + ?FRAME_TRACE | ?FRAME_HEARTBEAT). +-type(protocol() :: rabbit_framing:protocol()). +-type(method() :: rabbit_framing:amqp_method_record()). +-type(class_id() :: rabbit_framing:amqp_class_id()). +-type(weight() :: non_neg_integer()). +-type(body_size() :: non_neg_integer()). +-type(content() :: rabbit_types:undecoded_content()). + +-type(frame() :: + {'method', rabbit_framing:amqp_method_name(), binary()} | + {'content_header', class_id(), weight(), body_size(), binary()} | + {'content_body', binary()}). + +-type(state() :: + {'method', protocol()} | + {'content_header', method(), class_id(), protocol()} | + {'content_body', method(), body_size(), class_id(), protocol()}). + +-spec(analyze_frame/3 :: (frame_type(), binary(), protocol()) -> + frame() | 'heartbeat' | 'error'). + +-spec(init/1 :: (protocol()) -> {ok, state()}). +-spec(process/2 :: (frame(), state()) -> + {ok, state()} | + {ok, method(), state()} | + {ok, method(), content(), state()} | + {error, rabbit_types:amqp_error()}). + +-endif. %%-------------------------------------------------------------------- +analyze_frame(?FRAME_METHOD, + <<ClassId:16, MethodId:16, MethodFields/binary>>, + Protocol) -> + MethodName = Protocol:lookup_method_name({ClassId, MethodId}), + {method, MethodName, MethodFields}; +analyze_frame(?FRAME_HEADER, + <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, + _Protocol) -> + {content_header, ClassId, Weight, BodySize, Properties}; +analyze_frame(?FRAME_BODY, Body, _Protocol) -> + {content_body, Body}; +analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> + heartbeat; +analyze_frame(_Type, _Body, _Protocol) -> + error. + init(Protocol) -> {ok, {method, Protocol}}. process({method, MethodName, FieldsBin}, {method, Protocol}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 91dd42dd..16faf586 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,8 +41,6 @@ -export([conserve_memory/2, server_properties/0]). --export([analyze_frame/3]). - -export([emit_stats/1]). -define(HANDSHAKE_TIMEOUT, 10). @@ -528,7 +526,7 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, connection = #connection{protocol = Protocol}}) when CS =:= closing; CS =:= closed -> - case analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State @@ -538,7 +536,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) State; handle_frame(Type, 0, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> - case analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; {method, MethodName, FieldsBin} -> @@ -547,7 +545,7 @@ handle_frame(Type, 0, Payload, end; handle_frame(Type, Channel, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> - case analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> @@ -600,22 +598,6 @@ handle_frame(Type, Channel, Payload, end end. -analyze_frame(?FRAME_METHOD, - <<ClassId:16, MethodId:16, MethodFields/binary>>, - Protocol) -> - MethodName = Protocol:lookup_method_name({ClassId, MethodId}), - {method, MethodName, MethodFields}; -analyze_frame(?FRAME_HEADER, - <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, - _Protocol) -> - {content_header, ClassId, Weight, BodySize, Properties}; -analyze_frame(?FRAME_BODY, Body, _Protocol) -> - {content_body, Body}; -analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> - heartbeat; -analyze_frame(_Type, _Body, _Protocol) -> - error. - handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, |