diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-02-26 11:48:45 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-02-26 11:48:45 +0100 |
commit | 9f0bec8def6c43c13fc654262dd0ed671b924982 (patch) | |
tree | fea1bede766aab8dff19c17120bfe65cde6f209e | |
parent | 7b1f7692c71e1fffc24a5652d9ebc47f72ad2d67 (diff) | |
download | rabbitmq-server-git-9f0bec8def6c43c13fc654262dd0ed671b924982.tar.gz |
Start constants at 1 for stream plugin
5 files changed, 208 insertions, 208 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index dba0499caa..3667e1f14b 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -63,24 +63,24 @@ used to make the difference between a request (0) and a response (1). Example fo |=== |Response|Code -|OK|0 -|Stream does not exist|1 -|Subscription ID already exists|2 -|Subscription ID does not exist|3 -|Stream already exists|4 -|Stream not available|5 -|SASL mechanism not supported|6 -|Authentication failure|7 -|SASL error|8 -|SASL challenge|9 -|SASL authentication failure loopback|10 -|Virtual host access failure|11 -|Unknown frame|12 -|Frame too large|13 -|Internal error|14 -|Access refused|15 -|Precondition failed|16 -|Publisher does not exist|17 +|OK|1 +|Stream does not exist|2 +|Subscription ID already exists|3 +|Subscription ID does not exist|4 +|Stream already exists|5 +|Stream not available|6 +|SASL mechanism not supported|7 +|Authentication failure|8 +|SASL error|9 +|SASL challenge|10 +|SASL authentication failure loopback|11 +|Virtual host access failure|12 +|Unknown frame|13 +|Frame too large|14 +|Internal error|15 +|Access refused|16 +|Precondition failed|17 +|Publisher does not exist|18 |=== @@ -92,127 +92,127 @@ used to make the difference between a request (0) and a response (1). Example fo |<<declarepublisher>> |Client -|0 +|1 |Yes |<<publish>> |Client -|1 +|2 |No |<<publishconfirm>> |Server -|2 +|3 |No |<<publisherror>> |Server -|3 +|4 |No |<<querypublishersequence>> |Client -|4 +|5 |Yes |<<deletepublisher>> |Client -|5 +|6 |Yes |<<subscribe>> |Client -|6 +|7 |Yes |<<deliver>> |Server -|7 +|8 |No |<<credit>> |Client -|8 +|9 |No |<<commitoffset>> |Client -|9 +|10 |No |<<queryoffset>> |Client -|10 +|11 |Yes |<<unsubscribe>> |Client -|11 +|12 |Yes |<<create>> |Client -|12 +|13 |Yes |<<delete>> |Client -|13 +|14 |Yes |<<metadata>> |Client -|14 +|15 |Yes |<<metadataupdate>> |Server -|15 +|16 |No |<<peerproperties>> |Client -|16 +|17 |Yes |<<saslhandshake>> |Client -|17 +|18 |Yes |<<saslauthenticate>> |Client -|18 +|19 |Yes |<<tune>> |Server -|19 +|20 |Yes |<<open>> |Server -|20 +|21 |Yes |<<close>> |Client & Server -|21 +|22 |Yes |<<heartbeat>> |Client & Server -|22 +|23 |No |<<route>> (experimental) |Client -|23 +|24 |Yes |<<partitions>> (experimental) |Client -|24 +|25 |Yes |=== @@ -220,7 +220,7 @@ used to make the difference between a request (0) and a response (1). Example fo ``` DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream - Key => int16 // 0 + Key => int16 // 1 Version => int16 CorrelationId => int32 PublisherId => uint8 @@ -228,7 +228,7 @@ DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherRefer Stream => string DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId - Key => int16 // 18 + Key => int16 // 1 Version => int16 CorrelationId => int32 ResponseCode => int16 @@ -238,7 +238,7 @@ DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId ``` Publish => Key Version Stream PublishedMessages - Key => int16 // 1 + Key => int16 // 2 Version => int16 PublisherId => uint8 PublishedMessages => [PublishedMessage] @@ -251,7 +251,7 @@ Publish => Key Version Stream PublishedMessages ``` PublishConfirm => Key Version PublishingIds - Key => int16 // 2 + Key => int16 // 3 Version => int16 PublisherId => uint8 PublishingIds => [int64] // to correlate with the messages sent @@ -261,7 +261,7 @@ PublishConfirm => Key Version PublishingIds ``` PublishError => Key Version [PublishingError] - Key => int16 // 3 + Key => int16 // 4 Version => int16 PublisherId => uint8 PublishingError => PublishingId Code @@ -273,14 +273,14 @@ PublishError => Key Version [PublishingError] ``` QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream - Key => int16 // 4 + Key => int16 // 5 Version => int16 CorrelationId => int32 PublisherReference => string // max 256 characters Stream => string QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence - Key => int16 // 4 + Key => int16 // 5 Version => int16 CorrelationId => int32 ResponseCode => int16 @@ -291,13 +291,13 @@ QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence ``` DeletePublisherRequest => Key Version CorrelationId PublisherId - Key => int16 // 5 + Key => int16 // 6 Version => int16 CorrelationId => int32 PublisherId => uint8 DeletePublisherResponse => Key Version CorrelationId ResponseCode - Key => int16 // 5 + Key => int16 // 6 Version => int16 CorrelationId => int32 ResponseCode => int16 @@ -307,13 +307,13 @@ DeletePublisherResponse => Key Version CorrelationId ResponseCode ``` Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Credit - Key => int16 // 6 + Key => int16 // 7 Version => int16 CorrelationId => int32 // correlation id to correlate the response SubscriptionId => uint8 // client-supplied id to identify the subscription Stream => string // the name of the stream OffsetSpecification => OffsetType Offset - OffsetType => int16 // 0 (first), 1 (last), 2 (next), 3 (offset), 4 (timestamp) + OffsetType => int16 // 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp) Offset => uint64 (for offset) | int64 (for timestamp) Credit => int16 ``` @@ -322,7 +322,7 @@ Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification ``` Deliver => Key Version SubscriptionId OsirisChunk - Key => int16 // 7 + Key => int16 // 8 Version => int32 SubscriptionId => uint8 OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages @@ -345,13 +345,13 @@ for details on the structure of messages. ``` Credit => Key Version SubscriptionId Credit - Key => int16 // 8 + Key => int16 // 9 Version => int16 SubscriptionId => int8 Credit => int16 // the number of chunks that can be sent CreditResponse => Key Version ResponseCode SubscriptionId - Key => int16 // 8 + Key => int16 // 9 Version => int16 ResponseCode => int16 SubscriptionId => int8 @@ -363,7 +363,7 @@ NB: the server sent a response only in case of problem, e.g. crediting an unknow ``` CommitOffset => Key Version Reference Stream Offset - Key => int16 // 9 + Key => int16 // 10 Version => int16 CorrelationId => int32 // not used yet Reference => string // max 256 characters @@ -375,14 +375,14 @@ CommitOffset => Key Version Reference Stream Offset ``` QueryOffsetRequest => Key Version CorrelationId Reference Stream - Key => int16 // 10 + Key => int16 // 11 Version => int16 CorrelationId => int32 Reference => string // max 256 characters Stream => string QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset - Key => int16 // 10 + Key => int16 // 11 Version => int16 CorrelationId => int32 ResponseCode => int16 @@ -393,7 +393,7 @@ QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset ``` Unsubscribe => Key Version CorrelationId SubscriptionId - Key => int16 // 11 + Key => int16 // 12 Version => int16 CorrelationId => int32 SubscriptionId => int8 @@ -403,7 +403,7 @@ Unsubscribe => Key Version CorrelationId SubscriptionId ``` Create => Key Version CorrelationId Stream Arguments - Key => int16 // 12 + Key => int16 // 13 Version => int16 CorrelationId => int32 Stream => string @@ -417,7 +417,7 @@ Create => Key Version CorrelationId Stream Arguments ``` Delete => Key Version CorrelationId Stream - Key => int16 // 13 + Key => int16 // 14 Version => int16 CorrelationId => int32 Stream => string @@ -427,13 +427,13 @@ Delete => Key Version CorrelationId Stream ``` MetadataQuery => Key Version CorrelationId [Stream] - Key => int16 // 14 + Key => int16 // 15 Version => int16 CorrelationId => int32 Stream => string MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata] - Key => int16 // 14 + Key => int16 // 15 Version => int16 CorrelationId => int32 Broker => Reference Host Port @@ -451,7 +451,7 @@ MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata] ``` MetadataUpdate => Key Version MetadataInfo - Key => int16 // 15 + Key => int16 // 16 Version => int16 MetadataInfo => Code Stream Code => int16 // code to identify the information @@ -462,7 +462,7 @@ MetadataUpdate => Key Version MetadataInfo ``` PeerPropertiesRequest => Key Version PeerProperties - Key => int16 // 16 + Key => int16 // 17 Version => int16 CorrelationId => int32 PeerProperties => [PeerProperty] @@ -471,7 +471,7 @@ PeerPropertiesRequest => Key Version PeerProperties Value => string PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties - Key => int16 // 16 + Key => int16 // 17 Version => int16 CorrelationId => int32 ResponseCode => int16 @@ -485,12 +485,12 @@ PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties ``` SaslHandshakeRequest => Key Version CorrelationId Mechanism - Key => int16 // 17 + Key => int16 // 18 Version => int16 CorrelationId => int32 SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism] - Key => int16 // 17 + Key => int16 // 18 Version => int16 CorrelationId => int32 ResponseCode => int16 @@ -501,14 +501,14 @@ SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism] ``` SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData - Key => int16 // 18 + Key => int16 // 19 Version => int16 CorrelationId => int32 Mechanism => string SaslOpaqueData => bytes SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData - Key => int16 // 18 + Key => int16 // 19 Version => int16 CorrelationId => int32 ResponseCode => int16 @@ -519,7 +519,7 @@ SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueDat ``` TuneRequest => Key Version FrameMax Heartbeat - Key => int16 // 19 + Key => int16 // 20 Version => int16 FrameMax => int32 // in bytes, 0 means no limit Heartbeat => int32 // in seconds, 0 means no heartbeat @@ -531,13 +531,13 @@ TuneResponse => TuneRequest ``` OpenRequest => Key Version CorrelationId VirtualHost - Key => int16 // 20 + Key => int16 // 21 Version => int16 CorrelationId => int32 VirtualHost => string OpenResponse => Key Version CorrelationId ResponseCode - Key => int16 // 20 + Key => int16 // 21 Version => int16 CorrelationId => int32 ResponseCode => int16 @@ -547,14 +547,14 @@ OpenResponse => Key Version CorrelationId ResponseCode ``` CloseRequest => Key Version CorrelationId ClosingCode ClosingReason - Key => int16 // 21 + Key => int16 // 22 Version => int16 CorrelationId => int32 ClosingCode => int16 ClosingReason => string CloseResponse => Key Version CorrelationId ResponseCode - Key => int16 // 21 + Key => int16 // 22 Version => int16 CorrelationId => int32 ResponseCode => int16 @@ -564,7 +564,7 @@ CloseResponse => Key Version CorrelationId ResponseCode ``` Heartbeat => Key Version - Key => int16 // 22 + Key => int16 // 23 Version => int16 ``` @@ -574,32 +574,32 @@ _Experimental_ ``` RouteQuery => Key Version CorrelationId RoutingKey SuperStream - Key => int16 // 23 + Key => int16 // 24 Version => int16 CorrelationId => int32 RoutingKey => string SuperStream => string RouteResponse => Key Version CorrelationId Stream - Key => int16 // 14 + Key => int16 // 24 Version => int16 CorrelationId => int32 Stream => string ``` -=== Partitions (experimental) +=== Partitions _Experimental_ ``` PartitionsQuery => Key Version CorrelationId SuperStream - Key => int16 // 24 + Key => int16 // 25 Version => int16 CorrelationId => int32 SuperStream => string PartitionsResponse => Key Version CorrelationId [Stream] - Key => int16 // 23 + Key => int16 // 25 Version => int16 CorrelationId => int32 Stream => string diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index 802b2b823f..0503104c40 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -1,58 +1,58 @@ --define(COMMAND_DECLARE_PUBLISHER, 0). --define(COMMAND_PUBLISH, 1). --define(COMMAND_PUBLISH_CONFIRM, 2). --define(COMMAND_PUBLISH_ERROR, 3). --define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 4). --define(COMMAND_DELETE_PUBLISHER, 5). --define(COMMAND_SUBSCRIBE, 6). --define(COMMAND_DELIVER, 7). --define(COMMAND_CREDIT, 8). --define(COMMAND_COMMIT_OFFSET, 9). --define(COMMAND_QUERY_OFFSET, 10). --define(COMMAND_UNSUBSCRIBE, 11). --define(COMMAND_CREATE_STREAM, 12). --define(COMMAND_DELETE_STREAM, 13). --define(COMMAND_METADATA, 14). --define(COMMAND_METADATA_UPDATE, 15). --define(COMMAND_PEER_PROPERTIES, 16). --define(COMMAND_SASL_HANDSHAKE, 17). --define(COMMAND_SASL_AUTHENTICATE, 18). --define(COMMAND_TUNE, 19). --define(COMMAND_OPEN, 20). --define(COMMAND_CLOSE, 21). --define(COMMAND_HEARTBEAT, 22). --define(COMMAND_ROUTE, 23). --define(COMMAND_PARTITIONS, 24). +-define(COMMAND_DECLARE_PUBLISHER, 1). +-define(COMMAND_PUBLISH, 2). +-define(COMMAND_PUBLISH_CONFIRM, 3). +-define(COMMAND_PUBLISH_ERROR, 4). +-define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 5). +-define(COMMAND_DELETE_PUBLISHER, 6). +-define(COMMAND_SUBSCRIBE, 7). +-define(COMMAND_DELIVER, 8). +-define(COMMAND_CREDIT, 9). +-define(COMMAND_COMMIT_OFFSET, 10). +-define(COMMAND_QUERY_OFFSET, 11). +-define(COMMAND_UNSUBSCRIBE, 12). +-define(COMMAND_CREATE_STREAM, 13). +-define(COMMAND_DELETE_STREAM, 14). +-define(COMMAND_METADATA, 15). +-define(COMMAND_METADATA_UPDATE, 16). +-define(COMMAND_PEER_PROPERTIES, 17). +-define(COMMAND_SASL_HANDSHAKE, 18). +-define(COMMAND_SASL_AUTHENTICATE, 19). +-define(COMMAND_TUNE, 20). +-define(COMMAND_OPEN, 21). +-define(COMMAND_CLOSE, 22). +-define(COMMAND_HEARTBEAT, 23). +-define(COMMAND_ROUTE, 24). +-define(COMMAND_PARTITIONS, 25). -define(REQUEST, 0). -define(RESPONSE, 1). --define(VERSION_0, 0). +-define(VERSION_1, 1). --define(RESPONSE_CODE_OK, 0). --define(RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 1). --define(RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS, 2). --define(RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST, 3). --define(RESPONSE_CODE_STREAM_ALREADY_EXISTS, 4). --define(RESPONSE_CODE_STREAM_NOT_AVAILABLE, 5). --define(RESPONSE_SASL_MECHANISM_NOT_SUPPORTED, 6). --define(RESPONSE_AUTHENTICATION_FAILURE, 7). --define(RESPONSE_SASL_ERROR, 8). --define(RESPONSE_SASL_CHALLENGE, 9). --define(RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK, 10). --define(RESPONSE_VHOST_ACCESS_FAILURE, 11). --define(RESPONSE_CODE_UNKNOWN_FRAME, 12). --define(RESPONSE_CODE_FRAME_TOO_LARGE, 13). --define(RESPONSE_CODE_INTERNAL_ERROR, 14). --define(RESPONSE_CODE_ACCESS_REFUSED, 15). --define(RESPONSE_CODE_PRECONDITION_FAILED, 16). --define(RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, 17). +-define(RESPONSE_CODE_OK, 1). +-define(RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 2). +-define(RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS, 3). +-define(RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST, 4). +-define(RESPONSE_CODE_STREAM_ALREADY_EXISTS, 5). +-define(RESPONSE_CODE_STREAM_NOT_AVAILABLE, 6). +-define(RESPONSE_SASL_MECHANISM_NOT_SUPPORTED, 7). +-define(RESPONSE_AUTHENTICATION_FAILURE, 8). +-define(RESPONSE_SASL_ERROR, 9). +-define(RESPONSE_SASL_CHALLENGE, 10). +-define(RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK, 11). +-define(RESPONSE_VHOST_ACCESS_FAILURE, 12). +-define(RESPONSE_CODE_UNKNOWN_FRAME, 13). +-define(RESPONSE_CODE_FRAME_TOO_LARGE, 14). +-define(RESPONSE_CODE_INTERNAL_ERROR, 15). +-define(RESPONSE_CODE_ACCESS_REFUSED, 16). +-define(RESPONSE_CODE_PRECONDITION_FAILED, 17). +-define(RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, 18). --define(OFFSET_TYPE_FIRST, 0). --define(OFFSET_TYPE_LAST, 1). --define(OFFSET_TYPE_NEXT, 2). --define(OFFSET_TYPE_OFFSET, 3). --define(OFFSET_TYPE_TIMESTAMP, 4). +-define(OFFSET_TYPE_FIRST, 1). +-define(OFFSET_TYPE_LAST, 2). +-define(OFFSET_TYPE_NEXT, 3). +-define(OFFSET_TYPE_OFFSET, 4). +-define(OFFSET_TYPE_TIMESTAMP, 5). -define(DEFAULT_INITIAL_CREDITS, 50000). -define(DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING, 12500). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 013d57bec0..4d0e3b93e9 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -296,7 +296,7 @@ listen_loop_pre_auth(Transport, TuneFrame = <<?REQUEST:1, ?COMMAND_TUNE:15, - ?VERSION_0:16, + ?VERSION_1:16, FrameMax:32, Heartbeat:32>>, frame(Transport, Connection1, TuneFrame), @@ -451,7 +451,7 @@ listen_loop_post_auth(Transport, [<<FrameSize:32, ?REQUEST:1, ?COMMAND_METADATA_UPDATE:15, - ?VERSION_0:16, + ?VERSION_1:16, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, Stream/binary>>]), @@ -487,7 +487,7 @@ listen_loop_post_auth(Transport, [<<FrameSize:32, ?REQUEST:1, ?COMMAND_PUBLISH_CONFIRM:15, - ?VERSION_0:16>>, + ?VERSION_1:16>>, <<CurrentPublisherId:8>>, <<Count:32>>, PublishingIds]), @@ -507,7 +507,7 @@ listen_loop_post_auth(Transport, [<<FrameSize:32, ?REQUEST:1, ?COMMAND_PUBLISH_CONFIRM:15, - ?VERSION_0:16>>, + ?VERSION_1:16>>, <<LastPublisherId:8>>, <<LastCount:32>>, LastPublishingIds]), @@ -555,7 +555,7 @@ listen_loop_post_auth(Transport, [<<FrameSize:32, ?REQUEST:1, ?COMMAND_PUBLISH_CONFIRM:15, - ?VERSION_0:16>>, + ?VERSION_1:16>>, <<PublisherId:8>>, <<PublishingIdCount:32>>, PubIds]), @@ -642,7 +642,7 @@ listen_loop_post_auth(Transport, State1, Configuration); heartbeat_send -> - Frame = <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>, + Frame = <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>, case catch frame(Transport, Connection, Frame) of ok -> listen_loop_post_auth(Transport, @@ -796,7 +796,7 @@ handle_inbound_data(Transport, CloseFrame = <<?REQUEST:1, ?COMMAND_CLOSE:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_FRAME_TOO_LARGE:16, CloseReasonLength:16, @@ -851,7 +851,7 @@ handle_frame_pre_auth(Transport, State, <<?REQUEST:1, ?COMMAND_PEER_PROPERTIES:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ClientPropertiesCount:32, ClientPropertiesFrame/binary>>, @@ -902,7 +902,7 @@ handle_frame_pre_auth(Transport, Frame = <<?RESPONSE:1, ?COMMAND_PEER_PROPERTIES:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ?RESPONSE_CODE_OK:16, ServerPropertiesCount:32, @@ -919,7 +919,7 @@ handle_frame_pre_auth(Transport, State, <<?REQUEST:1, ?COMMAND_SASL_HANDSHAKE:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32>>, Rest) -> Mechanisms = rabbit_stream_utils:auth_mechanisms(S), @@ -933,7 +933,7 @@ handle_frame_pre_auth(Transport, Frame = <<?RESPONSE:1, ?COMMAND_SASL_HANDSHAKE:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ?RESPONSE_CODE_OK:16, MechanismsCount:32, @@ -950,7 +950,7 @@ handle_frame_pre_auth(Transport, State, <<?REQUEST:1, ?COMMAND_SASL_AUTHENTICATE:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, MechanismLength:16, Mechanism:MechanismLength/binary, @@ -1047,7 +1047,7 @@ handle_frame_pre_auth(Transport, Frame = <<?RESPONSE:1, ?COMMAND_SASL_AUTHENTICATE:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, FrameFragment/binary>>, frame(Transport, C1, Frame), @@ -1056,7 +1056,7 @@ handle_frame_pre_auth(Transport, Frame = <<?RESPONSE:1, ?COMMAND_SASL_AUTHENTICATE:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>, frame(Transport, Connection0, Frame), @@ -1072,7 +1072,7 @@ handle_frame_pre_auth(_Transport, State, <<?RESPONSE:1, ?COMMAND_TUNE:15, - ?VERSION_0:16, + ?VERSION_1:16, FrameMax:32, Heartbeat:32>>, Rest) -> @@ -1107,7 +1107,7 @@ handle_frame_pre_auth(Transport, State, <<?REQUEST:1, ?COMMAND_OPEN:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, VirtualHostLength:16, VirtualHost:VirtualHostLength/binary>>, @@ -1121,7 +1121,7 @@ handle_frame_pre_auth(Transport, #{}), F = <<?RESPONSE:1, ?COMMAND_OPEN:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ?RESPONSE_CODE_OK:16>>, %% FIXME check if vhost is alive (see rabbit_reader:is_vhost_alive/2) @@ -1132,7 +1132,7 @@ handle_frame_pre_auth(Transport, exit:_ -> Fr = <<?RESPONSE:1, ?COMMAND_OPEN:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ?RESPONSE_VHOST_ACCESS_FAILURE:16>>, {Connection#stream_connection{connection_step = failure}, Fr} @@ -1144,7 +1144,7 @@ handle_frame_pre_auth(Transport, handle_frame_pre_auth(_Transport, Connection, State, - <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>, + <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>, Rest) -> rabbit_log:info("Received heartbeat frame pre auth~n"), {Connection, State, Rest}; @@ -1194,7 +1194,7 @@ handle_frame_post_auth(Transport, State, <<?REQUEST:1, ?COMMAND_DECLARE_PUBLISHER:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, PublisherId:8, ReferenceSize:16, @@ -1284,7 +1284,7 @@ handle_frame_post_auth(Transport, State, <<?REQUEST:1, ?COMMAND_PUBLISH:15, - ?VERSION_0:16, + ?VERSION_1:16, PublisherId:8/unsigned, MessageCount:32, Messages/binary>>, @@ -1323,7 +1323,7 @@ handle_frame_post_auth(Transport, [<<FrameSize:32, ?REQUEST:1, ?COMMAND_PUBLISH_ERROR:15, - ?VERSION_0:16, + ?VERSION_1:16, PublisherId:8, MessageCount:32, Details/binary>>]), @@ -1340,7 +1340,7 @@ handle_frame_post_auth(Transport, [<<FrameSize:32, ?REQUEST:1, ?COMMAND_PUBLISH_ERROR:15, - ?VERSION_0:16, + ?VERSION_1:16, PublisherId:8, MessageCount:32, Details/binary>>]), @@ -1354,7 +1354,7 @@ handle_frame_post_auth(Transport, State, <<?REQUEST:1, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ReferenceSize:16, Reference:ReferenceSize/binary, @@ -1392,7 +1392,7 @@ handle_frame_post_auth(Transport, [<<FrameSize:32, ?RESPONSE:1, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15, - ?VERSION_0:16>>, + ?VERSION_1:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>, <<Sequence:64>>]), @@ -1404,7 +1404,7 @@ handle_frame_post_auth(Transport, State, <<?REQUEST:1, ?COMMAND_DELETE_PUBLISHER:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, PublisherId:8>>, Rest) -> @@ -1448,7 +1448,7 @@ handle_frame_post_auth(Transport, #stream_connection_state{consumers = Consumers} = State, <<?REQUEST:1, ?COMMAND_SUBSCRIBE:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, @@ -1605,7 +1605,7 @@ handle_frame_post_auth(Transport, #stream_connection_state{consumers = Consumers} = State, <<?REQUEST:1, ?COMMAND_CREDIT:15, - ?VERSION_0:16, + ?VERSION_1:16, SubscriptionId:8/unsigned, Credit:16/signed>>, Rest) -> @@ -1631,7 +1631,7 @@ handle_frame_post_auth(Transport, Frame = <<?RESPONSE:1, ?COMMAND_CREDIT:15, - ?VERSION_0:16, + ?VERSION_1:16, ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST:16, SubscriptionId:8>>, FrameSize = byte_size(Frame), @@ -1645,7 +1645,7 @@ handle_frame_post_auth(_Transport, State, <<?REQUEST:1, ?COMMAND_COMMIT_OFFSET:15, - ?VERSION_0:16, + ?VERSION_1:16, _CorrelationId:32, ReferenceSize:16, Reference:ReferenceSize/binary, @@ -1685,7 +1685,7 @@ handle_frame_post_auth(Transport, State, <<?REQUEST:1, ?COMMAND_QUERY_OFFSET:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ReferenceSize:16, Reference:ReferenceSize/binary, @@ -1722,7 +1722,7 @@ handle_frame_post_auth(Transport, [<<FrameSize:32, ?RESPONSE:1, ?COMMAND_QUERY_OFFSET:15, - ?VERSION_0:16>>, + ?VERSION_1:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]), @@ -1734,7 +1734,7 @@ handle_frame_post_auth(Transport, #stream_connection_state{consumers = Consumers} = State, <<?REQUEST:1, ?COMMAND_UNSUBSCRIBE:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, SubscriptionId:8/unsigned>>, Rest) -> @@ -1787,7 +1787,7 @@ handle_frame_post_auth(Transport, State, <<?REQUEST:1, ?COMMAND_CREATE_STREAM:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary, @@ -1871,7 +1871,7 @@ handle_frame_post_auth(Transport, State, <<?REQUEST:1, ?COMMAND_DELETE_STREAM:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, @@ -1903,7 +1903,7 @@ handle_frame_post_auth(Transport, [<<FrameSize:32, ?REQUEST:1, ?COMMAND_METADATA_UPDATE:15, - ?VERSION_0:16, + ?VERSION_1:16, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, Stream/binary>>]), @@ -1935,7 +1935,7 @@ handle_frame_post_auth(Transport, State, <<?REQUEST:1, ?COMMAND_METADATA:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, StreamCount:32, BinaryStreams/binary>>, @@ -2061,7 +2061,7 @@ handle_frame_post_auth(Transport, Frame = <<?RESPONSE:1, ?COMMAND_METADATA:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, BrokersBin/binary, MetadataBin/binary>>, @@ -2074,7 +2074,7 @@ handle_frame_post_auth(Transport, Connection, State, <<?COMMAND_ROUTE:16, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, RoutingKeySize:16, RoutingKey:RoutingKeySize/binary, @@ -2096,7 +2096,7 @@ handle_frame_post_auth(Transport, Frame = <<?COMMAND_ROUTE:16, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ResponseCode:16, StreamBin/binary>>, @@ -2109,7 +2109,7 @@ handle_frame_post_auth(Transport, Connection, State, <<?COMMAND_PARTITIONS:16, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, SuperStreamSize:16, SuperStream:SuperStreamSize/binary>>, @@ -2133,7 +2133,7 @@ handle_frame_post_auth(Transport, Frame = <<?COMMAND_PARTITIONS:16, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ResponseCode:16, PartitionsBin/binary>>, @@ -2145,7 +2145,7 @@ handle_frame_post_auth(Transport, State, <<?REQUEST:1, ?COMMAND_CLOSE:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ClosingCode:16, ClosingReasonLength:16, @@ -2156,7 +2156,7 @@ handle_frame_post_auth(Transport, Frame = <<?RESPONSE:1, ?COMMAND_CLOSE:15, - ?VERSION_0:16, + ?VERSION_1:16, CorrelationId:32, ?RESPONSE_CODE_OK:16>>, frame(Transport, Connection, Frame), @@ -2165,7 +2165,7 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(_Transport, Connection, State, - <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>, + <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>, Rest) -> rabbit_log:info("Received heartbeat frame post auth~n"), {Connection, State, Rest}; @@ -2177,7 +2177,7 @@ handle_frame_post_auth(Transport, Connection, State, Frame, Rest) -> CloseFrame = <<?REQUEST:1, ?COMMAND_CLOSE:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_UNKNOWN_FRAME:16, CloseReasonLength:16, @@ -2215,7 +2215,7 @@ handle_frame_post_close(_Transport, State, <<?RESPONSE:1, ?COMMAND_CLOSE:15, - ?VERSION_0:16, + ?VERSION_1:16, _CorrelationId:32, _ResponseCode:16>>, Rest) -> @@ -2225,7 +2225,7 @@ handle_frame_post_close(_Transport, handle_frame_post_close(_Transport, Connection, State, - <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>, + <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>, Rest) -> rabbit_log:info("Received heartbeat frame post close~n"), {Connection, State, Rest}; @@ -2429,7 +2429,7 @@ response(Transport, [<<?RESPONSE_FRAME_SIZE:32, ?RESPONSE:1, CommandId:15, - ?VERSION_0:16>>, + ?VERSION_1:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>]). subscription_exists(StreamSubscriptions, SubscriptionId) -> @@ -2450,7 +2450,7 @@ send_file_callback(Transport, <<FrameSize:32, ?REQUEST:1, ?COMMAND_DELIVER:15, - ?VERSION_0:16, + ?VERSION_1:16, SubscriptionId:8/unsigned>>, Transport:send(S, FrameBeginning), atomics:add(Counter, 1, Size), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index a3aa1c3ba6..176fcf71fc 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -185,7 +185,7 @@ test_peer_properties(S) -> PeerPropertiesFrame = <<?REQUEST:1, ?COMMAND_PEER_PROPERTIES:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, 0:32>>, PeerPropertiesFrameSize = byte_size(PeerPropertiesFrame), @@ -195,7 +195,7 @@ test_peer_properties(S) -> <<_Size:32, ?RESPONSE:1, ?COMMAND_PEER_PROPERTIES:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_OK:16, _Rest/binary>>} = @@ -203,7 +203,7 @@ test_peer_properties(S) -> test_authenticate(S) -> SaslHandshakeFrame = - <<?REQUEST:1, ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_0:16, 1:32>>, + <<?REQUEST:1, ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_1:16, 1:32>>, SaslHandshakeFrameSize = byte_size(SaslHandshakeFrame), gen_tcp:send(S, <<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>), @@ -216,7 +216,7 @@ test_authenticate(S) -> <<31:32, ?RESPONSE:1, ?COMMAND_SASL_HANDSHAKE:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_OK:16, 2:32, @@ -228,7 +228,7 @@ test_authenticate(S) -> <<31:32, ?RESPONSE:1, ?COMMAND_SASL_HANDSHAKE:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_OK:16, 2:32, @@ -250,7 +250,7 @@ test_authenticate(S) -> SaslAuthenticateFrame = <<?REQUEST:1, ?COMMAND_SASL_AUTHENTICATE:15, - ?VERSION_0:16, + ?VERSION_1:16, 2:32, 5:16, Plain/binary, @@ -266,7 +266,7 @@ test_authenticate(S) -> <<10:32, ?RESPONSE:1, ?COMMAND_SASL_AUTHENTICATE:15, - ?VERSION_0:16, + ?VERSION_1:16, 2:32, ?RESPONSE_CODE_OK:16, RestTune/binary>>} = @@ -276,7 +276,7 @@ test_authenticate(S) -> <<12:32, ?REQUEST:1, ?COMMAND_TUNE:15, - ?VERSION_0:16, + ?VERSION_1:16, ?DEFAULT_FRAME_MAX:32, ?DEFAULT_HEARTBEAT:32>>, case RestTune of @@ -289,7 +289,7 @@ test_authenticate(S) -> TuneFrame = <<?RESPONSE:1, ?COMMAND_TUNE:15, - ?VERSION_0:16, + ?VERSION_1:16, ?DEFAULT_FRAME_MAX:32, 0:32>>, TuneFrameSize = byte_size(TuneFrame), @@ -300,7 +300,7 @@ test_authenticate(S) -> OpenFrame = <<?REQUEST:1, ?COMMAND_OPEN:15, - ?VERSION_0:16, + ?VERSION_1:16, 3:32, VirtualHostLength:16, VirtualHost/binary>>, @@ -310,7 +310,7 @@ test_authenticate(S) -> <<10:32, ?RESPONSE:1, ?COMMAND_OPEN:15, - ?VERSION_0:16, + ?VERSION_1:16, 3:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). @@ -320,7 +320,7 @@ test_create_stream(S, Stream) -> CreateStreamFrame = <<?REQUEST:1, ?COMMAND_CREATE_STREAM:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, StreamSize:16, Stream:StreamSize/binary, @@ -331,7 +331,7 @@ test_create_stream(S, Stream) -> <<_Size:32, ?RESPONSE:1, ?COMMAND_CREATE_STREAM:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). @@ -341,7 +341,7 @@ test_delete_stream(S, Stream) -> DeleteStreamFrame = <<?REQUEST:1, ?COMMAND_DELETE_STREAM:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, StreamSize:16, Stream:StreamSize/binary>>, @@ -352,7 +352,7 @@ test_delete_stream(S, Stream) -> <<ResponseFrameSize:32, ?RESPONSE:1, ?COMMAND_DELETE_STREAM:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000). @@ -362,7 +362,7 @@ test_declare_publisher(S, PublisherId, Stream) -> DeclarePublisherFrame = <<?REQUEST:1, ?COMMAND_DECLARE_PUBLISHER:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, PublisherId:8, 0:16, %% empty publisher reference @@ -375,7 +375,7 @@ test_declare_publisher(S, PublisherId, Stream) -> <<_Size:32, ?RESPONSE:1, ?COMMAND_DECLARE_PUBLISHER:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = @@ -387,7 +387,7 @@ test_publish_confirm(S, PublisherId, Body) -> PublishFrame = <<?REQUEST:1, ?COMMAND_PUBLISH:15, - ?VERSION_0:16, + ?VERSION_1:16, PublisherId:8, 1:32, 1:64, @@ -399,7 +399,7 @@ test_publish_confirm(S, PublisherId, Body) -> <<_Size:32, ?REQUEST:1, ?COMMAND_PUBLISH_CONFIRM:15, - ?VERSION_0:16, + ?VERSION_1:16, PublisherId:8, 1:32, 1:64>>} = @@ -410,7 +410,7 @@ test_subscribe(S, SubscriptionId, Stream) -> SubscribeFrame = <<?REQUEST:1, ?COMMAND_SUBSCRIBE:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, SubscriptionId:8, StreamSize:16, @@ -425,7 +425,7 @@ test_subscribe(S, SubscriptionId, Stream) -> <<_Size:32, ?RESPONSE:1, ?COMMAND_SUBSCRIBE:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = @@ -438,7 +438,7 @@ test_deliver(S, Rest, SubscriptionId, Body) -> <<58:32, ?REQUEST:1, ?COMMAND_DELIVER:15, - ?VERSION_0:16, + ?VERSION_1:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, @@ -463,7 +463,7 @@ test_metadata_update_stream_deleted(S, Stream) -> <<FrameSize:32, ?REQUEST:1, ?COMMAND_METADATA_UPDATE:15, - ?VERSION_0:16, + ?VERSION_1:16, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, Stream/binary>>} = @@ -475,7 +475,7 @@ test_close(S) -> CloseFrame = <<?REQUEST:1, ?COMMAND_CLOSE:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_OK:16, CloseReasonSize:16, @@ -486,7 +486,7 @@ test_close(S) -> <<10:32, ?RESPONSE:1, ?COMMAND_CLOSE:15, - ?VERSION_0:16, + ?VERSION_1:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index 495440b35c..5d5b1b5b0c 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -41,7 +41,7 @@ public class TestUtils { } static int streamPortNode2() { - String port = System.getProperty("node2.stream.port", "5556"); + String port = System.getProperty("node2.stream.port", "5552"); return Integer.valueOf(port); } |