diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2011-09-07 08:55:12 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2011-09-07 08:55:12 +0000 |
| commit | c8b145b9cf5282d534f204a5be48d19fe507ecb3 (patch) | |
| tree | 2957d1a3121f0e446569487d87987aa4a8eaaff7 /java/broker | |
| parent | e70492626c601e6c610ccea3e33b2a91188f3f76 (diff) | |
| download | qpid-python-c8b145b9cf5282d534f204a5be48d19fe507ecb3.tar.gz | |
QPID-3473 : Replace use of MINA IO with transport IO (joint work with Robbie Gemmel)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1166069 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
10 files changed, 197 insertions, 97 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8141533045..97f999484f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -75,6 +75,7 @@ import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.TransportException; import java.util.ArrayList; import java.util.Collection; @@ -137,7 +138,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; - + private final AtomicLong _txnStarts = new AtomicLong(0); private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); @@ -201,12 +202,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel // theory return !(_transaction instanceof AutoCommitTransaction); } - + public boolean inTransaction() { return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; } - + private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -216,7 +217,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _txnCount.compareAndSet(0,1); } } - + private void decrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -314,7 +315,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel try { _currentMessage.getStoredMessage().flushToStore(); - + final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues(); if(!checkMessageUserId(_currentMessage.getContentHeader())) @@ -385,6 +386,13 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _currentMessage = null; throw e; } + catch (RuntimeException e) + { + // we want to make sure we don't keep a reference to the message in the + // event of an error + _currentMessage = null; + throw e; + } } protected void routeCurrentMessage() throws AMQException @@ -435,7 +443,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { throw new AMQException("Consumer already exists with same tag: " + tag); } - + Subscription subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); @@ -456,6 +464,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _tag2SubscriptionMap.remove(tag); throw e; } + catch (RuntimeException e) + { + _tag2SubscriptionMap.remove(tag); + throw e; + } return tag; } @@ -513,7 +526,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } catch (AMQException e) { - _logger.error("Caught AMQException whilst attempting to reque:" + e); + _logger.error("Caught AMQException whilst attempting to requeue:" + e); + } + catch (TransportException e) + { + _logger.error("Caught TransportException whilst attempting to requeue:" + e); } getConfigStore().removeConfiguredObject(this); @@ -944,7 +961,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel finally { _rollingBack = false; - + _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); @@ -1425,12 +1442,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { return _createTime; } - + public void mgmtClose() throws AMQException { _session.mgmtCloseChannel(_channelId); } - + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { if (inTransaction()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/Broker.java index a4e365005e..75891d02a3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -56,7 +56,6 @@ import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.Transport; -import org.apache.qpid.transport.network.mina.MinaNetworkTransport; public class Broker { @@ -83,7 +82,7 @@ public class Broker startup(new BrokerOptions()); } - public void startup(BrokerOptions options) throws Exception + public void startup(final BrokerOptions options) throws Exception { try { @@ -193,9 +192,9 @@ public class Broker { for(int port : ports) { - final Set<AmqpProtocolVersion> supported = + final Set<AmqpProtocolVersion> supported = getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8); - final NetworkTransportConfiguration settings = + final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); @@ -218,12 +217,12 @@ public class Broker for(int sslPort : sslPorts) { - final Set<AmqpProtocolVersion> supported = + final Set<AmqpProtocolVersion> supported = getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8); - final NetworkTransportConfiguration settings = + final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, sslPort, bindAddress.getHostName(), Transport.TCP); - final IncomingNetworkTransport transport = new MinaNetworkTransport(); + final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory(hostName, supported); @@ -244,7 +243,7 @@ public class Broker } private static Set<AmqpProtocolVersion> getSupportedVersions(final int port, final Set<Integer> exclude_0_10, - final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9, + final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9, final Set<Integer> exclude_0_8) { final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class); @@ -268,7 +267,7 @@ public class Broker return supported; } - + private File getConfigFile(final String fileName, final String defaultFileName, final String qpidHome, boolean throwOnFileNotFound) throws InitException diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 3786c2020c..1c01ce465d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -29,6 +29,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.common.Closeable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQProtocolEngine; +import org.apache.qpid.transport.TransportException; public class ConnectionRegistry implements IConnectionRegistry, Closeable { @@ -44,19 +46,24 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable /** Close all of the currently open connections. */ public void close() { + _logger.debug("Closing connection registry :" + _registry.size() + " connections."); while (!_registry.isEmpty()) { AMQConnectionModel connection = _registry.get(0); - closeConnection(connection, AMQConstant.INTERNAL_ERROR, "Broker is shutting down"); + closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down"); } } - + public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message) { try { connection.close(cause, message); } + catch (TransportException e) + { + _logger.warn("Error closing connection:" + e.getMessage()); + } catch (AMQException e) { _logger.warn("Error closing connection:" + e.getMessage()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index dade5d5f54..f8e4eab0b6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -68,5 +68,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); session.writeFrame(responseBody.generateFrame(channelId)); + session.closeProtocolSession(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 36e0643f94..88022ba519 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -64,8 +64,6 @@ import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.pool.Job; -import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -95,6 +93,7 @@ import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig @@ -132,7 +131,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private Object _lastSent; protected volatile boolean _closed; - + // maximum number of channels this session should have private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); @@ -158,22 +157,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private long _writtenBytes; private long _readBytes; - private Job _readJob; - private Job _writeJob; - private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); private long _maxFrameSize; private final AtomicBoolean _closing = new AtomicBoolean(false); private final UUID _id; private final ConfigStore _configStore; private long _createTime = System.currentTimeMillis(); - + private ApplicationRegistry _registry; private boolean _statisticsEnabled = false; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - private final NetworkConnection _network; - private final Sender<ByteBuffer> _sender; + private NetworkConnection _network; + private Sender<ByteBuffer> _sender; public ManagedObject getManagedObject() { @@ -184,11 +180,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _stateManager = new AMQStateManager(virtualHostRegistry, this); _codecFactory = new AMQCodecFactory(true, this); - _poolReference.acquireExecutorService(); - _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); - _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); - _network = network; - _sender = _network.getSender(); + + setNetworkConnection(network); _sessionID = connectionId; _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); @@ -204,6 +197,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr initialiseStatistics(); } + public void setNetworkConnection(NetworkConnection network) + { + setNetworkConnection(network, network.getSender()); + } + + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + { + _network = network; + _sender = sender; + } + private AMQProtocolSessionMBean createMBean() throws JMException { return new AMQProtocolSessionMBean(this); @@ -240,26 +244,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() + for (AMQDataBlock dataBlock : dataBlocks) { - public void run() + try { - // Decode buffer - - for (AMQDataBlock dataBlock : dataBlocks) - { - try - { - dataBlockReceived(dataBlock); - } - catch (Exception e) - { - _logger.error("Unexpected exception when processing datablock", e); - closeProtocolSession(); - } - } + dataBlockReceived(dataBlock); + } + catch (Exception e) + { + _logger.error("Unexpected exception when processing datablock", e); + closeProtocolSession(); } - }); + } } catch (Exception e) { @@ -337,6 +333,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr closeChannel(channelId); throw e; } + catch (TransportException e) + { + closeChannel(channelId); + throw e; + } } finally { @@ -368,6 +369,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr mechanisms.getBytes(), locales.getBytes()); _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); + _sender.flush(); } catch (AMQException e) @@ -375,6 +377,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + _sender.flush(); } } @@ -430,19 +433,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr AMQConstant.CHANNEL_ERROR.getName().toString()); _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce, false); + closeConnection(channelId, ce); } } catch (AMQConnectionException e) { _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, e, false); + closeConnection(channelId, e); } catch (AMQSecurityException e) { AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce, false); + closeConnection(channelId, ce); } } catch (Exception e) @@ -485,19 +488,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr * * @param frame the frame to write */ - public void writeFrame(AMQDataBlock frame) + public synchronized void writeFrame(AMQDataBlock frame) { _lastSent = frame; final ByteBuffer buf = frame.toNioByteBuffer(); _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); - Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() - { - public void run() - { - _sender.send(buf); - } - }); + _sender.send(buf); + _sender.flush(); } public AMQShortString getContextKey() @@ -729,7 +727,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } closeAllChannels(); - + getConfigStore().removeConfiguredObject(this); if (_managedObject != null) @@ -749,7 +747,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _closed = true; notifyAll(); } - _poolReference.releaseExecutorService(); CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE()); } } @@ -772,27 +769,32 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } } - public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException + public void closeConnection(int channelId, AMQConnectionException e) throws AMQException { - if (_logger.isInfoEnabled()) + try { - _logger.info("Closing connection due to: " + e); - } - - markChannelAwaitingCloseOk(channelId); - closeSession(); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e); + } - if (closeProtocolSession) + markChannelAwaitingCloseOk(channelId); + closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame(channelId)); + } + finally { closeProtocolSession(); } + + } public void closeProtocolSession() { - _sender.close(); + _network.close(); + try { _stateManager.changeState(AMQState.CONNECTION_CLOSED); @@ -801,6 +803,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _logger.info(e.getMessage()); } + catch (TransportException e) + { + _logger.info(e.getMessage()); + } } public String toString() @@ -923,7 +929,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _virtualHost = virtualHost; _virtualHost.getConnectionRegistry().registerConnection(this); - + _configStore.addConfiguredObject(this); try @@ -960,12 +966,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } _authorizedSubject = authorizedSubject; } - + public Subject getAuthorizedSubject() { return _authorizedSubject; } - + public Principal getAuthorizedPrincipal() { return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject); @@ -1001,6 +1007,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _logger.error("Could not close protocol engine", e); } + catch (TransportException e) + { + _logger.error("Could not close protocol engine", e); + } } public void readerIdle() @@ -1154,7 +1164,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { return false; } - + public void mgmtClose() { MethodRegistry methodRegistry = getMethodRegistry(); @@ -1256,7 +1266,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr new AMQShortString(message), 0,0); - writeFrame(responseBody.generateFrame((Integer)session.getID())); + writeFrame(responseBody.generateFrame((Integer)session.getID())); } public void close(AMQConstant cause, String message) throws AMQException @@ -1264,12 +1274,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getProtocolOutputConverter().getProtocolMajorVersion(), getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null), true); + (Throwable) null)); } public List<AMQSessionModel> getSessionModels() { - List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); + List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); for (AMQChannel channel : getChannels()) { sessions.add((AMQSessionModel) channel); @@ -1301,27 +1311,27 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } _virtualHost.registerMessageReceived(messageSize, timestamp); } - + public StatisticsCounter getMessageReceiptStatistics() { return _messagesReceived; } - + public StatisticsCounter getDataReceiptStatistics() { return _dataReceived; } - + public StatisticsCounter getMessageDeliveryStatistics() { return _messagesDelivered; } - + public StatisticsCounter getDataDeliveryStatistics() { return _dataDelivered; } - + public void resetStatistics() { _messagesDelivered.reset(); @@ -1334,7 +1344,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled()); - + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 9116bf2767..c1b5b02f8f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -163,8 +163,10 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth /** This must be called when the session is _closed in order to free up any resources managed by the session. */ void closeSession() throws AMQException; + void closeProtocolSession(); + /** This must be called to close the session in order to free up any resources managed by the session. */ - void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException; + void closeConnection(int channelId, AMQConnectionException e) throws AMQException; /** @return a key that uniquely identifies this session */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index bd1c19d30e..7033bf755d 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -44,7 +44,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private IApplicationRegistry _appRegistry; private NetworkConnection _network; private Sender<ByteBuffer> _sender; - + private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, @@ -53,14 +53,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine NetworkConnection network, long id) { + this(appRegistry,fqdn,supported,id); + setNetworkConnection(network); + } + + public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, + String fqdn, + Set<AmqpProtocolVersion> supported, + long id) + { _id = id; _appRegistry = appRegistry; _fqdn = fqdn; _supported = supported; - _network = network; - _sender = _network.getSender(); + } + public SocketAddress getRemoteAddress() { return _delegate.getRemoteAddress(); @@ -96,6 +105,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _delegate.readerIdle(); } + public void received(ByteBuffer msg) { _delegate.received(msg); @@ -158,6 +168,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine (byte) 10 }; + public void setNetworkConnection(NetworkConnection networkConnection) + { + setNetworkConnection(networkConnection, networkConnection.getSender()); + } + + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + { + _network = network; + _sender = sender; + } + + private static interface DelegateCreator { AmqpProtocolVersion getVersion(); @@ -302,6 +324,11 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + { + + } + public long getConnectionId() { return _id; @@ -380,14 +407,19 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine if(newDelegate == null) { _sender.send(ByteBuffer.wrap(newestSupported)); + _sender.flush(); _delegate = new ClosedDelegateProtocolEngine(); + + _network.close(); + } else { _delegate = newDelegate; _header.flip(); + _delegate.setNetworkConnection(_network, _sender); _delegate.received(_header); if(msg.hasRemaining()) { @@ -423,5 +455,10 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { } + + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + { + + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 8a7159bdc2..7e327b221f 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -48,4 +48,10 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement()); } + + public ServerProtocolEngine newProtocolEngine() + { + return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement()); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index 3ff650a480..48a8a1bf42 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; @@ -31,6 +32,7 @@ import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.registry.IApplicationRegistry; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.util.UUID; public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine, ConnectionConfig @@ -52,11 +54,16 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol super(new Assembler(conn)); _connection = conn; _connection.setConnectionConfig(this); - _network = network; + _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; - _connection.setSender(new Disassembler(_network.getSender(), MAX_FRAME_SIZE)); + if(network != null) + { + setNetworkConnection(network); + } + + _connection.onOpen(new Runnable() { public void run() @@ -65,6 +72,19 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } }); + } + + public void setNetworkConnection(NetworkConnection network) + { + setNetworkConnection(network, network.getSender()); + } + + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + { + _network = network; + + _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE)); + // FIXME Two log messages to maintain compatibility with earlier protocol versions _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); @@ -186,7 +206,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { return false; } - + public void mgmtClose() { _connection.mgmtClose(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index c058d0b0d8..5a411c6807 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -196,7 +196,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr return _closed; } - public void closeProtocolSession(boolean waitLast) + public void closeProtocolSession() { // Override as we don't have a real IOSession to close. // The alternative is to fully implement the TestIOSession to return a CloseFuture from close(); |
