summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java17
5 files changed, 54 insertions, 63 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
index 0979c9c6b8..d7eb138523 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
@@ -23,6 +23,7 @@ package org.apache.qpid.example.transport;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.url.URLSyntaxException;
@@ -36,6 +37,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
+import java.util.UUID;
/**
* This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as
@@ -66,9 +68,14 @@ public class ExistingSocketConnectorDemo implements ConnectionListener
MessageProducer _producer;
Session _session;
+ String Socket1_ID = UUID.randomUUID().toString();
+ String Socket2_ID = UUID.randomUUID().toString();
+
+
/** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */
- public static final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket:///'";
+ public final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID + "'";
+
public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException
{
@@ -76,7 +83,10 @@ public class ExistingSocketConnectorDemo implements ConnectionListener
Socket socket = SocketChannel.open().socket();
socket.connect(new InetSocketAddress("localhost", 5672));
- _connection = new AMQConnection(CONNECTION, socket);
+ TransportConnection.registerOpenSocket(Socket1_ID, socket);
+
+
+ _connection = new AMQConnection(CONNECTION);
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -130,7 +140,7 @@ public class ExistingSocketConnectorDemo implements ConnectionListener
socket.connect(new InetSocketAddress("localhost", 5673));
// This is the new method to pass in an open socket for the connection to use.
- ((AMQConnection) _connection).setOpenSocket(socket);
+ TransportConnection.registerOpenSocket(Socket2_ID, socket);
}
catch (IOException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index 89ce4b2c72..572ea48f85 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -157,7 +157,10 @@ public class AMQBrokerDetails implements BrokerDetails
}
else
{
- setPort(port);
+ if (!_transport.equalsIgnoreCase(SOCKET))
+ {
+ setPort(port);
+ }
}
String queryString = connection.getQuery();
@@ -264,13 +267,16 @@ public class AMQBrokerDetails implements BrokerDetails
sb.append(_transport);
sb.append("://");
- if (!(_transport.equalsIgnoreCase("vm")))
+ if (!(_transport.equalsIgnoreCase(VM)))
{
sb.append(_host);
}
- sb.append(':');
- sb.append(_port);
+ if (!(_transport.equalsIgnoreCase(SOCKET)))
+ {
+ sb.append(':');
+ sb.append(_port);
+ }
sb.append(printOptionsURL());
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index acbe495550..c9928a084e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -30,8 +30,6 @@ import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.transport.ITransportConnection;
-import org.apache.qpid.client.transport.SocketTransportConnection;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -64,7 +62,6 @@ import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import java.io.IOException;
import java.net.ConnectException;
-import java.net.Socket;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.*;
@@ -160,8 +157,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private static final long DEFAULT_TIMEOUT = 1000 * 30;
private ProtocolVersion _protocolVersion;
- /** The active socket that is to be used as a value for connection */
- private Socket _openSocket;
/**
* @param broker brokerdetails
@@ -179,7 +174,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"), null, null);
+ + AMQBrokerDetails.checkTransport(broker) + "'"), null);
}
/**
@@ -198,7 +193,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig, null);
+ + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
}
public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
@@ -223,38 +218,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
+ "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
: (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig, null);
- }
-
- public AMQConnection(String connection, Socket socket) throws AMQException, URLSyntaxException
- {
- this(new AMQConnectionURL(connection), null, socket);
+ + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(connection), null, null);
+ this(new AMQConnectionURL(connection), null);
}
public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(connection), sslConfig, null);
+ this(new AMQConnectionURL(connection), sslConfig);
}
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
- this(connectionURL, sslConfig, null);
- }
-
- public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig, Socket socket) throws AMQException
- {
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
}
- _openSocket = socket;
-
_sslConfiguration = sslConfig;
if (connectionURL == null)
{
@@ -414,23 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
- ITransportConnection connection = TransportConnection.getInstance(brokerDetail);
-
- if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
- {
- if (_openSocket != null)
- {
- ((SocketTransportConnection) connection).setOpenSocket(_openSocket);
- }
- else
- {
- throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket' transport:" + brokerDetail);
- }
-
- }
-
- connection.connect(_protocolHandler, brokerDetail);
+ TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
@@ -1327,11 +1294,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _sessions.get(channelId);
}
- public void setOpenSocket(Socket socket)
- {
- _openSocket = socket;
- }
-
public ProtocolVersion getProtocolVersion()
{
return _protocolVersion;
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index e9d6242e77..b2f7ae8395 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class SocketTransportConnection implements ITransportConnection
{
@@ -46,8 +48,6 @@ public class SocketTransportConnection implements ITransportConnection
private SocketConnectorFactory _socketConnectorFactory;
- private Socket _openSocket;
-
static interface SocketConnectorFactory
{
IoConnector newSocketConnector();
@@ -58,11 +58,6 @@ public class SocketTransportConnection implements ITransportConnection
_socketConnectorFactory = socketConnectorFactory;
}
- public void setOpenSocket(Socket openSocket)
- {
- _openSocket = openSocket;
- }
-
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
@@ -99,15 +94,18 @@ public class SocketTransportConnection implements ITransportConnection
{
address = null;
- if (_openSocket != null)
+ Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
+
+ if (socket != null)
{
- _logger.info("Using existing Socket:" + _openSocket);
- ((ExistingSocketConnector) ioConnector).setOpenSocket(_openSocket);
+ _logger.info("Using existing Socket:" + socket);
+
+ ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
}
else
{
throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket' transport:" + brokerDetail);
+ "with 'socket://<SocketID>' transport:" + brokerDetail);
}
}
else
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 94361fccfc..0ea04e5bc3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.net.Socket;
+
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
@@ -61,6 +64,18 @@ public class TransportConnection
private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
+ private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
+
+ public static void registerOpenSocket(String socketID, Socket openSocket)
+ {
+ _openSocketRegister.put(socketID, openSocket);
+ }
+
+ public static Socket removeOpenSocket(String socketID)
+ {
+ return _openSocketRegister.remove(socketID);
+ }
+
public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -305,7 +320,7 @@ public class TransportConnection
synchronized (_inVmPipeAddress)
{
_inVmPipeAddress.clear();
- }
+ }
_acceptor = null;
_currentInstance = -1;
_currentVMPort = -1;