diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-05 17:52:16 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-05 17:52:16 +0100 |
commit | be59b5f948efb66595238bd7e9e497510fdae452 (patch) | |
tree | 3c0452c5e076ad5398c88e081c6109455ec8870a | |
parent | 8af1cb807ba9ae63e6a67a81cb5f4bb71fb06ef0 (diff) | |
parent | 95b81581ff119a65be1b08f82429161c9d148d40 (diff) | |
download | rabbitmq-server-be59b5f948efb66595238bd7e9e497510fdae452.tar.gz |
Merge default into bug22889
-rw-r--r-- | .hgignore | 2 | ||||
-rw-r--r-- | Makefile | 22 | ||||
-rw-r--r-- | codegen.py | 17 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 4 | ||||
-rw-r--r-- | include/rabbit.hrl | 5 | ||||
-rw-r--r-- | src/rabbit.erl | 4 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 11 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 39 | ||||
-rw-r--r-- | src/rabbit_binary_parser.erl | 10 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
-rw-r--r-- | src/rabbit_framing_channel.erl | 18 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 247 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 3 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 86 |
14 files changed, 249 insertions, 224 deletions
@@ -11,7 +11,7 @@ syntax: regexp ^dist/ ^include/rabbit_framing\.hrl$ ^include/rabbit_framing_spec\.hrl$ -^src/rabbit_framing\.erl$ +^src/rabbit_framing_amqp.*\.erl$ ^src/.*\_usage.erl$ ^rabbit\.plt$ ^basic.plt$ @@ -12,7 +12,7 @@ EBIN_DIR=ebin INCLUDE_DIR=include DOCS_DIR=docs INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl -SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) +SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(BEAM_TARGETS) WEB_URL=http://www.rabbitmq.com/ @@ -56,7 +56,8 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json $(AMQP_CODEGEN_DIR)/rabbitmq-0.8-extensions.json +AMQP_SPEC_JSON_FILES_0_9_1=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.9.1.json +AMQP_SPEC_JSON_FILES_0_8=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.8.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -99,14 +100,17 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@ +$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) + $(PYTHON) codegen.py --allow-overwrite header $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) $@ -$(INCLUDE_DIR)/rabbit_framing_spec.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) - $(PYTHON) codegen.py spec $(AMQP_SPEC_JSON_FILES) $@ +$(INCLUDE_DIR)/rabbit_framing_spec.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) + $(PYTHON) codegen.py --allow-overwrite spec $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) $@ -$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@ +$(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_9_1) $@ + +$(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_8) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_8) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ @@ -131,7 +135,7 @@ $(BASIC_PLT): $(BEAM_TARGETS) clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel - rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc + rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(SOURCE_DIR)/rabbit_framing_amqp_*.erl codegen.pyc rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL) rm -f $(RABBIT_PLT) rm -f $(DEPS_FILE) @@ -315,8 +315,13 @@ def genErl(spec): methods = spec.allMethods() printFileHeader() - print """-module(rabbit_framing). --include("rabbit_framing.hrl"). + module = "rabbit_framing_amqp_%d_%d" % (spec.major, spec.minor) + if spec.revision != 0: + module = "%s_%d" % (module, spec.revision) + if module == "rabbit_framing_amqp_8_0": + module = "rabbit_framing_amqp_0_8" + print "-module(%s)." % module + print """-include("rabbit_framing.hrl"). -export([lookup_method_name/1]). @@ -331,6 +336,7 @@ def genErl(spec): -export([encode_properties/1]). -export([lookup_amqp_exception/1]). -export([amqp_exception/1]). +-export([version/0]). bitvalue(true) -> 1; bitvalue(false) -> 0; @@ -350,6 +356,7 @@ bitvalue(undefined) -> 0. -spec(encode_properties/1 :: (amqp_method_record()) -> binary()). -spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}). -spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()). +-spec(version/0 :: () -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -endif. % use_specs """ for m in methods: genLookupMethodName(m) @@ -391,6 +398,10 @@ bitvalue(undefined) -> 0. for(c,v,cls) in spec.constants: genAmqpException(c,v,cls) print "amqp_exception(_Code) -> undefined." + version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision) + if version == '{8, 0, 0}': version = '{0, 8, 0}' + print "version() -> %s." % (version) + def genHrl(spec): def erlType(domain): return erlangTypeMap[spec.resolveDomain(domain)] @@ -410,8 +421,6 @@ def genHrl(spec): methods = spec.allMethods() printFileHeader() - print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major) - print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor) print "-define(PROTOCOL_PORT, %d)." % (spec.port) for (c,v,cls) in spec.constants: diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index a2038cf0..9280b95c 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -862,6 +862,10 @@ <listitem><para>Number of channels using the connection.</para></listitem> </varlistentry> <varlistentry> + <term>protocol</term> + <listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem> + </varlistentry> + <varlistentry> <term>user</term> <listitem><para>Username associated with the connection.</para></listitem> </varlistentry> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 06297c69..8c713a50 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -36,7 +36,8 @@ -record(vhost, {virtual_host, dummy}). --record(connection, {user, timeout_sec, frame_max, vhost, client_properties}). +-record(connection, {protocol, user, timeout_sec, frame_max, vhost, + client_properties}). -record(content, {class_id, @@ -172,6 +173,7 @@ #amqp_error{name :: atom(), explanation :: string(), method :: atom()}). +-type(protocol() :: atom()). -endif. @@ -179,6 +181,7 @@ -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). +-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8"). -define(ERTS_MINIMUM, "5.6.3"). -define(MAX_WAIT, 16#ffffffff). diff --git a/src/rabbit.erl b/src/rabbit.erl index 6cf6d7d5..7df39987 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -424,9 +424,9 @@ print_banner() -> "| ~s +---+ |~n" "| |~n" "+-------------------+~n" - "AMQP ~p-~p~n~s~n~s~n~n", + "~s~n~s~n~s~n~n", [Product, string:right([$v|Version], ProductLen), - ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, + ?PROTOCOL_VERSION, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), Settings = [{"node", node()}, {"app descriptor", app_location()}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 4ab7a2a0..4a1d50df 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -80,7 +80,9 @@ delivery(Mandatory, Immediate, Txn, Message) -> sender = self(), message = Message}. build_content(Properties, BodyBin) -> - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), #content{class_id = ClassId, properties = Properties, properties_bin = none, @@ -90,8 +92,11 @@ from_content(Content) -> #content{class_id = ClassId, properties = Props, payload_fragments_rev = FragmentsRev} = - rabbit_binary_parser:ensure_content_decoded(Content), - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + rabbit_binary_parser:ensure_content_decoded(Content, + rabbit_framing_amqp_0_9_1), + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), {Props, list_to_binary(lists:reverse(FragmentsRev))}. message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 81cf3cee..75cd643c 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -41,12 +41,12 @@ % See definition of check_empty_content_body_frame_size/0, an assertion called at startup. -define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8). --export([build_simple_method_frame/2, - build_simple_content_frames/3, +-export([build_simple_method_frame/3, + build_simple_content_frames/4, build_heartbeat_frame/0]). -export([generate_table/1, encode_properties/2]). -export([check_empty_content_body_frame_size/0]). --export([ensure_content_encoded/1, clear_encoded_content/1]). +-export([ensure_content_encoded/2, clear_encoded_content/1]). -import(lists). @@ -56,25 +56,26 @@ -type(frame() :: [binary()]). --spec(build_simple_method_frame/2 :: - (channel_number(), amqp_method_record()) -> frame()). --spec(build_simple_content_frames/3 :: - (channel_number(), content(), non_neg_integer()) -> [frame()]). +-spec(build_simple_method_frame/3 :: + (channel_number(), amqp_method_record(), protocol()) -> frame()). +-spec(build_simple_content_frames/4 :: + (channel_number(), content(), non_neg_integer(), protocol()) -> + [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). -spec(generate_table/1 :: (amqp_table()) -> binary()). -spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()). -spec(check_empty_content_body_frame_size/0 :: () -> 'ok'). --spec(ensure_content_encoded/1 :: (content()) -> encoded_content()). +-spec(ensure_content_encoded/2 :: (content(), protocol()) -> encoded_content()). -spec(clear_encoded_content/1 :: (content()) -> unencoded_content()). -endif. %%---------------------------------------------------------------------------- -build_simple_method_frame(ChannelInt, MethodRecord) -> - MethodFields = rabbit_framing:encode_method_fields(MethodRecord), +build_simple_method_frame(ChannelInt, MethodRecord, Protocol) -> + MethodFields = Protocol:encode_method_fields(MethodRecord), MethodName = rabbit_misc:method_record_type(MethodRecord), - {ClassId, MethodId} = rabbit_framing:method_id(MethodName), + {ClassId, MethodId} = Protocol:method_id(MethodName), create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]). build_simple_content_frames(ChannelInt, @@ -82,18 +83,18 @@ build_simple_content_frames(ChannelInt, properties = ContentProperties, properties_bin = ContentPropertiesBin, payload_fragments_rev = PayloadFragmentsRev}, - FrameMax) -> + FrameMax, Protocol) -> {BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt), HeaderFrame = create_frame(2, ChannelInt, [<<ClassId:16, 0:16, BodySize:64>>, - maybe_encode_properties(ContentProperties, ContentPropertiesBin)]), + maybe_encode_properties(ContentProperties, ContentPropertiesBin, Protocol)]), [HeaderFrame | ContentFrames]. -maybe_encode_properties(_ContentProperties, ContentPropertiesBin) +maybe_encode_properties(_ContentProperties, ContentPropertiesBin, _Protocol) when is_binary(ContentPropertiesBin) -> ContentPropertiesBin; -maybe_encode_properties(ContentProperties, none) -> - rabbit_framing:encode_properties(ContentProperties). +maybe_encode_properties(ContentProperties, none, Protocol) -> + Protocol:encode_properties(ContentProperties). build_content_frames(FragsRev, FrameMax, ChannelInt) -> BodyPayloadMax = if FrameMax == 0 -> @@ -277,11 +278,11 @@ check_empty_content_body_frame_size() -> ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE}) end. -ensure_content_encoded(Content = #content{properties_bin = PropsBin}) +ensure_content_encoded(Content = #content{properties_bin = PropsBin}, _Protocol) when PropsBin =/= 'none' -> Content; -ensure_content_encoded(Content = #content{properties = Props}) -> - Content #content{properties_bin = rabbit_framing:encode_properties(Props)}. +ensure_content_encoded(Content = #content{properties = Props}, Protocol) -> + Content#content{properties_bin = Protocol:encode_properties(Props)}. clear_encoded_content(Content = #content{properties_bin = none}) -> Content; diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index e022a1fa..633be6f0 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -34,7 +34,7 @@ -include("rabbit.hrl"). -export([parse_table/1, parse_properties/2]). --export([ensure_content_decoded/1, clear_decoded_content/1]). +-export([ensure_content_decoded/2, clear_decoded_content/1]). -import(lists). @@ -44,7 +44,7 @@ -spec(parse_table/1 :: (binary()) -> amqp_table()). -spec(parse_properties/2 :: ([amqp_property_type()], binary()) -> [any()]). --spec(ensure_content_decoded/1 :: (content()) -> decoded_content()). +-spec(ensure_content_decoded/2 :: (content(), protocol()) -> decoded_content()). -spec(clear_decoded_content/1 :: (content()) -> undecoded_content()). -endif. @@ -159,12 +159,12 @@ parse_property(bit, Rest) -> parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) -> {parse_table(Table), Rest}. -ensure_content_decoded(Content = #content{properties = Props}) +ensure_content_decoded(Content = #content{properties = Props}, _Protocol) when Props =/= 'none' -> Content; -ensure_content_decoded(Content = #content{properties_bin = PropBin}) +ensure_content_decoded(Content = #content{properties_bin = PropBin}, Protocol) when is_binary(PropBin) -> - Content#content{properties = rabbit_framing:decode_properties( + Content#content{properties = Protocol:decode_properties( Content#content.class_id, PropBin)}. clear_decoded_content(Content = #content{properties = none}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 94a20fbd..213b6624 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -414,7 +414,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), + DecodedContent = rabbit_binary_parser:ensure_content_decoded( + Content, rabbit_framing_amqp_0_9_1), IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -927,7 +928,7 @@ basic_return(#basic_message{exchange_name = ExchangeName, content = Content}, WriterPid, Reason) -> {_Close, ReplyCode, ReplyText} = - rabbit_framing:lookup_amqp_exception(Reason), + rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index bc1a2a08..1fee6b56 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,20 +32,20 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/2, process/2, shutdown/1]). +-export([start_link/3, process/2, shutdown/1]). %% internal --export([mainloop/1]). +-export([mainloop/2]). %%-------------------------------------------------------------------- -start_link(StartFun, StartArgs) -> +start_link(StartFun, StartArgs, Protocol) -> spawn_link( fun () -> %% we trap exits so that a normal termination of the %% channel or reader process terminates us too. process_flag(trap_exit, true), - mainloop(apply(StartFun, StartArgs)) + mainloop(apply(StartFun, StartArgs), Protocol) end). process(Pid, Frame) -> @@ -72,16 +72,16 @@ read_frame(ChannelPid) -> Msg -> exit({unexpected_message, Msg}) end. -mainloop(ChannelPid) -> +mainloop(ChannelPid, Protocol) -> {method, MethodName, FieldsBin} = read_frame(ChannelPid), - Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin), - case rabbit_framing:method_has_content(MethodName) of - true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), + Method = Protocol:decode_method_fields(MethodName, FieldsBin), + case Protocol:method_has_content(MethodName) of + true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), rabbit_channel:do(ChannelPid, Method, collect_content(ChannelPid, ClassId)); false -> rabbit_channel:do(ChannelPid, Method) end, - ?MODULE:mainloop(ChannelPid). + ?MODULE:mainloop(ChannelPid, Protocol). collect_content(ChannelPid, ClassId) -> case read_frame(ChannelPid) of diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a54e0de9..10118071 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,7 +41,7 @@ -export([server_properties/0]). --export([analyze_frame/2]). +-export([analyze_frame/3]). -import(gen_tcp). -import(fprof). @@ -62,8 +62,8 @@ -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, - recv_oct, recv_cnt, send_oct, send_cnt, send_pend, - state, channels, user, vhost, timeout, frame_max, client_properties]). + recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels, + protocol, user, vhost, timeout, frame_max, client_properties]). %% connection lifecycle %% @@ -249,7 +249,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, vhost = none, - client_properties = none}, + client_properties = none, + protocol = unknown}, callback = uninitialized_callback, recv_ref = none, connection_state = pre_init, @@ -437,7 +438,9 @@ wait_for_channel_termination(N, TimerRef) -> end. maybe_close(State = #v1{connection_state = closing, - queue_collector = Collector}) -> + queue_collector = Collector, + connection = #connection{protocol = Protocol}, + sock = Sock}) -> case all_channels() of [] -> %% Spec says "Exclusive queues may only be accessed by the current @@ -445,16 +448,18 @@ maybe_close(State = #v1{connection_state = closing, %% This does not strictly imply synchrony, but in practice it seems %% to be what people assume. rabbit_reader_queue_collector:delete_all(Collector), - ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), close_connection(State); _ -> State end; maybe_close(State) -> State. -handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) +handle_frame(Type, 0, Payload, + State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}}) when CS =:= closing; CS =:= closed -> - case analyze_frame(Type, Payload) of + case analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State @@ -462,20 +467,20 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> State; -handle_frame(Type, 0, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, 0, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; - trace -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); Other -> throw({unexpected_frame_on_channel0, Other}) end; -handle_frame(Type, Channel, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, Channel, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - trace -> throw({unexpected_trace_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of @@ -516,17 +521,20 @@ handle_frame(Type, Channel, Payload, State) -> end end. -analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) -> - {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields}; -analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) -> +analyze_frame(?FRAME_METHOD, + <<ClassId:16, MethodId:16, MethodFields/binary>>, + Protocol) -> + MethodName = Protocol:lookup_method_name({ClassId, MethodId}), + {method, MethodName, MethodFields}; +analyze_frame(?FRAME_HEADER, + <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, + _Protocol) -> {content_header, ClassId, Weight, BodySize, Properties}; -analyze_frame(?FRAME_BODY, Body) -> +analyze_frame(?FRAME_BODY, Body, _Protocol) -> {content_body, Body}; -analyze_frame(?FRAME_TRACE, _Body) -> - trace; -analyze_frame(?FRAME_HEARTBEAT, <<>>) -> +analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> heartbeat; -analyze_frame(_Type, _Body) -> +analyze_frame(_Type, _Body, _Protocol) -> error. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> @@ -543,54 +551,61 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat throw({bad_payload, PayloadAndMarker}) end; -handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, - State = #v1{sock = Sock, connection = Connection}) -> - case check_version({ProtocolMajor, ProtocolMinor}, - {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of - true -> - ok = send_on_channel0( - Sock, - #'connection.start'{ - version_major = ?PROTOCOL_VERSION_MAJOR, - version_minor = ?PROTOCOL_VERSION_MINOR, - server_properties = server_properties(), - mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }), - {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT}, - connection_state = starting}, - frame_header, 7}; - false -> - throw({bad_version, ProtocolMajor, ProtocolMinor}) - end; +%% The two rules pertaining to version negotiation: +%% +%% * If the server cannot support the protocol specified in the +%% protocol header, it MUST respond with a valid protocol header and +%% then close the socket connection. +%% +%% * The server MUST provide a protocol version that is lower than or +%% equal to that requested by the client in the protocol header. +handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) -> + start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); + +handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) -> + start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); + +%% the 0-8 spec, confusingly, defines the version as 8-0 +handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> + start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); + +handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_version, A, B, C, D}); handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> rabbit_net:send( - Sock, <<"AMQP",1,1, - ?PROTOCOL_VERSION_MAJOR, - ?PROTOCOL_VERSION_MINOR>>) end), - throw({bad_header, Other}); + refuse_connection(Sock, {bad_header, Other}); handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). -%% the 0-8 spec, confusingly, defines the version as 8-0 -adjust_version({8,0}) -> {0,8}; -adjust_version(Version) -> Version. -check_version(ClientVersion, ServerVersion) -> - {ClientMajor, ClientMinor} = adjust_version(ClientVersion), - {ServerMajor, ServerMinor} = adjust_version(ServerVersion), - ClientMajor > ServerMajor - orelse - (ClientMajor == ServerMajor andalso - ClientMinor >= ServerMinor). +%% Offer a protocol version to the client. Connection.start only +%% includes a major and minor version number, Luckily 0-9 and 0-9-1 +%% are similar enough that clients will be happy with either. +start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, + Protocol, + State = #v1{sock = Sock, connection = Connection}) -> + Start = #'connection.start'{ version_major = ProtocolMajor, + version_minor = ProtocolMinor, + server_properties = server_properties(), + mechanisms = <<"PLAIN AMQPLAIN">>, + locales = <<"en_US">> }, + ok = send_on_channel0(Sock, Start, Protocol), + {State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT, + protocol = Protocol}, + connection_state = starting}, + frame_header, 7}. + +refuse_connection(Sock, Exception) -> + ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), + throw(Exception). %%-------------------------------------------------------------------------- -handle_method0(MethodName, FieldsBin, State) -> +handle_method0(MethodName, FieldsBin, + State = #v1{connection = #connection{protocol = Protocol}}) -> try - handle_method0(rabbit_framing:decode_method_fields( - MethodName, FieldsBin), + handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) catch exit:Reason -> CompleteReason = case Reason of @@ -612,14 +627,14 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response, client_properties = ClientProperties}, State = #v1{connection_state = starting, - connection = Connection, + connection = Connection = + #connection{protocol = Protocol}, sock = Sock}) -> User = rabbit_access_control:check_login(Mechanism, Response), - ok = send_on_channel0( - Sock, - #'connection.tune'{channel_max = 0, + Tune = #'connection.tune'{channel_max = 0, frame_max = ?FRAME_MAX, - heartbeat = 0}), + heartbeat = 0}, + ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{ user = User, @@ -645,46 +660,30 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, frame_max = FrameMax}} end; -handle_method0(#'connection.open'{virtual_host = VHostPath, - insist = Insist}, +handle_method0(#'connection.open'{virtual_host = VHostPath}, + State = #v1{connection_state = opening, connection = Connection = #connection{ - user = User}, + user = User, + protocol = Protocol}, sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, - KnownHosts = format_listeners(rabbit_networking:active_listeners()), - Redirects = compute_redirects(Insist), - if Redirects == [] -> - ok = send_on_channel0( - Sock, - #'connection.open_ok'{known_hosts = KnownHosts}), - State#v1{connection_state = running, - connection = NewConnection}; - true -> - %% FIXME: 'host' is supposed to only contain one - %% address; but which one do we pick? This is - %% really a problem with the spec. - Host = format_listeners(Redirects), - rabbit_log:info("connection ~p redirecting to ~p~n", - [self(), Host]), - ok = send_on_channel0( - Sock, - #'connection.redirect'{host = Host, - known_hosts = KnownHosts}), - close_connection(State#v1{connection = NewConnection}) - end; + ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), + State#v1{connection_state = running, + connection = NewConnection}; handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}, sock = Sock}) when CS =:= closing; CS =:= closed -> %% We're already closed or closing, so we don't need to cleanup %% anything. - ok = send_on_channel0(Sock, #'connection.close_ok'{}), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> @@ -697,23 +696,8 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -send_on_channel0(Sock, Method) -> - ok = rabbit_writer:internal_send_command(Sock, 0, Method). - -format_listeners(Listeners) -> - list_to_binary( - rabbit_misc:intersperse( - $,, - [io_lib:format("~s:~w", [Host, Port]) || - #listener{host = Host, port = Port} <- Listeners])). - -compute_redirects(true) -> []; -compute_redirects(false) -> - Node = node(), - LNode = rabbit_load:pick(), - if Node == LNode -> []; - true -> rabbit_networking:node_listeners(LNode) - end. +send_on_channel0(Sock, Method, Protocol) -> + ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). %%-------------------------------------------------------------------------- @@ -747,6 +731,8 @@ i(state, #v1{connection_state = S}) -> S; i(channels, #v1{}) -> length(all_channels()); +i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> + Protocol:version(); i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> @@ -766,15 +752,18 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- send_to_new_channel(Channel, AnalyzedFrame, - State = #v1{queue_collector = Collector}) -> - #v1{sock = Sock, connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), + #v1{connection = #connection{ + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost, + protocol = Protocol}, + sock = Sock, + queue_collector = Collector}) -> + WriterPid = rabbit_writer:start(Sock, Channel, FrameMax, Protocol), ChPid = rabbit_framing_channel:start_link( fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector]), + [Channel, self(), WriterPid, Username, VHost, Collector], + Protocol), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). @@ -790,25 +779,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> log_channel_error(CS, Channel, Reason), send_exception(State, Channel, Reason). -send_exception(State, Channel, Reason) -> - {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason), +send_exception(State = #v1{connection = #connection{protocol = Protocol}}, + Channel, Reason) -> + {ShouldClose, CloseChannel, CloseMethod} = + map_exception(Channel, Reason, Protocol), NewState = case ShouldClose of true -> terminate_channels(), close_connection(State); false -> close_channel(Channel, State) end, ok = rabbit_writer:internal_send_command( - NewState#v1.sock, CloseChannel, CloseMethod), + NewState#v1.sock, CloseChannel, CloseMethod, Protocol), NewState. -map_exception(Channel, Reason) -> +map_exception(Channel, Reason, Protocol) -> {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = - lookup_amqp_exception(Reason), + lookup_amqp_exception(Reason, Protocol), ShouldClose = SuggestedClose or (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; none -> {0, 0}; - _ -> rabbit_framing:method_id(FailedMethod) + _ -> Protocol:method_id(FailedMethod) end, {CloseChannel, CloseMethod} = case ShouldClose of @@ -823,22 +814,16 @@ map_exception(Channel, Reason) -> end, {ShouldClose, CloseChannel, CloseMethod}. -%% FIXME: this clause can go when we move to AMQP spec >=8.1 -lookup_amqp_exception(#amqp_error{name = precondition_failed, - explanation = Expl, - method = Method}) -> - ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), - {false, 406, ExplBin, Method}; lookup_amqp_exception(#amqp_error{name = Name, explanation = Expl, - method = Method}) -> - {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), + method = Method}, + Protocol) -> + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), ExplBin = amqp_exception_explanation(Text, Expl), {ShouldClose, Code, ExplBin, Method}; -lookup_amqp_exception(Other) -> +lookup_amqp_exception(Other, Protocol) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = - rabbit_framing:lookup_amqp_exception(internal_error), + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), {ShouldClose, Code, Text, none}. amqp_exception_explanation(Text, Expl) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 960d9a9c..53f73b9d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -361,7 +361,8 @@ test_content_framing(FrameMax, Fragments) -> 1, #content{class_id = 0, properties_bin = <<>>, payload_fragments_rev = Fragments}, - FrameMax), + FrameMax, + rabbit_framing_amqp_0_9_1), %% header is formatted correctly and the size is the total of the %% fragments <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 3d10dc12..6bdc1742 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,14 +33,14 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, start_link/3, shutdown/1, mainloop/1]). +-export([start/4, start_link/4, shutdown/1, mainloop/1]). -export([send_command/2, send_command/3, send_command_and_signal_back/3, send_command_and_signal_back/4, send_command_and_notify/5]). --export([internal_send_command/3, internal_send_command/5]). +-export([internal_send_command/4, internal_send_command/6]). -import(gen_tcp). --record(wstate, {sock, channel, frame_max}). +-record(wstate, {sock, channel, frame_max, protocol}). -define(HIBERNATE_AFTER, 5000). @@ -48,8 +48,10 @@ -ifdef(use_specs). --spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). +-spec(start/4 :: + (socket(), channel_number(), non_neg_integer(), protocol()) -> pid()). +-spec(start_link/4 :: + (socket(), channel_number(), non_neg_integer(), protocol()) -> pid()). -spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok'). -spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok'). -spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). @@ -57,25 +59,27 @@ (pid(), amqp_method(), content(), pid()) -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok'). --spec(internal_send_command/3 :: - (socket(), channel_number(), amqp_method_record()) -> 'ok'). --spec(internal_send_command/5 :: +-spec(internal_send_command/4 :: + (socket(), channel_number(), amqp_method_record(), protocol()) -> 'ok'). +-spec(internal_send_command/6 :: (socket(), channel_number(), amqp_method_record(), - content(), non_neg_integer()) -> 'ok'). + content(), non_neg_integer(), protocol()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax) -> +start(Sock, Channel, FrameMax, Protocol) -> spawn(?MODULE, mainloop, [#wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}]). + frame_max = FrameMax, + protocol = Protocol}]). -start_link(Sock, Channel, FrameMax) -> +start_link(Sock, Channel, FrameMax, Protocol) -> spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}]). + frame_max = FrameMax, + protocol = Protocol}]). mainloop(State) -> receive @@ -85,35 +89,40 @@ mainloop(State) -> end. handle_message({send_command, MethodRecord}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), + State = #wstate{sock = Sock, channel = Channel, + protocol = Protocol}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), State; handle_message({send_command, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), State; handle_message({send_command_and_signal_back, MethodRecord, Parent}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), + State = #wstate{sock = Sock, channel = Channel, + protocol = Protocol}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), Parent ! rabbit_writer_send_command_signal, State; handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), Parent ! rabbit_writer_send_command_signal, State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), rabbit_amqqueue:notify_sent(QPid, ChPid), State; handle_message({inet_reply, _, ok}, State) -> @@ -153,30 +162,32 @@ shutdown(W) -> %--------------------------------------------------------------------------- -assemble_frames(Channel, MethodRecord) -> +assemble_frames(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), - rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord). + rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord, + Protocol). -assemble_frames(Channel, MethodRecord, Content, FrameMax) -> +assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, Content), MethodName = rabbit_misc:method_record_type(MethodRecord), - true = rabbit_framing:method_has_content(MethodName), % assertion + true = Protocol:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( - Channel, MethodRecord), + Channel, MethodRecord, Protocol), ContentFrames = rabbit_binary_generator:build_simple_content_frames( - Channel, Content, FrameMax), + Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). -internal_send_command(Sock, Channel, MethodRecord) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). +internal_send_command(Sock, Channel, MethodRecord, Protocol) -> + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)). -internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + Content, FrameMax, Protocol)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -196,13 +207,14 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(Sock, Channel, MethodRecord) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)), +internal_send_command_async(Sock, Channel, MethodRecord, Protocol) -> + true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), ok. -internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)), + Content, FrameMax, Protocol)), ok. port_cmd(Sock, Data) -> |