diff options
authorMatthias Radestock <>2011-01-13 10:33:15 +0000
committerMatthias Radestock <>2011-01-13 10:33:15 +0000
commit461056061992ba614d71108f74c78cb010cc7d77 (patch)
parent2fec5acb77374d44eda0bcf0022c6b7919880459 (diff)
parent7304d31d8481b450f7742b05c484fa44842afbc1 (diff)
merge bug23684 into default
6 files changed, 210 insertions, 208 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2764a591..7b5f096b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -480,8 +480,8 @@ confirm(MsgSeqNos, QPid, State) ->
queues_for_msg = QFM0}}) ->
case gb_sets:is_element(MsgSeqNo, UC0) of
false -> {DMs, State0};
- true -> {ok, Qs} = dict:find(MsgSeqNo, QFM0),
- Qs1 = sets:del_element(QPid, Qs),
+ true -> Qs1 = sets:del_element(
+ QPid, dict:fetch(MsgSeqNo, QFM0)),
case sets:size(Qs1) of
0 -> {[MsgSeqNo | DMs],
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index a36253a0..9f50176d 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -50,7 +50,7 @@
rabbit_channel:channel_number(), non_neg_integer(), pid(),
rabbit_types:user(), rabbit_types:vhost(), pid()}).
--spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
+-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -72,13 +72,8 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
[Channel, ReaderPid, WriterPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
- {ok, FramingChannelPid} =
- supervisor2:start_child(
- SupPid,
- {framing_channel, {rabbit_framing_channel, start_link,
- [ReaderPid, ChannelPid, Protocol]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
- {ok, SupPid, FramingChannelPid}.
+ {ok, AState} = rabbit_command_assembler:init(Protocol),
+ {ok, SupPid, {ChannelPid, AState}}.
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 21c39780..fd99af56 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -43,7 +43,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
- {'ok', pid(), pid()}).
+ {'ok', pid(), {pid(), any()}}).
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
new file mode 100644
index 00000000..f8d3260e
--- /dev/null
+++ b/src/rabbit_command_assembler.erl
@@ -0,0 +1,148 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%% The Original Code is RabbitMQ.
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%% All Rights Reserved.
+%% Contributor(s): ______________________________________.
+-export([analyze_frame/3, init/1, process/2]).
+-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY |
+-type(protocol() :: rabbit_framing:protocol()).
+-type(method() :: rabbit_framing:amqp_method_record()).
+-type(class_id() :: rabbit_framing:amqp_class_id()).
+-type(weight() :: non_neg_integer()).
+-type(body_size() :: non_neg_integer()).
+-type(content() :: rabbit_types:undecoded_content()).
+-type(frame() ::
+ {'method', rabbit_framing:amqp_method_name(), binary()} |
+ {'content_header', class_id(), weight(), body_size(), binary()} |
+ {'content_body', binary()}).
+-type(state() ::
+ {'method', protocol()} |
+ {'content_header', method(), class_id(), protocol()} |
+ {'content_body', method(), body_size(), class_id(), protocol()}).
+-spec(analyze_frame/3 :: (frame_type(), binary(), protocol()) ->
+ frame() | 'heartbeat' | 'error').
+-spec(init/1 :: (protocol()) -> {ok, state()}).
+-spec(process/2 :: (frame(), state()) ->
+ {ok, state()} |
+ {ok, method(), state()} |
+ {ok, method(), content(), state()} |
+ {error, rabbit_types:amqp_error()}).
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
+ {content_header, ClassId, Weight, BodySize, Properties};
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
+ {content_body, Body};
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
+ heartbeat;
+analyze_frame(_Type, _Body, _Protocol) ->
+ error.
+init(Protocol) -> {ok, {method, Protocol}}.
+process({method, MethodName, FieldsBin}, {method, Protocol}) ->
+ try
+ Method = Protocol:decode_method_fields(MethodName, FieldsBin),
+ case Protocol:method_has_content(MethodName) of
+ true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
+ {ok, {content_header, Method, ClassId, Protocol}};
+ false -> {ok, Method, {method, Protocol}}
+ end
+ catch exit:#amqp_error{} = Reason -> {error, Reason}
+ end;
+process(_Frame, {method, _Protocol}) ->
+ unexpected_frame("expected method frame, "
+ "got non method frame instead", [], none);
+process({content_header, ClassId, 0, 0, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, Method, Content, {method, Protocol}};
+process({content_header, ClassId, 0, BodySize, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, {content_body, Method, BodySize, Content, Protocol}};
+process({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin},
+ {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId], Method);
+process(_Frame, {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got non content header frame instead", [ClassId], Method);
+process({content_body, FragmentBin},
+ {content_body, Method, RemainingSize,
+ Content = #content{payload_fragments_rev = Fragments}, Protocol}) ->
+ NewContent = Content#content{
+ payload_fragments_rev = [FragmentBin | Fragments]},
+ case RemainingSize - size(FragmentBin) of
+ 0 -> {ok, Method, NewContent, {method, Protocol}};
+ Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}}
+ end;
+process(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) ->
+ unexpected_frame("expected content body, "
+ "got non content body frame instead", [], Method).
+empty_content(ClassId, PropertiesBin, Protocol) ->
+ #content{class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ protocol = Protocol,
+ payload_fragments_rev = []}.
+unexpected_frame(Format, Params, Method) when is_atom(Method) ->
+ {error, rabbit_misc:amqp_error(unexpected_frame, Format, Params, Method)};
+unexpected_frame(Format, Params, Method) ->
+ unexpected_frame(Format, Params, rabbit_misc:method_record_type(Method)).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
deleted file mode 100644
index cb53185f..00000000
--- a/src/rabbit_framing_channel.erl
+++ /dev/null
@@ -1,129 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%% The Original Code is RabbitMQ.
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%% All Rights Reserved.
-%% Contributor(s): ______________________________________.
--export([start_link/3, process/2, shutdown/1]).
-%% internal
-start_link(Parent, ChannelPid, Protocol) ->
- {ok, proc_lib:spawn_link(
- fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
-process(Pid, Frame) ->
- Pid ! {frame, Frame},
- ok.
-shutdown(Pid) ->
- Pid ! terminate,
- ok.
-read_frame(ChannelPid) ->
- receive
- {frame, Frame} -> Frame;
- terminate -> rabbit_channel:shutdown(ChannelPid),
- read_frame(ChannelPid);
- Msg -> exit({unexpected_message, Msg})
- end.
-mainloop(Parent, ChannelPid, Protocol) ->
- case read_frame(ChannelPid) of
- {method, MethodName, FieldsBin} ->
- Method = Protocol:decode_method_fields(MethodName, FieldsBin),
- case Protocol:method_has_content(MethodName) of
- true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
- case collect_content(ChannelPid, ClassId, Protocol) of
- {ok, Content} ->
- rabbit_channel:do(ChannelPid, Method, Content),
- ?MODULE:mainloop(Parent, ChannelPid, Protocol);
- {error, Reason} ->
- channel_exit(Parent, Reason, MethodName)
- end;
- false -> rabbit_channel:do(ChannelPid, Method),
- ?MODULE:mainloop(Parent, ChannelPid, Protocol)
- end;
- _ ->
- channel_exit(Parent, {unexpected_frame,
- "expected method frame, "
- "got non method frame instead",
- []}, none)
- end.
-collect_content(ChannelPid, ClassId, Protocol) ->
- case read_frame(ChannelPid) of
- {content_header, ClassId, 0, BodySize, PropertiesBin} ->
- case collect_content_payload(ChannelPid, BodySize, []) of
- {ok, Payload} -> {ok, #content{
- class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- protocol = Protocol,
- payload_fragments_rev = Payload}};
- Error -> Error
- end;
- {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- {error, {unexpected_frame,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId]}};
- _ ->
- {error, {unexpected_frame,
- "expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId]}}
- end.
-collect_content_payload(_ChannelPid, 0, Acc) ->
- {ok, Acc};
-collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
- case read_frame(ChannelPid) of
- {content_body, FragmentBin} ->
- collect_content_payload(ChannelPid,
- RemainingByteCount - size(FragmentBin),
- [FragmentBin | Acc]);
- _ ->
- {error, {unexpected_frame,
- "expected content body, "
- "got non content body frame instead",
- []}}
- end.
-channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) ->
- Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params,
- MethodName),
- Parent ! {channel_exit, self(), Reason}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e9f34a0f..576c2f36 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -41,7 +41,7 @@
-export([conserve_memory/2, server_properties/0]).
+-export([process_channel_frame/5]). %% used by erlang-client
@@ -349,12 +349,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
- {channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
+ {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
- {channel_exit, ChannelOrFrPid, Reason} ->
- mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
+ {channel_exit, Channel, Reason} ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
terminate_connection ->
handshake_timeout ->
@@ -445,45 +445,32 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
-handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
- {channel, Channel} = get({ch_fr_pid, ChFrPid}),
- handle_exception(State, Channel, Reason);
-handle_channel_exit(Channel, Reason, State) ->
- handle_exception(State, Channel, Reason).
-handle_dependent_exit(ChSupPid, Reason, State) ->
+handle_dependent_exit(ChPid, Reason, State) ->
case termination_kind(Reason) of
controlled ->
- case erase({ch_sup_pid, ChSupPid}) of
- undefined -> ok;
- {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
- end,
+ erase({ch_pid, ChPid}),
uncontrolled ->
- case channel_cleanup(ChSupPid) of
- undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
- Channel ->
- maybe_close(handle_exception(State, Channel, Reason))
+ case channel_cleanup(ChPid) of
+ undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
+ Channel -> maybe_close(
+ handle_exception(State, Channel, Reason))
-channel_cleanup(ChSupPid) ->
- case get({ch_sup_pid, ChSupPid}) of
- undefined -> undefined;
- {{channel, Channel}, ChFr} -> erase({channel, Channel}),
- erase(ChFr),
- erase({ch_sup_pid, ChSupPid}),
- Channel
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ Channel -> erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ Channel
-all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
- {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()].
terminate_channels() ->
NChannels =
- length([rabbit_framing_channel:shutdown(ChFrPid)
- || ChFrPid <- all_channels()]),
+ length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -501,10 +488,10 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- case channel_cleanup(ChSupPid) of
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ case channel_cleanup(ChPid) of
undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
+ exit({abnormal_dependent_exit, ChPid, Reason});
Channel ->
case termination_kind(Reason) of
controlled ->
@@ -543,7 +530,7 @@ handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -553,7 +540,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
{method, MethodName, FieldsBin} ->
@@ -562,19 +549,23 @@ handle_frame(Type, 0, Payload,
handle_frame(Type, Channel, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ch_fr_pid, ChFrPid} ->
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
+ {ChPid, FramingState} ->
+ NewAState = process_channel_frame(
+ AnalyzedFrame, self(),
+ Channel, ChPid, FramingState),
+ put({channel, Channel}, {ChPid, NewAState}),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
{method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking andalso
+ case (State#v1.connection_state =:= blocking
+ andalso
Protocol:method_has_content(MethodName)) of
true -> State#v1{connection_state = blocked};
false -> State
@@ -603,9 +594,8 @@ handle_frame(Type, Channel, Payload,
undefined ->
case ?IS_RUNNING(State) of
- true -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
+ true -> send_to_new_channel(
+ Channel, AnalyzedFrame, State);
false -> throw({channel_frame_while_starting,
Channel, State#v1.connection_state,
@@ -613,22 +603,6 @@ handle_frame(Type, Channel, Payload,
- <<ClassId:16, MethodId:16, MethodFields/binary>>,
- Protocol) ->
- MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
- {method, MethodName, MethodFields};
- <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
- _Protocol) ->
- {content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body, _Protocol) ->
- {content_body, Body};
-analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
- heartbeat;
-analyze_frame(_Type, _Body, _Protocol) ->
- error.
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
@@ -804,7 +778,7 @@ handle_method0(#''{virtual_host = VHostPath},
fun() -> internal_emit_stats(State1) end),
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
- lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
State = #v1{connection_state = CS,
@@ -981,15 +955,29 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
frame_max = FrameMax,
user = User,
vhost = VHost}} = State,
- {ok, ChSupPid, ChFrPid} =
+ {ok, _ChSupPid, {ChPid, AState}} =
ChanSupSup, {Protocol, Sock, Channel, FrameMax,
self(), User, VHost, Collector}),
- erlang:monitor(process, ChSupPid),
- put({channel, Channel}, {ch_fr_pid, ChFrPid}),
- put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
- put({ch_fr_pid, ChFrPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
+ erlang:monitor(process, ChPid),
+ NewAState = process_channel_frame(AnalyzedFrame, self(),
+ Channel, ChPid, AState),
+ put({channel, Channel}, {ChPid, NewAState}),
+ put({ch_pid, ChPid}, Channel),
+ State.
+process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> NewAState;
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ NewAState;
+ {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
+ Method, Content),
+ NewAState;
+ {error, Reason} -> ErrPid ! {channel_exit, Channel,
+ Reason},
+ AState
+ end.
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",