diff options
Diffstat (limited to 'java/client')
5 files changed, 54 insertions, 63 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java index 0979c9c6b8..d7eb138523 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java @@ -23,6 +23,7 @@ package org.apache.qpid.example.transport; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.url.URLSyntaxException; @@ -36,6 +37,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; +import java.util.UUID; /** * This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as @@ -66,9 +68,14 @@ public class ExistingSocketConnectorDemo implements ConnectionListener MessageProducer _producer; Session _session; + String Socket1_ID = UUID.randomUUID().toString(); + String Socket2_ID = UUID.randomUUID().toString(); + + /** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */ - public static final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket:///'"; + public final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID + "'"; + public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException { @@ -76,7 +83,10 @@ public class ExistingSocketConnectorDemo implements ConnectionListener Socket socket = SocketChannel.open().socket(); socket.connect(new InetSocketAddress("localhost", 5672)); - _connection = new AMQConnection(CONNECTION, socket); + TransportConnection.registerOpenSocket(Socket1_ID, socket); + + + _connection = new AMQConnection(CONNECTION); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -130,7 +140,7 @@ public class ExistingSocketConnectorDemo implements ConnectionListener socket.connect(new InetSocketAddress("localhost", 5673)); // This is the new method to pass in an open socket for the connection to use. - ((AMQConnection) _connection).setOpenSocket(socket); + TransportConnection.registerOpenSocket(Socket2_ID, socket); } catch (IOException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 89ce4b2c72..572ea48f85 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -157,7 +157,10 @@ public class AMQBrokerDetails implements BrokerDetails } else { - setPort(port); + if (!_transport.equalsIgnoreCase(SOCKET)) + { + setPort(port); + } } String queryString = connection.getQuery(); @@ -264,13 +267,16 @@ public class AMQBrokerDetails implements BrokerDetails sb.append(_transport); sb.append("://"); - if (!(_transport.equalsIgnoreCase("vm"))) + if (!(_transport.equalsIgnoreCase(VM))) { sb.append(_host); } - sb.append(':'); - sb.append(_port); + if (!(_transport.equalsIgnoreCase(SOCKET))) + { + sb.append(':'); + sb.append(_port); + } sb.append(printOptionsURL()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index acbe495550..c9928a084e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -30,8 +30,6 @@ import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.transport.ITransportConnection; -import org.apache.qpid.client.transport.SocketTransportConnection; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -64,7 +62,6 @@ import javax.naming.Referenceable; import javax.naming.StringRefAddr; import java.io.IOException; import java.net.ConnectException; -import java.net.Socket; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.*; @@ -160,8 +157,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final long DEFAULT_TIMEOUT = 1000 * 30; private ProtocolVersion _protocolVersion; - /** The active socket that is to be used as a value for connection */ - private Socket _openSocket; /** * @param broker brokerdetails @@ -179,7 +174,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" - + AMQBrokerDetails.checkTransport(broker) + "'"), null, null); + + AMQBrokerDetails.checkTransport(broker) + "'"), null); } /** @@ -198,7 +193,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" - + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig, null); + + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); } public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) @@ -223,38 +218,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'") : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port - + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig, null); - } - - public AMQConnection(String connection, Socket socket) throws AMQException, URLSyntaxException - { - this(new AMQConnectionURL(connection), null, socket); + + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig); } public AMQConnection(String connection) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(connection), null, null); + this(new AMQConnectionURL(connection), null); } public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(connection), sslConfig, null); + this(new AMQConnectionURL(connection), sslConfig); } public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { - this(connectionURL, sslConfig, null); - } - - public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig, Socket socket) throws AMQException - { if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); } - _openSocket = socket; - _sslConfiguration = sslConfig; if (connectionURL == null) { @@ -414,23 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { - ITransportConnection connection = TransportConnection.getInstance(brokerDetail); - - if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) - { - if (_openSocket != null) - { - ((SocketTransportConnection) connection).setOpenSocket(_openSocket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket' transport:" + brokerDetail); - } - - } - - connection.connect(_protocolHandler, brokerDetail); + TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); // this blocks until the connection has been set up or when an error // has prevented the connection being set up @@ -1327,11 +1294,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _sessions.get(channelId); } - public void setOpenSocket(Socket socket) - { - _openSocket = socket; - } - public ProtocolVersion getProtocolVersion() { return _protocolVersion; diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index e9d6242e77..b2f7ae8395 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class SocketTransportConnection implements ITransportConnection { @@ -46,8 +48,6 @@ public class SocketTransportConnection implements ITransportConnection private SocketConnectorFactory _socketConnectorFactory; - private Socket _openSocket; - static interface SocketConnectorFactory { IoConnector newSocketConnector(); @@ -58,11 +58,6 @@ public class SocketTransportConnection implements ITransportConnection _socketConnectorFactory = socketConnectorFactory; } - public void setOpenSocket(Socket openSocket) - { - _openSocket = openSocket; - } - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); @@ -99,15 +94,18 @@ public class SocketTransportConnection implements ITransportConnection { address = null; - if (_openSocket != null) + Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost()); + + if (socket != null) { - _logger.info("Using existing Socket:" + _openSocket); - ((ExistingSocketConnector) ioConnector).setOpenSocket(_openSocket); + _logger.info("Using existing Socket:" + socket); + + ((ExistingSocketConnector) ioConnector).setOpenSocket(socket); } else { throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket' transport:" + brokerDetail); + "with 'socket://<SocketID>' transport:" + brokerDetail); } } else diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 94361fccfc..0ea04e5bc3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.net.Socket; + /** * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying @@ -61,6 +64,18 @@ public class TransportConnection private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; + private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>(); + + public static void registerOpenSocket(String socketID, Socket openSocket) + { + _openSocketRegister.put(socketID, openSocket); + } + + public static Socket removeOpenSocket(String socketID) + { + return _openSocketRegister.remove(socketID); + } + public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); @@ -305,7 +320,7 @@ public class TransportConnection synchronized (_inVmPipeAddress) { _inVmPipeAddress.clear(); - } + } _acceptor = null; _currentInstance = -1; _currentVMPort = -1; |
