summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-05 17:52:16 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-05 17:52:16 +0100
commitbe59b5f948efb66595238bd7e9e497510fdae452 (patch)
tree3c0452c5e076ad5398c88e081c6109455ec8870a
parent8af1cb807ba9ae63e6a67a81cb5f4bb71fb06ef0 (diff)
parent95b81581ff119a65be1b08f82429161c9d148d40 (diff)
downloadrabbitmq-server-be59b5f948efb66595238bd7e9e497510fdae452.tar.gz
Merge default into bug22889
-rw-r--r--.hgignore2
-rw-r--r--Makefile22
-rw-r--r--codegen.py17
-rw-r--r--docs/rabbitmqctl.1.xml4
-rw-r--r--include/rabbit.hrl5
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_basic.erl11
-rw-r--r--src/rabbit_binary_generator.erl39
-rw-r--r--src/rabbit_binary_parser.erl10
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_framing_channel.erl18
-rw-r--r--src/rabbit_reader.erl247
-rw-r--r--src/rabbit_tests.erl3
-rw-r--r--src/rabbit_writer.erl86
14 files changed, 249 insertions, 224 deletions
diff --git a/.hgignore b/.hgignore
index 7b796b66..03b60914 100644
--- a/.hgignore
+++ b/.hgignore
@@ -11,7 +11,7 @@ syntax: regexp
^dist/
^include/rabbit_framing\.hrl$
^include/rabbit_framing_spec\.hrl$
-^src/rabbit_framing\.erl$
+^src/rabbit_framing_amqp.*\.erl$
^src/.*\_usage.erl$
^rabbit\.plt$
^basic.plt$
diff --git a/Makefile b/Makefile
index 0f1d0e85..88e13512 100644
--- a/Makefile
+++ b/Makefile
@@ -12,7 +12,7 @@ EBIN_DIR=ebin
INCLUDE_DIR=include
DOCS_DIR=docs
INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl
-SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
+SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(BEAM_TARGETS)
WEB_URL=http://www.rabbitmq.com/
@@ -56,7 +56,8 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json $(AMQP_CODEGEN_DIR)/rabbitmq-0.8-extensions.json
+AMQP_SPEC_JSON_FILES_0_9_1=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.9.1.json
+AMQP_SPEC_JSON_FILES_0_8=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
@@ -99,14 +100,17 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/%.beam:
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
- $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@
+$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8)
+ $(PYTHON) codegen.py --allow-overwrite header $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) $@
-$(INCLUDE_DIR)/rabbit_framing_spec.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
- $(PYTHON) codegen.py spec $(AMQP_SPEC_JSON_FILES) $@
+$(INCLUDE_DIR)/rabbit_framing_spec.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8)
+ $(PYTHON) codegen.py --allow-overwrite spec $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) $@
-$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
- $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@
+$(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_9_1) $@
+
+$(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_8)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_8) $@
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
@@ -131,7 +135,7 @@ $(BASIC_PLT): $(BEAM_TARGETS)
clean:
rm -f $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel
- rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
+ rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(SOURCE_DIR)/rabbit_framing_amqp_*.erl codegen.pyc
rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL)
rm -f $(RABBIT_PLT)
rm -f $(DEPS_FILE)
diff --git a/codegen.py b/codegen.py
index 0d6d9d56..d8981a48 100644
--- a/codegen.py
+++ b/codegen.py
@@ -315,8 +315,13 @@ def genErl(spec):
methods = spec.allMethods()
printFileHeader()
- print """-module(rabbit_framing).
--include("rabbit_framing.hrl").
+ module = "rabbit_framing_amqp_%d_%d" % (spec.major, spec.minor)
+ if spec.revision != 0:
+ module = "%s_%d" % (module, spec.revision)
+ if module == "rabbit_framing_amqp_8_0":
+ module = "rabbit_framing_amqp_0_8"
+ print "-module(%s)." % module
+ print """-include("rabbit_framing.hrl").
-export([lookup_method_name/1]).
@@ -331,6 +336,7 @@ def genErl(spec):
-export([encode_properties/1]).
-export([lookup_amqp_exception/1]).
-export([amqp_exception/1]).
+-export([version/0]).
bitvalue(true) -> 1;
bitvalue(false) -> 0;
@@ -350,6 +356,7 @@ bitvalue(undefined) -> 0.
-spec(encode_properties/1 :: (amqp_method_record()) -> binary()).
-spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}).
-spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()).
+-spec(version/0 :: () -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-endif. % use_specs
"""
for m in methods: genLookupMethodName(m)
@@ -391,6 +398,10 @@ bitvalue(undefined) -> 0.
for(c,v,cls) in spec.constants: genAmqpException(c,v,cls)
print "amqp_exception(_Code) -> undefined."
+ version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision)
+ if version == '{8, 0, 0}': version = '{0, 8, 0}'
+ print "version() -> %s." % (version)
+
def genHrl(spec):
def erlType(domain):
return erlangTypeMap[spec.resolveDomain(domain)]
@@ -410,8 +421,6 @@ def genHrl(spec):
methods = spec.allMethods()
printFileHeader()
- print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
- print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
print "-define(PROTOCOL_PORT, %d)." % (spec.port)
for (c,v,cls) in spec.constants:
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index a2038cf0..9280b95c 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -862,6 +862,10 @@
<listitem><para>Number of channels using the connection.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>protocol</term>
+ <listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>user</term>
<listitem><para>Username associated with the connection.</para></listitem>
</varlistentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 06297c69..8c713a50 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -36,7 +36,8 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {user, timeout_sec, frame_max, vhost, client_properties}).
+-record(connection, {protocol, user, timeout_sec, frame_max, vhost,
+ client_properties}).
-record(content,
{class_id,
@@ -172,6 +173,7 @@
#amqp_error{name :: atom(),
explanation :: string(),
method :: atom()}).
+-type(protocol() :: atom()).
-endif.
@@ -179,6 +181,7 @@
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
+-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
-define(MAX_WAIT, 16#ffffffff).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 6cf6d7d5..7df39987 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -424,9 +424,9 @@ print_banner() ->
"| ~s +---+ |~n"
"| |~n"
"+-------------------+~n"
- "AMQP ~p-~p~n~s~n~s~n~n",
+ "~s~n~s~n~s~n~n",
[Product, string:right([$v|Version], ProductLen),
- ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
+ ?PROTOCOL_VERSION,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
Settings = [{"node", node()},
{"app descriptor", app_location()},
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 4ab7a2a0..4a1d50df 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -80,7 +80,9 @@ delivery(Mandatory, Immediate, Txn, Message) ->
sender = self(), message = Message}.
build_content(Properties, BodyBin) ->
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
#content{class_id = ClassId,
properties = Properties,
properties_bin = none,
@@ -90,8 +92,11 @@ from_content(Content) ->
#content{class_id = ClassId,
properties = Props,
payload_fragments_rev = FragmentsRev} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ rabbit_binary_parser:ensure_content_decoded(Content,
+ rabbit_framing_amqp_0_9_1),
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 81cf3cee..75cd643c 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -41,12 +41,12 @@
% See definition of check_empty_content_body_frame_size/0, an assertion called at startup.
-define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
--export([build_simple_method_frame/2,
- build_simple_content_frames/3,
+-export([build_simple_method_frame/3,
+ build_simple_content_frames/4,
build_heartbeat_frame/0]).
-export([generate_table/1, encode_properties/2]).
-export([check_empty_content_body_frame_size/0]).
--export([ensure_content_encoded/1, clear_encoded_content/1]).
+-export([ensure_content_encoded/2, clear_encoded_content/1]).
-import(lists).
@@ -56,25 +56,26 @@
-type(frame() :: [binary()]).
--spec(build_simple_method_frame/2 ::
- (channel_number(), amqp_method_record()) -> frame()).
--spec(build_simple_content_frames/3 ::
- (channel_number(), content(), non_neg_integer()) -> [frame()]).
+-spec(build_simple_method_frame/3 ::
+ (channel_number(), amqp_method_record(), protocol()) -> frame()).
+-spec(build_simple_content_frames/4 ::
+ (channel_number(), content(), non_neg_integer(), protocol()) ->
+ [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
-spec(generate_table/1 :: (amqp_table()) -> binary()).
-spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()).
-spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
--spec(ensure_content_encoded/1 :: (content()) -> encoded_content()).
+-spec(ensure_content_encoded/2 :: (content(), protocol()) -> encoded_content()).
-spec(clear_encoded_content/1 :: (content()) -> unencoded_content()).
-endif.
%%----------------------------------------------------------------------------
-build_simple_method_frame(ChannelInt, MethodRecord) ->
- MethodFields = rabbit_framing:encode_method_fields(MethodRecord),
+build_simple_method_frame(ChannelInt, MethodRecord, Protocol) ->
+ MethodFields = Protocol:encode_method_fields(MethodRecord),
MethodName = rabbit_misc:method_record_type(MethodRecord),
- {ClassId, MethodId} = rabbit_framing:method_id(MethodName),
+ {ClassId, MethodId} = Protocol:method_id(MethodName),
create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]).
build_simple_content_frames(ChannelInt,
@@ -82,18 +83,18 @@ build_simple_content_frames(ChannelInt,
properties = ContentProperties,
properties_bin = ContentPropertiesBin,
payload_fragments_rev = PayloadFragmentsRev},
- FrameMax) ->
+ FrameMax, Protocol) ->
{BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt),
HeaderFrame = create_frame(2, ChannelInt,
[<<ClassId:16, 0:16, BodySize:64>>,
- maybe_encode_properties(ContentProperties, ContentPropertiesBin)]),
+ maybe_encode_properties(ContentProperties, ContentPropertiesBin, Protocol)]),
[HeaderFrame | ContentFrames].
-maybe_encode_properties(_ContentProperties, ContentPropertiesBin)
+maybe_encode_properties(_ContentProperties, ContentPropertiesBin, _Protocol)
when is_binary(ContentPropertiesBin) ->
ContentPropertiesBin;
-maybe_encode_properties(ContentProperties, none) ->
- rabbit_framing:encode_properties(ContentProperties).
+maybe_encode_properties(ContentProperties, none, Protocol) ->
+ Protocol:encode_properties(ContentProperties).
build_content_frames(FragsRev, FrameMax, ChannelInt) ->
BodyPayloadMax = if FrameMax == 0 ->
@@ -277,11 +278,11 @@ check_empty_content_body_frame_size() ->
ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE})
end.
-ensure_content_encoded(Content = #content{properties_bin = PropsBin})
+ensure_content_encoded(Content = #content{properties_bin = PropsBin}, _Protocol)
when PropsBin =/= 'none' ->
Content;
-ensure_content_encoded(Content = #content{properties = Props}) ->
- Content #content{properties_bin = rabbit_framing:encode_properties(Props)}.
+ensure_content_encoded(Content = #content{properties = Props}, Protocol) ->
+ Content#content{properties_bin = Protocol:encode_properties(Props)}.
clear_encoded_content(Content = #content{properties_bin = none}) ->
Content;
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index e022a1fa..633be6f0 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -34,7 +34,7 @@
-include("rabbit.hrl").
-export([parse_table/1, parse_properties/2]).
--export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([ensure_content_decoded/2, clear_decoded_content/1]).
-import(lists).
@@ -44,7 +44,7 @@
-spec(parse_table/1 :: (binary()) -> amqp_table()).
-spec(parse_properties/2 :: ([amqp_property_type()], binary()) -> [any()]).
--spec(ensure_content_decoded/1 :: (content()) -> decoded_content()).
+-spec(ensure_content_decoded/2 :: (content(), protocol()) -> decoded_content()).
-spec(clear_decoded_content/1 :: (content()) -> undecoded_content()).
-endif.
@@ -159,12 +159,12 @@ parse_property(bit, Rest) ->
parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) ->
{parse_table(Table), Rest}.
-ensure_content_decoded(Content = #content{properties = Props})
+ensure_content_decoded(Content = #content{properties = Props}, _Protocol)
when Props =/= 'none' ->
Content;
-ensure_content_decoded(Content = #content{properties_bin = PropBin})
+ensure_content_decoded(Content = #content{properties_bin = PropBin}, Protocol)
when is_binary(PropBin) ->
- Content#content{properties = rabbit_framing:decode_properties(
+ Content#content{properties = Protocol:decode_properties(
Content#content.class_id, PropBin)}.
clear_decoded_content(Content = #content{properties = none}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 94a20fbd..213b6624 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -414,7 +414,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
+ DecodedContent = rabbit_binary_parser:ensure_content_decoded(
+ Content, rabbit_framing_amqp_0_9_1),
IsPersistent = is_message_persistent(DecodedContent),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -927,7 +928,7 @@ basic_return(#basic_message{exchange_name = ExchangeName,
content = Content},
WriterPid, Reason) ->
{_Close, ReplyCode, ReplyText} =
- rabbit_framing:lookup_amqp_exception(Reason),
+ rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.return'{reply_code = ReplyCode,
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index bc1a2a08..1fee6b56 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,20 +32,20 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/2, process/2, shutdown/1]).
+-export([start_link/3, process/2, shutdown/1]).
%% internal
--export([mainloop/1]).
+-export([mainloop/2]).
%%--------------------------------------------------------------------
-start_link(StartFun, StartArgs) ->
+start_link(StartFun, StartArgs, Protocol) ->
spawn_link(
fun () ->
%% we trap exits so that a normal termination of the
%% channel or reader process terminates us too.
process_flag(trap_exit, true),
- mainloop(apply(StartFun, StartArgs))
+ mainloop(apply(StartFun, StartArgs), Protocol)
end).
process(Pid, Frame) ->
@@ -72,16 +72,16 @@ read_frame(ChannelPid) ->
Msg -> exit({unexpected_message, Msg})
end.
-mainloop(ChannelPid) ->
+mainloop(ChannelPid, Protocol) ->
{method, MethodName, FieldsBin} = read_frame(ChannelPid),
- Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin),
- case rabbit_framing:method_has_content(MethodName) of
- true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
+ Method = Protocol:decode_method_fields(MethodName, FieldsBin),
+ case Protocol:method_has_content(MethodName) of
+ true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
rabbit_channel:do(ChannelPid, Method,
collect_content(ChannelPid, ClassId));
false -> rabbit_channel:do(ChannelPid, Method)
end,
- ?MODULE:mainloop(ChannelPid).
+ ?MODULE:mainloop(ChannelPid, Protocol).
collect_content(ChannelPid, ClassId) ->
case read_frame(ChannelPid) of
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a54e0de9..10118071 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -41,7 +41,7 @@
-export([server_properties/0]).
--export([analyze_frame/2]).
+-export([analyze_frame/3]).
-import(gen_tcp).
-import(fprof).
@@ -62,8 +62,8 @@
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
- recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
- state, channels, user, vhost, timeout, frame_max, client_properties]).
+ recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels,
+ protocol, user, vhost, timeout, frame_max, client_properties]).
%% connection lifecycle
%%
@@ -249,7 +249,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
- client_properties = none},
+ client_properties = none,
+ protocol = unknown},
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init,
@@ -437,7 +438,9 @@ wait_for_channel_termination(N, TimerRef) ->
end.
maybe_close(State = #v1{connection_state = closing,
- queue_collector = Collector}) ->
+ queue_collector = Collector,
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
case all_channels() of
[] ->
%% Spec says "Exclusive queues may only be accessed by the current
@@ -445,16 +448,18 @@ maybe_close(State = #v1{connection_state = closing,
%% This does not strictly imply synchrony, but in practice it seems
%% to be what people assume.
rabbit_reader_queue_collector:delete_all(Collector),
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
close_connection(State);
_ -> State
end;
maybe_close(State) ->
State.
-handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
+handle_frame(Type, 0, Payload,
+ State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload) of
+ case analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -462,20 +467,20 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->
State;
-handle_frame(Type, 0, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, 0, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
end;
-handle_frame(Type, Channel, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, Channel, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
@@ -516,17 +521,20 @@ handle_frame(Type, Channel, Payload, State) ->
end
end.
-analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) ->
- {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields};
-analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) ->
+analyze_frame(?FRAME_METHOD,
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+analyze_frame(?FRAME_HEADER,
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
{content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body) ->
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
-analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
heartbeat;
-analyze_frame(_Type, _Body) ->
+analyze_frame(_Type, _Body, _Protocol) ->
error.
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
@@ -543,54 +551,61 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
throw({bad_payload, PayloadAndMarker})
end;
-handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
- State = #v1{sock = Sock, connection = Connection}) ->
- case check_version({ProtocolMajor, ProtocolMinor},
- {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
- true ->
- ok = send_on_channel0(
- Sock,
- #'connection.start'{
- version_major = ?PROTOCOL_VERSION_MAJOR,
- version_minor = ?PROTOCOL_VERSION_MINOR,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> }),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT},
- connection_state = starting},
- frame_header, 7};
- false ->
- throw({bad_version, ProtocolMajor, ProtocolMinor})
- end;
+%% The two rules pertaining to version negotiation:
+%%
+%% * If the server cannot support the protocol specified in the
+%% protocol header, it MUST respond with a valid protocol header and
+%% then close the socket connection.
+%%
+%% * The server MUST provide a protocol version that is lower than or
+%% equal to that requested by the client in the protocol header.
+handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) ->
+ start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
+
+handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) ->
+ start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
+
+%% the 0-8 spec, confusingly, defines the version as 8-0
+handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+
+handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, A, B, C, D});
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> rabbit_net:send(
- Sock, <<"AMQP",1,1,
- ?PROTOCOL_VERSION_MAJOR,
- ?PROTOCOL_VERSION_MINOR>>) end),
- throw({bad_header, Other});
+ refuse_connection(Sock, {bad_header, Other});
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
-%% the 0-8 spec, confusingly, defines the version as 8-0
-adjust_version({8,0}) -> {0,8};
-adjust_version(Version) -> Version.
-check_version(ClientVersion, ServerVersion) ->
- {ClientMajor, ClientMinor} = adjust_version(ClientVersion),
- {ServerMajor, ServerMinor} = adjust_version(ServerVersion),
- ClientMajor > ServerMajor
- orelse
- (ClientMajor == ServerMajor andalso
- ClientMinor >= ServerMinor).
+%% Offer a protocol version to the client. Connection.start only
+%% includes a major and minor version number, Luckily 0-9 and 0-9-1
+%% are similar enough that clients will be happy with either.
+start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
+ Protocol,
+ State = #v1{sock = Sock, connection = Connection}) ->
+ Start = #'connection.start'{ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = <<"PLAIN AMQPLAIN">>,
+ locales = <<"en_US">> },
+ ok = send_on_channel0(Sock, Start, Protocol),
+ {State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7}.
+
+refuse_connection(Sock, Exception) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
+ throw(Exception).
%%--------------------------------------------------------------------------
-handle_method0(MethodName, FieldsBin, State) ->
+handle_method0(MethodName, FieldsBin,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
try
- handle_method0(rabbit_framing:decode_method_fields(
- MethodName, FieldsBin),
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
catch exit:Reason ->
CompleteReason = case Reason of
@@ -612,14 +627,14 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
State = #v1{connection_state = starting,
- connection = Connection,
+ connection = Connection =
+ #connection{protocol = Protocol},
sock = Sock}) ->
User = rabbit_access_control:check_login(Mechanism, Response),
- ok = send_on_channel0(
- Sock,
- #'connection.tune'{channel_max = 0,
+ Tune = #'connection.tune'{channel_max = 0,
frame_max = ?FRAME_MAX,
- heartbeat = 0}),
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{
user = User,
@@ -645,46 +660,30 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
frame_max = FrameMax}}
end;
-handle_method0(#'connection.open'{virtual_host = VHostPath,
- insist = Insist},
+handle_method0(#'connection.open'{virtual_host = VHostPath},
+
State = #v1{connection_state = opening,
connection = Connection = #connection{
- user = User},
+ user = User,
+ protocol = Protocol},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
+ ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
+ State#v1{connection_state = running,
+ connection = NewConnection};
handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol},
sock = Sock})
when CS =:= closing; CS =:= closed ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
- ok = send_on_channel0(Sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
@@ -697,23 +696,8 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-send_on_channel0(Sock, Method) ->
- ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
+send_on_channel0(Sock, Method, Protocol) ->
+ ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
%%--------------------------------------------------------------------------
@@ -747,6 +731,8 @@ i(state, #v1{connection_state = S}) ->
S;
i(channels, #v1{}) ->
length(all_channels());
+i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
+ Protocol:version();
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
@@ -766,15 +752,18 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
send_to_new_channel(Channel, AnalyzedFrame,
- State = #v1{queue_collector = Collector}) ->
- #v1{sock = Sock, connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
+ #v1{connection = #connection{
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost,
+ protocol = Protocol},
+ sock = Sock,
+ queue_collector = Collector}) ->
+ WriterPid = rabbit_writer:start(Sock, Channel, FrameMax, Protocol),
ChPid = rabbit_framing_channel:start_link(
fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector]),
+ [Channel, self(), WriterPid, Username, VHost, Collector],
+ Protocol),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
@@ -790,25 +779,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
log_channel_error(CS, Channel, Reason),
send_exception(State, Channel, Reason).
-send_exception(State, Channel, Reason) ->
- {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason),
+send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+ Channel, Reason) ->
+ {ShouldClose, CloseChannel, CloseMethod} =
+ map_exception(Channel, Reason, Protocol),
NewState = case ShouldClose of
true -> terminate_channels(),
close_connection(State);
false -> close_channel(Channel, State)
end,
ok = rabbit_writer:internal_send_command(
- NewState#v1.sock, CloseChannel, CloseMethod),
+ NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
NewState.
-map_exception(Channel, Reason) ->
+map_exception(Channel, Reason, Protocol) ->
{SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
- lookup_amqp_exception(Reason),
+ lookup_amqp_exception(Reason, Protocol),
ShouldClose = SuggestedClose or (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
none -> {0, 0};
- _ -> rabbit_framing:method_id(FailedMethod)
+ _ -> Protocol:method_id(FailedMethod)
end,
{CloseChannel, CloseMethod} =
case ShouldClose of
@@ -823,22 +814,16 @@ map_exception(Channel, Reason) ->
end,
{ShouldClose, CloseChannel, CloseMethod}.
-%% FIXME: this clause can go when we move to AMQP spec >=8.1
-lookup_amqp_exception(#amqp_error{name = precondition_failed,
- explanation = Expl,
- method = Method}) ->
- ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl),
- {false, 406, ExplBin, Method};
lookup_amqp_exception(#amqp_error{name = Name,
explanation = Expl,
- method = Method}) ->
- {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name),
+ method = Method},
+ Protocol) ->
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name),
ExplBin = amqp_exception_explanation(Text, Expl),
{ShouldClose, Code, ExplBin, Method};
-lookup_amqp_exception(Other) ->
+lookup_amqp_exception(Other, Protocol) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {ShouldClose, Code, Text} =
- rabbit_framing:lookup_amqp_exception(internal_error),
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error),
{ShouldClose, Code, Text, none}.
amqp_exception_explanation(Text, Expl) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 960d9a9c..53f73b9d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -361,7 +361,8 @@ test_content_framing(FrameMax, Fragments) ->
1,
#content{class_id = 0, properties_bin = <<>>,
payload_fragments_rev = Fragments},
- FrameMax),
+ FrameMax,
+ rabbit_framing_amqp_0_9_1),
%% header is formatted correctly and the size is the total of the
%% fragments
<<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 3d10dc12..6bdc1742 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,14 +33,14 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/3, start_link/3, shutdown/1, mainloop/1]).
+-export([start/4, start_link/4, shutdown/1, mainloop/1]).
-export([send_command/2, send_command/3, send_command_and_signal_back/3,
send_command_and_signal_back/4, send_command_and_notify/5]).
--export([internal_send_command/3, internal_send_command/5]).
+-export([internal_send_command/4, internal_send_command/6]).
-import(gen_tcp).
--record(wstate, {sock, channel, frame_max}).
+-record(wstate, {sock, channel, frame_max, protocol}).
-define(HIBERNATE_AFTER, 5000).
@@ -48,8 +48,10 @@
-ifdef(use_specs).
--spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
--spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
+-spec(start/4 ::
+ (socket(), channel_number(), non_neg_integer(), protocol()) -> pid()).
+-spec(start_link/4 ::
+ (socket(), channel_number(), non_neg_integer(), protocol()) -> pid()).
-spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok').
-spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok').
-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
@@ -57,25 +59,27 @@
(pid(), amqp_method(), content(), pid()) -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok').
--spec(internal_send_command/3 ::
- (socket(), channel_number(), amqp_method_record()) -> 'ok').
--spec(internal_send_command/5 ::
+-spec(internal_send_command/4 ::
+ (socket(), channel_number(), amqp_method_record(), protocol()) -> 'ok').
+-spec(internal_send_command/6 ::
(socket(), channel_number(), amqp_method_record(),
- content(), non_neg_integer()) -> 'ok').
+ content(), non_neg_integer(), protocol()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start(Sock, Channel, FrameMax) ->
+start(Sock, Channel, FrameMax, Protocol) ->
spawn(?MODULE, mainloop, [#wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}]).
+ frame_max = FrameMax,
+ protocol = Protocol}]).
-start_link(Sock, Channel, FrameMax) ->
+start_link(Sock, Channel, FrameMax, Protocol) ->
spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}]).
+ frame_max = FrameMax,
+ protocol = Protocol}]).
mainloop(State) ->
receive
@@ -85,35 +89,40 @@ mainloop(State) ->
end.
handle_message({send_command, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ State = #wstate{sock = Sock, channel = Channel,
+ protocol = Protocol}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
State;
handle_message({send_command, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
State;
handle_message({send_command_and_signal_back, MethodRecord, Parent},
- State = #wstate{sock = Sock, channel = Channel}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ State = #wstate{sock = Sock, channel = Channel,
+ protocol = Protocol}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
Parent ! rabbit_writer_send_command_signal,
State;
handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
Parent ! rabbit_writer_send_command_signal,
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
handle_message({inet_reply, _, ok}, State) ->
@@ -153,30 +162,32 @@ shutdown(W) ->
%---------------------------------------------------------------------------
-assemble_frames(Channel, MethodRecord) ->
+assemble_frames(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
- rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord).
+ rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord,
+ Protocol).
-assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
+assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
- true = rabbit_framing:method_has_content(MethodName), % assertion
+ true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
- Channel, MethodRecord),
+ Channel, MethodRecord, Protocol),
ContentFrames = rabbit_binary_generator:build_simple_content_frames(
- Channel, Content, FrameMax),
+ Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
-internal_send_command(Sock, Channel, MethodRecord) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
+internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
+ ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)).
-internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
+internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
+ Protocol) ->
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)).
+ Content, FrameMax, Protocol)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -196,13 +207,14 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(Sock, Channel, MethodRecord) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)),
+internal_send_command_async(Sock, Channel, MethodRecord, Protocol) ->
+ true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
ok.
-internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
+internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax,
+ Protocol) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)),
+ Content, FrameMax, Protocol)),
ok.
port_cmd(Sock, Data) ->