diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-09-30 14:16:35 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-09-30 14:16:35 +0000 |
| commit | 5285ab11ce4210d4a38a27833ba1aea028f483fa (patch) | |
| tree | 788dc05fb7229e9e3e968f717a1ac5360f31ac43 /qpid/java | |
| parent | 81445eee4f4a51d6472fd43d6d0fb0722fbeeca6 (diff) | |
| download | qpid-python-5285ab11ce4210d4a38a27833ba1aea028f483fa.tar.gz | |
QPID-5195 : Client does not properly support channel-max
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1527584 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
7 files changed, 113 insertions, 37 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index a97f4dc0bf..f46840e9ae 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -48,6 +48,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private String _topicPrefix; private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true")); private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false")); + private int _maxSessions = Integer.getInteger("qpid.max_sessions", 0); public ConnectionFactoryImpl(final String host, @@ -85,6 +86,18 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection final String remoteHost, final boolean ssl) { + this(host, port, username, password, clientId, remoteHost, ssl,0); + } + + public ConnectionFactoryImpl(final String host, + final int port, + final String username, + final String password, + final String clientId, + final String remoteHost, + final boolean ssl, + final int maxSessions) + { _host = host; _port = port; _username = username; @@ -92,6 +105,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection _clientId = clientId; _remoteHost = remoteHost; _ssl = ssl; + _maxSessions = maxSessions; } public ConnectionImpl createConnection() throws JMSException @@ -101,7 +115,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection public ConnectionImpl createConnection(final String username, final String password) throws JMSException { - ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); + ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl, _maxSessions); connection.setQueuePrefix(_queuePrefix); connection.setTopicPrefix(_topicPrefix); connection.setUseBinaryMessageId(_useBinaryMessageId); @@ -158,6 +172,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection boolean binaryMessageId = true; boolean syncPublish = false; + int maxSessions = 0; if(userInfo != null) { @@ -194,6 +209,10 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection { syncPublish = Boolean.parseBoolean(keyValuePair[1]); } + else if(keyValuePair[0].equalsIgnoreCase("max-sessions")) + { + maxSessions = Integer.parseInt(keyValuePair[1]); + } } } @@ -203,7 +222,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection } ConnectionFactoryImpl connectionFactory = - new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl); + new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl, maxSessions); connectionFactory.setUseBinaryMessageId(binaryMessageId); connectionFactory.setSyncPublish(syncPublish); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 8823f0f148..80875fcfa7 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -60,6 +60,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private String _topicPrefix; private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true")); private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false")); + private int _maxSessions; private static enum State { @@ -83,6 +84,11 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException { + this(host, port, username, password, clientId, remoteHost, ssl,0); + } + + public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException + { _host = host; _port = port; _username = username; @@ -90,6 +96,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect _clientId = clientId; _remoteHost = remoteHost; _ssl = ssl; + _maxSessions = maxSessions; } private void connect() throws JMSException @@ -106,7 +113,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect try { _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host, - _port, _username, _password, container, _remoteHost, _ssl); + _port, _username, _password, container, _remoteHost, _ssl, + _maxSessions - 1); _conn.setConnectionErrorTask(new ConnectionErrorTask()); // TODO - retrieve negotiated AMQP version _connectionMetaData = new ConnectionMetaDataImpl(1,0,0); diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java index 33af4729d9..4421c44a61 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java @@ -502,9 +502,11 @@ public abstract class Util { Container container = getContainerName() == null ? new Container() : new Container(getContainerName()); return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container, - _remoteHost == null ? getHost() : _remoteHost, _useSSL) + _remoteHost == null ? getHost() : _remoteHost, _useSSL, + 0) : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize, - container, _remoteHost == null ? getHost() : _remoteHost, _useSSL); + container, _remoteHost == null ? getHost() : _remoteHost, _useSSL, + 0); } public String getContainerName() diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index 350c6e690c..0d634c0f1a 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -108,7 +108,7 @@ public class Connection implements SocketExceptionHandler final Container container, final String remoteHostname) throws ConnectionException { - this(address,port,username,password,maxFrameSize,container,remoteHostname,false); + this(address,port,username,password,maxFrameSize,container,remoteHostname,false,-1); } public Connection(final String address, @@ -118,7 +118,7 @@ public class Connection implements SocketExceptionHandler final Container container, final boolean ssl) throws ConnectionException { - this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl); + this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl,-1); } public Connection(final String address, @@ -128,28 +128,32 @@ public class Connection implements SocketExceptionHandler final String remoteHost, final boolean ssl) throws ConnectionException { - this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl); + this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl,-1); } public Connection(final String address, - final int port, - final String username, - final String password, - final Container container, - final String remoteHost, - final boolean ssl) throws ConnectionException + final int port, + final String username, + final String password, + final Container container, + final String remoteHost, + final boolean ssl, + final int channelMax) throws ConnectionException { - this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl); + this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl, + channelMax); } public Connection(final String address, - final int port, - final String username, - final String password, - final int maxFrameSize, - final Container container, - final String remoteHostname, boolean ssl) throws ConnectionException + final int port, + final String username, + final String password, + final int maxFrameSize, + final Container container, + final String remoteHostname, + boolean ssl, + int channelMax) throws ConnectionException { _address = address; @@ -176,6 +180,10 @@ public class Connection implements SocketExceptionHandler } }; _conn = new ConnectionEndpoint(container, principal, password); + if(channelMax >= 0) + { + _conn.setChannelMax((short)channelMax); + } _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); _conn.setRemoteAddress(s.getRemoteSocketAddress()); _conn.setRemoteHostname(remoteHostname); diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java index 4271ed3e76..569c1f129d 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java @@ -22,6 +22,11 @@ package org.apache.qpid.amqp_1_0.client; public class ConnectionException extends Exception { + protected ConnectionException(final String message) + { + super(message); + } + public ConnectionException(Throwable cause) { super(cause); diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java index 626ea0f34d..21af6950d8 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java @@ -51,10 +51,14 @@ public class Session private TransactionController _sessionLocalTC; private Connection _connection; - public Session(final Connection connection, String name) + public Session(final Connection connection, String name) throws SessionCreationException { _connection = connection; _endpoint = connection.getEndpoint().createSession(name); + if(_endpoint == null) + { + throw new SessionCreationException("Cannot create session as all channels are in use"); + } _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry()); _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry()); } @@ -385,4 +389,14 @@ public class Session { public void configureSource(final Source source); } + + private class SessionCreationException extends ConnectionException + { + + private SessionCreationException(final String message) + { + super(message); + } + + } } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index b5520c24e5..cac4e88178 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -68,22 +68,22 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private final Container _container; private Principal _user; - private static final short DEFAULT_CHANNEL_MAX = 255; + private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue(); private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15); private ConnectionState _state = ConnectionState.UNOPENED; - private short _channelMax; + private short _channelMax = DEFAULT_CHANNEL_MAX; private int _maxFrameSize = 4096; private String _remoteContainerId; private SocketAddress _remoteAddress; // positioned by the *outgoing* channel - private SessionEndpoint[] _sendingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1]; + private SessionEndpoint[] _sendingSessions; // positioned by the *incoming* channel - private SessionEndpoint[] _receivingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1]; + private SessionEndpoint[] _receivingSessions; private boolean _closedForInput; private boolean _closedForOutput; @@ -165,7 +165,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour } if (_state == ConnectionState.UNOPENED) { - sendOpen(DEFAULT_CHANNEL_MAX, DEFAULT_MAX_FRAME); + sendOpen(_channelMax, DEFAULT_MAX_FRAME); _state = ConnectionState.AWAITING_OPEN; } } @@ -183,10 +183,10 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour public synchronized SessionEndpoint createSession(String name) { // todo assert connection state - SessionEndpoint endpoint = new SessionEndpoint(this); short channel = getFirstFreeChannel(); if (channel != -1) { + SessionEndpoint endpoint = new SessionEndpoint(this); _sendingSessions[channel] = endpoint; endpoint.setSendingChannel(channel); Begin begin = new Begin(); @@ -196,13 +196,14 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour begin.setHandleMax(_handleMax); send(channel, begin); + return endpoint; } else { - // todo error + // TODO - report error + return null; } - return endpoint; } @@ -235,7 +236,16 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { Open open = new Open(); - open.setChannelMax(UnsignedShort.valueOf(DEFAULT_CHANNEL_MAX)); + if(_receivingSessions == null) + { + _receivingSessions = new SessionEndpoint[channelMax+1]; + _sendingSessions = new SessionEndpoint[channelMax+1]; + } + if(channelMax < _channelMax) + { + _channelMax = channelMax; + } + open.setChannelMax(UnsignedShort.valueOf(channelMax)); open.setContainerId(_container.getId()); open.setMaxFrameSize(getDesiredMaxFrameSize()); open.setHostname(getRemoteHostname()); @@ -268,7 +278,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour short getFirstFreeChannel() { - for (int i = 0; i < _sendingSessions.length; i++) + for (int i = 0; i <= _channelMax; i++) { if (_sendingSessions[i] == null) { @@ -288,10 +298,16 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour public synchronized void receiveOpen(short channel, Open open) { - _channelMax = open.getChannelMax() == null ? DEFAULT_CHANNEL_MAX - : open.getChannelMax().shortValue() < DEFAULT_CHANNEL_MAX - ? DEFAULT_CHANNEL_MAX - : open.getChannelMax().shortValue(); + _channelMax = open.getChannelMax() == null ? _channelMax + : open.getChannelMax().shortValue() < _channelMax + ? open.getChannelMax().shortValue() + : _channelMax; + + if(_receivingSessions == null) + { + _receivingSessions = new SessionEndpoint[_channelMax+1]; + _sendingSessions = new SessionEndpoint[_channelMax+1]; + } UnsignedInteger remoteDesiredMaxFrameSize = open.getMaxFrameSize() == null ? UnsignedInteger.valueOf(DEFAULT_MAX_FRAME) : open.getMaxFrameSize(); @@ -618,7 +634,6 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour } } - public void invalidHeaderReceived() { // TODO @@ -998,4 +1013,9 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { return _remoteError; } + + public void setChannelMax(final short channelMax) + { + _channelMax = channelMax; + } } |
