summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-03 12:43:13 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-03 12:43:13 +0000
commit5621b7e1dc52ed3c01a76580a02c1a860d25f738 (patch)
tree5a765f647c640d38fb3111037f86efda02b171f0
parent9e58ec677542efb8ba67c6d68ad863a2120ccdb3 (diff)
downloadrabbitmq-server-5621b7e1dc52ed3c01a76580a02c1a860d25f738.tar.gz
add specs and move frame analysis from reader to command assembler
The latter avoids a mutual dependency between the reader and command assembler.
-rw-r--r--src/rabbit_command_assembler.erl55
-rw-r--r--src/rabbit_reader.erl24
2 files changed, 57 insertions, 22 deletions
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
index b3fb6561..ba3fbed5 100644
--- a/src/rabbit_command_assembler.erl
+++ b/src/rabbit_command_assembler.erl
@@ -30,12 +30,65 @@
%%
-module(rabbit_command_assembler).
+-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([init/1, process/2]).
+-export([analyze_frame/3, init/1, process/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY |
+ ?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY |
+ ?FRAME_TRACE | ?FRAME_HEARTBEAT).
+-type(protocol() :: rabbit_framing:protocol()).
+-type(method() :: rabbit_framing:amqp_method_record()).
+-type(class_id() :: rabbit_framing:amqp_class_id()).
+-type(weight() :: non_neg_integer()).
+-type(body_size() :: non_neg_integer()).
+-type(content() :: rabbit_types:undecoded_content()).
+
+-type(frame() ::
+ {'method', rabbit_framing:amqp_method_name(), binary()} |
+ {'content_header', class_id(), weight(), body_size(), binary()} |
+ {'content_body', binary()}).
+
+-type(state() ::
+ {'method', protocol()} |
+ {'content_header', method(), class_id(), protocol()} |
+ {'content_body', method(), body_size(), class_id(), protocol()}).
+
+-spec(analyze_frame/3 :: (frame_type(), binary(), protocol()) ->
+ frame() | 'heartbeat' | 'error').
+
+-spec(init/1 :: (protocol()) -> {ok, state()}).
+-spec(process/2 :: (frame(), state()) ->
+ {ok, state()} |
+ {ok, method(), state()} |
+ {ok, method(), content(), state()} |
+ {error, rabbit_types:amqp_error()}).
+
+-endif.
%%--------------------------------------------------------------------
+analyze_frame(?FRAME_METHOD,
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+analyze_frame(?FRAME_HEADER,
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
+ {content_header, ClassId, Weight, BodySize, Properties};
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
+ {content_body, Body};
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
+ heartbeat;
+analyze_frame(_Type, _Body, _Protocol) ->
+ error.
+
init(Protocol) -> {ok, {method, Protocol}}.
process({method, MethodName, FieldsBin}, {method, Protocol}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 91dd42dd..16faf586 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -41,8 +41,6 @@
-export([conserve_memory/2, server_properties/0]).
--export([analyze_frame/3]).
-
-export([emit_stats/1]).
-define(HANDSHAKE_TIMEOUT, 10).
@@ -528,7 +526,7 @@ handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -538,7 +536,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
State;
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
{method, MethodName, FieldsBin} ->
@@ -547,7 +545,7 @@ handle_frame(Type, 0, Payload,
end;
handle_frame(Type, Channel, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
@@ -600,22 +598,6 @@ handle_frame(Type, Channel, Payload,
end
end.
-analyze_frame(?FRAME_METHOD,
- <<ClassId:16, MethodId:16, MethodFields/binary>>,
- Protocol) ->
- MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
- {method, MethodName, MethodFields};
-analyze_frame(?FRAME_HEADER,
- <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
- _Protocol) ->
- {content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body, _Protocol) ->
- {content_body, Body};
-analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
- heartbeat;
-analyze_frame(_Type, _Body, _Protocol) ->
- error.
-
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},