From 38194151e929fef7fa8adeb08badfa85a17d8404 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 11 Feb 2015 15:11:13 +0000 Subject: refactoring git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658981 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/plugin/ProtocolEngineCreator.java | 2 +- .../protocol/MultiVersionProtocolEngine.java | 1 - .../server/transport/NonBlockingConnection.java | 634 +++++++++++++++++++++ .../transport/NonBlockingNetworkTransport.java | 187 ++++++ .../qpid/server/transport/SelectorThread.java | 302 ++++++++++ .../qpid/server/transport/TCPandSSLTransport.java | 1 - .../server/protocol/v0_10/CreditCreditManager.java | 2 +- .../protocol/v0_10/ProtocolEngineCreator_0_10.java | 2 +- .../server/protocol/v0_10/ProtocolEngine_0_10.java | 2 +- .../server/protocol/v0_10/ServerConnection.java | 2 +- .../protocol/v0_10/ServerSessionDelegate.java | 2 +- .../server/protocol/v0_10/WindowCreditManager.java | 2 +- .../protocol/v0_10/WindowCreditManagerTest.java | 2 +- .../server/protocol/v0_8/AMQProtocolEngine.java | 2 +- .../server/protocol/v0_8/NoAckCreditManager.java | 2 +- .../server/protocol/v0_8/Pre0_10CreditManager.java | 2 +- .../protocol/v0_8/ProtocolEngineCreator_0_8.java | 2 +- .../protocol/v0_8/ProtocolEngineCreator_0_9.java | 2 +- .../protocol/v0_8/ProtocolEngineCreator_0_9_1.java | 2 +- .../qpid/server/protocol/v1_0/Connection_1_0.java | 2 +- .../server/protocol/v1_0/ConsumerTarget_1_0.java | 2 +- .../v1_0/ProtocolEngineCreator_1_0_0_SASL.java | 2 +- .../protocol/v1_0/ProtocolEngine_1_0_0_SASL.java | 2 +- .../apache/qpid/protocol/ServerProtocolEngine.java | 43 -- .../transport/network/io/IdleTimeoutTicker.java | 2 +- .../network/io/NonBlockingConnection.java | 187 ------ .../network/io/NonBlockingNetworkTransport.java | 185 ------ .../network/io/NonBlockingSenderReceiver.java | 551 ------------------ .../qpid/transport/network/io/SelectorThread.java | 302 ---------- .../MultiVersionProtocolEngineFactoryTest.java | 1 - 30 files changed, 1142 insertions(+), 1290 deletions(-) create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java index 6e1b6529d8..6100a2eb80 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java @@ -19,7 +19,7 @@ package org.apache.qpid.server.plugin;/* * */ -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 946992cbb6..3b7883b9b9 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -30,7 +30,6 @@ import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java new file mode 100644 index 0000000000..331c2e697d --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -0,0 +1,634 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLPeerUnverifiedException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.SenderClosedException; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; +import org.apache.qpid.util.SystemUtils; + +public class NonBlockingConnection implements NetworkConnection, ByteBufferSender +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); + private final SocketChannel _socketChannel; + private final long _timeout; + private final Ticker _ticker; + private final SelectorThread _selector; + private int _maxReadIdle; + private int _maxWriteIdle; + private Principal _principal; + private boolean _principalChecked; + private final Object _lock = new Object(); + + public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; + + private final ConcurrentLinkedQueue _buffers = new ConcurrentLinkedQueue<>(); + private final List _encryptedOutput = new ArrayList<>(); + + private final String _remoteSocketAddress; + private final AtomicBoolean _closed = new AtomicBoolean(false); + private final ServerProtocolEngine _protocolEngine; + private final int _receiveBufSize; + private final Set _encryptionSet; + private final SSLContext _sslContext; + private final Runnable _onTransportEncryptionAction; + private ByteBuffer _netInputBuffer; + private SSLEngine _sslEngine; + + private ByteBuffer _currentBuffer; + + private TransportEncryption _transportEncryption; + private SSLEngineResult _status; + private volatile boolean _fullyWritten = true; + private AtomicBoolean _stateChanged = new AtomicBoolean(); + private boolean _workDone; + + + public NonBlockingConnection(SocketChannel socketChannel, + ServerProtocolEngine delegate, + int sendBufferSize, + int receiveBufferSize, + long timeout, + Ticker ticker, + final Set encryptionSet, + final SSLContext sslContext, + final boolean wantClientAuth, + final boolean needClientAuth, + final Collection enabledCipherSuites, + final Collection disabledCipherSuites, + final Runnable onTransportEncryptionAction, + final SelectorThread selectorThread) + { + _socketChannel = socketChannel; + _timeout = timeout; + _ticker = ticker; + _selector = selectorThread; + + _protocolEngine = delegate; + _receiveBufSize = receiveBufferSize; + _encryptionSet = encryptionSet; + _sslContext = sslContext; + _onTransportEncryptionAction = onTransportEncryptionAction; + + if(encryptionSet.size() == 1) + { + _transportEncryption = _encryptionSet.iterator().next(); + if (_transportEncryption == TransportEncryption.TLS) + { + onTransportEncryptionAction.run(); + } + } + + if(encryptionSet.contains(TransportEncryption.TLS)) + { + _sslEngine = _sslContext.createSSLEngine(); + _sslEngine.setUseClientMode(false); + SSLUtil.removeSSLv3Support(_sslEngine); + SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites); + + if(needClientAuth) + { + _sslEngine.setNeedClientAuth(true); + } + else if(wantClientAuth) + { + _sslEngine.setWantClientAuth(true); + } + _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2)); + } + + try + { + _remoteSocketAddress = _socketChannel.getRemoteAddress().toString(); + _socketChannel.configureBlocking(false); + } + catch (IOException e) + { + throw new SenderException("Unable to prepare the channel for non-blocking IO", e); + } + + + } + + + public Ticker getTicker() + { + return _ticker; + } + + public SocketChannel getSocketChannel() + { + return _socketChannel; + } + + public void start() + { + } + + public ByteBufferSender getSender() + { + return this; + } + + public void close() + { + LOGGER.debug("Closing " + _remoteSocketAddress); + if(_closed.compareAndSet(false,true)) + { + _stateChanged.set(true); + getSelector().wakeup(); + } + } + + public SocketAddress getRemoteAddress() + { + return _socketChannel.socket().getRemoteSocketAddress(); + } + + public SocketAddress getLocalAddress() + { + return _socketChannel.socket().getLocalSocketAddress(); + } + + public void setMaxWriteIdle(int sec) + { + _maxWriteIdle = sec; + } + + public void setMaxReadIdle(int sec) + { + _maxReadIdle = sec; + } + + @Override + public Principal getPeerPrincipal() + { + synchronized (_lock) + { + if(!_principalChecked) + { + if (_sslEngine != null) + { + try + { + _principal = _sslEngine.getSession().getPeerPrincipal(); + } + catch (SSLPeerUnverifiedException e) + { + return null; + } + } + + _principalChecked = true; + } + + return _principal; + } + } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } + + public boolean canRead() + { + return true; + } + + public boolean waitingForWrite() + { + return !_fullyWritten; + } + + public boolean isStateChanged() + { + + return _stateChanged.get(); + } + + public boolean doWork() + { + _stateChanged.set(false); + boolean closed = _closed.get(); + if (!closed) + { + try + { + _workDone = false; + + long currentTime = System.currentTimeMillis(); + int tick = _ticker.getTimeToNextTick(currentTime); + if (tick <= 0) + { + _ticker.tick(currentTime); + } + + _protocolEngine.setMessageAssignmentSuspended(true); + + _protocolEngine.processPendingMessages(); + + _protocolEngine.setTransportBlockedForWriting(!doWrite()); + boolean dataRead = doRead(); + _fullyWritten = doWrite(); + _protocolEngine.setTransportBlockedForWriting(!_fullyWritten); + + if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)) + { + _stateChanged.set(true); + } + + // tell all consumer targets that it is okay to accept more + _protocolEngine.setMessageAssignmentSuspended(false); + } + catch (IOException e) + { + LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); + LOGGER.debug("Closing " + _remoteSocketAddress); + if(_closed.compareAndSet(false,true)) + { + _stateChanged.set(true); + getSelector().wakeup(); + } + } + } + else + { + + if(!SystemUtils.isWindows()) + { + try + { + _socketChannel.shutdownInput(); + } + catch (IOException e) + { + LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e); + + } + } + try + { + while(!doWrite()) + { + } + } + catch (IOException e) + { + LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e); + + } + LOGGER.debug("Closing receiver"); + _protocolEngine.closed(); + + try + { + if(!SystemUtils.isWindows()) + { + _socketChannel.shutdownOutput(); + } + + _socketChannel.close(); + } + catch (IOException e) + { + LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e); + } + } + + return closed; + + } + + public SelectorThread getSelector() + { + return _selector; + } + + public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes) + { + return headerBytes[0] == -128 && + headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[4] == 0 || // SSL 3.0 + headerBytes[4] == 1 || // TLS 1.0 + headerBytes[4] == 2 || // TLS 1.1 + headerBytes[4] == 3); + } + + public boolean doRead() throws IOException + { + boolean readData = false; + if(_transportEncryption == TransportEncryption.NONE) + { + int remaining = 0; + while (remaining == 0 && !_closed.get()) + { + if (_currentBuffer == null || _currentBuffer.remaining() == 0) + { + _currentBuffer = ByteBuffer.allocate(_receiveBufSize); + } + int read = _socketChannel.read(_currentBuffer); + if(read > 0) + { + readData = true; + } + if (read == -1) + { + _closed.set(true); + } + remaining = _currentBuffer.remaining(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " byte(s)"); + } + ByteBuffer dup = _currentBuffer.duplicate(); + dup.flip(); + _currentBuffer = _currentBuffer.slice(); + _protocolEngine.received(dup); + } + } + else if(_transportEncryption == TransportEncryption.TLS) + { + int read = 1; + while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED)) + { + read = _socketChannel.read(_netInputBuffer); + if (read == -1) + { + _closed.set(true); + } + else if(read > 0) + { + readData = true; + } + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " encrypted bytes "); + } + + _netInputBuffer.flip(); + + + int unwrapped = 0; + boolean tasksRun; + do + { + ByteBuffer appInputBuffer = + ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50); + + _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer); + tasksRun = runSSLEngineTasks(_status); + + appInputBuffer.flip(); + unwrapped = appInputBuffer.remaining(); + if(unwrapped > 0) + { + readData = true; + } + _protocolEngine.received(appInputBuffer); + } + while(unwrapped > 0 || tasksRun); + + _netInputBuffer.compact(); + + } + } + else + { + int read = 1; + while (!_closed.get() && read > 0) + { + + read = _socketChannel.read(_netInputBuffer); + if (read == -1) + { + _closed.set(true); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer); + } + + if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK) + { + _netInputBuffer.flip(); + final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK]; + ByteBuffer dup = _netInputBuffer.duplicate(); + dup.get(headerBytes); + + _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE; + LOGGER.debug("Identified transport encryption as " + _transportEncryption); + + if (_transportEncryption == TransportEncryption.NONE) + { + _protocolEngine.received(_netInputBuffer); + } + else + { + _onTransportEncryptionAction.run(); + _netInputBuffer.compact(); + readData = doRead(); + } + break; + } + } + } + return readData; + } + + public boolean doWrite() throws IOException + { + + ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; + Iterator bufferIterator = _buffers.iterator(); + for (int i = 0; i < bufArray.length; i++) + { + bufArray[i] = bufferIterator.next(); + } + + int byteBuffersWritten = 0; + + if(_transportEncryption == TransportEncryption.NONE) + { + + + long written = _socketChannel.write(bufArray); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " bytes"); + } + + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + + + return bufArray.length == byteBuffersWritten; + } + else if(_transportEncryption == TransportEncryption.TLS) + { + int remaining = 0; + do + { + if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) + { + _workDone = true; + final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); + _status = _sslEngine.wrap(bufArray, netBuffer); + runSSLEngineTasks(_status); + + netBuffer.flip(); + remaining = netBuffer.remaining(); + if (remaining != 0) + { + _encryptedOutput.add(netBuffer); + } + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + } + + } + while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP); + ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); + long written = _socketChannel.write(encryptedBuffers); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " encrypted bytes"); + } + ListIterator iter = _encryptedOutput.listIterator(); + while(iter.hasNext()) + { + ByteBuffer buf = iter.next(); + if(buf.remaining() == 0) + { + iter.remove(); + } + else + { + break; + } + } + + return bufArray.length == byteBuffersWritten; + + } + else + { + return true; + } + } + + public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes) + { + return headerBytes[0] == 22 && // SSL Handshake + (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[2] == 0 || // SSL 3.0 + headerBytes[2] == 1 || // TLS 1.0 + headerBytes[2] == 2 || // TLS 1.1 + headerBytes[2] == 3)) && // TLS1.2 + (headerBytes[5] == 1); // client_hello + } + + public boolean runSSLEngineTasks(final SSLEngineResult status) + { + if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) + { + Runnable task; + while((task = _sslEngine.getDelegatedTask()) != null) + { + task.run(); + } + return true; + } + return false; + } + + public boolean looksLikeSSL(final byte[] headerBytes) + { + return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); + } + + @Override + public void send(final ByteBuffer msg) + { + if (_closed.get()) + { + throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed"); + } + // append to list and do selector wakeup + _buffers.add(msg); + _stateChanged.set(true); + } + + @Override + public void flush() + { + _stateChanged.set(true); + getSelector().wakeup(); + + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java new file mode 100644 index 0000000000..79313712a5 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Set; + +import javax.net.ssl.SSLContext; + +import org.slf4j.LoggerFactory; + +import org.apache.qpid.configuration.CommonProperties; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.io.AbstractNetworkTransport; +import org.apache.qpid.transport.network.io.IdleTimeoutTicker; + +public class NonBlockingNetworkTransport +{ + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); + private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, + CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); + private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , + CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); + private SelectorThread _selector; + + + private Set _encryptionSet; + private volatile boolean _closed = false; + private NetworkTransportConfiguration _config; + private ProtocolEngineFactory _factory; + private SSLContext _sslContext; + private ServerSocketChannel _serverSocket; + private int _timeout; + + public void close() + { + if(_selector != null) + { + try + { + if (_serverSocket != null) + { + _selector.cancelAcceptingSocket(_serverSocket); + _serverSocket.close(); + } + } + catch (IOException e) + { + // TODO + e.printStackTrace(); + } + finally + { + + _selector.close(); + } + } + } + + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext, + final Set encryptionSet) + { + try + { + + _config = config; + _factory = factory; + _sslContext = sslContext; + _timeout = TIMEOUT; + + InetSocketAddress address = config.getAddress(); + + _serverSocket = ServerSocketChannel.open(); + + _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true); + _serverSocket.bind(address); + _serverSocket.configureBlocking(false); + _encryptionSet = encryptionSet; + + _selector = new SelectorThread(config.getAddress().toString(), this); + _selector.start(); + _selector.addAcceptingSocket(_serverSocket); + } + catch (IOException e) + { + throw new TransportException("Failed to start AMQP on port : " + config, e); + } + + + } + + public int getAcceptingPort() + { + return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort(); + } + + public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException + { + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() + .getRemoteSocketAddress()); + + if(engine != null) + { + socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NonBlockingConnection connection = + new NonBlockingConnection(socketChannel, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker, + _encryptionSet, + _sslContext, + _config.wantClientAuth(), + _config.needClientAuth(), + _config.getEnabledCipherSuites(), + _config.getDisabledCipherSuites(), + new Runnable() + { + + @Override + public void run() + { + engine.encryptedTransport(); + } + }, + _selector); + + engine.setNetworkConnection(connection, connection.getSender()); + connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); + + ticker.setConnection(connection); + + connection.start(); + + _selector.addConnection(connection); + + } + else + { + socketChannel.close(); + } + } + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java new file mode 100644 index 0000000000..786f1915a7 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.transport; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** +* Created by keith on 28/01/2015. +*/ +public class SelectorThread extends Thread +{ + private final Queue _tasks = new ConcurrentLinkedQueue<>(); + private final Queue _unregisteredConnections = new ConcurrentLinkedQueue<>(); + private final Set _unscheduledConnections = new HashSet<>(); + private final Selector _selector; + private final AtomicBoolean _closed = new AtomicBoolean(); + private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); + private final NonBlockingNetworkTransport _transport; + + SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport) + { + super("SelectorThread-"+name); + _transport = nonBlockingNetworkTransport; + try + { + _selector = Selector.open(); + } + catch (IOException e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public void addAcceptingSocket(final ServerSocketChannel socketChannel) + { + _tasks.add(new Runnable() + { + @Override + public void run() + { + + try + { + socketChannel.register(_selector, SelectionKey.OP_ACCEPT); + } + catch (ClosedChannelException e) + { + // TODO + e.printStackTrace(); + } + } + }); + _selector.wakeup(); + } + + public void cancelAcceptingSocket(final ServerSocketChannel socketChannel) + { + _tasks.add(new Runnable() + { + @Override + public void run() + { + SelectionKey selectionKey = socketChannel.keyFor(_selector); + if(selectionKey != null) + { + selectionKey.cancel(); + } + } + }); + _selector.wakeup(); + } + + @Override + public void run() + { + + long nextTimeout = 0; + + try + { + while (!_closed.get()) + { + + _selector.select(nextTimeout); + + while(_tasks.peek() != null) + { + Runnable task = _tasks.poll(); + task.run(); + } + + List toBeScheduled = new ArrayList<>(); + + + Set selectionKeys = _selector.selectedKeys(); + for (SelectionKey key : selectionKeys) + { + if(key.isAcceptable()) + { + // todo - should we schedule this rather than running in this thread? + SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept(); + _transport.acceptSocketChannel(acceptedChannel); + } + else + { + NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); + + key.channel().register(_selector, 0); + + toBeScheduled.add(connection); + _unscheduledConnections.remove(connection); + } + + } + selectionKeys.clear(); + + while (_unregisteredConnections.peek() != null) + { + NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); + _unscheduledConnections.add(unregisteredConnection); + + + final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) + | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); + unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); + + } + + long currentTime = System.currentTimeMillis(); + Iterator iterator = _unscheduledConnections.iterator(); + nextTimeout = Integer.MAX_VALUE; + while (iterator.hasNext()) + { + NonBlockingConnection connection = iterator.next(); + + int period = connection.getTicker().getTimeToNextTick(currentTime); + if (period < 0 || connection.isStateChanged()) + { + toBeScheduled.add(connection); + connection.getSocketChannel().register(_selector, 0).cancel(); + iterator.remove(); + } + else + { + nextTimeout = Math.min(period, nextTimeout); + } + } + + for (NonBlockingConnection connection : toBeScheduled) + { + _scheduler.schedule(connection); + } + + } + } + catch (IOException e) + { + //TODO + e.printStackTrace(); + } + finally + { + try + { + _selector.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + + + + } + + public void addConnection(final NonBlockingConnection connection) + { + _unregisteredConnections.add(connection); + _selector.wakeup(); + + } + + public void wakeup() + { + _selector.wakeup(); + } + + public void close() + { + _closed.set(true); + _selector.wakeup(); + _scheduler.close(); + } + + private class NetworkConnectionScheduler + { + private final ScheduledThreadPoolExecutor _executor; + private final AtomicInteger _running = new AtomicInteger(); + private final int _poolSize; + + private NetworkConnectionScheduler() + { + _poolSize = Runtime.getRuntime().availableProcessors(); + _executor = new ScheduledThreadPoolExecutor(_poolSize); + _executor.prestartAllCoreThreads(); + } + + public void processConnection(final NonBlockingConnection connection) + { + try + { + _running.incrementAndGet(); + boolean rerun; + do + { + rerun = false; + boolean closed = connection.doWork(); + + if (!closed) + { + + if (connection.isStateChanged()) + { + if (_running.get() == _poolSize) + { + schedule(connection); + } + else + { + rerun = true; + } + } + else + { + SelectorThread.this.addConnection(connection); + } + } + + } while (rerun); + } + finally + { + _running.decrementAndGet(); + } + } + + public void schedule(final NonBlockingConnection connection) + { + _executor.submit(new Runnable() + { + @Override + public void run() + { + processConnection(connection); + } + }); + } + + public void close() + { + _executor.shutdown(); + } + + + + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java index 5f5467db07..7874437a2f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java @@ -36,7 +36,6 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.network.TransportEncryption; -import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport; class TCPandSSLTransport implements AcceptingTransport { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java index e670c1f88b..dd43ae7e11 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.protocol.v0_10; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java index 5c919252b8..4231045afd 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import java.net.InetSocketAddress; import java.net.SocketAddress; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 75a162deb8..5f227e5f18 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -29,7 +29,7 @@ import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Consumer; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index d9b4495d6e..a2f1f1a4ba 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 8fdee7a0f7..d33297fbf6 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -35,7 +35,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java index e11d2ce9bb..a7b08e3f83 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java index b05edc5d04..b9f013d253 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.test.utils.QpidTestCase; public class WindowCreditManagerTest extends QpidTestCase diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 945e18e560..ed075038e6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -58,7 +58,7 @@ import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java index 2d32617106..6e5aab2dd5 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java @@ -19,7 +19,7 @@ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class NoAckCreditManager extends AbstractFlowCreditManager diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java index e63645ed09..a869a707e1 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; import org.apache.qpid.server.flow.FlowCreditManager; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java index 0058fe86a9..e8cf028069 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java index 7253111114..8817e79aff 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java index e72cc4d058..af37b17d85 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index b6c23dff7a..a442b5c437 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -44,7 +44,7 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.End; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 589bd0ec04..ebd23f31ae 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -40,7 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java index fa8134cb55..e72dc17b57 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 22cf76a3ea..d361dce682 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -52,7 +52,7 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Consumer; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java deleted file mode 100644 index df4d7c7721..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.protocol; - -import javax.security.auth.Subject; - -public interface ServerProtocolEngine extends ProtocolEngine -{ - /** - * Gets the connection ID associated with this ProtocolEngine - */ - long getConnectionId(); - - Subject getSubject(); - - boolean isTransportBlockedForWriting(); - - void setTransportBlockedForWriting(boolean blocked); - - void setMessageAssignmentSuspended(boolean value); - - boolean isMessageAssignmentSuspended(); - - void processPendingMessages(); -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java index 54a2a360bb..71704fca3a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java @@ -25,7 +25,7 @@ import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.TransportActivity; -class IdleTimeoutTicker implements Ticker +public class IdleTimeoutTicker implements Ticker { private final TransportActivity _transport; private final int _defaultTimeout; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java deleted file mode 100644 index 7c7c6929d1..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java +++ /dev/null @@ -1,187 +0,0 @@ -/* -* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import java.net.SocketAddress; -import java.nio.channels.SocketChannel; -import java.security.Principal; -import java.util.Collection; -import java.util.Set; - -import javax.net.ssl.SSLContext; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.ByteBufferSender; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.Ticker; -import org.apache.qpid.transport.network.TransportEncryption; - -public class NonBlockingConnection implements NetworkConnection -{ - private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); - private final SocketChannel _socketChannel; - private final long _timeout; - private final NonBlockingSenderReceiver _nonBlockingSenderReceiver; - private final Ticker _ticker; - private final SelectorThread _selector; - private int _maxReadIdle; - private int _maxWriteIdle; - private Principal _principal; - private boolean _principalChecked; - private final Object _lock = new Object(); - - public NonBlockingConnection(SocketChannel socketChannel, - ServerProtocolEngine delegate, - int sendBufferSize, - int receiveBufferSize, - long timeout, - Ticker ticker, - final Set encryptionSet, - final SSLContext sslContext, - final boolean wantClientAuth, - final boolean needClientAuth, - final Collection enabledCipherSuites, - final Collection disabledCipherSuites, - final Runnable onTransportEncryptionAction, - final SelectorThread selectorThread) - { - _socketChannel = socketChannel; - _timeout = timeout; - _ticker = ticker; - _selector = selectorThread; - - _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(this, - delegate, - receiveBufferSize, - ticker, - encryptionSet, - sslContext, - wantClientAuth, - needClientAuth, - enabledCipherSuites, - disabledCipherSuites, - onTransportEncryptionAction); - - } - - - public Ticker getTicker() - { - return _ticker; - } - - public SocketChannel getSocketChannel() - { - return _socketChannel; - } - - public void start() - { - } - - public ByteBufferSender getSender() - { - return _nonBlockingSenderReceiver; - } - - public void close() - { - _nonBlockingSenderReceiver.close(); - } - - public SocketAddress getRemoteAddress() - { - return _socketChannel.socket().getRemoteSocketAddress(); - } - - public SocketAddress getLocalAddress() - { - return _socketChannel.socket().getLocalSocketAddress(); - } - - public void setMaxWriteIdle(int sec) - { - _maxWriteIdle = sec; - } - - public void setMaxReadIdle(int sec) - { - _maxReadIdle = sec; - } - - @Override - public Principal getPeerPrincipal() - { - synchronized (_lock) - { - if(!_principalChecked) - { - - _principal = _nonBlockingSenderReceiver.getPeerPrincipal(); - - _principalChecked = true; - } - - return _principal; - } - } - - @Override - public int getMaxReadIdle() - { - return _maxReadIdle; - } - - @Override - public int getMaxWriteIdle() - { - return _maxWriteIdle; - } - - public boolean canRead() - { - return _nonBlockingSenderReceiver.canRead(); - } - - public boolean waitingForWrite() - { - return _nonBlockingSenderReceiver.waitingForWrite(); - } - - public boolean isStateChanged() - { - - return _nonBlockingSenderReceiver.isStateChanged(); - } - - public boolean doWork() - { - return _nonBlockingSenderReceiver.doWork(); - } - - public SelectorThread getSelector() - { - return _selector; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java deleted file mode 100644 index 1c49efc294..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.StandardSocketOptions; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.Set; - -import javax.net.ssl.SSLContext; - -import org.slf4j.LoggerFactory; - -import org.apache.qpid.configuration.CommonProperties; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.TransportEncryption; - -public class NonBlockingNetworkTransport -{ - - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); - private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, - CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); - private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , - CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); - private SelectorThread _selector; - - - private Set _encryptionSet; - private volatile boolean _closed = false; - private NetworkTransportConfiguration _config; - private ProtocolEngineFactory _factory; - private SSLContext _sslContext; - private ServerSocketChannel _serverSocket; - private int _timeout; - - public void close() - { - if(_selector != null) - { - try - { - if (_serverSocket != null) - { - _selector.cancelAcceptingSocket(_serverSocket); - _serverSocket.close(); - } - } - catch (IOException e) - { - // TODO - e.printStackTrace(); - } - finally - { - - _selector.close(); - } - } - } - - public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext, - final Set encryptionSet) - { - try - { - - _config = config; - _factory = factory; - _sslContext = sslContext; - _timeout = TIMEOUT; - - InetSocketAddress address = config.getAddress(); - - _serverSocket = ServerSocketChannel.open(); - - _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true); - _serverSocket.bind(address); - _serverSocket.configureBlocking(false); - _encryptionSet = encryptionSet; - - _selector = new SelectorThread(config.getAddress().toString(), this); - _selector.start(); - _selector.addAcceptingSocket(_serverSocket); - } - catch (IOException e) - { - throw new TransportException("Failed to start AMQP on port : " + config, e); - } - - - } - - public int getAcceptingPort() - { - return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort(); - } - - public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException - { - final ServerProtocolEngine engine = - (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() - .getRemoteSocketAddress()); - - if(engine != null) - { - socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); - socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); - - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); - - socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - - - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - - NonBlockingConnection connection = - new NonBlockingConnection(socketChannel, - engine, - sendBufferSize, - receiveBufferSize, - _timeout, - ticker, - _encryptionSet, - _sslContext, - _config.wantClientAuth(), - _config.needClientAuth(), - _config.getEnabledCipherSuites(), - _config.getDisabledCipherSuites(), - new Runnable() - { - - @Override - public void run() - { - engine.encryptedTransport(); - } - }, - _selector); - - engine.setNetworkConnection(connection, connection.getSender()); - connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); - - ticker.setConnection(connection); - - connection.start(); - - _selector.addConnection(connection); - - } - else - { - socketChannel.close(); - } - } - - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java deleted file mode 100644 index ee99233063..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ /dev/null @@ -1,551 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.transport.network.io; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.security.Principal; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.ListIterator; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLPeerUnverifiedException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.ByteBufferSender; -import org.apache.qpid.transport.SenderClosedException; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.network.Ticker; -import org.apache.qpid.transport.network.TransportEncryption; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; -import org.apache.qpid.util.SystemUtils; - -public class NonBlockingSenderReceiver implements ByteBufferSender -{ - private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class); - public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; - - private final SocketChannel _socketChannel; - - private final ConcurrentLinkedQueue _buffers = new ConcurrentLinkedQueue<>(); - private final List _encryptedOutput = new ArrayList<>(); - - private final String _remoteSocketAddress; - private final AtomicBoolean _closed = new AtomicBoolean(false); - private final ServerProtocolEngine _protocolEngine; - private final int _receiveBufSize; - private final Ticker _ticker; - private final Set _encryptionSet; - private final SSLContext _sslContext; - private final Runnable _onTransportEncryptionAction; - private final NonBlockingConnection _connection; - private ByteBuffer _netInputBuffer; - private SSLEngine _sslEngine; - - private ByteBuffer _currentBuffer; - - private TransportEncryption _transportEncryption; - private SSLEngineResult _status; - private volatile boolean _fullyWritten = true; - private AtomicBoolean _stateChanged = new AtomicBoolean(); - private boolean _workDone; - - - public NonBlockingSenderReceiver(final NonBlockingConnection connection, - ServerProtocolEngine protocolEngine, - int receiveBufSize, - Ticker ticker, - final Set encryptionSet, - final SSLContext sslContext, - final boolean wantClientAuth, - final boolean needClientAuth, - final Collection enabledCipherSuites, - final Collection disabledCipherSuites, - final Runnable onTransportEncryptionAction) - { - _connection = connection; - _socketChannel = connection.getSocketChannel(); - _protocolEngine = protocolEngine; - _receiveBufSize = receiveBufSize; - _ticker = ticker; - _encryptionSet = encryptionSet; - _sslContext = sslContext; - _onTransportEncryptionAction = onTransportEncryptionAction; - - if(encryptionSet.size() == 1) - { - _transportEncryption = _encryptionSet.iterator().next(); - if (_transportEncryption == TransportEncryption.TLS) - { - onTransportEncryptionAction.run(); - } - } - - if(encryptionSet.contains(TransportEncryption.TLS)) - { - _sslEngine = _sslContext.createSSLEngine(); - _sslEngine.setUseClientMode(false); - SSLUtil.removeSSLv3Support(_sslEngine); - SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites); - - if(needClientAuth) - { - _sslEngine.setNeedClientAuth(true); - } - else if(wantClientAuth) - { - _sslEngine.setWantClientAuth(true); - } - _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2)); - } - - try - { - _remoteSocketAddress = _socketChannel.getRemoteAddress().toString(); - _socketChannel.configureBlocking(false); - } - catch (IOException e) - { - throw new SenderException("Unable to prepare the channel for non-blocking IO", e); - } - - } - - @Override - public void send(final ByteBuffer msg) - { - if (_closed.get()) - { - throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed"); - } - // append to list and do selector wakeup - _buffers.add(msg); - _stateChanged.set(true); - } - - - public boolean doWork() - { - _stateChanged.set(false); - boolean closed = _closed.get(); - if (!closed) - { - try - { - _workDone = false; - - long currentTime = System.currentTimeMillis(); - int tick = _ticker.getTimeToNextTick(currentTime); - if (tick <= 0) - { - _ticker.tick(currentTime); - } - - _protocolEngine.setMessageAssignmentSuspended(true); - - _protocolEngine.processPendingMessages(); - - _protocolEngine.setTransportBlockedForWriting(!doWrite()); - boolean dataRead = doRead(); - _fullyWritten = doWrite(); - _protocolEngine.setTransportBlockedForWriting(!_fullyWritten); - - if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)) - { - _stateChanged.set(true); - } - - // tell all consumer targets that it is okay to accept more - _protocolEngine.setMessageAssignmentSuspended(false); - } - catch (IOException e) - { - LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); - close(); - } - } - else - { - - if(!SystemUtils.isWindows()) - { - try - { - _socketChannel.shutdownInput(); - } - catch (IOException e) - { - LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e); - - } - } - try - { - while(!doWrite()) - { - } - } - catch (IOException e) - { - LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e); - - } - LOGGER.debug("Closing receiver"); - _protocolEngine.closed(); - - try - { - if(!SystemUtils.isWindows()) - { - _socketChannel.shutdownOutput(); - } - - _socketChannel.close(); - } - catch (IOException e) - { - LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e); - } - } - - return closed; - - } - - @Override - public void flush() - { - _stateChanged.set(true); - _connection.getSelector().wakeup(); - - } - - @Override - public void close() - { - LOGGER.debug("Closing " + _remoteSocketAddress); - if(_closed.compareAndSet(false,true)) - { - _stateChanged.set(true); - _connection.getSelector().wakeup(); - } - } - - private boolean doWrite() throws IOException - { - - ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; - Iterator bufferIterator = _buffers.iterator(); - for (int i = 0; i < bufArray.length; i++) - { - bufArray[i] = bufferIterator.next(); - } - - int byteBuffersWritten = 0; - - if(_transportEncryption == TransportEncryption.NONE) - { - - - long written = _socketChannel.write(bufArray); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Written " + written + " bytes"); - } - - for (ByteBuffer buf : bufArray) - { - if (buf.remaining() == 0) - { - byteBuffersWritten++; - _buffers.poll(); - } - } - - - return bufArray.length == byteBuffersWritten; - } - else if(_transportEncryption == TransportEncryption.TLS) - { - int remaining = 0; - do - { - if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) - { - _workDone = true; - final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); - _status = _sslEngine.wrap(bufArray, netBuffer); - runSSLEngineTasks(_status); - - netBuffer.flip(); - remaining = netBuffer.remaining(); - if (remaining != 0) - { - _encryptedOutput.add(netBuffer); - } - for (ByteBuffer buf : bufArray) - { - if (buf.remaining() == 0) - { - byteBuffersWritten++; - _buffers.poll(); - } - } - } - - } - while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP); - ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); - long written = _socketChannel.write(encryptedBuffers); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Written " + written + " encrypted bytes"); - } - ListIterator iter = _encryptedOutput.listIterator(); - while(iter.hasNext()) - { - ByteBuffer buf = iter.next(); - if(buf.remaining() == 0) - { - iter.remove(); - } - else - { - break; - } - } - - return bufArray.length == byteBuffersWritten; - - } - else - { - return true; - } - } - - private boolean doRead() throws IOException - { - boolean readData = false; - if(_transportEncryption == TransportEncryption.NONE) - { - int remaining = 0; - while (remaining == 0 && !_closed.get()) - { - if (_currentBuffer == null || _currentBuffer.remaining() == 0) - { - _currentBuffer = ByteBuffer.allocate(_receiveBufSize); - } - int read = _socketChannel.read(_currentBuffer); - if(read > 0) - { - readData = true; - } - if (read == -1) - { - _closed.set(true); - } - remaining = _currentBuffer.remaining(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Read " + read + " byte(s)"); - } - ByteBuffer dup = _currentBuffer.duplicate(); - dup.flip(); - _currentBuffer = _currentBuffer.slice(); - _protocolEngine.received(dup); - } - } - else if(_transportEncryption == TransportEncryption.TLS) - { - int read = 1; - while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED)) - { - read = _socketChannel.read(_netInputBuffer); - if (read == -1) - { - _closed.set(true); - } - else if(read > 0) - { - readData = true; - } - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Read " + read + " encrypted bytes "); - } - - _netInputBuffer.flip(); - - - int unwrapped = 0; - boolean tasksRun; - do - { - ByteBuffer appInputBuffer = - ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50); - - _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer); - tasksRun = runSSLEngineTasks(_status); - - appInputBuffer.flip(); - unwrapped = appInputBuffer.remaining(); - if(unwrapped > 0) - { - readData = true; - } - _protocolEngine.received(appInputBuffer); - } - while(unwrapped > 0 || tasksRun); - - _netInputBuffer.compact(); - - } - } - else - { - int read = 1; - while (!_closed.get() && read > 0) - { - - read = _socketChannel.read(_netInputBuffer); - if (read == -1) - { - _closed.set(true); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer); - } - - if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK) - { - _netInputBuffer.flip(); - final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK]; - ByteBuffer dup = _netInputBuffer.duplicate(); - dup.get(headerBytes); - - _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE; - LOGGER.debug("Identified transport encryption as " + _transportEncryption); - - if (_transportEncryption == TransportEncryption.NONE) - { - _protocolEngine.received(_netInputBuffer); - } - else - { - _onTransportEncryptionAction.run(); - _netInputBuffer.compact(); - readData = doRead(); - } - break; - } - } - } - return readData; - } - - private boolean runSSLEngineTasks(final SSLEngineResult status) - { - if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) - { - Runnable task; - while((task = _sslEngine.getDelegatedTask()) != null) - { - task.run(); - } - return true; - } - return false; - } - - private boolean looksLikeSSL(byte[] headerBytes) - { - return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); - } - - private boolean looksLikeSSLv3ClientHello(byte[] headerBytes) - { - return headerBytes[0] == 22 && // SSL Handshake - (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[2] == 0 || // SSL 3.0 - headerBytes[2] == 1 || // TLS 1.0 - headerBytes[2] == 2 || // TLS 1.1 - headerBytes[2] == 3)) && // TLS1.2 - (headerBytes[5] == 1); // client_hello - } - - private boolean looksLikeSSLv2ClientHello(byte[] headerBytes) - { - return headerBytes[0] == -128 && - headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[4] == 0 || // SSL 3.0 - headerBytes[4] == 1 || // TLS 1.0 - headerBytes[4] == 2 || // TLS 1.1 - headerBytes[4] == 3); - } - - public Principal getPeerPrincipal() - { - - if (_sslEngine != null) - { - try - { - return _sslEngine.getSession().getPeerPrincipal(); - } - catch (SSLPeerUnverifiedException e) - { - return null; - } - } - - return null; - } - - public boolean canRead() - { - return true; - } - - public boolean waitingForWrite() - { - return !_fullyWritten; - } - - public boolean isStateChanged() - { - return _stateChanged.get(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java deleted file mode 100644 index bd8d3ad804..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.transport.network.io; - -import java.io.IOException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -/** -* Created by keith on 28/01/2015. -*/ -public class SelectorThread extends Thread -{ - private final Queue _tasks = new ConcurrentLinkedQueue<>(); - private final Queue _unregisteredConnections = new ConcurrentLinkedQueue<>(); - private final Set _unscheduledConnections = new HashSet<>(); - private final Selector _selector; - private final AtomicBoolean _closed = new AtomicBoolean(); - private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); - private final NonBlockingNetworkTransport _transport; - - SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport) - { - super("SelectorThread-"+name); - _transport = nonBlockingNetworkTransport; - try - { - _selector = Selector.open(); - } - catch (IOException e) - { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - - public void addAcceptingSocket(final ServerSocketChannel socketChannel) - { - _tasks.add(new Runnable() - { - @Override - public void run() - { - - try - { - socketChannel.register(_selector, SelectionKey.OP_ACCEPT); - } - catch (ClosedChannelException e) - { - // TODO - e.printStackTrace(); - } - } - }); - _selector.wakeup(); - } - - public void cancelAcceptingSocket(final ServerSocketChannel socketChannel) - { - _tasks.add(new Runnable() - { - @Override - public void run() - { - SelectionKey selectionKey = socketChannel.keyFor(_selector); - if(selectionKey != null) - { - selectionKey.cancel(); - } - } - }); - _selector.wakeup(); - } - - @Override - public void run() - { - - long nextTimeout = 0; - - try - { - while (!_closed.get()) - { - - _selector.select(nextTimeout); - - while(_tasks.peek() != null) - { - Runnable task = _tasks.poll(); - task.run(); - } - - List toBeScheduled = new ArrayList<>(); - - - Set selectionKeys = _selector.selectedKeys(); - for (SelectionKey key : selectionKeys) - { - if(key.isAcceptable()) - { - // todo - should we schedule this rather than running in this thread? - SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept(); - _transport.acceptSocketChannel(acceptedChannel); - } - else - { - NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); - - key.channel().register(_selector, 0); - - toBeScheduled.add(connection); - _unscheduledConnections.remove(connection); - } - - } - selectionKeys.clear(); - - while (_unregisteredConnections.peek() != null) - { - NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); - _unscheduledConnections.add(unregisteredConnection); - - - final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) - | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); - unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); - - } - - long currentTime = System.currentTimeMillis(); - Iterator iterator = _unscheduledConnections.iterator(); - nextTimeout = Integer.MAX_VALUE; - while (iterator.hasNext()) - { - NonBlockingConnection connection = iterator.next(); - - int period = connection.getTicker().getTimeToNextTick(currentTime); - if (period < 0 || connection.isStateChanged()) - { - toBeScheduled.add(connection); - connection.getSocketChannel().register(_selector, 0).cancel(); - iterator.remove(); - } - else - { - nextTimeout = Math.min(period, nextTimeout); - } - } - - for (NonBlockingConnection connection : toBeScheduled) - { - _scheduler.schedule(connection); - } - - } - } - catch (IOException e) - { - //TODO - e.printStackTrace(); - } - finally - { - try - { - _selector.close(); - } - catch (IOException e) - { - e.printStackTrace(); - } - } - - - - - } - - public void addConnection(final NonBlockingConnection connection) - { - _unregisteredConnections.add(connection); - _selector.wakeup(); - - } - - public void wakeup() - { - _selector.wakeup(); - } - - public void close() - { - _closed.set(true); - _selector.wakeup(); - _scheduler.close(); - } - - private class NetworkConnectionScheduler - { - private final ScheduledThreadPoolExecutor _executor; - private final AtomicInteger _running = new AtomicInteger(); - private final int _poolSize; - - private NetworkConnectionScheduler() - { - _poolSize = Runtime.getRuntime().availableProcessors(); - _executor = new ScheduledThreadPoolExecutor(_poolSize); - _executor.prestartAllCoreThreads(); - } - - public void processConnection(final NonBlockingConnection connection) - { - try - { - _running.incrementAndGet(); - boolean rerun; - do - { - rerun = false; - boolean closed = connection.doWork(); - - if (!closed) - { - - if (connection.isStateChanged()) - { - if (_running.get() == _poolSize) - { - schedule(connection); - } - else - { - rerun = true; - } - } - else - { - SelectorThread.this.addConnection(connection); - } - } - - } while (rerun); - } - finally - { - _running.decrementAndGet(); - } - } - - public void schedule(final NonBlockingConnection connection) - { - _executor.submit(new Runnable() - { - @Override - public void run() - { - processConnection(connection); - } - }); - } - - public void close() - { - _executor.shutdown(); - } - - - - } -} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index 84eb761899..a1e30ac83e 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -33,7 +33,6 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.Set; -import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; -- cgit v1.2.1