summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-09-07 08:55:12 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-09-07 08:55:12 +0000
commitc8b145b9cf5282d534f204a5be48d19fe507ecb3 (patch)
tree2957d1a3121f0e446569487d87987aa4a8eaaff7 /java/broker
parente70492626c601e6c610ccea3e33b2a91188f3f76 (diff)
downloadqpid-python-c8b145b9cf5282d534f204a5be48d19fe507ecb3.tar.gz
QPID-3473 : Replace use of MINA IO with transport IO (joint work with Robbie Gemmel)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1166069 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java37
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Broker.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java146
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java4
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java43
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java6
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java26
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java2
10 files changed, 197 insertions, 97 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 8141533045..97f999484f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -75,6 +75,7 @@ import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TransportException;
import java.util.ArrayList;
import java.util.Collection;
@@ -137,7 +138,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
-
+
private final AtomicLong _txnStarts = new AtomicLong(0);
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
@@ -201,12 +202,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
-
+
public boolean inTransaction()
{
return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
}
-
+
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -216,7 +217,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_txnCount.compareAndSet(0,1);
}
}
-
+
private void decrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -314,7 +315,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
try
{
_currentMessage.getStoredMessage().flushToStore();
-
+
final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
@@ -385,6 +386,13 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_currentMessage = null;
throw e;
}
+ catch (RuntimeException e)
+ {
+ // we want to make sure we don't keep a reference to the message in the
+ // event of an error
+ _currentMessage = null;
+ throw e;
+ }
}
protected void routeCurrentMessage() throws AMQException
@@ -435,7 +443,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
-
+
Subscription subscription =
SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
@@ -456,6 +464,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_tag2SubscriptionMap.remove(tag);
throw e;
}
+ catch (RuntimeException e)
+ {
+ _tag2SubscriptionMap.remove(tag);
+ throw e;
+ }
return tag;
}
@@ -513,7 +526,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
catch (AMQException e)
{
- _logger.error("Caught AMQException whilst attempting to reque:" + e);
+ _logger.error("Caught AMQException whilst attempting to requeue:" + e);
+ }
+ catch (TransportException e)
+ {
+ _logger.error("Caught TransportException whilst attempting to requeue:" + e);
}
getConfigStore().removeConfiguredObject(this);
@@ -944,7 +961,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
finally
{
_rollingBack = false;
-
+
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
@@ -1425,12 +1442,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
return _createTime;
}
-
+
public void mgmtClose() throws AMQException
{
_session.mgmtCloseChannel(_channelId);
}
-
+
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
if (inTransaction())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index a4e365005e..75891d02a3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -56,7 +56,6 @@ import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.Transport;
-import org.apache.qpid.transport.network.mina.MinaNetworkTransport;
public class Broker
{
@@ -83,7 +82,7 @@ public class Broker
startup(new BrokerOptions());
}
- public void startup(BrokerOptions options) throws Exception
+ public void startup(final BrokerOptions options) throws Exception
{
try
{
@@ -193,9 +192,9 @@ public class Broker
{
for(int port : ports)
{
- final Set<AmqpProtocolVersion> supported =
+ final Set<AmqpProtocolVersion> supported =
getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8);
- final NetworkTransportConfiguration settings =
+ final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP);
final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
@@ -218,12 +217,12 @@ public class Broker
for(int sslPort : sslPorts)
{
- final Set<AmqpProtocolVersion> supported =
+ final Set<AmqpProtocolVersion> supported =
getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8);
- final NetworkTransportConfiguration settings =
+ final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, sslPort, bindAddress.getHostName(), Transport.TCP);
- final IncomingNetworkTransport transport = new MinaNetworkTransport();
+ final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(hostName, supported);
@@ -244,7 +243,7 @@ public class Broker
}
private static Set<AmqpProtocolVersion> getSupportedVersions(final int port, final Set<Integer> exclude_0_10,
- final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9,
+ final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9,
final Set<Integer> exclude_0_8)
{
final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
@@ -268,7 +267,7 @@ public class Broker
return supported;
}
-
+
private File getConfigFile(final String fileName,
final String defaultFileName,
final String qpidHome, boolean throwOnFileNotFound) throws InitException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 3786c2020c..1c01ce465d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -29,6 +29,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.transport.TransportException;
public class ConnectionRegistry implements IConnectionRegistry, Closeable
{
@@ -44,19 +46,24 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
/** Close all of the currently open connections. */
public void close()
{
+ _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
while (!_registry.isEmpty())
{
AMQConnectionModel connection = _registry.get(0);
- closeConnection(connection, AMQConstant.INTERNAL_ERROR, "Broker is shutting down");
+ closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down");
}
}
-
+
public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
{
try
{
connection.close(cause, message);
}
+ catch (TransportException e)
+ {
+ _logger.warn("Error closing connection:" + e.getMessage());
+ }
catch (AMQException e)
{
_logger.warn("Error closing connection:" + e.getMessage());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index dade5d5f54..f8e4eab0b6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -68,5 +68,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
session.writeFrame(responseBody.generateFrame(channelId));
+ session.closeProtocolSession();
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 36e0643f94..88022ba519 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -64,8 +64,6 @@ import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -95,6 +93,7 @@ import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
@@ -132,7 +131,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private Object _lastSent;
protected volatile boolean _closed;
-
+
// maximum number of channels this session should have
private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
@@ -158,22 +157,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private long _writtenBytes;
private long _readBytes;
- private Job _readJob;
- private Job _writeJob;
- private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
private final UUID _id;
private final ConfigStore _configStore;
private long _createTime = System.currentTimeMillis();
-
+
private ApplicationRegistry _registry;
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- private final NetworkConnection _network;
- private final Sender<ByteBuffer> _sender;
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
public ManagedObject getManagedObject()
{
@@ -184,11 +180,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_codecFactory = new AMQCodecFactory(true, this);
- _poolReference.acquireExecutorService();
- _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
- _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
- _network = network;
- _sender = _network.getSender();
+
+ setNetworkConnection(network);
_sessionID = connectionId;
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
@@ -204,6 +197,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
initialiseStatistics();
}
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ setNetworkConnection(network, network.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+ }
+
private AMQProtocolSessionMBean createMBean() throws JMException
{
return new AMQProtocolSessionMBean(this);
@@ -240,26 +244,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+ for (AMQDataBlock dataBlock : dataBlocks)
{
- public void run()
+ try
{
- // Decode buffer
-
- for (AMQDataBlock dataBlock : dataBlocks)
- {
- try
- {
- dataBlockReceived(dataBlock);
- }
- catch (Exception e)
- {
- _logger.error("Unexpected exception when processing datablock", e);
- closeProtocolSession();
- }
- }
+ dataBlockReceived(dataBlock);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
}
- });
+ }
}
catch (Exception e)
{
@@ -337,6 +333,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
closeChannel(channelId);
throw e;
}
+ catch (TransportException e)
+ {
+ closeChannel(channelId);
+ throw e;
+ }
}
finally
{
@@ -368,6 +369,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
mechanisms.getBytes(),
locales.getBytes());
_sender.send(responseBody.generateFrame(0).toNioByteBuffer());
+ _sender.flush();
}
catch (AMQException e)
@@ -375,6 +377,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
_sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
+ _sender.flush();
}
}
@@ -430,19 +433,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
AMQConstant.CHANNEL_ERROR.getName().toString());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce, false);
+ closeConnection(channelId, ce);
}
}
catch (AMQConnectionException e)
{
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, e, false);
+ closeConnection(channelId, e);
}
catch (AMQSecurityException e)
{
AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce, false);
+ closeConnection(channelId, ce);
}
}
catch (Exception e)
@@ -485,19 +488,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
*
* @param frame the frame to write
*/
- public void writeFrame(AMQDataBlock frame)
+ public synchronized void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
final ByteBuffer buf = frame.toNioByteBuffer();
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
- {
- public void run()
- {
- _sender.send(buf);
- }
- });
+ _sender.send(buf);
+ _sender.flush();
}
public AMQShortString getContextKey()
@@ -729,7 +727,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
closeAllChannels();
-
+
getConfigStore().removeConfiguredObject(this);
if (_managedObject != null)
@@ -749,7 +747,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_closed = true;
notifyAll();
}
- _poolReference.releaseExecutorService();
CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
}
}
@@ -772,27 +769,32 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
}
- public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
+ public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
{
- if (_logger.isInfoEnabled())
+ try
{
- _logger.info("Closing connection due to: " + e);
- }
-
- markChannelAwaitingCloseOk(channelId);
- closeSession();
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e);
+ }
- if (closeProtocolSession)
+ markChannelAwaitingCloseOk(channelId);
+ closeSession();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame(channelId));
+ }
+ finally
{
closeProtocolSession();
}
+
+
}
public void closeProtocolSession()
{
- _sender.close();
+ _network.close();
+
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -801,6 +803,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
_logger.info(e.getMessage());
}
+ catch (TransportException e)
+ {
+ _logger.info(e.getMessage());
+ }
}
public String toString()
@@ -923,7 +929,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_virtualHost = virtualHost;
_virtualHost.getConnectionRegistry().registerConnection(this);
-
+
_configStore.addConfiguredObject(this);
try
@@ -960,12 +966,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
_authorizedSubject = authorizedSubject;
}
-
+
public Subject getAuthorizedSubject()
{
return _authorizedSubject;
}
-
+
public Principal getAuthorizedPrincipal()
{
return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
@@ -1001,6 +1007,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
_logger.error("Could not close protocol engine", e);
}
+ catch (TransportException e)
+ {
+ _logger.error("Could not close protocol engine", e);
+ }
}
public void readerIdle()
@@ -1154,7 +1164,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
return false;
}
-
+
public void mgmtClose()
{
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1256,7 +1266,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
new AMQShortString(message),
0,0);
- writeFrame(responseBody.generateFrame((Integer)session.getID()));
+ writeFrame(responseBody.generateFrame((Integer)session.getID()));
}
public void close(AMQConstant cause, String message) throws AMQException
@@ -1264,12 +1274,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
getProtocolOutputConverter().getProtocolMajorVersion(),
getProtocolOutputConverter().getProtocolMinorVersion(),
- (Throwable) null), true);
+ (Throwable) null));
}
public List<AMQSessionModel> getSessionModels()
{
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
for (AMQChannel channel : getChannels())
{
sessions.add((AMQSessionModel) channel);
@@ -1301,27 +1311,27 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
_virtualHost.registerMessageReceived(messageSize, timestamp);
}
-
+
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
-
+
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
-
+
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
-
+
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
-
+
public void resetStatistics()
{
_messagesDelivered.reset();
@@ -1334,7 +1344,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
_registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
+
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
_dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
_messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 9116bf2767..c1b5b02f8f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -163,8 +163,10 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
void closeSession() throws AMQException;
+ void closeProtocolSession();
+
/** This must be called to close the session in order to free up any resources managed by the session. */
- void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
+ void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
/** @return a key that uniquely identifies this session */
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index bd1c19d30e..7033bf755d 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -44,7 +44,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private IApplicationRegistry _appRegistry;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
-
+
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
@@ -53,14 +53,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
NetworkConnection network,
long id)
{
+ this(appRegistry,fqdn,supported,id);
+ setNetworkConnection(network);
+ }
+
+ public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
+ String fqdn,
+ Set<AmqpProtocolVersion> supported,
+ long id)
+ {
_id = id;
_appRegistry = appRegistry;
_fqdn = fqdn;
_supported = supported;
- _network = network;
- _sender = _network.getSender();
+
}
+
public SocketAddress getRemoteAddress()
{
return _delegate.getRemoteAddress();
@@ -96,6 +105,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_delegate.readerIdle();
}
+
public void received(ByteBuffer msg)
{
_delegate.received(msg);
@@ -158,6 +168,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
(byte) 10
};
+ public void setNetworkConnection(NetworkConnection networkConnection)
+ {
+ setNetworkConnection(networkConnection, networkConnection.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+ }
+
+
private static interface DelegateCreator
{
AmqpProtocolVersion getVersion();
@@ -302,6 +324,11 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+
+ }
+
public long getConnectionId()
{
return _id;
@@ -380,14 +407,19 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
if(newDelegate == null)
{
_sender.send(ByteBuffer.wrap(newestSupported));
+ _sender.flush();
_delegate = new ClosedDelegateProtocolEngine();
+
+ _network.close();
+
}
else
{
_delegate = newDelegate;
_header.flip();
+ _delegate.setNetworkConnection(_network, _sender);
_delegate.received(_header);
if(msg.hasRemaining())
{
@@ -423,5 +455,10 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
}
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 8a7159bdc2..7e327b221f 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -48,4 +48,10 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
}
+
+ public ServerProtocolEngine newProtocolEngine()
+ {
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement());
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index 3ff650a480..48a8a1bf42 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
@@ -31,6 +32,7 @@ import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.registry.IApplicationRegistry;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import java.util.UUID;
public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine, ConnectionConfig
@@ -52,11 +54,16 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
super(new Assembler(conn));
_connection = conn;
_connection.setConnectionConfig(this);
- _network = network;
+
_id = appRegistry.getConfigStore().createId();
_appRegistry = appRegistry;
- _connection.setSender(new Disassembler(_network.getSender(), MAX_FRAME_SIZE));
+ if(network != null)
+ {
+ setNetworkConnection(network);
+ }
+
+
_connection.onOpen(new Runnable()
{
public void run()
@@ -65,6 +72,19 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
});
+ }
+
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ setNetworkConnection(network, network.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ _network = network;
+
+ _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+
// FIXME Two log messages to maintain compatibility with earlier protocol versions
_connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
_connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
@@ -186,7 +206,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
return false;
}
-
+
public void mgmtClose()
{
_connection.mgmtClose();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index c058d0b0d8..5a411c6807 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -196,7 +196,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
return _closed;
}
- public void closeProtocolSession(boolean waitLast)
+ public void closeProtocolSession()
{
// Override as we don't have a real IOSession to close.
// The alternative is to fully implement the TestIOSession to return a CloseFuture from close();