summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java478
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java56
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java1
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java17
7 files changed, 596 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
new file mode 100644
index 0000000000..ab3bc28d83
--- /dev/null
+++ b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class ExistingSocketConnector extends BaseIoConnector
+{
+ /** @noinspection StaticNonFinalField */
+ private static volatile int nextId = 0;
+
+ private final Object lock = new Object();
+ private final int id = nextId++;
+ private final String threadName = "SocketConnector-" + id;
+ private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
+ private final Queue connectQueue = new Queue();
+ private final SocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+ private final Executor executor;
+
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+ private int workerTimeout = 60; // 1 min.
+ private Socket _openSocket = null;
+
+ /** Create a connector with a single processing thread using a NewThreadExecutor */
+ public ExistingSocketConnector()
+ {
+ this(1, new NewThreadExecutor());
+ }
+
+ /**
+ * Create a connector with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public ExistingSocketConnector(int processorCount, Executor executor)
+ {
+ if (processorCount < 1)
+ {
+ throw new IllegalArgumentException("Must have at least one processor");
+ }
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new SocketIoProcessor[processorCount];
+
+ for (int i = 0; i < processorCount; i++)
+ {
+ ioProcessors[i] = new SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
+ }
+ }
+
+ /**
+ * How many seconds to keep the connection thread alive between connection requests
+ *
+ * @return Number of seconds to keep connection thread alive
+ */
+ public int getWorkerTimeout()
+ {
+ return workerTimeout;
+ }
+
+ /**
+ * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
+ *
+ * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
+ */
+ public void setWorkerTimeout(int workerTimeout)
+ {
+ if (workerTimeout < 0)
+ {
+ throw new IllegalArgumentException("Must be >= 0");
+ }
+ this.workerTimeout = workerTimeout;
+ }
+
+ public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
+ {
+ return connect(address, null, handler, config);
+ }
+
+ public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
+ IoHandler handler, IoServiceConfig config)
+ {
+ /** Changes here from the Mina OpenSocketConnector.
+ * Ignoreing all address as they are not needed */
+
+ if (handler == null)
+ {
+ throw new NullPointerException("handler");
+ }
+
+
+ if (config == null)
+ {
+ config = getDefaultConfig();
+ }
+
+ if (_openSocket == null)
+ {
+ throw new IllegalArgumentException("Specifed Socket not active");
+ }
+
+ boolean success = false;
+
+ try
+ {
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ newSession(_openSocket, handler, config, future);
+ success = true;
+ return future;
+ }
+ catch (IOException e)
+ {
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ finally
+ {
+ if (!success && _openSocket != null)
+ {
+ try
+ {
+ _openSocket.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ public IoServiceConfig getDefaultConfig()
+ {
+ return defaultConfig;
+ }
+
+ /**
+ * Sets the config this connector will use by default.
+ *
+ * @param defaultConfig the default config.
+ *
+ * @throws NullPointerException if the specified value is <code>null</code>.
+ */
+ public void setDefaultConfig(SocketConnectorConfig defaultConfig)
+ {
+ if (defaultConfig == null)
+ {
+ throw new NullPointerException("defaultConfig");
+ }
+ this.defaultConfig = defaultConfig;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if (worker == null)
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ executor.execute(new NamePreservingRunnable(worker));
+ }
+ }
+
+ private void registerNew()
+ {
+ if (connectQueue.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ ConnectionRequest req;
+ synchronized (connectQueue)
+ {
+ req = (ConnectionRequest) connectQueue.pop();
+ }
+
+ if (req == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = req.channel;
+ try
+ {
+ ch.register(selector, SelectionKey.OP_CONNECT, req);
+ }
+ catch (IOException e)
+ {
+ req.setException(e);
+ }
+ }
+ }
+
+ private void processSessions(Set keys)
+ {
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isConnectable())
+ {
+ continue;
+ }
+
+ SocketChannel ch = (SocketChannel) key.channel();
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ boolean success = false;
+ try
+ {
+ ch.finishConnect();
+ newSession(ch, entry.handler, entry.config, entry);
+ success = true;
+ }
+ catch (Throwable e)
+ {
+ entry.setException(e);
+ }
+ finally
+ {
+ key.cancel();
+ if (!success)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ keys.clear();
+ }
+
+ private void processTimedOutSessions(Set keys)
+ {
+ long currentTime = System.currentTimeMillis();
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isValid())
+ {
+ continue;
+ }
+
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ if (currentTime >= entry.deadline)
+ {
+ entry.setException(new ConnectException());
+ try
+ {
+ key.channel().close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ key.cancel();
+ }
+ }
+ }
+ }
+
+ private void newSession(Socket socket, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+ {
+ SocketSessionImpl session = new SocketSessionImpl(this,
+ nextProcessor(),
+ getListeners(),
+ config,
+ socket.getChannel(),
+ handler,
+ socket.getRemoteSocketAddress());
+
+ newSession(session, config, connectFuture);
+ }
+
+ private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+
+ {
+ SocketSessionImpl session = new SocketSessionImpl(this,
+ nextProcessor(),
+ getListeners(),
+ config,
+ ch,
+ handler,
+ ch.socket().getRemoteSocketAddress());
+
+ newSession(session, config, connectFuture);
+ }
+
+ private void newSession(SocketSessionImpl session, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+ {
+ try
+ {
+ getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getThreadModel().buildFilterChain(session.getFilterChain());
+ }
+ catch (Throwable e)
+ {
+ throw (IOException) new IOException("Failed to create a session.").initCause(e);
+ }
+ session.getIoProcessor().addNew(session);
+ connectFuture.setSession(session);
+ }
+
+ private SocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+ public void setOpenSocket(Socket openSocket)
+ {
+ _openSocket = openSocket;
+ }
+
+ private class Worker implements Runnable
+ {
+ private long lastActive = System.currentTimeMillis();
+
+ public void run()
+ {
+ Thread.currentThread().setName(ExistingSocketConnector.this.threadName);
+
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = selector.select(1000);
+
+ registerNew();
+
+ if (nKeys > 0)
+ {
+ processSessions(selector.selectedKeys());
+ }
+
+ processTimedOutSessions(selector.keys());
+
+ if (selector.keys().isEmpty())
+ {
+ if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
+ {
+ synchronized (lock)
+ {
+ if (selector.keys().isEmpty() &&
+ connectQueue.isEmpty())
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ else
+ {
+ lastActive = System.currentTimeMillis();
+ }
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ }
+ }
+
+ private class ConnectionRequest extends DefaultConnectFuture
+ {
+ private final SocketChannel channel;
+ private final long deadline;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+
+ private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
+ {
+ this.channel = channel;
+ long timeout;
+ if (config instanceof IoConnectorConfig)
+ {
+ timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
+ }
+ else
+ {
+ timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
+ }
+ this.deadline = System.currentTimeMillis() + timeout;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index c04380ba8c..89ce4b2c72 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -58,7 +58,8 @@ public class AMQBrokerDetails implements BrokerDetails
{
//todo this list of valid transports should be enumerated somewhere
if ((!(transport.equalsIgnoreCase("vm") ||
- transport.equalsIgnoreCase("tcp"))))
+ transport.equalsIgnoreCase("tcp") ||
+ transport.equalsIgnoreCase("socket"))))
{
if (transport.equalsIgnoreCase("localhost"))
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 39b3b80e74..acbe495550 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -30,6 +30,8 @@ import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.transport.ITransportConnection;
+import org.apache.qpid.client.transport.SocketTransportConnection;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -62,6 +64,7 @@ import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import java.io.IOException;
import java.net.ConnectException;
+import java.net.Socket;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.*;
@@ -157,6 +160,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private static final long DEFAULT_TIMEOUT = 1000 * 30;
private ProtocolVersion _protocolVersion;
+ /** The active socket that is to be used as a value for connection */
+ private Socket _openSocket;
+
/**
* @param broker brokerdetails
* @param username username
@@ -173,7 +179,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"), null);
+ + AMQBrokerDetails.checkTransport(broker) + "'"), null, null);
}
/**
@@ -192,7 +198,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
+ + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig, null);
}
public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
@@ -217,26 +223,38 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
+ "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
: (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
+ + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig, null);
+ }
+
+ public AMQConnection(String connection, Socket socket) throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(connection), null, socket);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(connection), null);
+ this(new AMQConnectionURL(connection), null, null);
}
public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(connection), sslConfig);
+ this(new AMQConnectionURL(connection), sslConfig, null);
}
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
+ this(connectionURL, sslConfig, null);
+ }
+
+ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig, Socket socket) throws AMQException
+ {
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
}
+ _openSocket = socket;
+
_sslConfiguration = sslConfig;
if (connectionURL == null)
{
@@ -395,9 +413,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
try
{
- TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
- // this blocks until the connection has been set up or when an error
- // has prevented the connection being set up
+
+ ITransportConnection connection = TransportConnection.getInstance(brokerDetail);
+
+ if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
+ {
+ if (_openSocket != null)
+ {
+ ((SocketTransportConnection) connection).setOpenSocket(_openSocket);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Active Socket must be provided for broker " +
+ "with 'socket' transport:" + brokerDetail);
+ }
+
+ }
+
+ connection.connect(_protocolHandler, brokerDetail);
+ // this blocks until the connection has been set up or when an error
+ // has prevented the connection being set up
//_protocolHandler.attainState(AMQState.CONNECTION_OPEN);
AMQState state = _protocolHandler.attainState(openOrClosedStates);
@@ -1292,6 +1327,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _sessions.get(channelId);
}
+ public void setOpenSocket(Socket socket)
+ {
+ _openSocket = socket;
+ }
+
public ProtocolVersion getProtocolVersion()
{
return _protocolVersion;
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index 5482e48699..e9d6242e77 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
@@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.Socket;
public class SocketTransportConnection implements ITransportConnection
{
@@ -44,6 +46,8 @@ public class SocketTransportConnection implements ITransportConnection
private SocketConnectorFactory _socketConnectorFactory;
+ private Socket _openSocket;
+
static interface SocketConnectorFactory
{
IoConnector newSocketConnector();
@@ -54,6 +58,11 @@ public class SocketTransportConnection implements ITransportConnection
_socketConnectorFactory = socketConnectorFactory;
}
+ public void setOpenSocket(Socket openSocket)
+ {
+ _openSocket = openSocket;
+ }
+
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
@@ -83,8 +92,31 @@ public class SocketTransportConnection implements ITransportConnection
_logger.info("send-buffer-size = " + scfg.getSendBufferSize());
scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
_logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
- final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
- _logger.info("Attempting connection to " + address);
+
+ final InetSocketAddress address;
+
+ if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
+ {
+ address = null;
+
+ if (_openSocket != null)
+ {
+ _logger.info("Using existing Socket:" + _openSocket);
+ ((ExistingSocketConnector) ioConnector).setOpenSocket(_openSocket);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Active Socket must be provided for broker " +
+ "with 'socket' transport:" + brokerDetail);
+ }
+ }
+ else
+ {
+ address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
+ _logger.info("Attempting connection to " + address);
+ }
+
+
ConnectFuture future = ioConnector.connect(address, protocolHandler);
// wait for connection to complete
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index c1116ca01e..94361fccfc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.transport;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
@@ -54,6 +55,7 @@ public class TransportConnection
private static final int TCP = 0;
private static final int VM = 1;
+ private static final int SOCKET = 2;
private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
@@ -87,7 +89,15 @@ public class TransportConnection
switch (transport)
{
-
+ case SOCKET:
+ _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ return new ExistingSocketConnector();
+ }
+ });
+ break;
case TCP:
_instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
{
@@ -127,6 +137,11 @@ public class TransportConnection
private static int getTransport(String transport)
{
+ if (transport.equals(BrokerDetails.SOCKET))
+ {
+ return SOCKET;
+ }
+
if (transport.equals(BrokerDetails.TCP))
{
return TCP;
diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index 603b0834a3..8b353a7264 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -34,6 +34,7 @@ public interface BrokerDetails
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
public static final int DEFAULT_PORT = 5672;
+ public static final String SOCKET = "socket";
public static final String TCP = "tcp";
public static final String VM = "vm";
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index 6c872a0e10..978ce34d59 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -510,6 +510,23 @@ public class ConnectionURLTest extends TestCase
}
+ public void testSocketProtocol() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@id/test" + "?brokerlist='socket:///'";
+
+ try
+ {
+ AMQConnectionURL curl = new AMQConnectionURL(url);
+ assertNotNull(curl);
+ assertEquals(1, curl.getBrokerCount());
+ assertNotNull(curl.getBrokerDetails(0));
+ assertEquals("socket", curl.getBrokerDetails(0).getTransport());
+ }
+ catch (URLSyntaxException e)
+ {
+ fail(e.getMessage());
+ }
+ }
public static junit.framework.Test suite()
{