summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-09-07 08:55:12 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-09-07 08:55:12 +0000
commitc8b145b9cf5282d534f204a5be48d19fe507ecb3 (patch)
tree2957d1a3121f0e446569487d87987aa4a8eaaff7 /java
parente70492626c601e6c610ccea3e33b2a91188f3f76 (diff)
downloadqpid-python-c8b145b9cf5282d534f204a5be48d19fe507ecb3.tar.gz
QPID-3473 : Replace use of MINA IO with transport IO (joint work with Robbie Gemmel)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1166069 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java37
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Broker.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java146
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java4
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java43
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java6
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java26
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java2
-rw-r--r--java/build.xml2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java151
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java115
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java312
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java17
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java188
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java66
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java18
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java18
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Job.java253
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java42
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SenderClosedException.java52
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Transport.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java146
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java19
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java81
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java157
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java221
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java79
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java147
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java161
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java13
-rw-r--r--java/common/src/test/java/org/apache/qpid/session/TestSession.java277
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java22
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java5
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java540
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java42
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java1
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java104
-rw-r--r--java/test-profiles/Excludes2
57 files changed, 771 insertions, 2899 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 8141533045..97f999484f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -75,6 +75,7 @@ import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TransportException;
import java.util.ArrayList;
import java.util.Collection;
@@ -137,7 +138,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
-
+
private final AtomicLong _txnStarts = new AtomicLong(0);
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
@@ -201,12 +202,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
-
+
public boolean inTransaction()
{
return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
}
-
+
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -216,7 +217,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_txnCount.compareAndSet(0,1);
}
}
-
+
private void decrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -314,7 +315,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
try
{
_currentMessage.getStoredMessage().flushToStore();
-
+
final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
@@ -385,6 +386,13 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_currentMessage = null;
throw e;
}
+ catch (RuntimeException e)
+ {
+ // we want to make sure we don't keep a reference to the message in the
+ // event of an error
+ _currentMessage = null;
+ throw e;
+ }
}
protected void routeCurrentMessage() throws AMQException
@@ -435,7 +443,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
-
+
Subscription subscription =
SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
@@ -456,6 +464,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_tag2SubscriptionMap.remove(tag);
throw e;
}
+ catch (RuntimeException e)
+ {
+ _tag2SubscriptionMap.remove(tag);
+ throw e;
+ }
return tag;
}
@@ -513,7 +526,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
catch (AMQException e)
{
- _logger.error("Caught AMQException whilst attempting to reque:" + e);
+ _logger.error("Caught AMQException whilst attempting to requeue:" + e);
+ }
+ catch (TransportException e)
+ {
+ _logger.error("Caught TransportException whilst attempting to requeue:" + e);
}
getConfigStore().removeConfiguredObject(this);
@@ -944,7 +961,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
finally
{
_rollingBack = false;
-
+
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
@@ -1425,12 +1442,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
return _createTime;
}
-
+
public void mgmtClose() throws AMQException
{
_session.mgmtCloseChannel(_channelId);
}
-
+
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
if (inTransaction())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index a4e365005e..75891d02a3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -56,7 +56,6 @@ import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.Transport;
-import org.apache.qpid.transport.network.mina.MinaNetworkTransport;
public class Broker
{
@@ -83,7 +82,7 @@ public class Broker
startup(new BrokerOptions());
}
- public void startup(BrokerOptions options) throws Exception
+ public void startup(final BrokerOptions options) throws Exception
{
try
{
@@ -193,9 +192,9 @@ public class Broker
{
for(int port : ports)
{
- final Set<AmqpProtocolVersion> supported =
+ final Set<AmqpProtocolVersion> supported =
getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8);
- final NetworkTransportConfiguration settings =
+ final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP);
final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
@@ -218,12 +217,12 @@ public class Broker
for(int sslPort : sslPorts)
{
- final Set<AmqpProtocolVersion> supported =
+ final Set<AmqpProtocolVersion> supported =
getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8);
- final NetworkTransportConfiguration settings =
+ final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, sslPort, bindAddress.getHostName(), Transport.TCP);
- final IncomingNetworkTransport transport = new MinaNetworkTransport();
+ final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(hostName, supported);
@@ -244,7 +243,7 @@ public class Broker
}
private static Set<AmqpProtocolVersion> getSupportedVersions(final int port, final Set<Integer> exclude_0_10,
- final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9,
+ final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9,
final Set<Integer> exclude_0_8)
{
final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
@@ -268,7 +267,7 @@ public class Broker
return supported;
}
-
+
private File getConfigFile(final String fileName,
final String defaultFileName,
final String qpidHome, boolean throwOnFileNotFound) throws InitException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 3786c2020c..1c01ce465d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -29,6 +29,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.transport.TransportException;
public class ConnectionRegistry implements IConnectionRegistry, Closeable
{
@@ -44,19 +46,24 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
/** Close all of the currently open connections. */
public void close()
{
+ _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
while (!_registry.isEmpty())
{
AMQConnectionModel connection = _registry.get(0);
- closeConnection(connection, AMQConstant.INTERNAL_ERROR, "Broker is shutting down");
+ closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down");
}
}
-
+
public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
{
try
{
connection.close(cause, message);
}
+ catch (TransportException e)
+ {
+ _logger.warn("Error closing connection:" + e.getMessage());
+ }
catch (AMQException e)
{
_logger.warn("Error closing connection:" + e.getMessage());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index dade5d5f54..f8e4eab0b6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -68,5 +68,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
session.writeFrame(responseBody.generateFrame(channelId));
+ session.closeProtocolSession();
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 36e0643f94..88022ba519 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -64,8 +64,6 @@ import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -95,6 +93,7 @@ import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
@@ -132,7 +131,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private Object _lastSent;
protected volatile boolean _closed;
-
+
// maximum number of channels this session should have
private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
@@ -158,22 +157,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private long _writtenBytes;
private long _readBytes;
- private Job _readJob;
- private Job _writeJob;
- private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
private final UUID _id;
private final ConfigStore _configStore;
private long _createTime = System.currentTimeMillis();
-
+
private ApplicationRegistry _registry;
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- private final NetworkConnection _network;
- private final Sender<ByteBuffer> _sender;
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
public ManagedObject getManagedObject()
{
@@ -184,11 +180,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_codecFactory = new AMQCodecFactory(true, this);
- _poolReference.acquireExecutorService();
- _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
- _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
- _network = network;
- _sender = _network.getSender();
+
+ setNetworkConnection(network);
_sessionID = connectionId;
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
@@ -204,6 +197,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
initialiseStatistics();
}
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ setNetworkConnection(network, network.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+ }
+
private AMQProtocolSessionMBean createMBean() throws JMException
{
return new AMQProtocolSessionMBean(this);
@@ -240,26 +244,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+ for (AMQDataBlock dataBlock : dataBlocks)
{
- public void run()
+ try
{
- // Decode buffer
-
- for (AMQDataBlock dataBlock : dataBlocks)
- {
- try
- {
- dataBlockReceived(dataBlock);
- }
- catch (Exception e)
- {
- _logger.error("Unexpected exception when processing datablock", e);
- closeProtocolSession();
- }
- }
+ dataBlockReceived(dataBlock);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
}
- });
+ }
}
catch (Exception e)
{
@@ -337,6 +333,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
closeChannel(channelId);
throw e;
}
+ catch (TransportException e)
+ {
+ closeChannel(channelId);
+ throw e;
+ }
}
finally
{
@@ -368,6 +369,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
mechanisms.getBytes(),
locales.getBytes());
_sender.send(responseBody.generateFrame(0).toNioByteBuffer());
+ _sender.flush();
}
catch (AMQException e)
@@ -375,6 +377,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
_sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
+ _sender.flush();
}
}
@@ -430,19 +433,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
AMQConstant.CHANNEL_ERROR.getName().toString());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce, false);
+ closeConnection(channelId, ce);
}
}
catch (AMQConnectionException e)
{
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, e, false);
+ closeConnection(channelId, e);
}
catch (AMQSecurityException e)
{
AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce, false);
+ closeConnection(channelId, ce);
}
}
catch (Exception e)
@@ -485,19 +488,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
*
* @param frame the frame to write
*/
- public void writeFrame(AMQDataBlock frame)
+ public synchronized void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
final ByteBuffer buf = frame.toNioByteBuffer();
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
- {
- public void run()
- {
- _sender.send(buf);
- }
- });
+ _sender.send(buf);
+ _sender.flush();
}
public AMQShortString getContextKey()
@@ -729,7 +727,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
closeAllChannels();
-
+
getConfigStore().removeConfiguredObject(this);
if (_managedObject != null)
@@ -749,7 +747,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_closed = true;
notifyAll();
}
- _poolReference.releaseExecutorService();
CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
}
}
@@ -772,27 +769,32 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
}
- public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
+ public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
{
- if (_logger.isInfoEnabled())
+ try
{
- _logger.info("Closing connection due to: " + e);
- }
-
- markChannelAwaitingCloseOk(channelId);
- closeSession();
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e);
+ }
- if (closeProtocolSession)
+ markChannelAwaitingCloseOk(channelId);
+ closeSession();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame(channelId));
+ }
+ finally
{
closeProtocolSession();
}
+
+
}
public void closeProtocolSession()
{
- _sender.close();
+ _network.close();
+
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -801,6 +803,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
_logger.info(e.getMessage());
}
+ catch (TransportException e)
+ {
+ _logger.info(e.getMessage());
+ }
}
public String toString()
@@ -923,7 +929,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_virtualHost = virtualHost;
_virtualHost.getConnectionRegistry().registerConnection(this);
-
+
_configStore.addConfiguredObject(this);
try
@@ -960,12 +966,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
_authorizedSubject = authorizedSubject;
}
-
+
public Subject getAuthorizedSubject()
{
return _authorizedSubject;
}
-
+
public Principal getAuthorizedPrincipal()
{
return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
@@ -1001,6 +1007,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
_logger.error("Could not close protocol engine", e);
}
+ catch (TransportException e)
+ {
+ _logger.error("Could not close protocol engine", e);
+ }
}
public void readerIdle()
@@ -1154,7 +1164,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
return false;
}
-
+
public void mgmtClose()
{
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1256,7 +1266,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
new AMQShortString(message),
0,0);
- writeFrame(responseBody.generateFrame((Integer)session.getID()));
+ writeFrame(responseBody.generateFrame((Integer)session.getID()));
}
public void close(AMQConstant cause, String message) throws AMQException
@@ -1264,12 +1274,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
getProtocolOutputConverter().getProtocolMajorVersion(),
getProtocolOutputConverter().getProtocolMinorVersion(),
- (Throwable) null), true);
+ (Throwable) null));
}
public List<AMQSessionModel> getSessionModels()
{
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
for (AMQChannel channel : getChannels())
{
sessions.add((AMQSessionModel) channel);
@@ -1301,27 +1311,27 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
_virtualHost.registerMessageReceived(messageSize, timestamp);
}
-
+
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
-
+
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
-
+
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
-
+
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
-
+
public void resetStatistics()
{
_messagesDelivered.reset();
@@ -1334,7 +1344,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
_registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
+
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
_dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
_messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 9116bf2767..c1b5b02f8f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -163,8 +163,10 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
void closeSession() throws AMQException;
+ void closeProtocolSession();
+
/** This must be called to close the session in order to free up any resources managed by the session. */
- void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
+ void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
/** @return a key that uniquely identifies this session */
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index bd1c19d30e..7033bf755d 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -44,7 +44,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private IApplicationRegistry _appRegistry;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
-
+
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
@@ -53,14 +53,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
NetworkConnection network,
long id)
{
+ this(appRegistry,fqdn,supported,id);
+ setNetworkConnection(network);
+ }
+
+ public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
+ String fqdn,
+ Set<AmqpProtocolVersion> supported,
+ long id)
+ {
_id = id;
_appRegistry = appRegistry;
_fqdn = fqdn;
_supported = supported;
- _network = network;
- _sender = _network.getSender();
+
}
+
public SocketAddress getRemoteAddress()
{
return _delegate.getRemoteAddress();
@@ -96,6 +105,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_delegate.readerIdle();
}
+
public void received(ByteBuffer msg)
{
_delegate.received(msg);
@@ -158,6 +168,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
(byte) 10
};
+ public void setNetworkConnection(NetworkConnection networkConnection)
+ {
+ setNetworkConnection(networkConnection, networkConnection.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+ }
+
+
private static interface DelegateCreator
{
AmqpProtocolVersion getVersion();
@@ -302,6 +324,11 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+
+ }
+
public long getConnectionId()
{
return _id;
@@ -380,14 +407,19 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
if(newDelegate == null)
{
_sender.send(ByteBuffer.wrap(newestSupported));
+ _sender.flush();
_delegate = new ClosedDelegateProtocolEngine();
+
+ _network.close();
+
}
else
{
_delegate = newDelegate;
_header.flip();
+ _delegate.setNetworkConnection(_network, _sender);
_delegate.received(_header);
if(msg.hasRemaining())
{
@@ -423,5 +455,10 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
}
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 8a7159bdc2..7e327b221f 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -48,4 +48,10 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
}
+
+ public ServerProtocolEngine newProtocolEngine()
+ {
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement());
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index 3ff650a480..48a8a1bf42 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
@@ -31,6 +32,7 @@ import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.registry.IApplicationRegistry;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import java.util.UUID;
public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine, ConnectionConfig
@@ -52,11 +54,16 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
super(new Assembler(conn));
_connection = conn;
_connection.setConnectionConfig(this);
- _network = network;
+
_id = appRegistry.getConfigStore().createId();
_appRegistry = appRegistry;
- _connection.setSender(new Disassembler(_network.getSender(), MAX_FRAME_SIZE));
+ if(network != null)
+ {
+ setNetworkConnection(network);
+ }
+
+
_connection.onOpen(new Runnable()
{
public void run()
@@ -65,6 +72,19 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
});
+ }
+
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ setNetworkConnection(network, network.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ _network = network;
+
+ _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+
// FIXME Two log messages to maintain compatibility with earlier protocol versions
_connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
_connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
@@ -186,7 +206,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
return false;
}
-
+
public void mgmtClose()
{
_connection.mgmtClose();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index c058d0b0d8..5a411c6807 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -196,7 +196,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
return _closed;
}
- public void closeProtocolSession(boolean waitLast)
+ public void closeProtocolSession()
{
// Override as we don't have a real IOSession to close.
// The alternative is to fully implement the TestIOSession to return a CloseFuture from close();
diff --git a/java/build.xml b/java/build.xml
index 9d07f4e4cc..84baa4237e 100644
--- a/java/build.xml
+++ b/java/build.xml
@@ -77,7 +77,7 @@
<iterate target="compile-tests"/>
</target>
- <target name="test" description="execute tests">
+ <target name="test" description="execute tests" depends="manifest">
<delete file="${build.failed}"/>
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 6b9cf909a8..6dfbb969e7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -1,6 +1,6 @@
package org.apache.qpid.client;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.qpid.client;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
@@ -43,6 +43,7 @@ import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionCloseCode;
import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ConnectionSettings;
@@ -58,7 +59,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
* This class logger.
*/
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
-
+
/**
* The name of the UUID property
*/
@@ -273,10 +274,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
ConnectionClose close = exc.getClose();
- if (close == null)
+ if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED)
{
_conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
-
+
try
{
if (_conn.firePreFailover(false) && _conn.attemptReconnection())
@@ -345,12 +346,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
return ProtocolVersion.v0_10;
}
-
+
public String getUUID()
{
- return (String)_qpidConnection.getServerProperties().get(UUID_NAME);
+ return (String)_qpidConnection.getServerProperties().get(UUID_NAME);
}
-
+
private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)
{
ConnectionSettings conSettings = brokerDetail.buildConnectionSettings();
@@ -358,7 +359,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
conSettings.setVhost(_conn.getVirtualHost());
conSettings.setUsername(_conn.getUsername());
conSettings.setPassword(_conn.getPassword());
-
+
// Pass client name from connection URL
Map<String, Object> clientProps = new HashMap<String, Object>();
try
@@ -375,7 +376,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return conSettings;
}
-
+
// The idle_timeout prop is in milisecs while
// the new heartbeat prop is in secs
private int getHeartbeatInterval(BrokerDetails brokerDetail)
@@ -390,7 +391,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT));
}
- else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null)
+ else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null)
{
heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
_logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 8bc9889050..948f5178a6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.security.GeneralSecurityException;
+import java.security.Security;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -55,6 +56,8 @@ import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,9 +121,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
+ SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
+
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
- NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslContext);
- _conn._protocolHandler.setNetworkConnection(network);
+ NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn._protocolHandler), sslContext);
+ _conn._protocolHandler.setNetworkConnection(network, securityLayer.sender(network.getSender()));
_conn._protocolHandler.getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
index e9e52cc97c..28d19ce817 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
@@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory;
* <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
* </table>
*
- * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
- * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
+ * @todo Another continuation. Could use an interface Continuation (as described in other todos)
+ * Then have a wrapping continuation (this), which blocks on an arbitrary
* Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
* Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
* to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 34c6468629..6d6cd9cae5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -31,7 +31,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -57,8 +56,6 @@ import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -169,14 +166,12 @@ public class AMQProtocolHandler implements ProtocolEngine
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
private AMQCodecFactory _codecFactory;
- private Job _readJob;
- private Job _writeJob;
- private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+
private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
private long _readBytes;
- private NetworkTransport _transport;
+
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
@@ -191,24 +186,6 @@ public class AMQProtocolHandler implements ProtocolEngine
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
_codecFactory = new AMQCodecFactory(false, _protocolSession);
- _poolReference.setThreadFactory(new ThreadFactory()
- {
-
- public Thread newThread(final Runnable runnable)
- {
- try
- {
- return Threading.getThreadFactory().createThread(runnable);
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to create thread", e);
- }
- }
- });
- _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
- _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
- _poolReference.acquireExecutorService();
_failoverHandler = new FailoverHandler(this);
}
@@ -329,17 +306,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
else
{
-
- if (cause instanceof ProtocolCodecException)
- {
- _logger.info("Protocol Exception caught NOT going to attempt failover as " +
- "cause isn't AMQConnectionClosedException: " + cause, cause);
-
- AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToAllWaiters(amqe);
- }
_connection.exceptionReceived(cause);
-
}
// FIXME Need to correctly handle other exceptions. Things like ...
@@ -433,76 +400,63 @@ public class AMQProtocolHandler implements ProtocolEngine
public void received(ByteBuffer msg)
{
+ _readBytes += msg.remaining();
try
{
- _readBytes += msg.remaining();
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+ // Decode buffer
+
+ for (AMQDataBlock message : dataBlocks)
{
- public void run()
- {
- // Decode buffer
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
- for (AMQDataBlock message : dataBlocks)
+ if(message instanceof AMQFrame)
{
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
- try
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
-
- if(message instanceof AMQFrame)
- {
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
-
- if (debug && ((msgNumber % 1000) == 0))
- {
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
- bodyFrame.handle(frame.getChannel(), _protocolSession);
-
- _connection.bytesReceived(_readBytes);
- }
- else if (message instanceof ProtocolInitiation)
- {
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- _suggestedProtocolVersion = protocolInit.checkVersion();
- _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
-
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
- }
- }
- catch (Exception e)
+ if (debug && ((msgNumber % 1000) == 0))
{
- _logger.error("Exception processing frame", e);
- propagateExceptionToFrameListeners(e);
- exception(e);
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
+
+ AMQFrame frame = (AMQFrame) message;
+
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+ _connection.bytesReceived(_readBytes);
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ _suggestedProtocolVersion = protocolInit.checkVersion();
+ _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
+
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
}
- });
}
catch (Exception e)
{
+ _logger.error("Exception processing frame", e);
propagateExceptionToFrameListeners(e);
exception(e);
}
+
+
}
public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
@@ -568,17 +522,13 @@ public class AMQProtocolHandler implements ProtocolEngine
writeFrame(frame, false);
}
- public void writeFrame(AMQDataBlock frame, boolean wait)
+ public synchronized void writeFrame(AMQDataBlock frame, boolean wait)
{
final ByteBuffer buf = frame.toNioByteBuffer();
_writtenBytes += buf.remaining();
- Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
- {
- public void run()
- {
- _sender.send(buf);
- }
- });
+ _sender.send(buf);
+ _sender.flush();
+
if (PROTOCOL_DEBUG)
{
_protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
@@ -595,10 +545,6 @@ public class AMQProtocolHandler implements ProtocolEngine
_connection.bytesSent(_writtenBytes);
- if (wait)
- {
- _sender.flush();
- }
}
/**
@@ -723,7 +669,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
}
}
- _poolReference.releaseExecutorService();
+
}
/** @return the number of bytes read from this protocol session */
@@ -841,8 +787,13 @@ public class AMQProtocolHandler implements ProtocolEngine
public void setNetworkConnection(NetworkConnection network)
{
+ setNetworkConnection(network, network.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
_network = network;
- _sender = network.getSender();
+ _sender = sender;
}
/** @param delay delay in seconds (not ms) */
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java
deleted file mode 100644
index bbd0a7b144..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.apache.mina.common.IoFilterAdapter;
-import org.apache.mina.common.IoSession;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A MINA filter that monitors the numbers of messages pending to be sent by MINA. It outputs a message
- * when a threshold has been exceeded, and has a frequency configuration so that messages are not output
- * too often.
- *
- */
-public class ProtocolBufferMonitorFilter extends IoFilterAdapter
-{
- private static final Logger _logger = LoggerFactory.getLogger(ProtocolBufferMonitorFilter.class);
-
- public static final long DEFAULT_FREQUENCY = 5000;
-
- public static final int DEFAULT_THRESHOLD = 3000;
-
- private int _bufferedMessages = 0;
-
- private int _threshold;
-
- private long _lastMessageOutputTime;
-
- private long _outputFrequencyInMillis;
-
- public ProtocolBufferMonitorFilter()
- {
- _threshold = DEFAULT_THRESHOLD;
- _outputFrequencyInMillis = DEFAULT_FREQUENCY;
- }
-
- public ProtocolBufferMonitorFilter(int threshold, long frequency)
- {
- _threshold = threshold;
- _outputFrequencyInMillis = frequency;
- }
-
- public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception
- {
- _bufferedMessages++;
- if (_bufferedMessages > _threshold)
- {
- long now = System.currentTimeMillis();
- if ((now - _lastMessageOutputTime) > _outputFrequencyInMillis)
- {
- _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: "
- + _bufferedMessages);
- _lastMessageOutputTime = now;
- }
- }
-
- nextFilter.messageReceived(session, message);
- }
-
- public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
- {
- _bufferedMessages--;
- nextFilter.messageSent(session, message);
- }
-
- public int getBufferedMessages()
- {
- return _bufferedMessages;
- }
-
- public int getThreshold()
- {
- return _threshold;
- }
-
- public void setThreshold(int threshold)
- {
- _threshold = threshold;
- }
-
- public long getOutputFrequencyInMillis()
- {
- return _outputFrequencyInMillis;
- }
-
- public void setOutputFrequencyInMillis(long outputFrequencyInMillis)
- {
- _outputFrequencyInMillis = outputFrequencyInMillis;
- }
-
- public long getLastMessageOutputTime()
- {
- return _lastMessageOutputTime;
- }
-}
diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java b/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java
deleted file mode 100644
index f0938a4bc0..0000000000
--- a/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.apache.mina.common.*;
-import org.apache.mina.common.support.DefaultCloseFuture;
-import org.apache.mina.common.support.DefaultWriteFuture;
-import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-
-import java.net.SocketAddress;
-import java.net.InetSocketAddress;
-import java.util.Set;
-
-public class MockIoSession implements IoSession
-{
- private AMQProtocolSession _protocolSession;
-
- /**
- * Stores the last response written
- */
- private Object _lastWrittenObject;
-
- private boolean _closing;
- private IoFilterChain _filterChain;
-
- public MockIoSession()
- {
- _filterChain = new AbstractIoFilterChain(this)
- {
- protected void doWrite(IoSession ioSession, IoFilter.WriteRequest writeRequest) throws Exception
- {
-
- }
-
- protected void doClose(IoSession ioSession) throws Exception
- {
-
- }
- };
- }
-
- public Object getLastWrittenObject()
- {
- return _lastWrittenObject;
- }
-
- public IoService getService()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoServiceConfig getServiceConfig()
- {
- return null;
- }
-
- public IoHandler getHandler()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoSessionConfig getConfig()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoFilterChain getFilterChain()
- {
- return _filterChain;
- }
-
- public WriteFuture write(Object message)
- {
- WriteFuture wf = new DefaultWriteFuture(null);
- _lastWrittenObject = message;
- return wf;
- }
-
- public CloseFuture close()
- {
- _closing = true;
- CloseFuture cf = new DefaultCloseFuture(null);
- cf.setClosed();
- return cf;
- }
-
- public Object getAttachment()
- {
- return _protocolSession;
- }
-
- public Object setAttachment(Object attachment)
- {
- Object current = _protocolSession;
- _protocolSession = (AMQProtocolSession) attachment;
- return current;
- }
-
- public Object getAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object setAttribute(String key, Object value)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object setAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object removeAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean containsAttribute(String key)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Set getAttributeKeys()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public TransportType getTransportType()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isConnected()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isClosing()
- {
- return _closing;
- }
-
- public CloseFuture getCloseFuture()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getRemoteAddress()
- {
- return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getLocalAddress()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getServiceAddress()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getIdleTime(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getIdleTimeInMillis(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setIdleTime(IdleStatus status, int idleTime)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getWriteTimeout()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getWriteTimeoutInMillis()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setWriteTimeout(int writeTimeout)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public TrafficMask getTrafficMask()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setTrafficMask(TrafficMask trafficMask)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void suspendRead()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void suspendWrite()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void resumeRead()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void resumeWrite()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getReadBytes()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getWrittenBytes()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getReadMessages()
- {
- return 0L;
- }
-
- public long getWrittenMessages()
- {
- return 0L;
- }
-
- public long getWrittenWriteRequests()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getScheduledWriteRequests()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getScheduledWriteBytes()
- {
- return 0; //TODO
- }
-
- public long getCreationTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastIoTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastReadTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastWriteTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isIdle(IdleStatus status)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getIdleCount(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastIdleTime(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
index 591dbd085b..c81af9760b 100644
--- a/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.codec;
-import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
@@ -31,14 +28,11 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations.
- * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
* <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
* </table>
*/
-public class AMQCodecFactory implements ProtocolCodecFactory
+public class AMQCodecFactory
{
- /** Holds the protocol encoder. */
- private final AMQEncoder _encoder = new AMQEncoder();
/** Holds the protocol decoder. */
private final AMQDecoder _frameDecoder;
@@ -56,15 +50,6 @@ public class AMQCodecFactory implements ProtocolCodecFactory
_frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
}
- /**
- * Gets the AMQP encoder.
- *
- * @return The AMQP encoder.
- */
- public ProtocolEncoder getEncoder()
- {
- return _encoder;
- }
/**
* Gets the AMQP decoder.
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index 281c0761d9..7732ff2fd5 100644
--- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -23,10 +23,7 @@ package org.apache.qpid.codec;
import java.util.ArrayList;
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
@@ -54,11 +51,8 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
* @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
* per-session overhead.
*/
-public class AMQDecoder extends CumulativeProtocolDecoder
+public class AMQDecoder
{
-
- private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
-
/** Holds the 'normal' AMQP data decoder. */
private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
@@ -84,98 +78,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder
_bodyFactory = new AMQMethodBodyFactory(session);
}
- /**
- * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
- * intiation decoders.
- *
- * @param session The Mina session.
- * @param in The raw byte buffer.
- * @param out The Mina object output gatherer to write decoded objects to.
- *
- * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
- *
- * @throws Exception If the data cannot be decoded for any reason.
- */
- protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
- {
-
- boolean decoded;
- if (_expectProtocolInitiation
- || (firstDecode
- && (in.remaining() > 0)
- && (in.get(in.position()) == (byte)'A')))
- {
- decoded = doDecodePI(session, in, out);
- }
- else
- {
- decoded = doDecodeDataBlock(session, in, out);
- }
- if(firstDecode && decoded)
- {
- firstDecode = false;
- }
- return decoded;
- }
-
- /**
- * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
- *
- * @param session The Mina session.
- * @param in The raw byte buffer.
- * @param out The Mina object output gatherer to write decoded objects to.
- *
- * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
- *
- * @throws Exception If the data cannot be decoded for any reason.
- */
- protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
- {
- int pos = in.position();
- boolean enoughData = _dataBlockDecoder.decodable(in.buf());
- in.position(pos);
- if (!enoughData)
- {
- // returning false means it will leave the contents in the buffer and
- // call us again when more data has been read
- return false;
- }
- else
- {
- _dataBlockDecoder.decode(session, in, out);
- return true;
- }
- }
-
- /**
- * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
- *
- * @param session The Mina session.
- * @param in The raw byte buffer.
- * @param out The Mina object output gatherer to write decoded objects to.
- *
- * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
- *
- * @throws Exception If the data cannot be decoded for any reason.
- */
- private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
- {
- boolean enoughData = _piDecoder.decodable(in.buf());
- if (!enoughData)
- {
- // returning false means it will leave the contents in the buffer and
- // call us again when more data has been read
- return false;
- }
- else
- {
- ProtocolInitiation pi = new ProtocolInitiation(in.buf());
- out.write(pi);
-
- return true;
- }
- }
/**
* Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol
@@ -190,97 +93,8 @@ public class AMQDecoder extends CumulativeProtocolDecoder
}
- /**
- * Cumulates content of <tt>in</tt> into internal buffer and forwards
- * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
- * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
- * and the cumulative buffer is compacted after decoding ends.
- *
- * @throws IllegalStateException if your <tt>doDecode()</tt> returned
- * <tt>true</tt> not consuming the cumulative buffer.
- */
- public void decode( IoSession session, ByteBuffer in,
- ProtocolDecoderOutput out ) throws Exception
- {
- ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
- // if we have a session buffer, append data to that otherwise
- // use the buffer read from the network directly
- if( buf != null )
- {
- buf.put( in );
- buf.flip();
- }
- else
- {
- buf = in;
- }
-
- for( ;; )
- {
- int oldPos = buf.position();
- boolean decoded = doDecode( session, buf, out );
- if( decoded )
- {
- if( buf.position() == oldPos )
- {
- throw new IllegalStateException(
- "doDecode() can't return true when buffer is not consumed." );
- }
-
- if( !buf.hasRemaining() )
- {
- break;
- }
- }
- else
- {
- break;
- }
- }
-
- // if there is any data left that cannot be decoded, we store
- // it in a buffer in the session and next time this decoder is
- // invoked the session buffer gets appended to
- if ( buf.hasRemaining() )
- {
- storeRemainingInSession( buf, session );
- }
- else
- {
- removeSessionBuffer( session );
- }
- }
-
- /**
- * Releases the cumulative buffer used by the specified <tt>session</tt>.
- * Please don't forget to call <tt>super.dispose( session )</tt> when
- * you override this method.
- */
- public void dispose( IoSession session ) throws Exception
- {
- removeSessionBuffer( session );
- }
-
- private void removeSessionBuffer(IoSession session)
- {
- ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
- if( buf != null )
- {
- buf.release();
- session.removeAttribute( BUFFER );
- }
- }
-
private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
- private void storeRemainingInSession(ByteBuffer buf, IoSession session)
- {
- ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
- remainingBuf.setAutoExpand( true );
- remainingBuf.put( buf );
- session.setAttribute( BUFFER, remainingBuf );
- }
-
public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
{
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java
deleted file mode 100644
index 53f48ae1c8..0000000000
--- a/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.codec;
-
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolEncoder;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-
-import org.apache.qpid.framing.AMQDataBlockEncoder;
-
-/**
- * AMQEncoder delegates encoding of AMQP to a data encoder.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Delegate AMQP encoding. <td> {@link AMQDataBlockEncoder}
- * </table>
- *
- * @todo This class just delegates to another, so seems to be pointless. Unless it is going to handle some
- * responsibilities in the future, then drop it.
- */
-public class AMQEncoder implements ProtocolEncoder
-{
- /** The data encoder that is delegated to. */
- private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder();
-
- /**
- * Encodes AMQP.
- *
- * @param session The Mina session.
- * @param message The data object to encode.
- * @param out The Mina writer to output the raw byte data to.
- *
- * @throws Exception If the data cannot be encoded for any reason.
- */
- public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
- {
- _dataBlockEncoder.encode(session, message, out);
- }
-
- /**
- * Does nothing. Called by Mina to allow this to clean up resources when it is no longer needed.
- *
- * @param session The Mina session.
- */
- public void dispose(IoSession session)
- { }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 228867b2b0..0187fa96a9 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -21,17 +21,12 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQDataBlockDecoder
{
- private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
@@ -106,19 +101,6 @@ public class AMQDataBlockDecoder
return frame;
}
- public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
- {
- AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
- if (bodyFactory == null)
- {
- AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
- bodyFactory = new AMQMethodBodyFactory(protocolSession);
- session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
- }
-
- out.write(createAndPopulateFrame(bodyFactory, in));
- }
-
public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
{
return decodable(msg.buf());
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
index 374644b4f2..d3b8ecf8bd 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
@@ -21,9 +21,6 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.apache.mina.filter.codec.demux.MessageEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Set;
-public final class AMQDataBlockEncoder implements MessageEncoder
+public final class AMQDataBlockEncoder
{
private static final Logger _logger = LoggerFactory.getLogger(AMQDataBlockEncoder.class);
@@ -40,19 +37,6 @@ public final class AMQDataBlockEncoder implements MessageEncoder
public AMQDataBlockEncoder()
{ }
- public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
- {
- final AMQDataBlock frame = (AMQDataBlock) message;
-
- final ByteBuffer buffer = frame.toByteBuffer();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
- }
-
- out.write(buffer);
- }
public Set getMessageTypes()
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 30db3b8be7..7526d4c756 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -138,4 +138,15 @@ public class ContentHeaderBody implements AMQBody
{
properties = props;
}
+
+ @Override
+ public String toString()
+ {
+ return "ContentHeaderBody{" +
+ "classId=" + classId +
+ ", weight=" + weight +
+ ", bodySize=" + bodySize +
+ ", properties=" + properties +
+ '}';
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java
deleted file mode 100644
index 82b600de88..0000000000
--- a/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.pool;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
- * The {@link Event}s themselves provide methods to process themselves, so processing a job simply consists of sequentially
- * processing all of its aggregated events.
- *
- * The constructor accepts a maximum number of events for the job, and only runs up to that maximum number when
- * processing the job, but the add method does not enforce this maximum. In other words, not all the enqueued events
- * may be processed in each run of the job, several runs may be required to clear the queue.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Aggregate many coninuations together into a single continuation.
- * <tr><td> Sequentially process aggregated continuations. <td> {@link Event}
- * <tr><td> Provide running and completion status of the aggregate continuation.
- * <tr><td> Execute a terminal continuation upon job completion. <td> {@link JobCompletionHandler}
- * </table>
- *
- * @todo Could make Job implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as a
- * continuation. Job is a continuation that aggregates other continuations and as such is a usefull re-usable
- * piece of code. There may be other palces than the mina filter chain where continuation batching is used within
- * qpid, so abstracting this out could provide a usefull building block. This also opens the way to different
- * kinds of job with a common interface, e.g. parallel or sequential jobs etc.
- *
- * @todo For better re-usability could make the completion handler optional. Only run it when one is set.
- */
-public class Job implements ReadWriteRunnable
-{
-
- /** Defines the maximum number of events that will be batched into a single job. */
- public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
-
- /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
- private final int _maxEvents;
-
- /** Holds the queue of events that make up the job. */
- private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>();
-
- /** Holds a status flag, that indicates when the job is actively running. */
- private final AtomicBoolean _active = new AtomicBoolean();
-
- private final boolean _readJob;
-
- private ReferenceCountingExecutorService _poolReference;
-
- private final static Logger _logger = LoggerFactory.getLogger(Job.class);
-
- public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob)
- {
- _poolReference = poolReference;
- _maxEvents = maxEvents;
- _readJob = readJob;
- }
-
- /**
- * Enqueus a continuation for sequential processing by this job.
- *
- * @param evt The continuation to enqueue.
- */
- public void add(Runnable evt)
- {
- _eventQueue.add(evt);
- }
-
- /**
- * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job.
- */
- boolean processAll()
- {
- // limit the number of events processed in one run
- int i = _maxEvents;
- while( --i != 0 )
- {
- Runnable e = _eventQueue.poll();
- if (e == null)
- {
- return true;
- }
- else
- {
- e.run();
- }
- }
- return false;
- }
-
- /**
- * Tests if there are no more enqueued continuations to process.
- *
- * @return <tt>true</tt> if there are no enqueued continuations in this job, <tt>false</tt> otherwise.
- */
- public boolean isComplete()
- {
- return _eventQueue.peek() == null;
- }
-
- /**
- * Marks this job as active if it is inactive. This method is thread safe.
- *
- * @return <tt>true</tt> if this job was inactive and has now been marked as active, <tt>false</tt> otherwise.
- */
- public boolean activate()
- {
- return _active.compareAndSet(false, true);
- }
-
- /**
- * Marks this job as inactive. This method is thread safe.
- */
- public void deactivate()
- {
- _active.set(false);
- }
-
- /**
- * Processes a batch of aggregated continuations, marks this job as inactive and call the terminal continuation.
- */
- public void run()
- {
- if(processAll())
- {
- deactivate();
- completed();
- }
- else
- {
- notCompleted();
- }
- }
-
- public boolean isRead()
- {
- return _readJob;
- }
-
- /**
- * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
- *
- * @param job The job.
- * @param event The event to hand off asynchronously.
- */
- public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event)
- {
-
- job.add(event);
-
-
- if(pool == null)
- {
- return;
- }
-
- // rather than perform additional checks on pool to check that it hasn't shutdown.
- // catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate())
- {
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
- }
-
- }
-
- /**
- * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
- * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
- *
- * @param session The Mina session to work in.
- * @param job The job that completed.
- */
- public void completed()
- {
- if (!isComplete())
- {
- final ExecutorService pool = _poolReference.getPool();
-
- if(pool == null)
- {
- return;
- }
-
-
- // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
- // Can the pool be shutdown at this point?
- if (activate())
- {
- try
- {
- pool.execute(this);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
-
- }
- }
- }
-
- public void notCompleted()
- {
- final ExecutorService pool = _poolReference.getPool();
-
- if(pool == null)
- {
- return;
- }
-
- try
- {
- pool.execute(this);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index f0f2652ce3..f9f6ca8444 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -80,7 +80,7 @@ public final class AMQConstant
/**
* An operator intervened to close the connection for some reason. The client may retry at some later date.
*/
- public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
+ public static final AMQConstant CONNECTION_FORCED = new AMQConstant(320, "connection forced", true);
/** The client tried to work with an unknown virtual host or cluster. */
public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 48a3df734a..fd651a2b66 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -21,8 +21,11 @@
package org.apache.qpid.protocol;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
/**
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
@@ -53,4 +56,6 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
void readerIdle();
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+
} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
index 4e40b78440..7378edff0c 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
@@ -26,6 +26,6 @@ public interface ProtocolEngineFactory
{
// Returns a new instance of a ProtocolEngine
- ProtocolEngine newProtocolEngine(NetworkConnection network);
+ ProtocolEngine newProtocolEngine();
} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index c8b7ad2a5e..e421f06901 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -210,10 +210,7 @@ public class ClientDelegate extends ConnectionDelegate
}
else if (sc.getMechanismName().equals("EXTERNAL"))
{
- if (conn.getSecurityLayer() != null)
- {
- conn.setUserID(conn.getSecurityLayer().getUserID());
- }
+ conn.setUserID(conn.getSecurityLayer().getUserID());
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 82a6cdaa67..d6ddbaa061 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -47,6 +47,7 @@ import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
import org.apache.qpid.util.Strings;
@@ -72,6 +73,7 @@ public class Connection extends ConnectionInvoker
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 0;
+
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
static class DefaultConnectionListener implements ConnectionListener
@@ -124,9 +126,9 @@ public class Connection extends ConnectionInvoker
private ConnectionSettings conSettings;
private SecurityLayer securityLayer;
private String _clientId;
-
+
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
-
+
public Connection() {}
public void setConnectionDelegate(ConnectionDelegate delegate)
@@ -239,12 +241,22 @@ public class Connection extends ConnectionInvoker
userID = settings.getUsername();
delegate = new ClientDelegate(settings);
- securityLayer = new SecurityLayer(this);
+ securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
- Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
- NetworkConnection network = transport.connect(settings, receiver, null);
- sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize());
+ Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
+ if(secureReceiver instanceof ConnectionListener)
+ {
+ addConnectionListener((ConnectionListener)secureReceiver);
+ }
+
+ NetworkConnection network = transport.connect(settings, secureReceiver, null);
+ final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
+ if(secureSender instanceof ConnectionListener)
+ {
+ addConnectionListener((ConnectionListener)secureSender);
+ }
+ sender = new Disassembler(secureSender, settings.getMaxFrameSize());
send(new ProtocolHeader(1, 0, 10));
@@ -326,14 +338,14 @@ public class Connection extends ConnectionInvoker
Waiter w = new Waiter(lock, timeout);
while (w.hasTime() && state != OPEN && error == null)
{
- w.await();
+ w.await();
}
-
+
if (state != OPEN)
{
throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
}
-
+
Session ssn = _sessionFactory.newSession(this, name, expiry);
sessions.put(name, ssn);
map(ssn);
@@ -475,13 +487,13 @@ public class Connection extends ConnectionInvoker
ssn.setState(Session.State.CLOSED);
}
else
- {
+ {
map(ssn);
ssn.attach();
ssn.resume();
}
}
-
+
for (Binary ssn_name : transactedSessions)
{
sessions.remove(ssn_name);
@@ -572,12 +584,12 @@ public class Connection extends ConnectionInvoker
{
close(ConnectionCloseCode.NORMAL, null);
}
-
+
public void mgmtClose()
{
close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface.");
}
-
+
public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
{
synchronized (lock)
@@ -680,12 +692,12 @@ public class Connection extends ConnectionInvoker
{
return conSettings;
}
-
+
public SecurityLayer getSecurityLayer()
{
return securityLayer;
}
-
+
public boolean isConnectionResuming()
{
return connectionLost.get();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SenderClosedException.java b/java/common/src/main/java/org/apache/qpid/transport/SenderClosedException.java
new file mode 100644
index 0000000000..924c327861
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/SenderClosedException.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * SenderClosedException
+ *
+ */
+
+public class SenderClosedException extends SenderException
+{
+
+ public SenderClosedException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public SenderClosedException(String message)
+ {
+ super(message);
+ }
+
+ public SenderClosedException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ public void rethrow()
+ {
+ throw new SenderClosedException(getMessage(), this);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
index 1f69973b96..7384702525 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
@@ -29,6 +29,8 @@ public interface NetworkConnection
{
Sender<ByteBuffer> getSender();
+ void start();
+
void close();
/**
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
index 4610c2351e..f71d39c381 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
@@ -27,6 +27,4 @@ package org.apache.qpid.transport.network;
public interface NetworkTransport
{
public void close();
-
- public NetworkConnection getConnection();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
index 0bae46e8eb..c3c248761c 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
@@ -29,5 +29,7 @@ import org.apache.qpid.transport.Receiver;
public interface OutgoingNetworkTransport extends NetworkTransport
{
+ public NetworkConnection getConnection();
+
public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext);
} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
index 2c10a30f10..da4349ba86 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -37,7 +37,6 @@ public class Transport
public static final String QPID_BROKER_TRANSPORT_PROPNAME = "qpid.broker.transport";
// Can't reference the class directly here, as this would preclude the ability to bundle transports separately.
- private static final String MINA_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.mina.MinaNetworkTransport";
private static final String IO_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.io.IoNetworkTransport";
public static final String TCP = "tcp";
@@ -47,9 +46,9 @@ public class Transport
static
{
final Map<ProtocolVersion,String> map = new HashMap<ProtocolVersion, String>();
- map.put(ProtocolVersion.v8_0, MINA_TRANSPORT_CLASSNAME);
- map.put(ProtocolVersion.v0_9, MINA_TRANSPORT_CLASSNAME);
- map.put(ProtocolVersion.v0_91, MINA_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v8_0, IO_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_9, IO_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_91, IO_TRANSPORT_CLASSNAME);
map.put(ProtocolVersion.v0_10, IO_TRANSPORT_CLASSNAME);
OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map);
@@ -58,7 +57,7 @@ public class Transport
public static IncomingNetworkTransport getIncomingTransportInstance()
{
return (IncomingNetworkTransport) loadTransportClass(
- System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, MINA_TRANSPORT_CLASSNAME));
+ System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, IO_TRANSPORT_CLASSNAME));
}
public static OutgoingNetworkTransport getOutgoingTransportInstance(
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
index cca1fc46c9..52cc6363a9 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -37,7 +37,7 @@ public class IoNetworkConnection implements NetworkConnection
private final long _timeout;
private final IoSender _ioSender;
private final IoReceiver _ioReceiver;
-
+
public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
int sendBufferSize, int receiveBufferSize, long timeout)
{
@@ -45,9 +45,15 @@ public class IoNetworkConnection implements NetworkConnection
_timeout = timeout;
_ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout);
+
_ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
+
_ioSender.registerCloseListener(_ioReceiver);
+ }
+
+ public void start()
+ {
_ioReceiver.initiate();
_ioSender.initiate();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index 30e2856c59..7f0f04f9c4 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -21,22 +21,21 @@
package org.apache.qpid.transport.network.io;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
+import java.net.*;
import java.nio.ByteBuffer;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocketFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.util.Logger;
-public class IoNetworkTransport implements OutgoingNetworkTransport
+public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
{
static
{
@@ -51,12 +50,13 @@ public class IoNetworkTransport implements OutgoingNetworkTransport
private Socket _socket;
private IoNetworkConnection _connection;
private long _timeout = 60000;
-
+ private AcceptingThread _acceptor;
+
public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext)
{
int sendBufferSize = settings.getWriteBufferSize();
int receiveBufferSize = settings.getReadBufferSize();
-
+
try
{
_socket = new Socket();
@@ -84,6 +84,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport
try
{
_connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout);
+ _connection.start();
}
catch(Exception e)
{
@@ -104,11 +105,134 @@ public class IoNetworkTransport implements OutgoingNetworkTransport
public void close()
{
- _connection.close();
+ if(_connection != null)
+ {
+ _connection.close();
+ }
+ if(_acceptor != null)
+ {
+ _acceptor.close();
+ }
}
public NetworkConnection getConnection()
{
return _connection;
}
+
+ public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
+ {
+
+ try
+ {
+ _acceptor = new AcceptingThread(config, factory, sslContext);
+
+ _acceptor.start();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Unable to start server socket", e);
+ }
+
+
+ }
+
+ private class AcceptingThread extends Thread
+ {
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContent;
+ private ServerSocket _serverSocket;
+
+ private AcceptingThread(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext)
+ throws IOException
+ {
+ _config = config;
+ _factory = factory;
+ _sslContent = sslContext;
+
+ InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort());
+
+ if(sslContext == null)
+ {
+ _serverSocket = new ServerSocket();
+ }
+ else
+ {
+ SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory();
+ _serverSocket = socketFactory.createServerSocket();
+ }
+
+ _serverSocket.bind(address);
+ _serverSocket.setReuseAddress(true);
+
+
+ }
+
+
+ /**
+ Close the underlying ServerSocket if it has not already been closed.
+ */
+ public void close()
+ {
+ if (!_serverSocket.isClosed())
+ {
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (true)
+ {
+ try
+ {
+ Socket socket = _serverSocket.accept();
+ socket.setTcpNoDelay(_config.getTcpNoDelay());
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socket.setSendBufferSize(sendBufferSize);
+ socket.setReceiveBufferSize(receiveBufferSize);
+
+ ProtocolEngine engine = _factory.newProtocolEngine();
+
+ NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout);
+
+
+ engine.setNetworkConnection(connection, connection.getSender());
+
+ connection.start();
+
+
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error(e, "Error in Acceptor thread " + _config.getPort());
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug(e, "SocketException - no new connections will be accepted on port "
+ + _config.getPort());
+ }
+ }
+
+
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index fea87fc350..d4b5975e54 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -92,13 +92,23 @@ final class IoReceiver implements Runnable, Closeable
{
try
{
- if (shutdownBroken)
+ try
{
- socket.close();
+ if (shutdownBroken)
+ {
+ socket.close();
+ }
+ else
+ {
+ socket.shutdownInput();
+ }
}
- else
+ catch(SocketException se)
{
- socket.shutdownInput();
+ if(!socket.isClosed() && !socket.isInputShutdown())
+ {
+ throw se;
+ }
}
if (block && Thread.currentThread() != receiverThread)
{
@@ -117,6 +127,7 @@ final class IoReceiver implements Runnable, Closeable
{
throw new TransportException(e);
}
+
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 1bb515624c..473d4d95ff 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -59,7 +60,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
private final List<Closeable> _listeners = new ArrayList<Closeable>();
-
+
private volatile Throwable exception = null;
public IoSender(Socket socket, int bufferSize, long timeout)
@@ -80,13 +81,13 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
try
{
//Create but deliberately don't start the thread.
- senderThread = Threading.getThreadFactory().createThread(this);
+ senderThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
throw new Error("Error creating IOSender thread",e);
}
-
+
senderThread.setDaemon(true);
senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
}
@@ -110,7 +111,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
{
if (closed.get())
{
- throw new SenderException("sender is closed", exception);
+ throw new SenderClosedException("sender is closed", exception);
}
final int size = buffer.length;
@@ -143,7 +144,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
if (closed.get())
{
- throw new SenderException("sender is closed", exception);
+ throw new SenderClosedException("sender is closed", exception);
}
if (head - tail >= size)
@@ -255,7 +256,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
public void run()
{
- final int size = buffer.length;
+ final int size = buffer.length;
while (true)
{
final int hd = head;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java
deleted file mode 100644
index 0f433f6eeb..0000000000
--- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
-*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.mina;
-
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-public class MinaNetworkConnection implements NetworkConnection
-{
- private IoSession _session;
- private Sender<ByteBuffer> _sender;
-
- public MinaNetworkConnection(IoSession session)
- {
- _session = session;
- _sender = new MinaSender(_session);
- }
-
- public Sender<ByteBuffer> getSender()
- {
- return _sender;
- }
-
- public void close()
- {
- _session.close();
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _session.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _session.getLocalAddress();
- }
-
- public long getReadBytes()
- {
- return _session.getReadBytes();
- }
-
- public long getWrittenBytes()
- {
- return _session.getWrittenBytes();
- }
-
- public void setMaxWriteIdle(int sec)
- {
- _session.setIdleTime(IdleStatus.WRITER_IDLE, sec);
- }
-
- public void setMaxReadIdle(int sec)
- {
- _session.setIdleTime(IdleStatus.READER_IDLE, sec);
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
deleted file mode 100644
index 0e4492e31b..0000000000
--- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.transport.network.mina;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.util.SessionUtil;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MinaNetworkHandler extends IoHandlerAdapter
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(MinaNetworkHandler.class);
-
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext = null;
- private boolean _useClientMode;
-
- static
- {
- boolean directBuffers = Boolean.getBoolean("amqj.enableDirectBuffers");
- LOGGER.debug("Using " + (directBuffers ? "direct" : "heap") + " buffers");
- ByteBuffer.setUseDirectBuffers(directBuffers);
-
- //override the MINA defaults to prevent use of the PooledByteBufferAllocator
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
- }
-
- public MinaNetworkHandler(SSLContext sslContext, ProtocolEngineFactory factory)
- {
- _sslContext = sslContext;
- _factory = factory;
- if(_factory == null)
- {
- _useClientMode = true;
- }
- }
-
- public MinaNetworkHandler(SSLContext sslContext)
- {
- this(sslContext, null);
- }
-
- public void messageReceived(IoSession session, Object message)
- {
- ProtocolEngine engine = (ProtocolEngine) session.getAttachment();
- ByteBuffer buf = (ByteBuffer) message;
- try
- {
- engine.received(buf.buf());
- }
- catch (RuntimeException re)
- {
- engine.exception(re);
- }
- }
-
- public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception
- {
- ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment();
- if(engine != null)
- {
- LOGGER.error("Exception caught by Mina", throwable);
- engine.exception(throwable);
- }
- else
- {
- LOGGER.error("Exception caught by Mina but without protocol engine to handle it", throwable);
- }
- }
-
- public void sessionCreated(IoSession ioSession) throws Exception
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Created session: " + ioSession.getRemoteAddress());
- }
-
- SessionUtil.initialize(ioSession);
-
- if (_sslContext != null)
- {
- SSLFilter sslFilter = new SSLFilter(_sslContext);
- sslFilter.setUseClientMode(_useClientMode);
-
- ioSession.getFilterChain().addFirst("sslFilter",sslFilter);
- }
-
- if (_factory != null)
- {
- NetworkConnection netConn = new MinaNetworkConnection(ioSession);
-
- ProtocolEngine engine = _factory.newProtocolEngine(netConn);
- ioSession.setAttachment(engine);
- }
- }
-
- public void sessionClosed(IoSession ioSession) throws Exception
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("closed: " + ioSession.getRemoteAddress());
- }
-
- ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment();
- if(engine != null)
- {
- engine.closed();
- }
- else
- {
- LOGGER.error("Unable to close ProtocolEngine as none was present");
- }
- }
-
-
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
- {
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- ((ProtocolEngine) session.getAttachment()).writerIdle();
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- ((ProtocolEngine) session.getAttachment()).readerIdle();
- }
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
deleted file mode 100644
index 85b42da2b2..0000000000
--- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
-*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.mina;
-
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExecutorThreadModel;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.transport.socket.nio.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.NewThreadExecutor;
-
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.thread.QpidThreadExecutor;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SocketConnectorFactory;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
-{
- private static final int UNKNOWN = -1;
- private static final int TCP = 0;
-
- public NetworkConnection _connection;
- private SocketAcceptor _acceptor;
- private InetSocketAddress _address;
-
- public NetworkConnection connect(ConnectionSettings settings,
- Receiver<java.nio.ByteBuffer> delegate, SSLContext sslContext)
- {
- int transport = getTransport(settings.getProtocol());
-
- IoConnectorCreator stc;
- switch(transport)
- {
- case TCP:
- stc = new IoConnectorCreator(new SocketConnectorFactory()
- {
- public IoConnector newConnector()
- {
- return new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
- }
- });
- _connection = stc.connect(delegate, settings, sslContext);
- break;
- case UNKNOWN:
- default:
- throw new TransportException("Unknown protocol: " + settings.getProtocol());
- }
-
- return _connection;
- }
-
- private static int getTransport(String transport)
- {
- if (transport.equals(Transport.TCP))
- {
- return TCP;
- }
-
- return UNKNOWN;
- }
-
- public void close()
- {
- if(_connection != null)
- {
- _connection.close();
- }
- if (_acceptor != null)
- {
- _acceptor.unbindAll();
- }
- }
-
- public NetworkConnection getConnection()
- {
- return _connection;
- }
-
- public void accept(final NetworkTransportConfiguration config, final ProtocolEngineFactory factory,
- final SSLContext sslContext)
- {
- int processors = config.getConnectorProcessors();
-
- if (Transport.TCP.equalsIgnoreCase(config.getTransport()))
- {
- _acceptor = new SocketAcceptor(processors, new NewThreadExecutor());
-
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
- sconfig.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Acceptor)"));
- SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
- sc.setTcpNoDelay(config.getTcpNoDelay());
- sc.setSendBufferSize(config.getSendBufferSize());
- sc.setReceiveBufferSize(config.getReceiveBufferSize());
-
- if (config.getHost().equals(WILDCARD_ADDRESS))
- {
- _address = new InetSocketAddress(config.getPort());
- }
- else
- {
- _address = new InetSocketAddress(config.getHost(), config.getPort());
- }
- }
- else
- {
- throw new TransportException("Unknown transport: " + config.getTransport());
- }
-
- try
- {
- _acceptor.bind(_address, new MinaNetworkHandler(sslContext, factory));
- }
- catch (IOException e)
- {
- throw new TransportException("Could not bind to " + _address, e);
- }
- }
-
-
- private static class IoConnectorCreator
- {
- private static final Logger LOGGER = LoggerFactory.getLogger(IoConnectorCreator.class);
-
- private static final int CLIENT_DEFAULT_BUFFER_SIZE = 32 * 1024;
-
- private SocketConnectorFactory _ioConnectorFactory;
-
- public IoConnectorCreator(SocketConnectorFactory socketConnectorFactory)
- {
- _ioConnectorFactory = socketConnectorFactory;
- }
-
- public NetworkConnection connect(Receiver<java.nio.ByteBuffer> receiver, ConnectionSettings settings, SSLContext sslContext)
- {
- final IoConnector ioConnector = _ioConnectorFactory.newConnector();
- final SocketAddress address;
- final String protocol = settings.getProtocol();
- final int port = settings.getPort();
-
- if (Transport.TCP.equalsIgnoreCase(protocol))
- {
- address = new InetSocketAddress(settings.getHost(), port);
- }
- else
- {
- throw new TransportException("Unknown transport: " + protocol);
- }
-
- LOGGER.info("Attempting connection to " + address);
-
- if (ioConnector instanceof SocketConnector)
- {
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
- cfg.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Client)"));
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay(true);
- scfg.setSendBufferSize(CLIENT_DEFAULT_BUFFER_SIZE);
- scfg.setReceiveBufferSize(CLIENT_DEFAULT_BUFFER_SIZE);
-
- // Don't have the connector's worker thread wait around for other
- // connections (we only use one SocketConnector per connection
- // at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- ((SocketConnector) ioConnector).setWorkerTimeout(0);
- }
-
- ConnectFuture future = ioConnector.connect(address, new MinaNetworkHandler(sslContext), ioConnector.getDefaultConfig());
- future.join();
- if (!future.isConnected())
- {
- throw new TransportException("Could not open connection");
- }
-
- IoSession session = future.getSession();
- session.setAttachment(receiver);
-
- return new MinaNetworkConnection(session);
- }
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
deleted file mode 100644
index be114e2fa1..0000000000
--- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.mina;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
-import org.apache.qpid.transport.Sender;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * MinaSender
- */
-public class MinaSender implements Sender<java.nio.ByteBuffer>
-{
- private static final Logger _log = LoggerFactory.getLogger(MinaSender.class);
-
- private final IoSession _session;
- private WriteFuture _lastWrite;
-
- public MinaSender(IoSession session)
- {
- _session = session;
- }
-
- public synchronized void send(java.nio.ByteBuffer msg)
- {
- _log.debug("sending data:");
- ByteBuffer mina = ByteBuffer.allocate(msg.limit());
- mina.put(msg);
- mina.flip();
- _lastWrite = _session.write(mina);
- _log.debug("sent data:");
- }
-
- public synchronized void flush()
- {
- if (_lastWrite != null)
- {
- _lastWrite.join();
- }
- }
-
- public void close()
- {
- // MINA will sometimes throw away in-progress writes when you ask it to close
- flush();
- CloseFuture closed = _session.close();
- closed.join();
- }
-
- public void setIdleTimeout(int i)
- {
- //TODO:
- //We are instead using the setMax[Read|Write]IdleTime methods in
- //MinaNetworkConnection for this. Should remove this method from
- //sender interface, but currently being used by IoSender for 0-10.
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
index e80f8904a3..9fd65c6e51 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
@@ -27,7 +27,6 @@ import javax.net.ssl.SSLEngine;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
@@ -38,148 +37,12 @@ import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
import org.apache.qpid.transport.network.security.ssl.SSLSender;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-public class SecurityLayer
+public interface SecurityLayer
{
- ConnectionSettings settings;
- Connection con;
- SSLSecurityLayer sslLayer;
- SASLSecurityLayer saslLayer;
- public SecurityLayer(Connection con)
- {
- this.con = con;
- this.settings = con.getConnectionSettings();
- if (settings.isUseSSL())
- {
- sslLayer = new SSLSecurityLayer();
- }
- if (settings.isUseSASLEncryption())
- {
- saslLayer = new SASLSecurityLayer();
- }
- }
+ public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate);
+ public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate);
+ public String getUserID();
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
- {
- Sender<ByteBuffer> sender = delegate;
-
- if (settings.isUseSSL())
- {
- sender = sslLayer.sender(sender);
- }
-
- if (settings.isUseSASLEncryption())
- {
- sender = saslLayer.sender(sender);
- }
-
- return sender;
- }
-
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
- {
- Receiver<ByteBuffer> receiver = delegate;
-
- if (settings.isUseSSL())
- {
- receiver = sslLayer.receiver(receiver);
- }
-
- if (settings.isUseSASLEncryption())
- {
- receiver = saslLayer.receiver(receiver);
- }
-
- return receiver;
- }
-
- public String getUserID()
- {
- if (settings.isUseSSL())
- {
- return sslLayer.getUserID();
- }
- else
- {
- return null;
- }
- }
-
- class SSLSecurityLayer
- {
- final SSLEngine _engine;
- final SSLStatus _sslStatus = new SSLStatus();
-
- public SSLSecurityLayer()
- {
- SSLContext sslCtx;
- try
- {
- sslCtx = SSLContextFactory
- .buildClientContext(settings.getTrustStorePath(),
- settings.getTrustStorePassword(),
- settings.getTrustStoreCertType(),
- settings.getKeyStorePath(),
- settings.getKeyStorePassword(),
- settings.getKeyStoreCertType(),
- settings.getCertAlias());
- }
- catch (Exception e)
- {
- throw new TransportException("Error creating SSL Context", e);
- }
-
- try
- {
- _engine = sslCtx.createSSLEngine();
- _engine.setUseClientMode(true);
- }
- catch(Exception e)
- {
- throw new TransportException("Error creating SSL Engine", e);
- }
- }
-
- public SSLSender sender(Sender<ByteBuffer> delegate)
- {
- SSLSender sender = new SSLSender(_engine, delegate, _sslStatus);
- sender.setConnectionSettings(settings);
- return sender;
- }
-
- public SSLReceiver receiver(Receiver<ByteBuffer> delegate)
- {
- SSLReceiver receiver = new SSLReceiver(_engine, delegate, _sslStatus);
- receiver.setConnectionSettings(settings);
- return receiver;
- }
-
- public String getUserID()
- {
- return SSLUtil.retriveIdentity(_engine);
- }
-
- }
-
- class SASLSecurityLayer
- {
- public SASLSecurityLayer()
- {
- }
-
- public SASLSender sender(Sender<ByteBuffer> delegate)
- {
- SASLSender sender = new SASLSender(delegate);
- con.addConnectionListener((ConnectionListener)sender);
- return sender;
- }
-
- public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
- {
- SASLReceiver receiver = new SASLReceiver(delegate);
- con.addConnectionListener((ConnectionListener)receiver);
- return receiver;
- }
-
- }
}
+
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
new file mode 100644
index 0000000000..17f89c34ef
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
@@ -0,0 +1,161 @@
+package org.apache.qpid.transport.network.security;
+
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
+import org.apache.qpid.transport.network.security.sasl.SASLSender;
+import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
+import org.apache.qpid.transport.network.security.ssl.SSLSender;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import java.nio.ByteBuffer;
+
+public class SecurityLayerFactory
+{
+ public static SecurityLayer newInstance(ConnectionSettings settings)
+ {
+
+ SecurityLayer layer = NullSecurityLayer.getInstance();
+
+ if (settings.isUseSSL())
+ {
+ layer = new SSLSecurityLayer(settings, layer);
+ }
+ if (settings.isUseSASLEncryption())
+ {
+ layer = new SASLSecurityLayer(layer);
+ }
+
+ return layer;
+
+ }
+
+ static class SSLSecurityLayer implements SecurityLayer
+ {
+
+ private final SSLEngine _engine;
+ private final SSLStatus _sslStatus = new SSLStatus();
+ private String _hostname;
+ private SecurityLayer _layer;
+
+
+ public SSLSecurityLayer(ConnectionSettings settings, SecurityLayer layer)
+ {
+
+ SSLContext sslCtx;
+ _layer = layer;
+ try
+ {
+ sslCtx = SSLContextFactory
+ .buildClientContext(settings.getTrustStorePath(),
+ settings.getTrustStorePassword(),
+ settings.getTrustStoreCertType(),
+ settings.getKeyStorePath(),
+ settings.getKeyStorePassword(),
+ settings.getKeyStoreCertType(),
+ settings.getCertAlias());
+ }
+ catch (Exception e)
+ {
+ throw new TransportException("Error creating SSL Context", e);
+ }
+
+ if(settings.isVerifyHostname())
+ {
+ _hostname = settings.getHost();
+ }
+
+ try
+ {
+ _engine = sslCtx.createSSLEngine();
+ _engine.setUseClientMode(true);
+ }
+ catch(Exception e)
+ {
+ throw new TransportException("Error creating SSL Engine", e);
+ }
+
+ }
+
+ public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ {
+ SSLSender sender = new SSLSender(_engine, _layer.sender(delegate), _sslStatus);
+ sender.setHostname(_hostname);
+ return sender;
+ }
+
+ public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ {
+ SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus);
+ receiver.setHostname(_hostname);
+ return receiver;
+ }
+
+ public String getUserID()
+ {
+ return SSLUtil.retriveIdentity(_engine);
+ }
+ }
+
+
+ static class SASLSecurityLayer implements SecurityLayer
+ {
+
+ private SecurityLayer _layer;
+
+ SASLSecurityLayer(SecurityLayer layer)
+ {
+ _layer = layer;
+ }
+
+ public SASLSender sender(Sender<ByteBuffer> delegate)
+ {
+ SASLSender sender = new SASLSender(_layer.sender(delegate));
+ return sender;
+ }
+
+ public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
+ {
+ SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate));
+ return receiver;
+ }
+
+ public String getUserID()
+ {
+ return _layer.getUserID();
+ }
+ }
+
+
+ static class NullSecurityLayer implements SecurityLayer
+ {
+
+ private static final NullSecurityLayer INSTANCE = new NullSecurityLayer();
+
+ private NullSecurityLayer()
+ {
+ }
+
+ public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ {
+ return delegate;
+ }
+
+ public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ {
+ return delegate;
+ }
+
+ public String getUserID()
+ {
+ return null;
+ }
+
+ public static NullSecurityLayer getInstance()
+ {
+ return INSTANCE;
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
index 878f0b2352..8ad40bbfd3 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
@@ -43,10 +43,11 @@ public class SSLReceiver implements Receiver<ByteBuffer>
private final int sslBufSize;
private final ByteBuffer localBuffer;
private final SSLStatus _sslStatus;
- private ConnectionSettings settings;
private ByteBuffer appData;
private boolean dataCached = false;
+ private String _hostname;
+
public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus)
{
this.engine = engine;
@@ -57,9 +58,9 @@ public class SSLReceiver implements Receiver<ByteBuffer>
_sslStatus = sslStatus;
}
- public void setConnectionSettings(ConnectionSettings settings)
+ public void setHostname(String hostname)
{
- this.settings = settings;
+ _hostname = hostname;
}
public void closed()
@@ -166,9 +167,9 @@ public class SSLReceiver implements Receiver<ByteBuffer>
handshakeStatus = engine.getHandshakeStatus();
case FINISHED:
- if (this.settings != null && this.settings.isVerifyHostname() )
+ if (_hostname != null)
{
- SSLUtil.verifyHostname(engine, this.settings.getHost());
+ SSLUtil.verifyHostname(engine, _hostname);
}
case NEED_WRAP:
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
index 5e0ee93cb8..6f5aa6d86e 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -44,8 +44,9 @@ public class SSLSender implements Sender<ByteBuffer>
private final ByteBuffer netData;
private final long timeout;
private final SSLStatus _sslStatus;
- private ConnectionSettings settings;
-
+
+ private String _hostname;
+
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -59,9 +60,9 @@ public class SSLSender implements Sender<ByteBuffer>
_sslStatus = sslStatus;
}
- public void setConnectionSettings(ConnectionSettings settings)
+ public void setHostname(String hostname)
{
- this.settings = settings;
+ _hostname = hostname;
}
public void close()
@@ -237,9 +238,9 @@ public class SSLSender implements Sender<ByteBuffer>
break;
case FINISHED:
- if (this.settings != null && this.settings.isVerifyHostname() )
+ if (_hostname != null)
{
- SSLUtil.verifyHostname(engine, this.settings.getHost());
+ SSLUtil.verifyHostname(engine, _hostname);
}
case NOT_HANDSHAKING:
diff --git a/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/java/common/src/test/java/org/apache/qpid/session/TestSession.java
deleted file mode 100644
index aafc91b03b..0000000000
--- a/java/common/src/test/java/org/apache/qpid/session/TestSession.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.session;
-
-import org.apache.mina.common.*;
-
-import java.net.SocketAddress;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class TestSession implements IoSession
-{
- private final ConcurrentMap attributes = new ConcurrentHashMap();
-
- public TestSession()
- {
- }
-
- public IoService getService()
- {
- return null; //TODO
- }
-
- public IoServiceConfig getServiceConfig()
- {
- return null; //TODO
- }
-
- public IoHandler getHandler()
- {
- return null; //TODO
- }
-
- public IoSessionConfig getConfig()
- {
- return null; //TODO
- }
-
- public IoFilterChain getFilterChain()
- {
- return null; //TODO
- }
-
- public WriteFuture write(Object message)
- {
- return null; //TODO
- }
-
- public CloseFuture close()
- {
- return null; //TODO
- }
-
- public Object getAttachment()
- {
- return getAttribute("");
- }
-
- public Object setAttachment(Object attachment)
- {
- return setAttribute("",attachment);
- }
-
- public Object getAttribute(String key)
- {
- return attributes.get(key);
- }
-
- public Object setAttribute(String key, Object value)
- {
- return attributes.put(key,value);
- }
-
- public Object setAttribute(String key)
- {
- return attributes.put(key, Boolean.TRUE);
- }
-
- public Object removeAttribute(String key)
- {
- return attributes.remove(key);
- }
-
- public boolean containsAttribute(String key)
- {
- return attributes.containsKey(key);
- }
-
- public Set getAttributeKeys()
- {
- return attributes.keySet();
- }
-
- public TransportType getTransportType()
- {
- return null; //TODO
- }
-
- public boolean isConnected()
- {
- return false; //TODO
- }
-
- public boolean isClosing()
- {
- return false; //TODO
- }
-
- public CloseFuture getCloseFuture()
- {
- return null; //TODO
- }
-
- public SocketAddress getRemoteAddress()
- {
- return null; //TODO
- }
-
- public SocketAddress getLocalAddress()
- {
- return null; //TODO
- }
-
- public SocketAddress getServiceAddress()
- {
- return null; //TODO
- }
-
- public int getIdleTime(IdleStatus status)
- {
- return 0; //TODO
- }
-
- public long getIdleTimeInMillis(IdleStatus status)
- {
- return 0; //TODO
- }
-
- public void setIdleTime(IdleStatus status, int idleTime)
- {
- //TODO
- }
-
- public int getWriteTimeout()
- {
- return 0; //TODO
- }
-
- public long getWriteTimeoutInMillis()
- {
- return 0; //TODO
- }
-
- public void setWriteTimeout(int writeTimeout)
- {
- //TODO
- }
-
- public TrafficMask getTrafficMask()
- {
- return null; //TODO
- }
-
- public void setTrafficMask(TrafficMask trafficMask)
- {
- //TODO
- }
-
- public void suspendRead()
- {
- //TODO
- }
-
- public void suspendWrite()
- {
- //TODO
- }
-
- public void resumeRead()
- {
- //TODO
- }
-
- public void resumeWrite()
- {
- //TODO
- }
-
- public long getReadBytes()
- {
- return 0; //TODO
- }
-
- public long getWrittenBytes()
- {
- return 0; //TODO
- }
-
- public long getReadMessages()
- {
- return 0;
- }
-
- public long getWrittenMessages()
- {
- return 0;
- }
-
- public long getWrittenWriteRequests()
- {
- return 0; //TODO
- }
-
- public int getScheduledWriteRequests()
- {
- return 0; //TODO
- }
-
- public int getScheduledWriteBytes()
- {
- return 0; //TODO
- }
-
- public long getCreationTime()
- {
- return 0; //TODO
- }
-
- public long getLastIoTime()
- {
- return 0; //TODO
- }
-
- public long getLastReadTime()
- {
- return 0; //TODO
- }
-
- public long getLastWriteTime()
- {
- return 0; //TODO
- }
-
- public boolean isIdle(IdleStatus status)
- {
- return false; //TODO
- }
-
- public int getIdleCount(IdleStatus status)
- {
- return 0; //TODO
- }
-
- public long getLastIdleTime(IdleStatus status)
- {
- return 0; //TODO
- }
-}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
index 8686c17414..8533c64fab 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
@@ -49,10 +49,12 @@ public class TestNetworkConnection implements NetworkConnection
_sender = new MockSender();
}
+
+
public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException
{
-
+
}
public SocketAddress getLocalAddress()
@@ -68,37 +70,37 @@ public class TestNetworkConnection implements NetworkConnection
public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkTransportConfiguration config,
SSLContextFactory sslFactory) throws OpenException
{
-
+
}
public void setMaxReadIdle(int idleTime)
{
-
+
}
public void setMaxWriteIdle(int idleTime)
{
-
+
}
public void close()
{
-
+
}
public void flush()
{
-
+
}
public void send(ByteBuffer msg)
{
-
+
}
public void setIdleTimeout(int i)
{
-
+
}
public void setPort(int port)
@@ -135,4 +137,8 @@ public class TestNetworkConnection implements NetworkConnection
{
return _sender;
}
+
+ public void start()
+ {
+ }
}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
index d2fab7d163..7039b904e3 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
@@ -33,7 +33,6 @@ import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.io.IoNetworkTransport;
-import org.apache.qpid.transport.network.mina.MinaNetworkTransport;
public class TransportTest extends QpidTestCase
{
@@ -44,7 +43,7 @@ public class TransportTest extends QpidTestCase
{
final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v8_0);
assertNotNull(networkTransport);
- assertTrue(networkTransport instanceof MinaNetworkTransport);
+ assertTrue(networkTransport instanceof IoNetworkTransport);
}
public void testGloballyOverriddenOutgoingTransportForv0_8() throws Exception
@@ -76,7 +75,7 @@ public class TransportTest extends QpidTestCase
{
final IncomingNetworkTransport networkTransport = Transport.getIncomingTransportInstance();
assertNotNull(networkTransport);
- assertTrue(networkTransport instanceof MinaNetworkTransport);
+ assertTrue(networkTransport instanceof IoNetworkTransport);
}
public void testOverriddenGetIncomingTransport() throws Exception
diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
deleted file mode 100644
index 976b141fc0..0000000000
--- a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
+++ /dev/null
@@ -1,540 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.transport.network.mina;
-
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
-
-public class MinaNetworkHandlerTest extends QpidTestCase
-{
-
- private static final String TEST_DATA = "YHALOTHAR";
- private int _testPort;
- private IncomingNetworkTransport _server;
- private OutgoingNetworkTransport _client;
- private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read
- private Exception _thrownEx;
- private ConnectionSettings _clientSettings;
- private NetworkConnection _network;
- private TestNetworkTransportConfiguration _brokerSettings;
-
- @Override
- public void setUp() throws Exception
- {
- String host = InetAddress.getLocalHost().getHostName();
- _testPort = findFreePort();
-
- _clientSettings = new ConnectionSettings();
- _clientSettings.setHost(host);
- _clientSettings.setPort(_testPort);
-
- _brokerSettings = new TestNetworkTransportConfiguration(_testPort, host);
-
- _server = new MinaNetworkTransport();
- _client = new MinaNetworkTransport();
- _thrownEx = null;
- _countingEngine = new CountingProtocolEngine();
- }
-
- @Override
- public void tearDown()
- {
- if (_server != null)
- {
- _server.close();
- }
-
- if (_client != null)
- {
- _client.close();
- }
- }
-
- /**
- * Tests that a socket can't be opened if a driver hasn't been bound
- * to the port and can be opened if a driver has been bound.
- */
- public void testBindOpen() throws Exception
- {
- try
- {
- _client.connect(_clientSettings, _countingEngine, null);
- }
- catch (TransportException e)
- {
- _thrownEx = e;
- }
-
- assertNotNull("Open should have failed since no engine bound", _thrownEx);
-
- _server.accept(_brokerSettings, null, null);
-
- _client.connect(_clientSettings, _countingEngine, null);
- }
-
- /**
- * Tests that a socket can't be opened after a bound NetworkDriver has been closed
- */
- public void testBindOpenCloseOpen() throws Exception
- {
- _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
- _client.connect(_clientSettings, _countingEngine, null);
- _client.close();
- _server.close();
-
- try
- {
- _client.connect(_clientSettings, _countingEngine, null);
- }
- catch (TransportException e)
- {
- _thrownEx = e;
- }
- assertNotNull("Open should have failed", _thrownEx);
- }
-
- /**
- * Checks that the right exception is thrown when binding a NetworkDriver to an already
- * existing socket.
- */
- public void testBindPortInUse()
- {
- try
- {
- _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
- }
- catch (TransportException e)
- {
- fail("First bind should not fail");
- }
-
- try
- {
- IncomingNetworkTransport second = new MinaNetworkTransport();
- second.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
- }
- catch (TransportException e)
- {
- _thrownEx = e;
- }
- assertNotNull("Second bind should throw BindException", _thrownEx);
- }
-
- /**
- * Tests that binding to the wildcard address succeeds and a client can
- * connect via localhost.
- */
- public void testWildcardBind() throws Exception
- {
- TestNetworkTransportConfiguration serverSettings =
- new TestNetworkTransportConfiguration(_testPort, WILDCARD_ADDRESS);
-
- _server.accept(serverSettings, null, null);
-
- try
- {
- _client.connect(_clientSettings, _countingEngine, null);
- }
- catch (TransportException e)
- {
- fail("Open should have succeeded since we used a wildcard bind");
- }
- }
-
- /**
- * tests that bytes sent on a network driver are received at the other end
- */
- public void testSend() throws Exception
- {
- // Open a connection from a counting engine to an echo engine
- _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
- _network = _client.connect(_clientSettings, _countingEngine, null);
-
- // Tell the counting engine how much data we're sending
- _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
-
- // Send the data and wait for up to 2 seconds to get it back
- _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes()));
- _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
-
- // Check what we got
- assertEquals("Wrong amount of data recieved", TEST_DATA.getBytes().length, _countingEngine.getReadBytes());
- }
-
- /**
- * Opens a connection with a low read idle and check that it gets triggered
- *
- */
- public void testSetReadIdle() throws Exception
- {
- // Open a connection from a counting engine to an echo engine
- _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
- _network = _client.connect(_clientSettings, _countingEngine, null);
- assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle());
- _network.setMaxReadIdle(1);
- sleepForAtLeast(1500);
- assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle());
- }
-
- /**
- * Opens a connection with a low write idle and check that it gets triggered
- *
- */
- public void testSetWriteIdle() throws Exception
- {
- // Open a connection from a counting engine to an echo engine
- _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
- _network = _client.connect(_clientSettings, _countingEngine, null);
- assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle());
- _network.setMaxWriteIdle(1);
- sleepForAtLeast(1500);
- assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle());
- }
-
-
- /**
- * Creates and then closes a connection from client to server and checks that the server
- * has its closed() method called. Then creates a new client and closes the server to check
- * that the client has its closed() method called.
- */
- public void testClosed() throws Exception
- {
- // Open a connection from a counting engine to an echo engine
- EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory();
- _server.accept(_brokerSettings, factory, null);
- _network = _client.connect(_clientSettings, _countingEngine, null);
- EchoProtocolEngine serverEngine = null;
- while (serverEngine == null)
- {
- serverEngine = factory.getEngine();
- if (serverEngine == null)
- {
- try
- {
- Thread.sleep(10);
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- assertFalse("Server should not have been closed", serverEngine.getClosed());
- serverEngine.setNewLatch(1);
- _client.close();
- try
- {
- serverEngine.getLatch().await(2, TimeUnit.SECONDS);
- }
- catch (InterruptedException e)
- {
- }
- assertTrue("Server should have been closed", serverEngine.getClosed());
-
- _client.connect(_clientSettings, _countingEngine, null);
- _countingEngine.setClosed(false);
- assertFalse("Client should not have been closed", _countingEngine.getClosed());
- _countingEngine.setNewLatch(1);
- _server.close();
- try
- {
- _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
- }
- catch (InterruptedException e)
- {
- }
- assertTrue("Client should have been closed", _countingEngine.getClosed());
- }
-
- /**
- * Create a connection and instruct the client to throw an exception when it gets some data
- * and that the latch gets counted down.
- */
- public void testExceptionCaught() throws Exception
- {
- _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
- _network = _client.connect(_clientSettings, _countingEngine, null);
-
-
- assertEquals("Exception should not have been thrown", 1,
- _countingEngine.getExceptionLatch().getCount());
- _countingEngine.setErrorOnNextRead(true);
- _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
- _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes()));
- _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS);
- assertEquals("Exception should have been thrown", 0,
- _countingEngine.getExceptionLatch().getCount());
- }
-
- /**
- * Opens a connection and checks that the remote address is the one that was asked for
- */
- public void testGetRemoteAddress() throws Exception
- {
- _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
- _network = _client.connect(_clientSettings, _countingEngine, null);
- assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), _testPort),
- _network.getRemoteAddress());
- }
-
- private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory
- {
- private EchoProtocolEngine _engine = null;
-
- public ProtocolEngine newProtocolEngine(NetworkConnection network)
- {
- if (_engine == null)
- {
- _engine = new EchoProtocolEngine(network);
- }
- return getEngine();
- }
-
- public EchoProtocolEngine getEngine()
- {
- return _engine;
- }
- }
-
- public class CountingProtocolEngine implements ServerProtocolEngine
- {
- public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
- private int _readBytes;
- private CountDownLatch _latch = new CountDownLatch(0);
- private boolean _readerHasBeenIdle;
- private boolean _writerHasBeenIdle;
- private boolean _closed = false;
- private boolean _nextReadErrors = false;
- private CountDownLatch _exceptionLatch = new CountDownLatch(1);
-
- public void closed()
- {
- setClosed(true);
- _latch.countDown();
- }
-
- public void setErrorOnNextRead(boolean b)
- {
- _nextReadErrors = b;
- }
-
- public void setNewLatch(int length)
- {
- _latch = new CountDownLatch(length);
- }
-
- public long getReadBytes()
- {
- return _readBytes;
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- public long getWrittenBytes()
- {
- return 0;
- }
-
- public void readerIdle()
- {
- _readerHasBeenIdle = true;
- }
-
- public void writeFrame(AMQDataBlock frame)
- {
-
- }
-
- public void writerIdle()
- {
- _writerHasBeenIdle = true;
- }
-
- public void exception(Throwable t)
- {
- _exceptionLatch.countDown();
- }
-
- public CountDownLatch getExceptionLatch()
- {
- return _exceptionLatch;
- }
-
- public void received(ByteBuffer msg)
- {
- // increment read bytes and count down the latch for that many
- int bytes = msg.remaining();
- _readBytes += bytes;
- for (int i = 0; i < bytes; i++)
- {
- _latch.countDown();
- }
-
- // Throw an error if we've been asked too, but we can still count
- if (_nextReadErrors)
- {
- throw new RuntimeException("Was asked to error");
- }
- }
-
- public CountDownLatch getLatch()
- {
- return _latch;
- }
-
- public boolean getWriterHasBeenIdle()
- {
- return _writerHasBeenIdle;
- }
-
- public boolean getReaderHasBeenIdle()
- {
- return _readerHasBeenIdle;
- }
-
- public void setClosed(boolean _closed)
- {
- this._closed = _closed;
- }
-
- public boolean getClosed()
- {
- return _closed;
- }
-
- public long getConnectionId()
- {
- return -1;
- }
-
- }
-
- private class EchoProtocolEngine extends CountingProtocolEngine
- {
- private NetworkConnection _echoNetwork;
-
- public EchoProtocolEngine(NetworkConnection network)
- {
- _echoNetwork = network;
- }
-
- public void received(ByteBuffer msg)
- {
- super.received(msg);
- msg.rewind();
- _echoNetwork.getSender().send(msg);
- }
- }
-
- public static void sleepForAtLeast(long period)
- {
- long start = System.currentTimeMillis();
- long timeLeft = period;
- while (timeLeft > 0)
- {
- try
- {
- Thread.sleep(timeLeft);
- }
- catch (InterruptedException e)
- {
- // Ignore it
- }
- timeLeft = period - (System.currentTimeMillis() - start);
- }
- }
-
- private static class TestNetworkTransportConfiguration implements NetworkTransportConfiguration
- {
- private int _port;
- private String _host;
-
- public TestNetworkTransportConfiguration(final int port, final String host)
- {
- _port = port;
- _host = host;
- }
-
- public Boolean getTcpNoDelay()
- {
- return true;
- }
-
- public Integer getReceiveBufferSize()
- {
- return 32768;
- }
-
- public Integer getSendBufferSize()
- {
- return 32768;
- }
-
- public Integer getPort()
- {
- return _port;
- }
-
- public String getHost()
- {
- return _host;
- }
-
- public String getTransport()
- {
- return Transport.TCP;
- }
-
- public Integer getConnectorProcessors()
- {
- return 4;
- }
-
- }
-} \ No newline at end of file
diff --git a/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java b/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
index 30d2d851a0..98cdf94ac9 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
+import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
@@ -139,7 +140,7 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL
public void onException(JMSException e)
{
- if (e.getLinkedException() instanceof AMQDisconnectedException)
+ if (e.getLinkedException() instanceof AMQDisconnectedException || e.getLinkedException() instanceof AMQConnectionClosedException)
{
_logger.debug("Received AMQDisconnectedException");
_failoverComplete.countDown();
diff --git a/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java b/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
index d1ba725721..782ca22965 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
@@ -67,14 +67,14 @@ public class ExternalACLTest extends AbstractACLTestCase
fail("Connection was not created due to:" + e);
}
}
-
+
public void testAccessVhostAuthorisedGuestSuccess() throws IOException, Exception
{
//The 'guest' user has no access to the 'test' vhost, as tested below in testAccessNoRights(), and so
- //is unable to perform actions such as connecting (and by extension, creating a queue, and consuming
- //from a queue etc). In order to test the vhost-wide 'access' ACL right, the 'guest' user has been given
+ //is unable to perform actions such as connecting (and by extension, creating a queue, and consuming
+ //from a queue etc). In order to test the vhost-wide 'access' ACL right, the 'guest' user has been given
//this right in the 'test2' vhost.
-
+
try
{
//get a connection to the 'test2' vhost using the guest user and perform various actions.
@@ -106,7 +106,7 @@ public class ExternalACLTest extends AbstractACLTestCase
fail("Test failed due to:" + e.getMessage());
}
}
-
+
public void testAccessNoRightsFailure() throws Exception
{
try
@@ -115,7 +115,7 @@ public class ExternalACLTest extends AbstractACLTestCase
Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
conn.start();
sess.rollback();
-
+
fail("Connection was created.");
}
catch (JMSException e)
@@ -126,11 +126,11 @@ public class ExternalACLTest extends AbstractACLTestCase
Throwable cause = linkedException.getCause();
assertNotNull("Cause was null", cause);
assertTrue("Wrong linked exception type", cause instanceof AMQException);
- AMQConstant errorCode = isBroker010() ? AMQConstant.CONTEXT_IN_USE : AMQConstant.ACCESS_REFUSED;
+ AMQConstant errorCode = isBroker010() ? AMQConstant.CONNECTION_FORCED : AMQConstant.ACCESS_REFUSED;
assertEquals("Incorrect error code received", errorCode, ((AMQException) cause).getErrorCode());
}
}
-
+
public void testClientDeleteQueueSuccess() throws Exception
{
try
@@ -155,7 +155,7 @@ public class ExternalACLTest extends AbstractACLTestCase
fail("Test failed due to:" + e.getMessage());
}
}
-
+
public void testServerDeleteQueueFailure() throws Exception
{
try
@@ -207,13 +207,13 @@ public class ExternalACLTest extends AbstractACLTestCase
try
{
Connection conn = getConnection("test", "client", "guest");
-
+
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
sess.createConsumer(sess.createQueue("IllegalQueue"));
-
+
fail("Test failed as consumer was created.");
}
catch (JMSException e)
@@ -253,10 +253,10 @@ public class ExternalACLTest extends AbstractACLTestCase
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
-
+
//Create a Named Queue
((AMQSession<?, ?>) sess).createQueue(new AMQShortString("IllegalQueue"), false, false, false);
-
+
fail("Test failed as Queue creation succeded.");
//conn will be automatically closed
}
@@ -385,7 +385,7 @@ public class ExternalACLTest extends AbstractACLTestCase
try
{
Connection conn = getConnection("test", "client", "guest");
-
+
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
@@ -405,13 +405,13 @@ public class ExternalACLTest extends AbstractACLTestCase
try
{
Connection conn = getConnection("test", "server", "guest");
-
+
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
sess.createConsumer(sess.createTemporaryQueue());
-
+
fail("Test failed as consumer was created.");
}
catch (JMSException e)
@@ -446,7 +446,7 @@ public class ExternalACLTest extends AbstractACLTestCase
try
{
Connection conn = getConnection("test", "server", "guest");
-
+
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
@@ -480,13 +480,13 @@ public class ExternalACLTest extends AbstractACLTestCase
check403Exception(e.getLinkedException());
}
}
-
+
public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception
{
try
{
Connection connection = getConnection("test", "server", "guest");
-
+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
@@ -630,8 +630,8 @@ public class ExternalACLTest extends AbstractACLTestCase
check403Exception(e.getLinkedException());
}
}
-
-
+
+
@Override
public String getConfig()
{
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index 97d825177c..d6caf05d33 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -59,7 +59,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
_queue = _clientSession.createQueue(getTestQueueName());
_clientSession.createConsumer(_queue).close();
-
+
//Ensure there are no messages on the queue to start with.
checkQueueDepth(0);
}
@@ -490,7 +490,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
}
}
- assertTrue("We should get atleast " + messages + " msgs.", msgCount >= messages);
+ assertTrue("We should get atleast " + messages + " msgs (found " + msgCount +").", msgCount >= messages);
if (_logger.isDebugEnabled())
{
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
index cec9d292cf..0057422c8f 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
@@ -47,13 +47,15 @@ public class ExceptionListenerTest extends QpidBrokerTestCase
{
public void onException(JMSException e)
{
+ _logger.debug("&&&&&&&&&&&&&&&&&&&&&&&&&&&& Caught exception &&&&&&&&&&&&&&&&&&&&&&&&&&&& ", e);
fired.countDown();
}
});
-
+ _logger.debug("%%%%%%%%%%%%%%%% Stopping Broker %%%%%%%%%%%%%%%%%%%%%");
stopBroker();
+ _logger.debug("%%%%%%%%%%%%%%%% Stopped Broker %%%%%%%%%%%%%%%%%%%%%");
- if (!fired.await(3, TimeUnit.SECONDS))
+ if (!fired.await(5, TimeUnit.SECONDS))
{
fail("exception listener was not fired");
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 5701b5a1fd..836684c965 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -25,7 +25,6 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java b/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java
deleted file mode 100644
index f1eb8159b6..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.utils.protocol;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.TransportType;
-import org.apache.mina.common.support.BaseIoSession;
-
-public class TestIoSession extends BaseIoSession {
-
- private String _stringLocalAddress;
- private int _localPort;
-
- public SocketAddress getLocalAddress()
- {
- //create a new address for testing purposes using member variables
- return new InetSocketAddress(_stringLocalAddress,_localPort);
- }
-
- protected void updateTrafficMask() {
- //dummy
- }
-
- public IoService getService() {
- return null;
- }
-
- public IoServiceConfig getServiceConfig() {
- return null;
- }
-
- public IoHandler getHandler() {
- return null;
- }
-
- public IoSessionConfig getConfig() {
- return null;
- }
-
- public IoFilterChain getFilterChain() {
- return null;
- }
-
- public TransportType getTransportType() {
- return null;
- }
-
- public SocketAddress getRemoteAddress() {
- return null;
- }
-
- public SocketAddress getServiceAddress() {
- return null;
- }
-
- public int getScheduledWriteRequests() {
- return 0;
- }
-
- public int getScheduledWriteBytes() {
- return 0;
- }
-
- public String getStringLocalAddress() {
- return _stringLocalAddress;
- }
-
- public void setStringLocalAddress(String _stringLocalAddress) {
- this._stringLocalAddress = _stringLocalAddress;
- }
-
- public int getLocalPort() {
- return _localPort;
- }
-
- public void setLocalPort(int _localPort) {
- this._localPort = _localPort;
- }
-}
diff --git a/java/test-profiles/Excludes b/java/test-profiles/Excludes
index 56a256f191..b1edd07f87 100644
--- a/java/test-profiles/Excludes
+++ b/java/test-profiles/Excludes
@@ -43,3 +43,5 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#*
// QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail.
org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
+
+org.apache.qpid.client.ssl.SSLTest#testVerifyLocalHostLocalDomain