diff options
Diffstat (limited to 'qpid/java/client/src')
6 files changed, 84 insertions, 26 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 5b6a986997..fd21b376ac 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -173,8 +173,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Indicates the sync publish options (persistent|all) //By default it's async publish private String _syncPublish = ""; - - // Indicates whether to use the old map message format or the + + // Indicates whether to use the old map message format or the // new amqp-0-10 encoded format. private boolean _useLegacyMapMessageFormat; @@ -261,7 +261,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { throw new IllegalArgumentException("Connection must be specified"); } - + // set this connection maxPrefetch if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) { @@ -311,7 +311,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // use the default value set for all connections _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish); } - + if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null) { _useLegacyMapMessageFormat = Boolean.parseBoolean( @@ -322,16 +322,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // use the default value set for all connections _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT); } - + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); _logger.debug("AMQP version " + amqpVersion); - + _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); - if ("0-8".equals(amqpVersion)) + if ("0-8".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_8_0(this); - } + } else if ("0-9".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_0_9(this); @@ -418,6 +418,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect brokerDetails = _failoverPolicy.getNextBrokerDetails(); } } + verifyClientID(); if (_logger.isDebugEnabled()) { @@ -504,7 +505,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); - //Update our session to use this new protocol version + //Update our session to use this new protocol version _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion()); } @@ -1074,7 +1075,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _username = id; } - + public String getPassword() { return _password; @@ -1250,7 +1251,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { je.setLinkedException((Exception) cause); } - + je.initCause(cause); } @@ -1283,7 +1284,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Not a hard-error connection not closing: " + cause); } - + // deliver the exception if there is a listener if (_exceptionListener != null) { @@ -1293,7 +1294,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.error("Throwable Received but no listener set: " + cause); } - + // if we are closing the connection, close sessions first if (closer) { @@ -1351,17 +1352,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Returns connection url. + * Returns connection url. * @return connection url */ public ConnectionURL getConnectionURL() { return _connectionURL; } - + /** * Returns stringified connection url. This url is suitable only for display - * as {@link AMQConnectionURL#toString()} converts any password to asterisks. + * as {@link AMQConnectionURL#toString()} converts any password to asterisks. * @return connection url */ public String toURL() @@ -1477,9 +1478,24 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _sessions.getNextChannelId(); } - + public boolean isUseLegacyMapMessageFormat() { return _useLegacyMapMessageFormat; } + + private void verifyClientID() throws AMQException + { + if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID)) + { + try + { + _delegate.verifyClientID(); + } + catch(JMSException e) + { + throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique",e); + } + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 9560bd5c7c..5acdaaa185 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -57,10 +57,12 @@ public interface AMQConnectionDelegate void closeConnection(long timeout) throws JMSException, AMQException; <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; - + int getMaxChannelID(); int getMinChannelID(); ProtocolVersion getProtocolVersion(); + + void verifyClientID() throws JMSException; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index d50c9e16fe..e1643f095d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -47,6 +47,7 @@ import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.ProtocolVersionException; +import org.apache.qpid.transport.SessionDetachCode; import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +87,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec /** * create a Session and start it if required. */ + public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow) + throws JMSException + { + return createSession(transacted,acknowledgeMode,prefetchHigh,prefetchLow,null); + } + + public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow, String name) throws JMSException { _conn.checkNotClosed(); @@ -101,7 +109,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec try { session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh, - prefetchLow); + prefetchLow,name); _conn.registerSession(channelId, session); if (_conn._started) { @@ -449,12 +457,31 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec else { heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); - } + } return heartbeat; } - + protected org.apache.qpid.transport.Connection getQpidConnection() { return _qpidConnection; } + + public void verifyClientID() throws JMSException + { + int prefetch = (int)_conn.getMaxPrefetch(); + AMQSession_0_10 ssn = (AMQSession_0_10)createSession(false, 1,prefetch,prefetch,_conn.getClientID()); + org.apache.qpid.transport.Session ssn_0_10 = ssn.getQpidSession(); + try + { + ssn_0_10.awaitOpen(); + } + catch(Exception e) + { + if (ssn_0_10.getDetachCode() != null && + ssn_0_10.getDetachCode() == SessionDetachCode.SESSION_BUSY) + { + throw new JMSException("ClientID must be unique"); + } + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index a837975304..22d1936463 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -332,4 +332,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return ProtocolVersion.v8_0; } + + public void verifyClientID() throws JMSException + { + // NOOP + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 1ea92c67f7..75d96d67af 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -159,13 +159,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, - int defaultPrefetchHighMark, int defaultPrefetchLowMark) + int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name) { super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark); _qpidConnection = qpidConnection; - _qpidSession = _qpidConnection.createSession(1); + if (name == null) + { + _qpidSession = _qpidConnection.createSession(1); + } + else + { + _qpidSession = _qpidConnection.createSession(name,1); + } _qpidSession.setSessionListener(this); if (_transacted) { @@ -192,11 +199,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * @param qpidConnection The connection */ AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, - boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) + boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow, + String name) { this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), - defaultPrefetchHigh, defaultPrefetchLow); + defaultPrefetchHigh, defaultPrefetchLow,name); } private void addUnacked(int id) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index 354b67cd35..6b9121811d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -52,7 +52,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic { super(qpidConnection, con, channelId, false, // this is not a transacted session Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted - MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); + MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null); createSession(); _xaResource = new XAResourceImpl(this); } |
