summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-13 11:32:10 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-13 11:32:10 +0000
commit998dc60408788458ffe10d22c5714b9406561a90 (patch)
tree90d5260a70703e521335805346835a6ab3359e1b
parent68c21a665828b20372376fd427c647bed2ddd884 (diff)
parent7304d31d8481b450f7742b05c484fa44842afbc1 (diff)
downloadrabbitmq-server-bug23678.tar.gz
merge from defaultbug23678
-rw-r--r--src/rabbit_amqqueue_process.erl34
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_channel_sup.erl11
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_command_assembler.erl148
-rw-r--r--src/rabbit_framing_channel.erl129
-rw-r--r--src/rabbit_msg_store.erl52
-rw-r--r--src/rabbit_reader.erl124
-rw-r--r--src/rabbit_variable_queue.erl32
9 files changed, 275 insertions, 261 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fde54346..38b83117 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -785,18 +785,19 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {maybe_run_queue_via_backing_queue, _Fun} -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -856,10 +857,11 @@ handle_call({deliver_immediately, Delivery},
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
reply(Delivered, State1);
-handle_call({deliver, Delivery}, _From, State) ->
- %% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- reply(Delivered, NewState);
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" delivery mode. Reply asap.
+ gen_server2:reply(From, true),
+ {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
+ noreply(NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
NewState = commit_transaction(Txn, From, ChPid, State),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 930e48e6..a2a22f8a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -489,8 +489,8 @@ confirm(MsgSeqNos, QPid, State) ->
queues_for_msg = QFM0}}) ->
case gb_sets:is_element(MsgSeqNo, UC0) of
false -> {DMs, State0};
- true -> {ok, Qs} = dict:find(MsgSeqNo, QFM0),
- Qs1 = sets:del_element(QPid, Qs),
+ true -> Qs1 = sets:del_element(
+ QPid, dict:fetch(MsgSeqNo, QFM0)),
case sets:size(Qs1) of
0 -> {[MsgSeqNo | DMs],
State0#ch{
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index a36253a0..9f50176d 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -50,7 +50,7 @@
rabbit_channel:channel_number(), non_neg_integer(), pid(),
rabbit_types:user(), rabbit_types:vhost(), pid()}).
--spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
+-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
-endif.
@@ -72,13 +72,8 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
[Channel, ReaderPid, WriterPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
- {ok, FramingChannelPid} =
- supervisor2:start_child(
- SupPid,
- {framing_channel, {rabbit_framing_channel, start_link,
- [ReaderPid, ChannelPid, Protocol]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
- {ok, SupPid, FramingChannelPid}.
+ {ok, AState} = rabbit_command_assembler:init(Protocol),
+ {ok, SupPid, {ChannelPid, AState}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 21c39780..fd99af56 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -43,7 +43,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
- {'ok', pid(), pid()}).
+ {'ok', pid(), {pid(), any()}}).
-endif.
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
new file mode 100644
index 00000000..f8d3260e
--- /dev/null
+++ b/src/rabbit_command_assembler.erl
@@ -0,0 +1,148 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_command_assembler).
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+-export([analyze_frame/3, init/1, process/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY |
+ ?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY |
+ ?FRAME_TRACE | ?FRAME_HEARTBEAT).
+-type(protocol() :: rabbit_framing:protocol()).
+-type(method() :: rabbit_framing:amqp_method_record()).
+-type(class_id() :: rabbit_framing:amqp_class_id()).
+-type(weight() :: non_neg_integer()).
+-type(body_size() :: non_neg_integer()).
+-type(content() :: rabbit_types:undecoded_content()).
+
+-type(frame() ::
+ {'method', rabbit_framing:amqp_method_name(), binary()} |
+ {'content_header', class_id(), weight(), body_size(), binary()} |
+ {'content_body', binary()}).
+
+-type(state() ::
+ {'method', protocol()} |
+ {'content_header', method(), class_id(), protocol()} |
+ {'content_body', method(), body_size(), class_id(), protocol()}).
+
+-spec(analyze_frame/3 :: (frame_type(), binary(), protocol()) ->
+ frame() | 'heartbeat' | 'error').
+
+-spec(init/1 :: (protocol()) -> {ok, state()}).
+-spec(process/2 :: (frame(), state()) ->
+ {ok, state()} |
+ {ok, method(), state()} |
+ {ok, method(), content(), state()} |
+ {error, rabbit_types:amqp_error()}).
+
+-endif.
+
+%%--------------------------------------------------------------------
+
+analyze_frame(?FRAME_METHOD,
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+analyze_frame(?FRAME_HEADER,
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
+ {content_header, ClassId, Weight, BodySize, Properties};
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
+ {content_body, Body};
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
+ heartbeat;
+analyze_frame(_Type, _Body, _Protocol) ->
+ error.
+
+init(Protocol) -> {ok, {method, Protocol}}.
+
+process({method, MethodName, FieldsBin}, {method, Protocol}) ->
+ try
+ Method = Protocol:decode_method_fields(MethodName, FieldsBin),
+ case Protocol:method_has_content(MethodName) of
+ true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
+ {ok, {content_header, Method, ClassId, Protocol}};
+ false -> {ok, Method, {method, Protocol}}
+ end
+ catch exit:#amqp_error{} = Reason -> {error, Reason}
+ end;
+process(_Frame, {method, _Protocol}) ->
+ unexpected_frame("expected method frame, "
+ "got non method frame instead", [], none);
+process({content_header, ClassId, 0, 0, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, Method, Content, {method, Protocol}};
+process({content_header, ClassId, 0, BodySize, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, {content_body, Method, BodySize, Content, Protocol}};
+process({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin},
+ {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId], Method);
+process(_Frame, {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got non content header frame instead", [ClassId], Method);
+process({content_body, FragmentBin},
+ {content_body, Method, RemainingSize,
+ Content = #content{payload_fragments_rev = Fragments}, Protocol}) ->
+ NewContent = Content#content{
+ payload_fragments_rev = [FragmentBin | Fragments]},
+ case RemainingSize - size(FragmentBin) of
+ 0 -> {ok, Method, NewContent, {method, Protocol}};
+ Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}}
+ end;
+process(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) ->
+ unexpected_frame("expected content body, "
+ "got non content body frame instead", [], Method).
+
+%%--------------------------------------------------------------------
+
+empty_content(ClassId, PropertiesBin, Protocol) ->
+ #content{class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ protocol = Protocol,
+ payload_fragments_rev = []}.
+
+unexpected_frame(Format, Params, Method) when is_atom(Method) ->
+ {error, rabbit_misc:amqp_error(unexpected_frame, Format, Params, Method)};
+unexpected_frame(Format, Params, Method) ->
+ unexpected_frame(Format, Params, rabbit_misc:method_record_type(Method)).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
deleted file mode 100644
index cb53185f..00000000
--- a/src/rabbit_framing_channel.erl
+++ /dev/null
@@ -1,129 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_framing_channel).
--include("rabbit.hrl").
-
--export([start_link/3, process/2, shutdown/1]).
-
-%% internal
--export([mainloop/3]).
-
-%%--------------------------------------------------------------------
-
-start_link(Parent, ChannelPid, Protocol) ->
- {ok, proc_lib:spawn_link(
- fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
-
-process(Pid, Frame) ->
- Pid ! {frame, Frame},
- ok.
-
-shutdown(Pid) ->
- Pid ! terminate,
- ok.
-
-%%--------------------------------------------------------------------
-
-read_frame(ChannelPid) ->
- receive
- {frame, Frame} -> Frame;
- terminate -> rabbit_channel:shutdown(ChannelPid),
- read_frame(ChannelPid);
- Msg -> exit({unexpected_message, Msg})
- end.
-
-mainloop(Parent, ChannelPid, Protocol) ->
- case read_frame(ChannelPid) of
- {method, MethodName, FieldsBin} ->
- Method = Protocol:decode_method_fields(MethodName, FieldsBin),
- case Protocol:method_has_content(MethodName) of
- true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
- case collect_content(ChannelPid, ClassId, Protocol) of
- {ok, Content} ->
- rabbit_channel:do(ChannelPid, Method, Content),
- ?MODULE:mainloop(Parent, ChannelPid, Protocol);
- {error, Reason} ->
- channel_exit(Parent, Reason, MethodName)
- end;
- false -> rabbit_channel:do(ChannelPid, Method),
- ?MODULE:mainloop(Parent, ChannelPid, Protocol)
- end;
- _ ->
- channel_exit(Parent, {unexpected_frame,
- "expected method frame, "
- "got non method frame instead",
- []}, none)
- end.
-
-collect_content(ChannelPid, ClassId, Protocol) ->
- case read_frame(ChannelPid) of
- {content_header, ClassId, 0, BodySize, PropertiesBin} ->
- case collect_content_payload(ChannelPid, BodySize, []) of
- {ok, Payload} -> {ok, #content{
- class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- protocol = Protocol,
- payload_fragments_rev = Payload}};
- Error -> Error
- end;
- {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- {error, {unexpected_frame,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId]}};
- _ ->
- {error, {unexpected_frame,
- "expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId]}}
- end.
-
-collect_content_payload(_ChannelPid, 0, Acc) ->
- {ok, Acc};
-collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
- case read_frame(ChannelPid) of
- {content_body, FragmentBin} ->
- collect_content_payload(ChannelPid,
- RemainingByteCount - size(FragmentBin),
- [FragmentBin | Acc]);
- _ ->
- {error, {unexpected_frame,
- "expected content body, "
- "got non content body frame instead",
- []}}
- end.
-
-channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) ->
- Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params,
- MethodName),
- Parent ! {channel_exit, self(), Reason}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index deb62eb2..f8b41ed3 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -761,24 +761,17 @@ handle_cast({client_delete, CRef},
handle_cast({write, CRef, Guid},
State = #msstate { file_summary_ets = FileSummaryEts,
- current_file = CurFile,
cur_file_cache_ets = CurFileCacheEts,
client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- CTG1 = case dict:find(CRef, CODC) of
- {ok, _} -> dict:update(CRef, fun(Guids) ->
- gb_sets:add(Guid, Guids)
- end,
- gb_sets:singleton(Guid), CTG);
- error -> CTG
- end,
+ CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC),
State1 = State #msstate { cref_to_guids = CTG1 },
- case should_mask_action(CRef, Guid, State1) of
+ case should_mask_action(CRef, Guid, State) of
{true, _Location} ->
- noreply(State1);
+ noreply(State);
{false, not_found} ->
write_message(Guid, Msg, State1);
{Mask, #msg_location { ref_count = 0, file = File,
@@ -792,23 +785,17 @@ handle_cast({write, CRef, Guid},
%% message, but as it is being GC'd currently,
%% we'll have to write a new copy, which will then
%% be younger, so ignore this write.
- noreply(State1);
+ noreply(State);
{_Mask, [#file_summary {}]} ->
ok = index_update_ref_count(Guid, 1, State),
- noreply(adjust_valid_total_size(File, TotalSize, State))
+ State2 = client_confirm_if_on_disk(CRef, Guid, File, State),
+ noreply(adjust_valid_total_size(File, TotalSize, State2))
end;
{_Mask, #msg_location { ref_count = RefCount, file = File }} ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State1),
- CTG2 = case {dict:find(CRef, CODC), File} of
- {{ok, _}, CurFile} -> CTG1;
- {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid),
- written),
- CTG;
- _ -> CTG1
- end,
- noreply(State1 #msstate { cref_to_guids = CTG2 })
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
end;
handle_cast({remove, CRef, Guids}, State) ->
@@ -1166,6 +1153,29 @@ client_confirm(CRef, Guids, ActionTaken,
error -> State
end.
+add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) ->
+ case dict:find(CRef, CODC) of
+ {ok, _} -> dict:update(CRef,
+ fun (Guids) -> gb_sets:add(Guid, Guids) end,
+ gb_sets:singleton(Guid), CTG);
+ error -> CTG
+ end.
+
+client_confirm_if_on_disk(CRef, Guid, File,
+ State = #msstate { client_ondisk_callback = CODC,
+ current_file = CurFile,
+ cref_to_guids = CTG }) ->
+ CTG1 =
+ case File of
+ CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC);
+ _ -> case dict:find(CRef, CODC) of
+ {ok, Fun} -> Fun(gb_sets:singleton(Guid), written);
+ _ -> ok
+ end,
+ CTG
+ end,
+ State #msstate { cref_to_guids = CTG1 }.
+
%% Detect whether the Guid is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e9f34a0f..576c2f36 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -41,7 +41,7 @@
-export([conserve_memory/2, server_properties/0]).
--export([analyze_frame/3]).
+-export([process_channel_frame/5]). %% used by erlang-client
-export([emit_stats/1]).
@@ -349,12 +349,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
- {channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
+ {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, ChannelOrFrPid, Reason} ->
- mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
+ {channel_exit, Channel, Reason} ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -445,45 +445,32 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
-handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
- {channel, Channel} = get({ch_fr_pid, ChFrPid}),
- handle_exception(State, Channel, Reason);
-handle_channel_exit(Channel, Reason, State) ->
- handle_exception(State, Channel, Reason).
-
-handle_dependent_exit(ChSupPid, Reason, State) ->
+handle_dependent_exit(ChPid, Reason, State) ->
case termination_kind(Reason) of
controlled ->
- case erase({ch_sup_pid, ChSupPid}) of
- undefined -> ok;
- {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
- end,
+ erase({ch_pid, ChPid}),
maybe_close(State);
uncontrolled ->
- case channel_cleanup(ChSupPid) of
- undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
- Channel ->
- maybe_close(handle_exception(State, Channel, Reason))
+ case channel_cleanup(ChPid) of
+ undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
+ Channel -> maybe_close(
+ handle_exception(State, Channel, Reason))
end
end.
-channel_cleanup(ChSupPid) ->
- case get({ch_sup_pid, ChSupPid}) of
- undefined -> undefined;
- {{channel, Channel}, ChFr} -> erase({channel, Channel}),
- erase(ChFr),
- erase({ch_sup_pid, ChSupPid}),
- Channel
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ Channel -> erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ Channel
end.
-all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
- {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()].
terminate_channels() ->
NChannels =
- length([rabbit_framing_channel:shutdown(ChFrPid)
- || ChFrPid <- all_channels()]),
+ length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -501,10 +488,10 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- case channel_cleanup(ChSupPid) of
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ case channel_cleanup(ChPid) of
undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
+ exit({abnormal_dependent_exit, ChPid, Reason});
Channel ->
case termination_kind(Reason) of
controlled ->
@@ -543,7 +530,7 @@ handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -553,7 +540,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
State;
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
{method, MethodName, FieldsBin} ->
@@ -562,19 +549,23 @@ handle_frame(Type, 0, Payload,
end;
handle_frame(Type, Channel, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ch_fr_pid, ChFrPid} ->
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
+ {ChPid, FramingState} ->
+ NewAState = process_channel_frame(
+ AnalyzedFrame, self(),
+ Channel, ChPid, FramingState),
+ put({channel, Channel}, {ChPid, NewAState}),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
State;
{method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking andalso
+ case (State#v1.connection_state =:= blocking
+ andalso
Protocol:method_has_content(MethodName)) of
true -> State#v1{connection_state = blocked};
false -> State
@@ -603,9 +594,8 @@ handle_frame(Type, Channel, Payload,
State;
undefined ->
case ?IS_RUNNING(State) of
- true -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
+ true -> send_to_new_channel(
+ Channel, AnalyzedFrame, State);
false -> throw({channel_frame_while_starting,
Channel, State#v1.connection_state,
AnalyzedFrame})
@@ -613,22 +603,6 @@ handle_frame(Type, Channel, Payload,
end
end.
-analyze_frame(?FRAME_METHOD,
- <<ClassId:16, MethodId:16, MethodFields/binary>>,
- Protocol) ->
- MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
- {method, MethodName, MethodFields};
-analyze_frame(?FRAME_HEADER,
- <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
- _Protocol) ->
- {content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body, _Protocol) ->
- {content_body, Body};
-analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
- heartbeat;
-analyze_frame(_Type, _Body, _Protocol) ->
- error.
-
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
@@ -804,7 +778,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
- lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection_state = CS,
@@ -981,15 +955,29 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
frame_max = FrameMax,
user = User,
vhost = VHost}} = State,
- {ok, ChSupPid, ChFrPid} =
+ {ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {Protocol, Sock, Channel, FrameMax,
self(), User, VHost, Collector}),
- erlang:monitor(process, ChSupPid),
- put({channel, Channel}, {ch_fr_pid, ChFrPid}),
- put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
- put({ch_fr_pid, ChFrPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
+ erlang:monitor(process, ChPid),
+ NewAState = process_channel_frame(AnalyzedFrame, self(),
+ Channel, ChPid, AState),
+ put({channel, Channel}, {ChPid, NewAState}),
+ put({ch_pid, ChPid}, Channel),
+ State.
+
+process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> NewAState;
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ NewAState;
+ {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
+ Method, Content),
+ NewAState;
+ {error, Reason} -> ErrPid ! {channel_exit, Channel,
+ Reason},
+ AState
+ end.
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b24c5ca2..665cac96 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -710,22 +710,22 @@ tx_commit(Txn, Fun, MsgPropsFun,
end)}.
requeue(AckTags, MsgPropsFun, State) ->
- a(reduce_memory_use(
- ack(fun msg_store_release/3,
- fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
- {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
- true, false, State1),
- State2;
- ({IsPersistent, Guid, MsgProps}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
- true, true, State2),
- State3
- end,
- AckTags, State))).
+ a(reduce_memory_use(
+ ack(fun msg_store_release/3,
+ fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
+ {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
+ true, false, State1),
+ State2;
+ ({IsPersistent, Guid, MsgProps}, State1) ->
+ #vqstate { msg_store_clients = MSCState } = State1,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, Guid),
+ State2 = State1 #vqstate { msg_store_clients = MSCState1 },
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
+ true, true, State2),
+ State3
+ end,
+ AckTags, State))).
len(#vqstate { len = Len }) -> Len.