From 0314cbe225dce796e09ae9abbd450323808fe493 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Tue, 18 Aug 2009 16:16:10 +0000 Subject: QPID-2024: Add ProtocolEngine and NetworkDriver interfaces and a NetworkDriver implementation that uses MINA. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@805477 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/transport/QpidThreadExecutor.java | 22 ---------------------- .../qpid/client/transport/TransportConnection.java | 1 + 2 files changed, 1 insertion(+), 22 deletions(-) delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java deleted file mode 100644 index 3de410fb69..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.apache.qpid.client.transport; - -import org.apache.qpid.thread.Threading; - -import edu.emory.mathcs.backport.java.util.concurrent.Executor; - -public class QpidThreadExecutor implements Executor -{ - @Override - public void execute(Runnable command) - { - try - { - Threading.getThreadFactory().createThread(command).start(); - } - catch(Exception e) - { - throw new RuntimeException("Error creating a thread using Qpid thread factory",e); - } - } - -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 0bacda04ff..32cc8c4cb5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -31,6 +31,7 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.thread.QpidThreadExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -- cgit v1.2.1 From a7be8fc7337b5cc093f593cc1becb9fe7b4dc0fb Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Tue, 1 Sep 2009 16:27:52 +0000 Subject: QPID-2025: Add a AMQProtocolEngine from the de-MINAfied AMQMinaProtocolSession. Remove various now-unused classes and update references. Add tests for AMQDecoder. Net -1500 lines, +25% performance on transient messaging. Nice. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@810110 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/protocol/AMQProtocolHandler.java | 2 +- .../apache/qpid/client/transport/TransportConnection.java | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 2389c9e2da..e3a1a82dc4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -191,7 +191,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.debug("Protocol session created for session " + System.identityHashCode(session)); _failoverHandler = new FailoverHandler(this, session); - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); + final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession)); if (Boolean.getBoolean("amqj.shared_read_write_pool")) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 32cc8c4cb5..4ff24c3607 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -31,7 +31,10 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +65,7 @@ public class TransportConnection private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); - private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; + private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory"; private static Map _openSocketRegister = new ConcurrentHashMap(); @@ -190,8 +193,6 @@ public class TransportConnection _acceptor = new VmPipeAcceptor(); IoServiceConfig config = _acceptor.getDefaultConfig(); - - config.setThreadModel(ReadWriteThreadModel.getInstance()); } synchronized (_inVmPipeAddress) { @@ -276,7 +277,10 @@ public class TransportConnection { Class[] cnstr = {Integer.class}; Object[] params = {port}; - provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + + provider = new MINANetworkDriver(); + ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true); // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); } -- cgit v1.2.1 From c1ebe66bfab328c5192a35c21ea290b5c45f40f5 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 9 Sep 2009 13:05:43 +0000 Subject: Merge from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@812936 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/AMQAuthenticationException.java | 5 -- .../java/org/apache/qpid/client/AMQSession.java | 51 +++++++----- .../org/apache/qpid/client/AMQSession_0_8.java | 24 ++++-- .../client/handler/ChannelCloseMethodHandler.java | 91 ++++++++++++---------- .../handler/ConnectionOpenOkMethodHandler.java | 1 - .../handler/ConnectionTuneMethodHandler.java | 1 - .../protocol/AMQIoTransportProtocolSession.java | 2 +- .../qpid/client/protocol/AMQProtocolHandler.java | 54 ++++++------- .../qpid/client/protocol/AMQProtocolSession.java | 4 +- .../apache/qpid/client/state/AMQStateManager.java | 19 ++++- .../client/util/FlowControllingBlockingQueue.java | 18 ++++- .../org/apache/qpid/client/MockAMQConnection.java | 2 +- .../client/protocol/AMQProtocolHandlerTest.java | 11 +-- 13 files changed, 159 insertions(+), 124 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index 05ac3dca9e..6bae0166d1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -39,9 +39,4 @@ public class AMQAuthenticationException extends AMQException { super(error, msg, cause); } - public boolean isHardError() - { - return true; - } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 118be75705..2e3e417c95 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -65,6 +65,7 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -205,9 +206,9 @@ public abstract class AMQSession @@ -86,7 +87,7 @@ public class AMQStateManager implements AMQMethodListener return _currentState; } - public void changeState(AMQState newState) throws AMQException + public void changeState(AMQState newState) { _logger.debug("State changing to " + newState + " from old state " + _currentState); @@ -136,6 +137,22 @@ public class AMQStateManager implements AMQMethodListener */ public void error(Exception error) { + if (error instanceof AMQException) + { + // AMQException should be being notified before closing the + // ProtocolSession. Which will change the State to CLOSED. + // if we have a hard error. + if (((AMQException)error).isHardError()) + { + changeState(AMQState.CONNECTION_CLOSING); + } + } + else + { + // Be on the safe side here and mark the connection closed + changeState(AMQState.CONNECTION_CLOSED); + } + if (_waiters.size() == 0) { _logger.error("No Waiters for error saving as last error:" + error.getMessage()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index bddbc329ab..ee7fc533a3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -22,10 +22,11 @@ package org.apache.qpid.client.util; import java.util.Iterator; import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the @@ -36,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class FlowControllingBlockingQueue { + private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class); + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final Queue _queue = new ConcurrentLinkedQueue(); @@ -46,6 +49,8 @@ public class FlowControllingBlockingQueue /** We require a separate count so we can track whether we have reached the threshold */ private int _count; + + private boolean disableFlowControl; public boolean isEmpty() { @@ -69,6 +74,10 @@ public class FlowControllingBlockingQueue _flowControlHighThreshold = highThreshold; _flowControlLowThreshold = lowThreshold; _listener = listener; + if (highThreshold == 0) + { + disableFlowControl = true; + } } public Object take() throws InterruptedException @@ -84,7 +93,7 @@ public class FlowControllingBlockingQueue } } } - if (_listener != null) + if (!disableFlowControl && _listener != null) { synchronized (_listener) { @@ -93,6 +102,7 @@ public class FlowControllingBlockingQueue _listener.underThreshold(_count); } } + } return o; @@ -106,7 +116,7 @@ public class FlowControllingBlockingQueue notifyAll(); } - if (_listener != null) + if (!disableFlowControl && _listener != null) { synchronized (_listener) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java index ce79080e97..da44822ec3 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -85,7 +85,7 @@ public class MockAMQConnection extends AMQConnection } @Override - public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException { _connected = true; _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index 10ec220d9e..fc7f8148f0 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -200,15 +200,8 @@ public class AMQProtocolHandlerTest extends TestCase _handler.getStateManager().error(trigger); _logger.info("Setting state to be CONNECTION_CLOSED."); - try - { - _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - } - catch (AMQException e) - { - _logger.error("Unable to change the state to closed.", e); - fail("Unable to change the state to closed due to :"+e.getMessage()); - } + + _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); _logger.info("Firing exception"); _handler.propagateExceptionToFrameListeners(trigger); -- cgit v1.2.1 From 9c4ecc45da929750ff7f0e0a5d7ada4e674b9105 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 16 Sep 2009 10:06:55 +0000 Subject: QPID-2105: Make NetworkDriver.open use a SSLContextFactory, not an SSLEngine. Allow an existing SocketConnector to be passed into a MINANetworkDriver, for use with the ExistingSocket bit of TransportConnection. Move the ExistingSocket stuff to one place, use MINANetworkDriver in TransportConnection and make AMQProtocolHandler implement ProtocolEngine. Remove MINA specific gubbins from AMQProtocolHandler and AMQProtocolSession. Move fireAsynchEvent to Job Add getLocalAddress to AMQProtocolEngine Move TestNetworkDriver to common Use correct class for logger in AMQProtocolEngine Check the exception is thrown properly in SimpleACLTest, make it a little less prone to obscure race conditions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815704 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/AMQConnectionDelegate_8_0.java | 2 +- .../qpid/client/failover/FailoverHandler.java | 10 +- .../qpid/client/protocol/AMQProtocolHandler.java | 363 +++++++++++---------- .../qpid/client/protocol/AMQProtocolSession.java | 110 +------ .../transport/SocketTransportConnection.java | 62 ++-- .../qpid/client/transport/TransportConnection.java | 19 +- .../transport/VmPipeTransportConnection.java | 11 +- .../client/protocol/AMQProtocolHandlerTest.java | 5 +- 8 files changed, 250 insertions(+), 332 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index a0b69b5493..9876393d4c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -97,7 +97,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { _conn.getProtocolHandler().createIoTransportSession(brokerDetail); } - + _conn._protocolHandler.getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 927f660932..8223cd5394 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.client.failover; -import org.apache.mina.common.IoSession; - import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; @@ -81,9 +79,6 @@ public class FailoverHandler implements Runnable /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class); - /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */ - private final IoSession _session; - /** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */ private AMQProtocolHandler _amqProtocolHandler; @@ -99,10 +94,9 @@ public class FailoverHandler implements Runnable * @param amqProtocolHandler The protocol handler that spans the failover. * @param session The MINA session, for the failing connection. */ - public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session) + public FailoverHandler(AMQProtocolHandler amqProtocolHandler) { _amqProtocolHandler = amqProtocolHandler; - _session = session; } /** @@ -221,7 +215,7 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setFailoverState(FailoverState.FAILED); /*try {*/ - _amqProtocolHandler.exceptionCaught(_session, e); + _amqProtocolHandler.exception(e); /*} catch (Exception ex) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index ab3ff8ecb0..c7e2493025 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,10 +20,6 @@ */ package org.apache.qpid.client.protocol; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.WriteBufferLimitFilterBuilder; @@ -48,16 +44,25 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.pool.Event; +import org.apache.qpid.pool.Job; +import org.apache.qpid.pool.PoolingFilter; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -120,7 +125,7 @@ import java.util.concurrent.CountDownLatch; * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so * that lifecycles of the fields match lifecycles of their containing objects. */ -public class AMQProtocolHandler extends IoHandlerAdapter +public class AMQProtocolHandler implements ProtocolEngine { /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class); @@ -137,7 +142,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter private volatile AMQProtocolSession _protocolSession; /** Holds the state of the protocol session. */ - private AMQStateManager _stateManager = new AMQStateManager(); + private AMQStateManager _stateManager; /** Holds the method listeners, */ private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); @@ -166,7 +171,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Object to lock on when changing the latch */ private Object _failoverLatchChange = new Object(); - + private AMQCodecFactory _codecFactory; + private Job _readJob; + private Job _writeJob; + private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + private NetworkDriver _networkDriver; + + private long _writtenBytes; + private long _readBytes; + /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -175,86 +188,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter public AMQProtocolHandler(AMQConnection con) { _connection = con; - } - - /** - * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the - * session, which filters the events handled by this handler. The filter chain consists of, handing off events - * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP. - * - * @param session The MINA session. - * - * @throws Exception Any underlying exceptions are allowed to fall through to MINA. - */ - public void sessionCreated(IoSession session) throws Exception - { - _logger.debug("Protocol session created for session " + System.identityHashCode(session)); - _failoverHandler = new FailoverHandler(this, session); - - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession)); - - if (Boolean.getBoolean("amqj.shared_read_write_pool")) - { - session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } - else - { - session.getFilterChain().addLast("protocolFilter", pcf); - } - // we only add the SSL filter where we have an SSL connection - if (_connection.getSSLConfiguration() != null) - { - SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = - new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); - SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); - sslFilter.setUseClientMode(true); - session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); - } - - try - { - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); - threadModel.getAsynchronousReadFilter().createNewJobForSession(session); - threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); - } - catch (RuntimeException e) - { - _logger.error(e.getMessage(), e); - } - - if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME)) - { - try - { - //Add IO Protection Filters - IoFilterChain chain = session.getFilterChain(); - - session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty( - ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT))); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty( - ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT))); - writefilter.attach(chain); - session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); - - _logger.info("Using IO Read/Write Filter Protection"); - } - catch (Exception e) - { - _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); - } - } - _protocolSession = new AMQProtocolSession(this, session, _connection); - - _stateManager.setProtocolSession(_protocolSession); - - _protocolSession.init(); + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager = new AMQStateManager(_protocolSession); + _codecFactory = new AMQCodecFactory(false, _protocolSession); + ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); + _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true); + _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false); + _poolReference.acquireExecutorService(); + _failoverHandler = new FailoverHandler(this); } /** @@ -283,12 +224,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter * may be called first followed by this method. This depends on whether the client was trying to send data at the * time of the failure. * - * @param session The MINA session. - * * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and * not otherwise? The above comment doesn't make that clear. */ - public void sessionClosed(IoSession session) + public void closed() { if (_connection.isClosed()) { @@ -327,7 +266,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _logger.debug("sessionClose() not allowed to failover"); _connection.exceptionReceived(new AMQDisconnectedException( - "Server closed connection and reconnection " + "not permitted.", null)); + "Server closed connection and reconnection " + "not permitted.", + _stateManager.getLastException())); } else { @@ -350,43 +290,39 @@ public class AMQProtocolHandler extends IoHandlerAdapter failoverThread.start(); } - public void sessionIdle(IoSession session, IdleStatus status) throws Exception + @Override + public void readerIdle() { - _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); - if (IdleStatus.WRITER_IDLE.equals(status)) - { - // write heartbeat frame: - _logger.debug("Sent heartbeat"); - session.write(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - // failover: - HeartbeatDiagnostics.timeout(); - _logger.warn("Timed out while waiting for heartbeat from peer."); - session.close(); - } + _logger.debug("Protocol Session [" + this + "] idle: reader"); + // failover: + HeartbeatDiagnostics.timeout(); + _logger.warn("Timed out while waiting for heartbeat from peer."); + _networkDriver.close(); + } + + @Override + public void writerIdle() + { + _logger.debug("Protocol Session [" + this + "] idle: reader"); + writeFrame(HeartbeatBody.FRAME); + HeartbeatDiagnostics.sent(); } /** - * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an - * IOException, MINA will close the connection automatically. - * - * @param session The MINA session. - * @param cause The exception that triggered this event. + * Invoked when any exception is thrown by the NetworkDriver */ - public void exceptionCaught(IoSession session, Throwable cause) + public void exception(Throwable cause) { + _logger.info("AS: HELLO"); if (_failoverState == FailoverState.NOT_STARTED) { // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attemp failover - - sessionClosed(session); + // this will attempt failover + _networkDriver.close(); + closed(); } else { @@ -437,6 +373,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void propagateExceptionToAllWaiters(Exception e) { getStateManager().error(e); + propagateExceptionToFrameListeners(e); } @@ -490,48 +427,84 @@ public class AMQProtocolHandler extends IoHandlerAdapter private static int _messageReceivedCount; - public void messageReceived(IoSession session, Object message) throws Exception - { - if (PROTOCOL_DEBUG) - { - _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); - } - if(message instanceof AMQFrame) + @Override + public void received(ByteBuffer msg) + { + try { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; + _readBytes += msg.remaining(); + final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - if (debug && ((msgNumber % 1000) == 0)) + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable() { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } - - AMQFrame frame = (AMQFrame) message; - - final AMQBody bodyFrame = frame.getBodyFrame(); - - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + @Override + public void run() + { + // Decode buffer - bodyFrame.handle(frame.getChannel(), _protocolSession); + for (AMQDataBlock message : dataBlocks) + { - _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + try + { + if (PROTOCOL_DEBUG) + { + _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); + } + + if(message instanceof AMQFrame) + { + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; + + if (debug && ((msgNumber % 1000) == 0)) + { + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + } + + AMQFrame frame = (AMQFrame) message; + + final AMQBody bodyFrame = frame.getBodyFrame(); + + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + + bodyFrame.handle(frame.getChannel(), _protocolSession); + + _connection.bytesReceived(_readBytes); + } + else if (message instanceof ProtocolInitiation) + { + // We get here if the server sends a response to our initial protocol header + // suggesting an alternate ProtocolVersion; the server will then close the + // connection. + ProtocolInitiation protocolInit = (ProtocolInitiation) message; + ProtocolVersion pv = protocolInit.checkVersion(); + getConnection().setProtocolVersion(pv); + + // get round a bug in old versions of qpid whereby the connection is not closed + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + } + } + catch (Exception e) + { + e.printStackTrace(); + _logger.error("Exception processing frame", e); + propagateExceptionToFrameListeners(e); + exception(e); + } + } + } + })); } - else if (message instanceof ProtocolInitiation) + catch (Exception e) { - // We get here if the server sends a response to our initial protocol header - // suggesting an alternate ProtocolVersion; the server will then close the - // connection. - ProtocolInitiation protocolInit = (ProtocolInitiation) message; - ProtocolVersion pv = protocolInit.checkVersion(); - getConnection().setProtocolVersion(pv); - - // get round a bug in old versions of qpid whereby the connection is not closed - _stateManager.changeState(AMQState.CONNECTION_CLOSED); + propagateExceptionToFrameListeners(e); + exception(e); } } - public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session) + public void methodBodyReceived(final int channelId, final AMQBody bodyFrame) throws AMQException { @@ -571,32 +544,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter { propagateExceptionToFrameListeners(e); - exceptionCaught(session, e); + exception(e); } } private static int _messagesOut; - public void messageSent(IoSession session, Object message) throws Exception - { - if (PROTOCOL_DEBUG) - { - _protocolLogger.debug(String.format("SEND: [%s] %s", this, message)); - } - - final long sentMessages = _messagesOut++; - - final boolean debug = _logger.isDebugEnabled(); - - if (debug && ((sentMessages % 1000) == 0)) - { - _logger.debug("Sent " + _messagesOut + " protocol messages"); - } - - _connection.bytesSent(session.getWrittenBytes()); - } - public StateWaiter createWaiter(Set states) throws AMQException { return getStateManager().createWaiter(states); @@ -610,12 +564,34 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void writeFrame(AMQDataBlock frame) { - _protocolSession.writeFrame(frame); + writeFrame(frame, false); } public void writeFrame(AMQDataBlock frame, boolean wait) { - _protocolSession.writeFrame(frame, wait); + ByteBuffer buf = frame.toNioByteBuffer(); + _writtenBytes += buf.remaining(); + _networkDriver.send(buf); + if (PROTOCOL_DEBUG) + { + _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame)); + } + + final long sentMessages = _messagesOut++; + + final boolean debug = _logger.isDebugEnabled(); + + if (debug && ((sentMessages % 1000) == 0)) + { + _logger.debug("Sent " + _messagesOut + " protocol messages"); + } + + _connection.bytesSent(_writtenBytes); + + if (wait) + { + _networkDriver.flush(); + } } /** @@ -673,7 +649,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter //FIXME: At this point here we should check or before add we should check _stateManager is in an open // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 } - _protocolSession.writeFrame(frame); + writeFrame(frame); return listener.blockForFrame(timeout); // When control resumes before this line, a reply will have been received @@ -723,20 +699,17 @@ public class AMQProtocolHandler extends IoHandlerAdapter final AMQFrame frame = body.generateFrame(0); //If the connection is already closed then don't do a syncWrite - if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) - { - _protocolSession.closeProtocolSession(false); - } - else + if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) { try { syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _protocolSession.closeProtocolSession(); + _networkDriver.close(); + closed(); } catch (AMQTimeoutException e) { - _protocolSession.closeProtocolSession(false); + closed(); } catch (FailoverException e) { @@ -748,13 +721,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** @return the number of bytes read from this protocol session */ public long getReadBytes() { - return _protocolSession.getIoSession().getReadBytes(); + return _readBytes; } /** @return the number of bytes written to this protocol session */ public long getWrittenBytes() { - return _protocolSession.getIoSession().getWrittenBytes(); + return _writtenBytes; } public void failover(String host, int port) @@ -807,6 +780,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void setStateManager(AMQStateManager stateManager) { _stateManager = stateManager; + _stateManager.setProtocolSession(_protocolSession); } public AMQProtocolSession getProtocolSession() @@ -843,4 +817,35 @@ public class AMQProtocolHandler extends IoHandlerAdapter { return _protocolSession.getProtocolVersion(); } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + /** @param delay delay in seconds (not ms) */ + void initHeartbeats(int delay) + { + if (delay > 0) + { + getNetworkDriver().setMaxWriteIdle(delay); + getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); + } + } + + public NetworkDriver getNetworkDriver() + { + return _networkDriver; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 0e872170aa..cd049c24a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client.protocol; -import org.apache.commons.lang.StringUtils; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +28,7 @@ import javax.security.sasl.SaslClient; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang.StringUtils; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -65,10 +61,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final String SASL_CLIENT = "SASLClient"; - protected final IoSession _minaProtocolSession; - - protected WriteFuture _lastWriteFuture; - /** * The handler from which this session was created and which is used to handle protocol events. We send failover * events to the handler. @@ -102,28 +94,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected final AMQConnection _connection; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private ConnectionTuneParameters _connectionTuneParameters; - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) - { - _protocolHandler = protocolHandler; - _minaProtocolSession = protocolSession; - _minaProtocolSession.setAttachment(this); - // properties of the connection are made available to the event handlers - _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); - // fixme - real value needed - _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - _protocolVersion = connection.getProtocolVersion(); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), - this); - _connection = connection; + private SaslClient _saslClient; - } + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { - _protocolHandler = protocolHandler; - _minaProtocolSession = null; + _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); @@ -134,7 +113,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { // start the process of setting up the connection. This is the first place that // data is written to the server. - _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion())); + _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion())); } public String getClientID() @@ -175,14 +154,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return getAMQConnection().getPassword(); } - public IoSession getIoSession() - { - return _minaProtocolSession; - } - public SaslClient getSaslClient() { - return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT); + return _saslClient; } /** @@ -192,28 +166,21 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void setSaslClient(SaslClient client) { - if (client == null) - { - _minaProtocolSession.removeAttribute(SASL_CLIENT); - } - else - { - _minaProtocolSession.setAttribute(SASL_CLIENT, client); - } + _saslClient = client; } public ConnectionTuneParameters getConnectionTuneParameters() { - return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS); + return _connectionTuneParameters; } public void setConnectionTuneParameters(ConnectionTuneParameters params) { - _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params); + _connectionTuneParameters = params; AMQConnection con = getAMQConnection(); con.setMaximumChannelCount(params.getChannelMax()); con.setMaximumFrameSize(params.getFrameMax()); - initHeartbeats((int) params.getHeartbeat()); + _protocolHandler.initHeartbeats((int) params.getHeartbeat()); } /** @@ -335,21 +302,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void writeFrame(AMQDataBlock frame) { - writeFrame(frame, false); + _protocolHandler.writeFrame(frame); } public void writeFrame(AMQDataBlock frame, boolean wait) { - WriteFuture f = _minaProtocolSession.write(frame); - if (wait) - { - // fixme -- time out? - f.join(); - } - else - { - _lastWriteFuture = f; - } + _protocolHandler.writeFrame(frame, wait); } /** @@ -407,33 +365,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public AMQConnection getAMQConnection() { - return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); + return _connection; } - public void closeProtocolSession() + public void closeProtocolSession() throws AMQException { - closeProtocolSession(true); - } - - public void closeProtocolSession(boolean waitLast) - { - _logger.debug("Waiting for last write to join."); - if (waitLast && (_lastWriteFuture != null)) - { - _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - } - - _logger.debug("Closing protocol session"); - - final CloseFuture future = _minaProtocolSession.close(); - - // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED - // then wait for the connection to close. - // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any - // error now shouldn't matter. - - _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + _protocolHandler.closeConnection(0); } public void failover(String host, int port) @@ -449,22 +386,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession id = _queueId++; } // get rid of / and : and ; from address for spec conformance - String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", ""); + String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", ""); return new AMQShortString("tmp_" + localAddress + "_" + id); } - /** @param delay delay in seconds (not ms) */ - void initHeartbeats(int delay) - { - if (delay > 0) - { - _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); - } - } - public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag) { final AMQSession session = getSession(channelId); @@ -530,7 +456,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException { - _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); + _protocolHandler.methodBodyReceived(channel, amqMethodBody); } public void notifyError(Exception error) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index b2f7ae8395..77c9c40e82 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -24,23 +24,33 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.filter.SSLFilter; import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.client.SSLConfiguration; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.net.InetAddressCachePolicy; + import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.security.GeneralSecurityException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.net.ssl.SSLEngine; + public class SocketTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); @@ -71,61 +81,27 @@ public class SocketTransportConnection implements ITransportConnection } final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); - - // if we do not use our own thread model we get the MINA default which is to use - // its own leader-follower model - boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); - if (readWriteThreading) - { - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true"))); - scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)); - _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); - scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE)); - _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize()); - final InetSocketAddress address; if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) { address = null; - - Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost()); - - if (socket != null) - { - _logger.info("Using existing Socket:" + socket); - - ((ExistingSocketConnector) ioConnector).setOpenSocket(socket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://' transport:" + brokerDetail); - } } else { address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); _logger.info("Attempting connection to " + address); } - - - ConnectFuture future = ioConnector.connect(address, protocolHandler); - - // wait for connection to complete - if (future.join(brokerDetail.getTimeout())) - { - // we call getSession which throws an IOException if there has been an error connecting - future.getSession(); - } - else + + SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration(); + SSLContextFactory sslFactory = null; + if (sslConfig != null) { - throw new IOException("Timeout waiting for connection."); + sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); } + + MINANetworkDriver driver = new MINANetworkDriver(ioConnector); + driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory); + protocolHandler.setNetworkDriver(driver); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 4ff24c3607..45194750dc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -79,7 +79,7 @@ public class TransportConnection return _openSocketRegister.remove(socketID); } - public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException + public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); @@ -95,7 +95,22 @@ public class TransportConnection { public IoConnector newSocketConnector() { - return new ExistingSocketConnector(1,new QpidThreadExecutor()); + ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor()); + + Socket socket = TransportConnection.removeOpenSocket(details.getHost()); + + if (socket != null) + { + _logger.info("Using existing Socket:" + socket); + + ((ExistingSocketConnector) connector).setOpenSocket(socket); + } + else + { + throw new IllegalArgumentException("Active Socket must be provided for broker " + + "with 'socket://' transport:" + details); + } + return connector; } }); case TCP: diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index dca6efba67..3de6f9b9ea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -28,6 +28,7 @@ import org.apache.mina.transport.vmpipe.VmPipeConnector; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,8 @@ public class VmPipeTransportConnection implements ITransportConnection private static int _port; + private MINANetworkDriver _networkDriver; + public VmPipeTransportConnection(int port) { _port = port; @@ -47,16 +50,16 @@ public class VmPipeTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { final VmPipeConnector ioConnector = new QpidVmPipeConnector(); - final IoServiceConfig cfg = ioConnector.getDefaultConfig(); - - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); final VmPipeAddress address = new VmPipeAddress(_port); _logger.info("Attempting connection to " + address); - ConnectFuture future = ioConnector.connect(address, protocolHandler); + _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler); + protocolHandler.setNetworkDriver(_networkDriver); + ConnectFuture future = ioConnector.connect(address, _networkDriver); // wait for connection to complete future.join(); // we call getSession which throws an IOException if there has been an error connecting future.getSession(); + _networkDriver.setProtocolEngine(protocolHandler); } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index fc7f8148f0..f520a21ba0 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.TestNetworkDriver; import org.apache.qpid.client.MockAMQConnection; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.state.AMQState; @@ -72,9 +73,7 @@ public class AMQProtocolHandlerTest extends TestCase { //Create a new ProtocolHandler with a fake connection. _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'")); - - _handler.sessionCreated(new MockIoSession()); - + _handler.setNetworkDriver(new TestNetworkDriver()); AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); _blockFrame = new AMQFrame(0, body); -- cgit v1.2.1 From 93fa7d17feecb3d27cead67e11b250af1fcc595e Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 16 Sep 2009 11:32:28 +0000 Subject: QPID-2015: Remove AMQIoTransportProtocolSession. Release the executor service in the same class as it's acquired git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815729 13f79535-47bb-0310-9956-ffa450edef68 --- .../protocol/AMQIoTransportProtocolSession.java | 146 --------------------- .../qpid/client/protocol/AMQProtocolHandler.java | 1 + 2 files changed, 1 insertion(+), 146 deletions(-) delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java deleted file mode 100644 index 8782e00a12..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java +++ /dev/null @@ -1,146 +0,0 @@ -package org.apache.qpid.client.protocol; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.UUID; - -import javax.security.sasl.SaslClient; - -import org.apache.commons.lang.StringUtils; -import org.apache.mina.common.IdleStatus; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.ConnectionTuneParameters; -import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.Sender; - -public class AMQIoTransportProtocolSession extends AMQProtocolSession -{ - - protected Sender _ioSender; - private SaslClient _saslClient; - private ConnectionTuneParameters _connectionTuneParameters; - - public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) - { - super(protocolHandler, connection); - } - - @Override - public void closeProtocolSession(boolean waitLast) - { - _ioSender.close(); - _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - } - - @Override - public void init() - { - _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer()); - _ioSender.flush(); - } - - @Override - protected AMQShortString generateQueueName() - { - int id; - synchronized (_queueIdLock) - { - id = _queueId++; - } - return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id); - } - - @Override - public AMQConnection getAMQConnection() - { - return _connection; - } - - @Override - public SaslClient getSaslClient() - { - return _saslClient; - } - - @Override - public void setSaslClient(SaslClient client) - { - _saslClient = client; - } - - /** @param delay delay in seconds (not ms) */ - @Override - void initHeartbeats(int delay) - { - if (delay > 0) - { - // FIXME: actually do something here - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); - } - } - - @Override - public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException - { - // FIXME? - _protocolHandler.methodBodyReceived(channel, amqMethodBody, null); - } - - @Override - public void writeFrame(AMQDataBlock frame, boolean wait) - { - _ioSender.send(frame.toNioByteBuffer()); - if (wait) - { - _ioSender.flush(); - } - } - - @Override - public void setSender(Sender sender) - { - _ioSender = sender; - } - - @Override - public ConnectionTuneParameters getConnectionTuneParameters() - { - return _connectionTuneParameters; - } - - @Override - public void setConnectionTuneParameters(ConnectionTuneParameters params) - { - _connectionTuneParameters = params; - AMQConnection con = getAMQConnection(); - con.setMaximumChannelCount(params.getChannelMax()); - con.setMaximumFrameSize(params.getFrameMax()); - initHeartbeats((int) params.getHeartbeat()); - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c7e2493025..99366101d1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -716,6 +716,7 @@ public class AMQProtocolHandler implements ProtocolEngine _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); } } + _poolReference.releaseExecutorService(); } /** @return the number of bytes read from this protocol session */ -- cgit v1.2.1 From 31bbc100ac6b3a31eb25d29f407d60ff23334d1f Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 17 Sep 2009 15:19:54 +0000 Subject: QPID-2024 QPID-2105: Remove now unnecessary classes like Event, PoolingFilter, ReadWriteThreadModel. Move the couple of necessary methods to Job. Fix imports. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@816232 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/protocol/AMQProtocolHandler.java | 53 ++++++++++------------ .../transport/SocketTransportConnection.java | 23 ++-------- .../qpid/client/transport/TransportConnection.java | 14 +++--- .../transport/VmPipeTransportConnection.java | 6 +-- 4 files changed, 35 insertions(+), 61 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 99366101d1..be75fc150f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,20 +20,22 @@ */ package org.apache.qpid.client.protocol; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; + import org.apache.mina.filter.codec.ProtocolCodecException; -import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.SSLConfiguration; -import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; @@ -42,32 +44,29 @@ import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.Event; import org.apache.qpid.pool.Job; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the @@ -107,9 +106,6 @@ import java.util.concurrent.CountDownLatch; * *

*
CRC Card
Responsibilities Collaborations - *
Create the filter chain to filter this handlers events. - * {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. - * *
Maintain fail-over state. *
*
@@ -191,9 +187,8 @@ public class AMQProtocolHandler implements ProtocolEngine _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); _codecFactory = new AMQCodecFactory(false, _protocolSession); - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); - _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true); - _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false); + _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); + _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); _poolReference.acquireExecutorService(); _failoverHandler = new FailoverHandler(this); } @@ -436,7 +431,7 @@ public class AMQProtocolHandler implements ProtocolEngine _readBytes += msg.remaining(); final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable() + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() { @Override public void run() @@ -495,7 +490,7 @@ public class AMQProtocolHandler implements ProtocolEngine } } } - })); + }); } catch (Exception e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 77c9c40e82..1ac8f62e32 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -20,37 +20,20 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.transport.socket.nio.ExistingSocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; - import org.apache.qpid.client.SSLConfiguration; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.network.mina.MINANetworkDriver; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.net.InetAddressCachePolicy; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.security.GeneralSecurityException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.net.ssl.SSLEngine; - public class SocketTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 45194750dc..a4f8bb0166 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; @@ -30,20 +36,12 @@ import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.thread.QpidThreadExecutor; import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.net.Socket; - /** * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index 3de6f9b9ea..504d475740 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -20,20 +20,18 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; + import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - public class VmPipeTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class); -- cgit v1.2.1 From 2296769193754d1bc09e1dc3b998709a5808ecbb Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 18 Sep 2009 12:54:23 +0000 Subject: QPID-2104 AMQProtocolHandler: hand the actual write off to a seperate thread git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@816612 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/protocol/AMQProtocolHandler.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index be75fc150f..06a1fe2696 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -564,9 +564,16 @@ public class AMQProtocolHandler implements ProtocolEngine public void writeFrame(AMQDataBlock frame, boolean wait) { - ByteBuffer buf = frame.toNioByteBuffer(); + final ByteBuffer buf = frame.toNioByteBuffer(); _writtenBytes += buf.remaining(); - _networkDriver.send(buf); + Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() + { + @Override + public void run() + { + _networkDriver.send(buf); + } + }); if (PROTOCOL_DEBUG) { _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame)); -- cgit v1.2.1 From 98cc985dbd81a84cd0b0a969c57cb941680ec81f Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Sun, 11 Oct 2009 23:22:08 +0000 Subject: Merge from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@824198 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/client/src/main/java/log4j.xml | 36 +++++ .../java/org/apache/qpid/client/AMQConnection.java | 12 +- .../apache/qpid/client/AMQConnectionDelegate.java | 9 ++ .../qpid/client/AMQConnectionDelegate_0_10.java | 13 +- .../qpid/client/AMQConnectionDelegate_8_0.java | 13 +- .../java/org/apache/qpid/client/AMQSession.java | 152 +++++++++++++++------ .../org/apache/qpid/client/AMQSession_0_10.java | 3 - .../org/apache/qpid/client/AMQSession_0_8.java | 11 +- .../apache/qpid/client/BasicMessageConsumer.java | 1 + .../apache/qpid/client/BasicMessageProducer.java | 2 +- .../org/apache/qpid/client/XAConnectionImpl.java | 2 +- .../client/configuration/ClientProperties.java | 2 +- .../qpid/client/failover/FailoverHandler.java | 2 + .../client/handler/ClientMethodDispatcherImpl.java | 4 +- .../qpid/client/protocol/AMQProtocolHandler.java | 1 - .../apache/qpid/client/util/BlockingWaiter.java | 2 +- .../qpid/jms/failover/FailoverExchangeMethod.java | 13 +- 17 files changed, 216 insertions(+), 62 deletions(-) create mode 100644 qpid/java/client/src/main/java/log4j.xml (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/log4j.xml b/qpid/java/client/src/main/java/log4j.xml new file mode 100644 index 0000000000..c27acba818 --- /dev/null +++ b/qpid/java/client/src/main/java/log4j.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0d2adcec8a..ed122a772e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -313,7 +313,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect protected AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages - protected int _maxPrefetch; + private int _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; @@ -450,7 +450,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - // use the defaul value set for all connections + // use the default value set for all connections _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); } @@ -512,7 +512,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect boolean retryAllowed = true; Exception connectionException = null; - while (!_connected && retryAllowed) + while (!_connected && retryAllowed && brokerDetails != null) { ProtocolVersion pe = null; try @@ -691,12 +691,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean attemptReconnection() { - while (_failoverPolicy.failoverAllowed()) + BrokerDetails broker = null; + while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) { try { - makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); - + makeBrokerConnection(broker); return true; } catch (Exception e) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index e5980d8b7d..e6c3473cb1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -39,6 +39,15 @@ public interface AMQConnectionDelegate Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException; + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException thrown if there is a problem creating the session. + */ + XASession createXASession() throws JMSException; + XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; void failoverPrep(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 927929c94a..4d10180667 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -99,6 +99,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return session; } + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + /** * create an XA Session and start it if required. */ @@ -285,7 +297,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _qpidConnection.setIdleTimeout(l); } - @Override public int getMaxChannelID() { return Integer.MAX_VALUE; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 9876393d4c..97d0d0516e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -191,6 +191,18 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate }, _conn).execute(); } + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException thrown if there is a problem creating the session. + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException, FailoverException { @@ -290,7 +302,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public void setIdleTimeout(long l){} - @Override public int getMaxChannelID() { return (int) (Math.pow(2, 16)-1); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2e3e417c95..dd9a00ce10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,7 +21,6 @@ package org.apache.qpid.client; import java.io.Serializable; -import java.io.IOException; import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; @@ -60,6 +59,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -113,7 +113,6 @@ import org.slf4j.LoggerFactory; public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -198,16 +197,32 @@ public abstract class AMQSession= System.currentTimeMillis() ) + { + + _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); + _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + } + if(!_flowControl.getFlowControl()) { - _flowControl.wait(); + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 0644bd88a8..1587d6a6bf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -414,9 +414,6 @@ public class AMQSession_0_10 extends AMQSession extends Closeable implements Messa else { _session.addDeliveredMessage(msg.getDeliveryTag()); + _session.markDirty(); } break; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 5ff6066ddc..44ce59975a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -60,7 +60,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac /** * Priority of messages created by this producer. */ - private int _messagePriority; + private int _messagePriority = Message.DEFAULT_PRIORITY; /** * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index 20fa68605a..43025bd724 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -47,7 +47,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ public synchronized XASession createXASession() throws JMSException { checkNotClosed(); - return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2); + return _delegate.createXASession(); } //-- Interface XAQueueConnection diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index 3627618e68..be0d283470 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -39,7 +39,7 @@ public class ClientProperties * type: long */ public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch"; - public static final String MAX_PREFETCH_DEFAULT = "5000"; + public static final String MAX_PREFETCH_DEFAULT = "500"; /** * When true a sync command is sent after every persistent messages. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 8223cd5394..7761215450 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -21,6 +21,7 @@ package org.apache.qpid.client.failover; import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; @@ -134,6 +135,7 @@ public class FailoverHandler implements Runnable // a slightly more complex state model therefore I felt it was worthwhile doing this. AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager(); + // Use a fresh new StateManager for the reconnection attempts _amqProtocolHandler.setStateManager(new AMQStateManager()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 9c791730ca..0e3333940a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -39,6 +39,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); + private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance(); private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); @@ -159,7 +160,8 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException { - return false; + _channelFlowMethodHandler.methodReceived(_session, body, channelId); + return true; } public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 06a1fe2696..e645f4f214 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -308,7 +308,6 @@ public class AMQProtocolHandler implements ProtocolEngine */ public void exception(Throwable cause) { - _logger.info("AS: HELLO"); if (_failoverState == FailoverState.NOT_STARTED) { // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 67cda957fb..a3d015eadc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -253,7 +253,7 @@ public abstract class BlockingWaiter } else { - System.err.println("WARNING: new error arrived while old one not yet processed"); + System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage()); } try diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java index e05a7ab6e2..960661daea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java @@ -189,7 +189,8 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener { synchronized (_brokerListLock) { - return _connectionDetails.getBrokerDetails(_currentBrokerIndex); + _currentBrokerDetail = _connectionDetails.getBrokerDetails(_currentBrokerIndex); + return _currentBrokerDetail; } } @@ -214,7 +215,15 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener broker.getHost().equals(_currentBrokerDetail.getHost()) && broker.getPort() == _currentBrokerDetail.getPort()) { - return getNextBrokerDetails(); + if (_connectionDetails.getBrokerCount() > 1) + { + return getNextBrokerDetails(); + } + else + { + _failedAttemps ++; + return null; + } } String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY); -- cgit v1.2.1 From ebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 11 Dec 2014 10:11:03 +0000 Subject: Allow the transport to inform the model that encryption is being used git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644586 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c61469559a..0fe2ce232e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -316,6 +316,11 @@ public class AMQProtocolHandler implements ProtocolEngine } } + @Override + public void encryptedTransport() + { + } + public void readerIdle() { _logger.debug("Protocol Session [" + this + "] idle: reader"); -- cgit v1.2.1 From 57f62fb2e1dc7bb04a2bee4072992fb00f310126 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Fri, 12 Dec 2014 16:04:52 +0000 Subject: Remove dead setIdleTimeout method git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644950 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java | 2 +- .../src/test/java/org/apache/qpid/client/transport/MockSender.java | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 8a7e6ab084..c7dee5b985 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -670,7 +670,7 @@ public class AMQSession_0_10Test extends QpidTestCase { private List _sendEvents = new ArrayList(); - public void setIdleTimeout(int i) + private void setIdleTimeout(int i) { } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java index 7c3988c19a..11b34d3dff 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java @@ -27,11 +27,6 @@ import org.apache.qpid.transport.Sender; public class MockSender implements Sender { - public void setIdleTimeout(int i) - { - - } - public void send(ByteBuffer msg) { -- cgit v1.2.1 From 26eab7ed4556717fca50ad93025fdc8d112f9715 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 31 Jan 2015 20:07:36 +0000 Subject: Separate Byte and ProtocolEvent sender/receivers, add server specific 0-10 encoder git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1656248 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/client/AMQConnectionDelegate_8_0.java | 8 ++++---- .../client/handler/ConnectionCloseMethodHandler.java | 6 ++---- .../apache/qpid/client/protocol/AMQProtocolHandler.java | 8 ++++---- .../apache/qpid/client/protocol/AMQProtocolSession.java | 7 +++---- .../java/org/apache/qpid/client/AMQSession_0_10Test.java | 2 +- .../org/apache/qpid/client/transport/MockSender.java | 4 ++-- .../qpid/client/transport/TestNetworkConnection.java | 16 ++++++++-------- 7 files changed, 24 insertions(+), 27 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 35582d92b7..d0d9d28398 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.XASession; -import org.apache.qpid.transport.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +59,7 @@ import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.Session; import org.apache.qpid.properties.ConnectionStartProperties; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.OutgoingNetworkTransport; @@ -527,12 +527,12 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } - private static class ReceiverClosedWaiter implements Receiver + private static class ReceiverClosedWaiter implements ByteBufferReceiver { private final CountDownLatch _closedWatcher; - private final Receiver _receiver; + private final ByteBufferReceiver _receiver; - public ReceiverClosedWaiter(Receiver receiver) + public ReceiverClosedWaiter(ByteBufferReceiver receiver) { _receiver = receiver; _closedWatcher = new CountDownLatch(1); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index f038fc6e4f..17b0fe1abb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.client.handler; -import java.nio.ByteBuffer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +33,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.TransportException; public class ConnectionCloseMethodHandler implements StateAwareMethodListener @@ -95,7 +93,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener sender = session.getSender(); + ByteBufferSender sender = session.getSender(); if (error != null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index d5e3027601..f50447b930 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -66,8 +66,8 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; @@ -179,7 +179,7 @@ public class AMQProtocolHandler implements ProtocolEngine private NetworkConnection _network; - private Sender _sender; + private ByteBufferSender _sender; private long _lastReadTime = System.currentTimeMillis(); private long _lastWriteTime = System.currentTimeMillis(); private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; @@ -905,7 +905,7 @@ public class AMQProtocolHandler implements ProtocolEngine setNetworkConnection(network, network.getSender()); } - public void setNetworkConnection(NetworkConnection network, Sender sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { _network = network; _sender = sender; @@ -923,7 +923,7 @@ public class AMQProtocolHandler implements ProtocolEngine return _lastWriteTime; } - protected Sender getSender() + protected ByteBufferSender getSender() { return _sender; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index e5765ee00f..9b0b21f06e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.client.protocol; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -52,8 +51,8 @@ import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; /** @@ -382,7 +381,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } - public Sender getSender() + public ByteBufferSender getSender() { return _protocolHandler.getSender(); } @@ -476,7 +475,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _protocolHandler.propagateExceptionToAllWaiters(error); } - public void setSender(Sender sender) + public void setSender(ByteBufferSender sender) { // No-op, interface munging } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index c7dee5b985..2543c5b500 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -666,7 +666,7 @@ public class AMQSession_0_10Test extends QpidTestCase } } - class MockSender implements Sender + class MockSender implements ProtocolEventSender { private List _sendEvents = new ArrayList(); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java index 11b34d3dff..ee6704bb39 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java @@ -22,9 +22,9 @@ package org.apache.qpid.client.transport; import java.nio.ByteBuffer; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; -public class MockSender implements Sender +public class MockSender implements ByteBufferSender { public void send(ByteBuffer msg) diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java index c9af1de6a7..cdfa83571b 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java @@ -20,18 +20,18 @@ */ package org.apache.qpid.client.transport; -import java.security.Principal; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; - import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.Principal; + +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.network.NetworkConnection; /** * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, @@ -147,7 +147,7 @@ public class TestNetworkConnection implements NetworkConnection _remoteAddress = address; } - public Sender getSender() + public ByteBufferSender getSender() { return _sender; } -- cgit v1.2.1 From 762aa71a99d13cb3f7efd29cb95098eadafb5396 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 12 Feb 2015 20:25:53 +0000 Subject: merged from trunk r1659391 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659392 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 119 ++++----------------- .../org/apache/qpid/client/AMQSession_0_10.java | 6 +- .../org/apache/qpid/client/AMQSession_0_8.java | 7 +- .../apache/qpid/client/ChannelToSessionMap.java | 93 +++------------- .../org/apache/qpid/client/XAConnectionImpl.java | 2 +- .../client/messaging/address/QpidQueueOptions.java | 13 --- 6 files changed, 42 insertions(+), 198 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 12e9285af8..8f5e9524b6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -133,7 +133,7 @@ public abstract class AMQSession _producers = new ConcurrentHashMap(); + private final Map _producers = new ConcurrentHashMap(); /** * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume @@ -195,7 +195,7 @@ public abstract class AMQSession _consumers = new IdToConsumerMap(); + private final Map _consumers = new ConcurrentHashMap<>(); /** * Contains a list of consumers which have been removed but which might still have @@ -224,7 +224,7 @@ public abstract class AMQSession getConsumers() + protected Collection getConsumers() { - return _consumers; + return new ArrayList(_consumers.values()); } protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) @@ -317,83 +316,6 @@ public abstract class AMQSession - { - private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentMap _slowAccessConsumers = new ConcurrentHashMap(); - - public C get(int id) - { - if ((id & 0xFFFFFFF0) == 0) - { - return (C) _fastAccessConsumers[id]; - } - else - { - return _slowAccessConsumers.get(id); - } - } - - public C put(int id, C consumer) - { - C oldVal; - if ((id & 0xFFFFFFF0) == 0) - { - oldVal = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = consumer; - } - else - { - oldVal = _slowAccessConsumers.put(id, consumer); - } - - return oldVal; - - } - - public C remove(int id) - { - C consumer; - if ((id & 0xFFFFFFF0) == 0) - { - consumer = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = null; - } - else - { - consumer = _slowAccessConsumers.remove(id); - } - - return consumer; - - } - - public Collection values() - { - ArrayList values = new ArrayList(); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessConsumers[i] != null) - { - values.add((C) _fastAccessConsumers[i]); - } - } - values.addAll(_slowAccessConsumers.values()); - - return values; - } - - public void clear() - { - _slowAccessConsumers.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessConsumers[i] = null; - } - } - } - /** * Creates a new session on a connection. * @@ -2490,7 +2412,7 @@ public abstract class AMQSession tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); getPrefetchedMessageTags().addAll(tags); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 143de271a1..5fb9329af7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -27,6 +27,7 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -330,10 +331,9 @@ public class AMQSession_0_8 extends AMQSession consumersToCheck = new ArrayList(getConsumers().values()); boolean messageListenerFound = false; boolean serverRejectBehaviourFound = false; - for(BasicMessageConsumer_0_8 consumer : consumersToCheck) + for(BasicMessageConsumer_0_8 consumer : getConsumers()) { if (consumer.isMessageListenerSet()) { @@ -344,7 +344,6 @@ public class AMQSession_0_8 extends AMQSession _slowAccessSessions = new LinkedHashMap(); - private int _size = 0; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private final Map _sessionMap = new ConcurrentHashMap<>(); private AtomicInteger _idFactory = new AtomicInteger(0); private int _maxChannelID; private int _minChannelID; public AMQSession get(int channelId) { - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - return _fastAccessSessions[channelId]; - } - else - { - return _slowAccessSessions.get(channelId); - } + return _sessionMap.get(channelId); } - public AMQSession put(int channelId, AMQSession session) + public void put(int channelId, AMQSession session) { - AMQSession oldVal; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - oldVal = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = session; - } - else - { - oldVal = _slowAccessSessions.put(channelId, session); - } - if ((oldVal != null) && (session == null)) - { - _size--; - } - else if ((oldVal == null) && (session != null)) - { - _size++; - } - - return session; - + _sessionMap.put(channelId, session); } - public AMQSession remove(int channelId) + public void remove(int channelId) { - AMQSession session; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - session = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = null; - } - else - { - session = _slowAccessSessions.remove(channelId); - } - - if (session != null) - { - _size--; - } - return session; - + _sessionMap.remove(channelId); } public Collection values() { - ArrayList values = new ArrayList(size()); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessSessions[i] != null) - { - values.add(_fastAccessSessions[i]); - } - } - values.addAll(_slowAccessSessions.values()); - - return values; + return new ArrayList<>(_sessionMap.values()); } public int size() { - return _size; + return _sessionMap.size(); } public void clear() { - _size = 0; - _slowAccessSessions.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessSessions[i] = null; - } + _sessionMap.clear(); } /* @@ -141,14 +80,8 @@ public final class ChannelToSessionMap //go back to the start _idFactory.set(_minChannelID); } - if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) - { - done = (_fastAccessSessions[id] == null); - } - else - { - done = (!_slowAccessSessions.keySet().contains(id)); - } + + done = (!_sessionMap.keySet().contains(id)); } return id; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index d9514338ce..d625a9ae69 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -29,7 +29,7 @@ import javax.jms.XATopicConnection; import javax.jms.XATopicSession; /** - * This class implements the javax.njms.XAConnection interface + * This class implements the javax.jms.XAConnection interface */ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQueueConnection, XATopicConnection { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java index 5b6c027f4a..24295a0832 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java @@ -30,7 +30,6 @@ public class QpidQueueOptions extends HashMap public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; public static final String QPID_LAST_VALUE_QUEUE_NO_BROWSE = "qpid.last_value_queue_no_browse"; - public static final String QPID_QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"; public void validatePolicyType(String type) { @@ -83,16 +82,4 @@ public class QpidQueueOptions extends HashMap this.put(QPID_LVQ_KEY, key); } - public void setQueueEvents(String value) - { - if (value != null && (value.equals("1") || value.equals("2"))) - { - this.put(QPID_QUEUE_EVENT_GENERATION, value); - } - else - { - throw new IllegalArgumentException("Invalid value for " + - QPID_QUEUE_EVENT_GENERATION + " should be one of {1|2}"); - } - } } -- cgit v1.2.1 From 9dc57fe738f366d875c2319dafdfa2c50ce2f20b Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 3 Mar 2015 14:56:40 +0000 Subject: merge from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1663717 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQBrokerDetails.java | 20 +++- .../java/org/apache/qpid/client/AMQConnection.java | 70 +++++++---- .../qpid/client/AMQConnectionDelegate_0_10.java | 128 ++++++++++++++++++--- .../org/apache/qpid/client/AMQDestination.java | 8 ++ .../apache/qpid/client/BasicMessageConsumer.java | 5 + .../qpid/client/BasicMessageConsumer_0_8.java | 1 + .../failover/ConnectionRedirectException.java | 46 ++++++++ .../qpid/client/failover/FailoverHandler.java | 6 +- .../handler/ConnectionRedirectMethodHandler.java | 21 +++- .../qpid/client/protocol/AMQProtocolHandler.java | 8 -- .../qpid/client/protocol/AMQProtocolSession.java | 5 - .../apache/qpid/client/state/AMQStateManager.java | 8 +- .../java/org/apache/qpid/jms/BrokerDetails.java | 2 +- 13 files changed, 259 insertions(+), 69 deletions(-) create mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index bde20d0550..9d9278b74d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -20,18 +20,18 @@ */ package org.apache.qpid.client; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; - import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + public class AMQBrokerDetails implements BrokerDetails, Serializable { private static final long serialVersionUID = 8450786374975932890L; @@ -42,6 +42,14 @@ public class AMQBrokerDetails implements BrokerDetails, Serializable private Map _options = new HashMap(); + public AMQBrokerDetails(BrokerDetails details) + { + _host = details.getHost(); + _port = details.getPort(); + _transport = details.getTransport(); + _options = new HashMap<>(details.getProperties()); + } + public AMQBrokerDetails(){} public AMQBrokerDetails(String url) throws URLSyntaxException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 8e7b5b90d8..ec60bd2914 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -62,6 +62,7 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.client.failover.ConnectionRedirectException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; @@ -462,9 +463,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else if (!isConnected()) { - retryAllowed = _failoverPolicy.failoverAllowed(); - brokerDetails = _failoverPolicy.getNextBrokerDetails(); - _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); + if(connectionException instanceof ConnectionRedirectException) + { + ConnectionRedirectException redirect = (ConnectionRedirectException) connectionException; + retryAllowed = true; + brokerDetails = new AMQBrokerDetails(brokerDetails); + brokerDetails.setHost(redirect.getHost()); + brokerDetails.setPort(redirect.getPort()); + _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); + + } + else + { + retryAllowed = _failoverPolicy.failoverAllowed(); + brokerDetails = _failoverPolicy.getNextBrokerDetails(); + _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); + } } } @@ -599,9 +613,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _virtualHost = virtualHost; } - public boolean attemptReconnection(String host, int port) + public boolean attemptReconnection(String host, int port, final boolean useFailoverConfigOnFailure) { - BrokerDetails bd = new AMQBrokerDetails(host, port); + BrokerDetails bd = new AMQBrokerDetails(_failoverPolicy.getCurrentBrokerDetails()); + bd.setHost(host); + bd.setPort(port); _failoverPolicy.setBroker(bd); @@ -618,10 +634,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.info("Unable to connect to broker at " + bd); } - attemptReconnection(); + return useFailoverConfigOnFailure && attemptReconnection(); } - return false; } public boolean attemptReconnection() @@ -629,32 +644,41 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect BrokerDetails broker = null; while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) { - try + if (attemptConnection(broker)) { - makeBrokerConnection(broker); return true; } - catch (Exception e) + } + + // connection unsuccessful + return false; + } + + private boolean attemptConnection(final BrokerDetails broker) + { + try + { + makeBrokerConnection(broker); + return true; + } + catch (Exception e) + { + if (!(e instanceof AMQException)) { - if (!(e instanceof AMQException)) + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); - } + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); } - else + } + else + { + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info(e.getMessage() + ":Unable to connect to broker at " - + _failoverPolicy.getCurrentBrokerDetails()); - } + _logger.info(e.getMessage() + ":Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails()); } } } - - // connection unsuccessful return false; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index e22a341205..2c10c585fc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -61,6 +61,8 @@ import org.apache.qpid.transport.TransportException; public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { + private static final int DEFAULT_PORT = 5672; + /** * This class logger. */ @@ -238,7 +240,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue()); } - String msg = "Cannot connect to broker: " + ce.getMessage(); + String msg = "Cannot connect to broker ("+brokerDetail+"): " + ce.getMessage(); throw new AMQException(code, msg, ce); } @@ -314,25 +316,39 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec @Override public void run() { - try - { - if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + try { - failoverPrep(); - _conn.resubscribeSessions(); - _conn.fireFailoverComplete(); - failoverDone.set(true); + boolean preFailover = _conn.firePreFailover(false); + if (preFailover) + { + boolean reconnected; + if(exc instanceof RedirectConnectionException) + { + RedirectConnectionException redirect = (RedirectConnectionException)exc; + reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts()); + } + else + { + reconnected = _conn.attemptReconnection(); + } + if(reconnected) + { + failoverPrep(); + _conn.resubscribeSessions(); + _conn.fireFailoverComplete(); + failoverDone.set(true); + } + } + } + catch (Exception e) + { + _logger.error("error during failover", e); + } + finally + { + _conn.getProtocolHandler().getFailoverLatch().countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); } - } - catch (Exception e) - { - _logger.error("error during failover", e); - } - finally - { - _conn.getProtocolHandler().getFailoverLatch().countDown(); - _conn.getProtocolHandler().setFailoverLatch(null); - } } }); @@ -376,6 +392,58 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } + @Override + public boolean redirect(final String host, final List knownHosts) + { + exception = new RedirectConnectionException(host,knownHosts); + + return false; + } + + private boolean attemptRedirection(String host, List knownHosts) + { + + boolean redirected = host != null && attemptRedirection(host); + if(knownHosts != null) + { + for(Object knownHost : knownHosts) + { + redirected = attemptRedirection(String.valueOf(knownHost)); + if(redirected) + { + break; + } + } + } + return redirected; + } + + private boolean attemptRedirection(String host) + { + int portIndex = host.indexOf(':'); + + int port; + if (portIndex == -1) + { + port = DEFAULT_PORT; + } + else + { + try + { + port = Integer.parseInt(host.substring(portIndex + 1)); + } + catch(NumberFormatException e) + { + _logger.info("Unable to redirect to " + host + " - does not look like a valid address"); + return false; + } + host = host.substring(0, portIndex); + + } + return _conn.attemptReconnection(host,port,false); + } + public T executeRetrySupport(FailoverProtectedOperation operation) throws E { if (_conn.isFailingOver()) @@ -538,4 +606,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { return _qpidConnection.isMessageCompressionSupported(); } + + private class RedirectConnectionException extends ConnectionException + { + private final String _host; + private final List _knownHosts; + + public RedirectConnectionException(final String host, + final List knownHosts) + { + super("Connection redirected to " + host + " alternates " + knownHosts); + _host = host; + _knownHosts = knownHosts; + } + + public String getHost() + { + return _host; + } + + public List getKnownHosts() + { + return _knownHosts; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index a71555480f..2eeea4c967 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -89,6 +89,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte private RejectBehaviour _rejectBehaviour; + private Map _consumerArguments; + public static final int QUEUE_TYPE = 1; public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; @@ -299,6 +301,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys(); final String rejectBehaviourValue = binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR); _rejectBehaviour = rejectBehaviourValue == null ? null : RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase()); + _consumerArguments = binding.getConsumerOptions(); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName) @@ -718,6 +721,11 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte return result; } + public Map getConsumerArguments() + { + return _consumerArguments; + } + public Reference getReference() throws NamingException { return new Reference( diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 9cef1f8dce..3c947043c6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -187,6 +187,10 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } final FieldTable ft = FieldTableFactory.newFieldTable(); + if(destination.getConsumerArguments() != null) + { + ft.addAll(FieldTable.convertToFieldTable(destination.getConsumerArguments())); + } // rawSelector is used by HeadersExchange and is not a JMS Selector if (rawSelector != null) { @@ -203,6 +207,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal); } + _arguments = ft; _addressType = _destination.getAddressType(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 1d7bb6087a..4f2715bd7b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -69,6 +69,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer { @@ -65,7 +70,21 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener } - session.failover(host, port); + session.notifyError(new ConnectionRedirectException(host,port)); + + ByteBufferSender sender = session.getSender(); + + // Close the open TCP connection + try + { + sender.close(); + } + catch(TransportException e) + { + //Ignore, they are already logged by the Sender and this + //is a connection-close being processed by the IoReceiver + //which will as it closes initiate failover if necessary. + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index f50447b930..200b1d72a4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -799,14 +799,6 @@ public class AMQProtocolHandler implements ProtocolEngine return _writtenBytes; } - public void failover(String host, int port) - { - _failoverHandler.setHost(host); - _failoverHandler.setPort(port); - // see javadoc for FailoverHandler to see rationale for separate thread - startFailoverThread(); - } - public void blockUntilNotFailingOver() throws InterruptedException { synchronized(_failoverLatchChange) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 9b0b21f06e..15cb908807 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -386,11 +386,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return _protocolHandler.getSender(); } - public void failover(String host, int port) - { - _protocolHandler.failover(host, port); - } - protected AMQShortString generateQueueName() { int id; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index fab0bcd71f..a44b6a1ff3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.client.state; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,10 +33,6 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; - /** * The state manager is responsible for managing the state of the protocol session. *

diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 808b2781c4..9faa58f11d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -98,7 +98,7 @@ public interface BrokerDetails /** * Sets the properties associated with this connection * - * @param props the new p[roperties. + * @param props the new properties. */ public void setProperties(Map props); -- cgit v1.2.1