summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-02-26 11:48:45 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-02-26 11:48:45 +0100
commit9f0bec8def6c43c13fc654262dd0ed671b924982 (patch)
treefea1bede766aab8dff19c17120bfe65cde6f209e
parent7b1f7692c71e1fffc24a5652d9ebc47f72ad2d67 (diff)
downloadrabbitmq-server-git-9f0bec8def6c43c13fc654262dd0ed671b924982.tar.gz
Start constants at 1 for stream plugin
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc166
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl98
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl100
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl50
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java2
5 files changed, 208 insertions, 208 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index dba0499caa..3667e1f14b 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -63,24 +63,24 @@ used to make the difference between a request (0) and a response (1). Example fo
|===
|Response|Code
-|OK|0
-|Stream does not exist|1
-|Subscription ID already exists|2
-|Subscription ID does not exist|3
-|Stream already exists|4
-|Stream not available|5
-|SASL mechanism not supported|6
-|Authentication failure|7
-|SASL error|8
-|SASL challenge|9
-|SASL authentication failure loopback|10
-|Virtual host access failure|11
-|Unknown frame|12
-|Frame too large|13
-|Internal error|14
-|Access refused|15
-|Precondition failed|16
-|Publisher does not exist|17
+|OK|1
+|Stream does not exist|2
+|Subscription ID already exists|3
+|Subscription ID does not exist|4
+|Stream already exists|5
+|Stream not available|6
+|SASL mechanism not supported|7
+|Authentication failure|8
+|SASL error|9
+|SASL challenge|10
+|SASL authentication failure loopback|11
+|Virtual host access failure|12
+|Unknown frame|13
+|Frame too large|14
+|Internal error|15
+|Access refused|16
+|Precondition failed|17
+|Publisher does not exist|18
|===
@@ -92,127 +92,127 @@ used to make the difference between a request (0) and a response (1). Example fo
|<<declarepublisher>>
|Client
-|0
+|1
|Yes
|<<publish>>
|Client
-|1
+|2
|No
|<<publishconfirm>>
|Server
-|2
+|3
|No
|<<publisherror>>
|Server
-|3
+|4
|No
|<<querypublishersequence>>
|Client
-|4
+|5
|Yes
|<<deletepublisher>>
|Client
-|5
+|6
|Yes
|<<subscribe>>
|Client
-|6
+|7
|Yes
|<<deliver>>
|Server
-|7
+|8
|No
|<<credit>>
|Client
-|8
+|9
|No
|<<commitoffset>>
|Client
-|9
+|10
|No
|<<queryoffset>>
|Client
-|10
+|11
|Yes
|<<unsubscribe>>
|Client
-|11
+|12
|Yes
|<<create>>
|Client
-|12
+|13
|Yes
|<<delete>>
|Client
-|13
+|14
|Yes
|<<metadata>>
|Client
-|14
+|15
|Yes
|<<metadataupdate>>
|Server
-|15
+|16
|No
|<<peerproperties>>
|Client
-|16
+|17
|Yes
|<<saslhandshake>>
|Client
-|17
+|18
|Yes
|<<saslauthenticate>>
|Client
-|18
+|19
|Yes
|<<tune>>
|Server
-|19
+|20
|Yes
|<<open>>
|Server
-|20
+|21
|Yes
|<<close>>
|Client & Server
-|21
+|22
|Yes
|<<heartbeat>>
|Client & Server
-|22
+|23
|No
|<<route>> (experimental)
|Client
-|23
+|24
|Yes
|<<partitions>> (experimental)
|Client
-|24
+|25
|Yes
|===
@@ -220,7 +220,7 @@ used to make the difference between a request (0) and a response (1). Example fo
```
DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
- Key => int16 // 0
+ Key => int16 // 1
Version => int16
CorrelationId => int32
PublisherId => uint8
@@ -228,7 +228,7 @@ DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherRefer
Stream => string
DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
- Key => int16 // 18
+ Key => int16 // 1
Version => int16
CorrelationId => int32
ResponseCode => int16
@@ -238,7 +238,7 @@ DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
```
Publish => Key Version Stream PublishedMessages
- Key => int16 // 1
+ Key => int16 // 2
Version => int16
PublisherId => uint8
PublishedMessages => [PublishedMessage]
@@ -251,7 +251,7 @@ Publish => Key Version Stream PublishedMessages
```
PublishConfirm => Key Version PublishingIds
- Key => int16 // 2
+ Key => int16 // 3
Version => int16
PublisherId => uint8
PublishingIds => [int64] // to correlate with the messages sent
@@ -261,7 +261,7 @@ PublishConfirm => Key Version PublishingIds
```
PublishError => Key Version [PublishingError]
- Key => int16 // 3
+ Key => int16 // 4
Version => int16
PublisherId => uint8
PublishingError => PublishingId Code
@@ -273,14 +273,14 @@ PublishError => Key Version [PublishingError]
```
QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
- Key => int16 // 4
+ Key => int16 // 5
Version => int16
CorrelationId => int32
PublisherReference => string // max 256 characters
Stream => string
QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
- Key => int16 // 4
+ Key => int16 // 5
Version => int16
CorrelationId => int32
ResponseCode => int16
@@ -291,13 +291,13 @@ QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
```
DeletePublisherRequest => Key Version CorrelationId PublisherId
- Key => int16 // 5
+ Key => int16 // 6
Version => int16
CorrelationId => int32
PublisherId => uint8
DeletePublisherResponse => Key Version CorrelationId ResponseCode
- Key => int16 // 5
+ Key => int16 // 6
Version => int16
CorrelationId => int32
ResponseCode => int16
@@ -307,13 +307,13 @@ DeletePublisherResponse => Key Version CorrelationId ResponseCode
```
Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Credit
- Key => int16 // 6
+ Key => int16 // 7
Version => int16
CorrelationId => int32 // correlation id to correlate the response
SubscriptionId => uint8 // client-supplied id to identify the subscription
Stream => string // the name of the stream
OffsetSpecification => OffsetType Offset
- OffsetType => int16 // 0 (first), 1 (last), 2 (next), 3 (offset), 4 (timestamp)
+ OffsetType => int16 // 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
Offset => uint64 (for offset) | int64 (for timestamp)
Credit => int16
```
@@ -322,7 +322,7 @@ Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification
```
Deliver => Key Version SubscriptionId OsirisChunk
- Key => int16 // 7
+ Key => int16 // 8
Version => int32
SubscriptionId => uint8
OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
@@ -345,13 +345,13 @@ for details on the structure of messages.
```
Credit => Key Version SubscriptionId Credit
- Key => int16 // 8
+ Key => int16 // 9
Version => int16
SubscriptionId => int8
Credit => int16 // the number of chunks that can be sent
CreditResponse => Key Version ResponseCode SubscriptionId
- Key => int16 // 8
+ Key => int16 // 9
Version => int16
ResponseCode => int16
SubscriptionId => int8
@@ -363,7 +363,7 @@ NB: the server sent a response only in case of problem, e.g. crediting an unknow
```
CommitOffset => Key Version Reference Stream Offset
- Key => int16 // 9
+ Key => int16 // 10
Version => int16
CorrelationId => int32 // not used yet
Reference => string // max 256 characters
@@ -375,14 +375,14 @@ CommitOffset => Key Version Reference Stream Offset
```
QueryOffsetRequest => Key Version CorrelationId Reference Stream
- Key => int16 // 10
+ Key => int16 // 11
Version => int16
CorrelationId => int32
Reference => string // max 256 characters
Stream => string
QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
- Key => int16 // 10
+ Key => int16 // 11
Version => int16
CorrelationId => int32
ResponseCode => int16
@@ -393,7 +393,7 @@ QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
```
Unsubscribe => Key Version CorrelationId SubscriptionId
- Key => int16 // 11
+ Key => int16 // 12
Version => int16
CorrelationId => int32
SubscriptionId => int8
@@ -403,7 +403,7 @@ Unsubscribe => Key Version CorrelationId SubscriptionId
```
Create => Key Version CorrelationId Stream Arguments
- Key => int16 // 12
+ Key => int16 // 13
Version => int16
CorrelationId => int32
Stream => string
@@ -417,7 +417,7 @@ Create => Key Version CorrelationId Stream Arguments
```
Delete => Key Version CorrelationId Stream
- Key => int16 // 13
+ Key => int16 // 14
Version => int16
CorrelationId => int32
Stream => string
@@ -427,13 +427,13 @@ Delete => Key Version CorrelationId Stream
```
MetadataQuery => Key Version CorrelationId [Stream]
- Key => int16 // 14
+ Key => int16 // 15
Version => int16
CorrelationId => int32
Stream => string
MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
- Key => int16 // 14
+ Key => int16 // 15
Version => int16
CorrelationId => int32
Broker => Reference Host Port
@@ -451,7 +451,7 @@ MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
```
MetadataUpdate => Key Version MetadataInfo
- Key => int16 // 15
+ Key => int16 // 16
Version => int16
MetadataInfo => Code Stream
Code => int16 // code to identify the information
@@ -462,7 +462,7 @@ MetadataUpdate => Key Version MetadataInfo
```
PeerPropertiesRequest => Key Version PeerProperties
- Key => int16 // 16
+ Key => int16 // 17
Version => int16
CorrelationId => int32
PeerProperties => [PeerProperty]
@@ -471,7 +471,7 @@ PeerPropertiesRequest => Key Version PeerProperties
Value => string
PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties
- Key => int16 // 16
+ Key => int16 // 17
Version => int16
CorrelationId => int32
ResponseCode => int16
@@ -485,12 +485,12 @@ PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties
```
SaslHandshakeRequest => Key Version CorrelationId Mechanism
- Key => int16 // 17
+ Key => int16 // 18
Version => int16
CorrelationId => int32
SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism]
- Key => int16 // 17
+ Key => int16 // 18
Version => int16
CorrelationId => int32
ResponseCode => int16
@@ -501,14 +501,14 @@ SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism]
```
SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData
- Key => int16 // 18
+ Key => int16 // 19
Version => int16
CorrelationId => int32
Mechanism => string
SaslOpaqueData => bytes
SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData
- Key => int16 // 18
+ Key => int16 // 19
Version => int16
CorrelationId => int32
ResponseCode => int16
@@ -519,7 +519,7 @@ SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueDat
```
TuneRequest => Key Version FrameMax Heartbeat
- Key => int16 // 19
+ Key => int16 // 20
Version => int16
FrameMax => int32 // in bytes, 0 means no limit
Heartbeat => int32 // in seconds, 0 means no heartbeat
@@ -531,13 +531,13 @@ TuneResponse => TuneRequest
```
OpenRequest => Key Version CorrelationId VirtualHost
- Key => int16 // 20
+ Key => int16 // 21
Version => int16
CorrelationId => int32
VirtualHost => string
OpenResponse => Key Version CorrelationId ResponseCode
- Key => int16 // 20
+ Key => int16 // 21
Version => int16
CorrelationId => int32
ResponseCode => int16
@@ -547,14 +547,14 @@ OpenResponse => Key Version CorrelationId ResponseCode
```
CloseRequest => Key Version CorrelationId ClosingCode ClosingReason
- Key => int16 // 21
+ Key => int16 // 22
Version => int16
CorrelationId => int32
ClosingCode => int16
ClosingReason => string
CloseResponse => Key Version CorrelationId ResponseCode
- Key => int16 // 21
+ Key => int16 // 22
Version => int16
CorrelationId => int32
ResponseCode => int16
@@ -564,7 +564,7 @@ CloseResponse => Key Version CorrelationId ResponseCode
```
Heartbeat => Key Version
- Key => int16 // 22
+ Key => int16 // 23
Version => int16
```
@@ -574,32 +574,32 @@ _Experimental_
```
RouteQuery => Key Version CorrelationId RoutingKey SuperStream
- Key => int16 // 23
+ Key => int16 // 24
Version => int16
CorrelationId => int32
RoutingKey => string
SuperStream => string
RouteResponse => Key Version CorrelationId Stream
- Key => int16 // 14
+ Key => int16 // 24
Version => int16
CorrelationId => int32
Stream => string
```
-=== Partitions (experimental)
+=== Partitions
_Experimental_
```
PartitionsQuery => Key Version CorrelationId SuperStream
- Key => int16 // 24
+ Key => int16 // 25
Version => int16
CorrelationId => int32
SuperStream => string
PartitionsResponse => Key Version CorrelationId [Stream]
- Key => int16 // 23
+ Key => int16 // 25
Version => int16
CorrelationId => int32
Stream => string
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index 802b2b823f..0503104c40 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -1,58 +1,58 @@
--define(COMMAND_DECLARE_PUBLISHER, 0).
--define(COMMAND_PUBLISH, 1).
--define(COMMAND_PUBLISH_CONFIRM, 2).
--define(COMMAND_PUBLISH_ERROR, 3).
--define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 4).
--define(COMMAND_DELETE_PUBLISHER, 5).
--define(COMMAND_SUBSCRIBE, 6).
--define(COMMAND_DELIVER, 7).
--define(COMMAND_CREDIT, 8).
--define(COMMAND_COMMIT_OFFSET, 9).
--define(COMMAND_QUERY_OFFSET, 10).
--define(COMMAND_UNSUBSCRIBE, 11).
--define(COMMAND_CREATE_STREAM, 12).
--define(COMMAND_DELETE_STREAM, 13).
--define(COMMAND_METADATA, 14).
--define(COMMAND_METADATA_UPDATE, 15).
--define(COMMAND_PEER_PROPERTIES, 16).
--define(COMMAND_SASL_HANDSHAKE, 17).
--define(COMMAND_SASL_AUTHENTICATE, 18).
--define(COMMAND_TUNE, 19).
--define(COMMAND_OPEN, 20).
--define(COMMAND_CLOSE, 21).
--define(COMMAND_HEARTBEAT, 22).
--define(COMMAND_ROUTE, 23).
--define(COMMAND_PARTITIONS, 24).
+-define(COMMAND_DECLARE_PUBLISHER, 1).
+-define(COMMAND_PUBLISH, 2).
+-define(COMMAND_PUBLISH_CONFIRM, 3).
+-define(COMMAND_PUBLISH_ERROR, 4).
+-define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 5).
+-define(COMMAND_DELETE_PUBLISHER, 6).
+-define(COMMAND_SUBSCRIBE, 7).
+-define(COMMAND_DELIVER, 8).
+-define(COMMAND_CREDIT, 9).
+-define(COMMAND_COMMIT_OFFSET, 10).
+-define(COMMAND_QUERY_OFFSET, 11).
+-define(COMMAND_UNSUBSCRIBE, 12).
+-define(COMMAND_CREATE_STREAM, 13).
+-define(COMMAND_DELETE_STREAM, 14).
+-define(COMMAND_METADATA, 15).
+-define(COMMAND_METADATA_UPDATE, 16).
+-define(COMMAND_PEER_PROPERTIES, 17).
+-define(COMMAND_SASL_HANDSHAKE, 18).
+-define(COMMAND_SASL_AUTHENTICATE, 19).
+-define(COMMAND_TUNE, 20).
+-define(COMMAND_OPEN, 21).
+-define(COMMAND_CLOSE, 22).
+-define(COMMAND_HEARTBEAT, 23).
+-define(COMMAND_ROUTE, 24).
+-define(COMMAND_PARTITIONS, 25).
-define(REQUEST, 0).
-define(RESPONSE, 1).
--define(VERSION_0, 0).
+-define(VERSION_1, 1).
--define(RESPONSE_CODE_OK, 0).
--define(RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 1).
--define(RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS, 2).
--define(RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST, 3).
--define(RESPONSE_CODE_STREAM_ALREADY_EXISTS, 4).
--define(RESPONSE_CODE_STREAM_NOT_AVAILABLE, 5).
--define(RESPONSE_SASL_MECHANISM_NOT_SUPPORTED, 6).
--define(RESPONSE_AUTHENTICATION_FAILURE, 7).
--define(RESPONSE_SASL_ERROR, 8).
--define(RESPONSE_SASL_CHALLENGE, 9).
--define(RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK, 10).
--define(RESPONSE_VHOST_ACCESS_FAILURE, 11).
--define(RESPONSE_CODE_UNKNOWN_FRAME, 12).
--define(RESPONSE_CODE_FRAME_TOO_LARGE, 13).
--define(RESPONSE_CODE_INTERNAL_ERROR, 14).
--define(RESPONSE_CODE_ACCESS_REFUSED, 15).
--define(RESPONSE_CODE_PRECONDITION_FAILED, 16).
--define(RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, 17).
+-define(RESPONSE_CODE_OK, 1).
+-define(RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 2).
+-define(RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS, 3).
+-define(RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST, 4).
+-define(RESPONSE_CODE_STREAM_ALREADY_EXISTS, 5).
+-define(RESPONSE_CODE_STREAM_NOT_AVAILABLE, 6).
+-define(RESPONSE_SASL_MECHANISM_NOT_SUPPORTED, 7).
+-define(RESPONSE_AUTHENTICATION_FAILURE, 8).
+-define(RESPONSE_SASL_ERROR, 9).
+-define(RESPONSE_SASL_CHALLENGE, 10).
+-define(RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK, 11).
+-define(RESPONSE_VHOST_ACCESS_FAILURE, 12).
+-define(RESPONSE_CODE_UNKNOWN_FRAME, 13).
+-define(RESPONSE_CODE_FRAME_TOO_LARGE, 14).
+-define(RESPONSE_CODE_INTERNAL_ERROR, 15).
+-define(RESPONSE_CODE_ACCESS_REFUSED, 16).
+-define(RESPONSE_CODE_PRECONDITION_FAILED, 17).
+-define(RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, 18).
--define(OFFSET_TYPE_FIRST, 0).
--define(OFFSET_TYPE_LAST, 1).
--define(OFFSET_TYPE_NEXT, 2).
--define(OFFSET_TYPE_OFFSET, 3).
--define(OFFSET_TYPE_TIMESTAMP, 4).
+-define(OFFSET_TYPE_FIRST, 1).
+-define(OFFSET_TYPE_LAST, 2).
+-define(OFFSET_TYPE_NEXT, 3).
+-define(OFFSET_TYPE_OFFSET, 4).
+-define(OFFSET_TYPE_TIMESTAMP, 5).
-define(DEFAULT_INITIAL_CREDITS, 50000).
-define(DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING, 12500).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 013d57bec0..4d0e3b93e9 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -296,7 +296,7 @@ listen_loop_pre_auth(Transport,
TuneFrame =
<<?REQUEST:1,
?COMMAND_TUNE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
FrameMax:32,
Heartbeat:32>>,
frame(Transport, Connection1, TuneFrame),
@@ -451,7 +451,7 @@ listen_loop_post_auth(Transport,
[<<FrameSize:32,
?REQUEST:1,
?COMMAND_METADATA_UPDATE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
Stream/binary>>]),
@@ -487,7 +487,7 @@ listen_loop_post_auth(Transport,
[<<FrameSize:32,
?REQUEST:1,
?COMMAND_PUBLISH_CONFIRM:15,
- ?VERSION_0:16>>,
+ ?VERSION_1:16>>,
<<CurrentPublisherId:8>>,
<<Count:32>>,
PublishingIds]),
@@ -507,7 +507,7 @@ listen_loop_post_auth(Transport,
[<<FrameSize:32,
?REQUEST:1,
?COMMAND_PUBLISH_CONFIRM:15,
- ?VERSION_0:16>>,
+ ?VERSION_1:16>>,
<<LastPublisherId:8>>,
<<LastCount:32>>,
LastPublishingIds]),
@@ -555,7 +555,7 @@ listen_loop_post_auth(Transport,
[<<FrameSize:32,
?REQUEST:1,
?COMMAND_PUBLISH_CONFIRM:15,
- ?VERSION_0:16>>,
+ ?VERSION_1:16>>,
<<PublisherId:8>>,
<<PublishingIdCount:32>>,
PubIds]),
@@ -642,7 +642,7 @@ listen_loop_post_auth(Transport,
State1,
Configuration);
heartbeat_send ->
- Frame = <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
+ Frame = <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>,
case catch frame(Transport, Connection, Frame) of
ok ->
listen_loop_post_auth(Transport,
@@ -796,7 +796,7 @@ handle_inbound_data(Transport,
CloseFrame =
<<?REQUEST:1,
?COMMAND_CLOSE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_FRAME_TOO_LARGE:16,
CloseReasonLength:16,
@@ -851,7 +851,7 @@ handle_frame_pre_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_PEER_PROPERTIES:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
ClientPropertiesCount:32,
ClientPropertiesFrame/binary>>,
@@ -902,7 +902,7 @@ handle_frame_pre_auth(Transport,
Frame =
<<?RESPONSE:1,
?COMMAND_PEER_PROPERTIES:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16,
ServerPropertiesCount:32,
@@ -919,7 +919,7 @@ handle_frame_pre_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_SASL_HANDSHAKE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32>>,
Rest) ->
Mechanisms = rabbit_stream_utils:auth_mechanisms(S),
@@ -933,7 +933,7 @@ handle_frame_pre_auth(Transport,
Frame =
<<?RESPONSE:1,
?COMMAND_SASL_HANDSHAKE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16,
MechanismsCount:32,
@@ -950,7 +950,7 @@ handle_frame_pre_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_SASL_AUTHENTICATE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
MechanismLength:16,
Mechanism:MechanismLength/binary,
@@ -1047,7 +1047,7 @@ handle_frame_pre_auth(Transport,
Frame =
<<?RESPONSE:1,
?COMMAND_SASL_AUTHENTICATE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
FrameFragment/binary>>,
frame(Transport, C1, Frame),
@@ -1056,7 +1056,7 @@ handle_frame_pre_auth(Transport,
Frame =
<<?RESPONSE:1,
?COMMAND_SASL_AUTHENTICATE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>,
frame(Transport, Connection0, Frame),
@@ -1072,7 +1072,7 @@ handle_frame_pre_auth(_Transport,
State,
<<?RESPONSE:1,
?COMMAND_TUNE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
FrameMax:32,
Heartbeat:32>>,
Rest) ->
@@ -1107,7 +1107,7 @@ handle_frame_pre_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_OPEN:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
VirtualHostLength:16,
VirtualHost:VirtualHostLength/binary>>,
@@ -1121,7 +1121,7 @@ handle_frame_pre_auth(Transport,
#{}),
F = <<?RESPONSE:1,
?COMMAND_OPEN:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16>>,
%% FIXME check if vhost is alive (see rabbit_reader:is_vhost_alive/2)
@@ -1132,7 +1132,7 @@ handle_frame_pre_auth(Transport,
exit:_ ->
Fr = <<?RESPONSE:1,
?COMMAND_OPEN:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
?RESPONSE_VHOST_ACCESS_FAILURE:16>>,
{Connection#stream_connection{connection_step = failure}, Fr}
@@ -1144,7 +1144,7 @@ handle_frame_pre_auth(Transport,
handle_frame_pre_auth(_Transport,
Connection,
State,
- <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
+ <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>,
Rest) ->
rabbit_log:info("Received heartbeat frame pre auth~n"),
{Connection, State, Rest};
@@ -1194,7 +1194,7 @@ handle_frame_post_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_DECLARE_PUBLISHER:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
PublisherId:8,
ReferenceSize:16,
@@ -1284,7 +1284,7 @@ handle_frame_post_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_PUBLISH:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
PublisherId:8/unsigned,
MessageCount:32,
Messages/binary>>,
@@ -1323,7 +1323,7 @@ handle_frame_post_auth(Transport,
[<<FrameSize:32,
?REQUEST:1,
?COMMAND_PUBLISH_ERROR:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
PublisherId:8,
MessageCount:32,
Details/binary>>]),
@@ -1340,7 +1340,7 @@ handle_frame_post_auth(Transport,
[<<FrameSize:32,
?REQUEST:1,
?COMMAND_PUBLISH_ERROR:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
PublisherId:8,
MessageCount:32,
Details/binary>>]),
@@ -1354,7 +1354,7 @@ handle_frame_post_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_QUERY_PUBLISHER_SEQUENCE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
ReferenceSize:16,
Reference:ReferenceSize/binary,
@@ -1392,7 +1392,7 @@ handle_frame_post_auth(Transport,
[<<FrameSize:32,
?RESPONSE:1,
?COMMAND_QUERY_PUBLISHER_SEQUENCE:15,
- ?VERSION_0:16>>,
+ ?VERSION_1:16>>,
<<CorrelationId:32>>,
<<ResponseCode:16>>,
<<Sequence:64>>]),
@@ -1404,7 +1404,7 @@ handle_frame_post_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_DELETE_PUBLISHER:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
PublisherId:8>>,
Rest) ->
@@ -1448,7 +1448,7 @@ handle_frame_post_auth(Transport,
#stream_connection_state{consumers = Consumers} = State,
<<?REQUEST:1,
?COMMAND_SUBSCRIBE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
SubscriptionId:8/unsigned,
StreamSize:16,
@@ -1605,7 +1605,7 @@ handle_frame_post_auth(Transport,
#stream_connection_state{consumers = Consumers} = State,
<<?REQUEST:1,
?COMMAND_CREDIT:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
SubscriptionId:8/unsigned,
Credit:16/signed>>,
Rest) ->
@@ -1631,7 +1631,7 @@ handle_frame_post_auth(Transport,
Frame =
<<?RESPONSE:1,
?COMMAND_CREDIT:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST:16,
SubscriptionId:8>>,
FrameSize = byte_size(Frame),
@@ -1645,7 +1645,7 @@ handle_frame_post_auth(_Transport,
State,
<<?REQUEST:1,
?COMMAND_COMMIT_OFFSET:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
_CorrelationId:32,
ReferenceSize:16,
Reference:ReferenceSize/binary,
@@ -1685,7 +1685,7 @@ handle_frame_post_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_QUERY_OFFSET:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
ReferenceSize:16,
Reference:ReferenceSize/binary,
@@ -1722,7 +1722,7 @@ handle_frame_post_auth(Transport,
[<<FrameSize:32,
?RESPONSE:1,
?COMMAND_QUERY_OFFSET:15,
- ?VERSION_0:16>>,
+ ?VERSION_1:16>>,
<<CorrelationId:32>>,
<<ResponseCode:16>>,
<<Offset:64>>]),
@@ -1734,7 +1734,7 @@ handle_frame_post_auth(Transport,
#stream_connection_state{consumers = Consumers} = State,
<<?REQUEST:1,
?COMMAND_UNSUBSCRIBE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
SubscriptionId:8/unsigned>>,
Rest) ->
@@ -1787,7 +1787,7 @@ handle_frame_post_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_CREATE_STREAM:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
StreamSize:16,
Stream:StreamSize/binary,
@@ -1871,7 +1871,7 @@ handle_frame_post_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_DELETE_STREAM:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
StreamSize:16,
Stream:StreamSize/binary>>,
@@ -1903,7 +1903,7 @@ handle_frame_post_auth(Transport,
[<<FrameSize:32,
?REQUEST:1,
?COMMAND_METADATA_UPDATE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
Stream/binary>>]),
@@ -1935,7 +1935,7 @@ handle_frame_post_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_METADATA:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
StreamCount:32,
BinaryStreams/binary>>,
@@ -2061,7 +2061,7 @@ handle_frame_post_auth(Transport,
Frame =
<<?RESPONSE:1,
?COMMAND_METADATA:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
BrokersBin/binary,
MetadataBin/binary>>,
@@ -2074,7 +2074,7 @@ handle_frame_post_auth(Transport,
Connection,
State,
<<?COMMAND_ROUTE:16,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
RoutingKeySize:16,
RoutingKey:RoutingKeySize/binary,
@@ -2096,7 +2096,7 @@ handle_frame_post_auth(Transport,
Frame =
<<?COMMAND_ROUTE:16,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
ResponseCode:16,
StreamBin/binary>>,
@@ -2109,7 +2109,7 @@ handle_frame_post_auth(Transport,
Connection,
State,
<<?COMMAND_PARTITIONS:16,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
SuperStreamSize:16,
SuperStream:SuperStreamSize/binary>>,
@@ -2133,7 +2133,7 @@ handle_frame_post_auth(Transport,
Frame =
<<?COMMAND_PARTITIONS:16,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
ResponseCode:16,
PartitionsBin/binary>>,
@@ -2145,7 +2145,7 @@ handle_frame_post_auth(Transport,
State,
<<?REQUEST:1,
?COMMAND_CLOSE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
ClosingCode:16,
ClosingReasonLength:16,
@@ -2156,7 +2156,7 @@ handle_frame_post_auth(Transport,
Frame =
<<?RESPONSE:1,
?COMMAND_CLOSE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16>>,
frame(Transport, Connection, Frame),
@@ -2165,7 +2165,7 @@ handle_frame_post_auth(Transport,
handle_frame_post_auth(_Transport,
Connection,
State,
- <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
+ <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>,
Rest) ->
rabbit_log:info("Received heartbeat frame post auth~n"),
{Connection, State, Rest};
@@ -2177,7 +2177,7 @@ handle_frame_post_auth(Transport, Connection, State, Frame, Rest) ->
CloseFrame =
<<?REQUEST:1,
?COMMAND_CLOSE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_UNKNOWN_FRAME:16,
CloseReasonLength:16,
@@ -2215,7 +2215,7 @@ handle_frame_post_close(_Transport,
State,
<<?RESPONSE:1,
?COMMAND_CLOSE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
_CorrelationId:32,
_ResponseCode:16>>,
Rest) ->
@@ -2225,7 +2225,7 @@ handle_frame_post_close(_Transport,
handle_frame_post_close(_Transport,
Connection,
State,
- <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
+ <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>,
Rest) ->
rabbit_log:info("Received heartbeat frame post close~n"),
{Connection, State, Rest};
@@ -2429,7 +2429,7 @@ response(Transport,
[<<?RESPONSE_FRAME_SIZE:32,
?RESPONSE:1,
CommandId:15,
- ?VERSION_0:16>>,
+ ?VERSION_1:16>>,
<<CorrelationId:32>>, <<ResponseCode:16>>]).
subscription_exists(StreamSubscriptions, SubscriptionId) ->
@@ -2450,7 +2450,7 @@ send_file_callback(Transport,
<<FrameSize:32,
?REQUEST:1,
?COMMAND_DELIVER:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
SubscriptionId:8/unsigned>>,
Transport:send(S, FrameBeginning),
atomics:add(Counter, 1, Size),
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index a3aa1c3ba6..176fcf71fc 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -185,7 +185,7 @@ test_peer_properties(S) ->
PeerPropertiesFrame =
<<?REQUEST:1,
?COMMAND_PEER_PROPERTIES:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
0:32>>,
PeerPropertiesFrameSize = byte_size(PeerPropertiesFrame),
@@ -195,7 +195,7 @@ test_peer_properties(S) ->
<<_Size:32,
?RESPONSE:1,
?COMMAND_PEER_PROPERTIES:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_OK:16,
_Rest/binary>>} =
@@ -203,7 +203,7 @@ test_peer_properties(S) ->
test_authenticate(S) ->
SaslHandshakeFrame =
- <<?REQUEST:1, ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_0:16, 1:32>>,
+ <<?REQUEST:1, ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_1:16, 1:32>>,
SaslHandshakeFrameSize = byte_size(SaslHandshakeFrame),
gen_tcp:send(S,
<<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>),
@@ -216,7 +216,7 @@ test_authenticate(S) ->
<<31:32,
?RESPONSE:1,
?COMMAND_SASL_HANDSHAKE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_OK:16,
2:32,
@@ -228,7 +228,7 @@ test_authenticate(S) ->
<<31:32,
?RESPONSE:1,
?COMMAND_SASL_HANDSHAKE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_OK:16,
2:32,
@@ -250,7 +250,7 @@ test_authenticate(S) ->
SaslAuthenticateFrame =
<<?REQUEST:1,
?COMMAND_SASL_AUTHENTICATE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
2:32,
5:16,
Plain/binary,
@@ -266,7 +266,7 @@ test_authenticate(S) ->
<<10:32,
?RESPONSE:1,
?COMMAND_SASL_AUTHENTICATE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
2:32,
?RESPONSE_CODE_OK:16,
RestTune/binary>>} =
@@ -276,7 +276,7 @@ test_authenticate(S) ->
<<12:32,
?REQUEST:1,
?COMMAND_TUNE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
?DEFAULT_FRAME_MAX:32,
?DEFAULT_HEARTBEAT:32>>,
case RestTune of
@@ -289,7 +289,7 @@ test_authenticate(S) ->
TuneFrame =
<<?RESPONSE:1,
?COMMAND_TUNE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
?DEFAULT_FRAME_MAX:32,
0:32>>,
TuneFrameSize = byte_size(TuneFrame),
@@ -300,7 +300,7 @@ test_authenticate(S) ->
OpenFrame =
<<?REQUEST:1,
?COMMAND_OPEN:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
3:32,
VirtualHostLength:16,
VirtualHost/binary>>,
@@ -310,7 +310,7 @@ test_authenticate(S) ->
<<10:32,
?RESPONSE:1,
?COMMAND_OPEN:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
3:32,
?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 0, 5000).
@@ -320,7 +320,7 @@ test_create_stream(S, Stream) ->
CreateStreamFrame =
<<?REQUEST:1,
?COMMAND_CREATE_STREAM:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
StreamSize:16,
Stream:StreamSize/binary,
@@ -331,7 +331,7 @@ test_create_stream(S, Stream) ->
<<_Size:32,
?RESPONSE:1,
?COMMAND_CREATE_STREAM:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 0, 5000).
@@ -341,7 +341,7 @@ test_delete_stream(S, Stream) ->
DeleteStreamFrame =
<<?REQUEST:1,
?COMMAND_DELETE_STREAM:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
StreamSize:16,
Stream:StreamSize/binary>>,
@@ -352,7 +352,7 @@ test_delete_stream(S, Stream) ->
<<ResponseFrameSize:32,
?RESPONSE:1,
?COMMAND_DELETE_STREAM:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 4 + 10, 5000).
@@ -362,7 +362,7 @@ test_declare_publisher(S, PublisherId, Stream) ->
DeclarePublisherFrame =
<<?REQUEST:1,
?COMMAND_DECLARE_PUBLISHER:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
PublisherId:8,
0:16, %% empty publisher reference
@@ -375,7 +375,7 @@ test_declare_publisher(S, PublisherId, Stream) ->
<<_Size:32,
?RESPONSE:1,
?COMMAND_DECLARE_PUBLISHER:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_OK:16,
Rest/binary>>} =
@@ -387,7 +387,7 @@ test_publish_confirm(S, PublisherId, Body) ->
PublishFrame =
<<?REQUEST:1,
?COMMAND_PUBLISH:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
PublisherId:8,
1:32,
1:64,
@@ -399,7 +399,7 @@ test_publish_confirm(S, PublisherId, Body) ->
<<_Size:32,
?REQUEST:1,
?COMMAND_PUBLISH_CONFIRM:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
PublisherId:8,
1:32,
1:64>>} =
@@ -410,7 +410,7 @@ test_subscribe(S, SubscriptionId, Stream) ->
SubscribeFrame =
<<?REQUEST:1,
?COMMAND_SUBSCRIBE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
SubscriptionId:8,
StreamSize:16,
@@ -425,7 +425,7 @@ test_subscribe(S, SubscriptionId, Stream) ->
<<_Size:32,
?RESPONSE:1,
?COMMAND_SUBSCRIBE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_OK:16,
Rest/binary>>} =
@@ -438,7 +438,7 @@ test_deliver(S, Rest, SubscriptionId, Body) ->
<<58:32,
?REQUEST:1,
?COMMAND_DELIVER:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
SubscriptionId:8,
5:4/unsigned,
0:4/unsigned,
@@ -463,7 +463,7 @@ test_metadata_update_stream_deleted(S, Stream) ->
<<FrameSize:32,
?REQUEST:1,
?COMMAND_METADATA_UPDATE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
Stream/binary>>} =
@@ -475,7 +475,7 @@ test_close(S) ->
CloseFrame =
<<?REQUEST:1,
?COMMAND_CLOSE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_OK:16,
CloseReasonSize:16,
@@ -486,7 +486,7 @@ test_close(S) ->
<<10:32,
?RESPONSE:1,
?COMMAND_CLOSE:15,
- ?VERSION_0:16,
+ ?VERSION_1:16,
1:32,
?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 0, 5000).
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
index 495440b35c..5d5b1b5b0c 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
@@ -41,7 +41,7 @@ public class TestUtils {
}
static int streamPortNode2() {
- String port = System.getProperty("node2.stream.port", "5556");
+ String port = System.getProperty("node2.stream.port", "5552");
return Integer.valueOf(port);
}