diff options
Diffstat (limited to 'qpid/java')
63 files changed, 1797 insertions, 1343 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 2e7f3eee7f..6587bc76b2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -64,12 +64,12 @@ import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; public abstract class AbstractExchange<T extends AbstractExchange<T>> extends AbstractConfiguredObject<T> @@ -510,7 +510,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> { if (_virtualHost.getState() != State.ACTIVE) { - throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent"); + throw new VirtualHostUnavailableException(this._virtualHost); } List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties); @@ -910,4 +910,5 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord()); } } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java deleted file mode 100644 index be3a13d2d3..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ /dev/null @@ -1,87 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -import java.util.concurrent.atomic.AtomicLong; - -public class BytesOnlyCreditManager extends AbstractFlowCreditManager -{ - private final AtomicLong _bytesCredit; - - public BytesOnlyCreditManager(long initialCredit) - { - _bytesCredit = new AtomicLong(initialCredit); - } - - public long getMessageCredit() - { - return -1L; - } - - public long getBytesCredit() - { - return _bytesCredit.get(); - } - - public void restoreCredit(long messageCredit, long bytesCredit) - { - _bytesCredit.addAndGet(bytesCredit); - setSuspended(false); - } - - public void removeAllCredit() - { - _bytesCredit.set(0L); - } - - public boolean hasCredit() - { - return _bytesCredit.get() > 0L; - } - - public boolean useCreditForMessage(long msgSize) - { - if(hasCredit()) - { - if(_bytesCredit.addAndGet(-msgSize) >= 0) - { - return true; - } - else - { - _bytesCredit.addAndGet(msgSize); - setSuspended(true); - return false; - } - } - else - { - return false; - } - - } - - public void setBytesCredit(long bytesCredit) - { - _bytesCredit.set( bytesCredit ); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index 280f2851a4..08aac0b511 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -24,10 +24,6 @@ package org.apache.qpid.server.flow; public interface FlowCreditManager { - long getMessageCredit(); - - long getBytesCredit(); - public static interface FlowCreditManagerListener { void creditStateChanged(boolean hasCredit); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java deleted file mode 100644 index 31c1fda968..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ /dev/null @@ -1,90 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -public class MessageAndBytesCreditManager extends AbstractFlowCreditManager implements FlowCreditManager -{ - private long _messageCredit; - private long _bytesCredit; - - public MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit) - { - _messageCredit = messageCredit; - _bytesCredit = bytesCredit; - } - - public synchronized long getMessageCredit() - { - return _messageCredit; - } - - public synchronized long getBytesCredit() - { - return _bytesCredit; - } - - public synchronized void restoreCredit(long messageCredit, long bytesCredit) - { - _messageCredit += messageCredit; - _bytesCredit += bytesCredit; - setSuspended(hasCredit()); - } - - public synchronized void removeAllCredit() - { - _messageCredit = 0L; - _bytesCredit = 0L; - setSuspended(true); - } - - public synchronized boolean hasCredit() - { - return (_messageCredit > 0L) && ( _bytesCredit > 0L ); - } - - public synchronized boolean useCreditForMessage(final long msgSize) - { - if(_messageCredit == 0L) - { - setSuspended(true); - return false; - } - else - { - if(msgSize > _bytesCredit) - { - setSuspended(true); - return false; - } - _messageCredit--; - _bytesCredit -= msgSize; - setSuspended(false); - return true; - } - - } - - public synchronized void setBytesCredit(long bytesCredit) - { - _bytesCredit = bytesCredit; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index f13af479ad..40aa1bbafd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -113,4 +113,6 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo * @return the time of the last activity or 0 if not in a transaction */ long getTransactionUpdateTime(); + + void transportStateChanged(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index dd5e01ebc5..3c25e0934c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -24,12 +24,8 @@ package org.apache.qpid.server.protocol; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.security.Principal; import java.util.Set; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.auth.Subject; import org.apache.log4j.Logger; @@ -43,21 +39,14 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender; -import org.apache.qpid.transport.network.security.ssl.SSLReceiver; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; public class MultiVersionProtocolEngine implements ServerProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); private final long _id; - private final SSLContext _sslContext; - private final boolean _wantClientAuth; - private final boolean _needClientAuth; private final AmqpPort<?> _port; - private final Transport _transport; + private Transport _transport; private final ProtocolEngineCreator[] _creators; private final Runnable _onCloseTask; @@ -71,9 +60,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); public MultiVersionProtocolEngine(final Broker<?> broker, - SSLContext sslContext, - boolean wantClientAuth, - boolean needClientAuth, final Set<Protocol> supported, final Protocol defaultSupportedReply, AmqpPort<?> port, @@ -92,16 +78,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _broker = broker; _supported = supported; _defaultSupportedReply = defaultSupportedReply; - _sslContext = sslContext; - _wantClientAuth = wantClientAuth; - _needClientAuth = needClientAuth; _port = port; _transport = transport; _creators = creators; _onCloseTask = onCloseTask; } - public SocketAddress getRemoteAddress() { return _delegate.getRemoteAddress(); @@ -147,6 +129,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _delegate.readerIdle(); } + @Override + public void encryptedTransport() + { + _delegate.encryptedTransport(); + } + public void received(ByteBuffer msg) { @@ -169,6 +157,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getSubject(); } + @Override + public boolean isTransportBlockedForWriting() + { + return _delegate.isTransportBlockedForWriting(); + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _delegate.setTransportBlockedForWriting(blocked); + } + private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) @@ -247,6 +247,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } + @Override + public void encryptedTransport() + { + + } + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) { @@ -274,12 +280,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { return new Subject(); } + + @Override + public boolean isTransportBlockedForWriting() + { + return false; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + } } private class SelfDelegateProtocolEngine implements ServerProtocolEngine { private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); - private long _lastReadTime; + private long _lastReadTime = System.currentTimeMillis(); public SocketAddress getRemoteAddress() { @@ -360,15 +377,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } } - - if(newDelegate == null && looksLikeSSL(headerBytes)) - { - if(_sslContext != null) - { - newDelegate = new SslDelegateProtocolEngine(); - } - } - // If no delegate is found then send back a supported protocol version id if(newDelegate == null) { @@ -423,6 +431,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getSubject(); } + @Override + public boolean isTransportBlockedForWriting() + { + return false; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + } + public void exception(Throwable t) { _logger.error("Error establishing session", t); @@ -466,131 +485,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _network.close(); } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) - { - - } - @Override - public long getLastReadTime() + public void encryptedTransport() { - return _lastReadTime; - } - - @Override - public long getLastWriteTime() - { - return 0; - } - } - - private class SslDelegateProtocolEngine implements ServerProtocolEngine - { - private final MultiVersionProtocolEngine _decryptEngine; - private final SSLEngine _engine; - private final SSLReceiver _sslReceiver; - private final SSLBufferingSender _sslSender; - private long _lastReadTime; - - private SslDelegateProtocolEngine() - { - - _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported, - _defaultSupportedReply, _port, Transport.SSL, _id, _creators, - null); - - _engine = _sslContext.createSSLEngine(); - _engine.setUseClientMode(false); - SSLUtil.removeSSLv3Support(_engine); - - if(_needClientAuth) - { - _engine.setNeedClientAuth(true); - } - else if(_wantClientAuth) + if(_transport == Transport.TCP) { - _engine.setWantClientAuth(true); + _transport = Transport.SSL; } - - SSLStatus sslStatus = new SSLStatus(); - _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus); - _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus); - _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender); - } - - @Override - public void received(ByteBuffer msg) - { - _lastReadTime = System.currentTimeMillis(); - _sslReceiver.received(msg); - _sslSender.send(); - _sslSender.flush(); } - @Override public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) { - //TODO - Implement - } - - @Override - public SocketAddress getRemoteAddress() - { - return _decryptEngine.getRemoteAddress(); - } - - @Override - public SocketAddress getLocalAddress() - { - return _decryptEngine.getLocalAddress(); - } - - @Override - public long getWrittenBytes() - { - return _decryptEngine.getWrittenBytes(); - } - - @Override - public long getReadBytes() - { - return _decryptEngine.getReadBytes(); - } - @Override - public void closed() - { - _decryptEngine.closed(); - } - - @Override - public void writerIdle() - { - _decryptEngine.writerIdle(); - } - - @Override - public void readerIdle() - { - _decryptEngine.readerIdle(); - } - - @Override - public void exception(Throwable t) - { - _decryptEngine.exception(t); - } - - @Override - public long getConnectionId() - { - return _decryptEngine.getConnectionId(); - } - - @Override - public Subject getSubject() - { - return _decryptEngine.getSubject(); } @Override @@ -602,132 +508,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine @Override public long getLastWriteTime() { - return _decryptEngine.getLastWriteTime(); + return 0; } } - private boolean looksLikeSSL(byte[] headerBytes) - { - return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); - } - - private boolean looksLikeSSLv3ClientHello(byte[] headerBytes) - { - return headerBytes[0] == 22 && // SSL Handshake - (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[2] == 0 || // SSL 3.0 - headerBytes[2] == 1 || // TLS 1.0 - headerBytes[2] == 2 || // TLS 1.1 - headerBytes[2] == 3)) && // TLS1.2 - (headerBytes[5] == 1); // client_hello - } - - private boolean looksLikeSSLv2ClientHello(byte[] headerBytes) - { - return headerBytes[0] == -128 && - headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[4] == 0 || // SSL 3.0 - headerBytes[4] == 1 || // TLS 1.0 - headerBytes[4] == 2 || // TLS 1.1 - headerBytes[4] == 3); - } - - - private static class SSLNetworkConnection implements NetworkConnection - { - private final NetworkConnection _network; - private final SSLBufferingSender _sslSender; - private final SSLEngine _engine; - private Principal _principal; - private boolean _principalChecked; - private final Object _lock = new Object(); - - public SSLNetworkConnection(SSLEngine engine, NetworkConnection network, - SSLBufferingSender sslSender) - { - _engine = engine; - _network = network; - _sslSender = sslSender; - - } - - @Override - public Sender<ByteBuffer> getSender() - { - return _sslSender; - } - - @Override - public void start() - { - _network.start(); - } - - @Override - public void close() - { - _sslSender.close(); - - _network.close(); - } - - @Override - public SocketAddress getRemoteAddress() - { - return _network.getRemoteAddress(); - } - - @Override - public SocketAddress getLocalAddress() - { - return _network.getLocalAddress(); - } - - @Override - public void setMaxWriteIdle(int sec) - { - _network.setMaxWriteIdle(sec); - } - - @Override - public void setMaxReadIdle(int sec) - { - _network.setMaxReadIdle(sec); - } - - @Override - public Principal getPeerPrincipal() - { - synchronized (_lock) - { - if(!_principalChecked) - { - try - { - _principal = _engine.getSession().getPeerPrincipal(); - } - catch (SSLPeerUnverifiedException e) - { - _principal = null; - } - - _principalChecked = true; - } - - return _principal; - } - } - - @Override - public int getMaxReadIdle() - { - return _network.getMaxReadIdle(); - } - @Override - public int getMaxWriteIdle() - { - return _network.getMaxWriteIdle(); - } - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 5c704c5967..a51717e79e 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -27,10 +27,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import javax.net.ssl.SSLContext; - import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.PortMessages; import org.apache.qpid.server.logging.subjects.PortLogSubject; import org.apache.qpid.server.model.Broker; @@ -48,9 +45,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory private final Broker<?> _broker; private final Set<Protocol> _supported; private final Protocol _defaultSupportedReply; - private final SSLContext _sslContext; - private final boolean _wantClientAuth; - private final boolean _needClientAuth; private final AmqpPort<?> _port; private final Transport _transport; private final ProtocolEngineCreator[] _creators; @@ -58,9 +52,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory _connectionCountDecrementingTask = new ConnectionCountDecrementingTask(); public MultiVersionProtocolEngineFactory(Broker<?> broker, - SSLContext sslContext, - boolean wantClientAuth, - boolean needClientAuth, final Set<Protocol> supportedVersions, final Protocol defaultSupportedReply, AmqpPort<?> port, @@ -73,7 +64,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory } _broker = broker; - _sslContext = sslContext; _supported = supportedVersions; _defaultSupportedReply = defaultSupportedReply; final List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>(); @@ -83,18 +73,16 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory } Collections.sort(creators, new ProtocolEngineCreatorComparator()); _creators = creators.toArray(new ProtocolEngineCreator[creators.size()]); - _wantClientAuth = wantClientAuth; - _needClientAuth = needClientAuth; _port = port; _transport = transport; } - public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress) + public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress) { if(_port.canAcceptNewConnection(remoteSocketAddress)) { _port.incrementConnectionCount(); - return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth, + return new MultiVersionProtocolEngine(_broker, _supported, _defaultSupportedReply, _port, _transport, ID_GENERATOR.getAndIncrement(), _creators, _connectionCountDecrementingTask); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index f905558f13..26cbf379a0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogMessage; @@ -2522,7 +2523,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { if (_virtualHost.getState() != State.ACTIVE) { - throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent"); + throw new VirtualHostUnavailableException(this._virtualHost); } if(!message.isReferenced(this)) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java index b1f6b84b72..2fd10e4de4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java @@ -23,18 +23,20 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; import java.net.InetSocketAddress; +import java.util.EnumSet; import java.util.Set; import javax.net.ssl.SSLContext; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport; class TCPandSSLTransport implements AcceptingTransport { @@ -78,17 +80,25 @@ class TCPandSSLTransport implements AcceptingTransport } final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(); - _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance(); + _networkTransport = new NonBlockingNetworkTransport(); final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory( - _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null, - settings.wantClientAuth(), settings.needClientAuth(), + _port.getParent(Broker.class), _supported, _defaultSupportedProtocolReply, _port, _transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL); - _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext); + EnumSet<TransportEncryption> encryptionSet = EnumSet.noneOf(TransportEncryption.class); + if(_transports.contains(Transport.TCP)) + { + encryptionSet.add(TransportEncryption.NONE); + } + if(_transports.contains(Transport.SSL)) + { + encryptionSet.add(TransportEncryption.TLS); + } + _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet); } public int getAcceptingPort() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java new file mode 100644 index 0000000000..a0bab0b0a6 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; + +public class VirtualHostUnavailableException extends ConnectionScopedRuntimeException +{ + public VirtualHostUnavailableException(VirtualHostImpl<?, ?, ?> host) + { + super("Virtualhost state " + + host.getState() + + " prevents the message from being sent"); + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 1c42d9b6fe..47ed224133 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -456,6 +456,12 @@ public class MockConsumer implements ConsumerTarget { return 0; } + + @Override + public void transportStateChanged() + { + + } } private static class MockConnectionModel implements AMQConnectionModel @@ -663,5 +669,7 @@ public class MockConsumer implements ConsumerTarget { } + + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 89d681111b..afa4fb8bc0 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -158,6 +158,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC return _name; } + public void transportStateChanged() + { + _creditManager.restoreCredit(0, 0); + } public static class AddMessageDispositionListenerAction implements Runnable { @@ -555,10 +559,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC switch(flowMode) { case CREDIT: - _creditManager = new CreditCreditManager(0l,0l); + _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine()); break; case WINDOW: - _creditManager = new WindowCreditManager(0l,0l); + _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine()); break; default: // this should never happen, as 0-10 is finalised and so the enum should never change diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java index 8dddac9809..e670c1f88b 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java @@ -21,48 +21,27 @@ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { + private final ServerProtocolEngine _serverProtocolEngine; private volatile long _bytesCredit; private volatile long _messageCredit; - public CreditCreditManager(long bytesCredit, long messageCredit) + public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine) { + _serverProtocolEngine = serverProtocolEngine; _bytesCredit = bytesCredit; _messageCredit = messageCredit; setSuspended(!hasCredit()); } - - public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit) - { - _bytesCredit = bytesCredit; - _messageCredit = messageCredit; - - setSuspended(!hasCredit()); - - } - - - public long getMessageCredit() - { - return _messageCredit == -1L - ? Long.MAX_VALUE - : _messageCredit; - } - - public long getBytesCredit() - { - return _bytesCredit == -1L - ? Long.MAX_VALUE - : _bytesCredit; - } - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { + setSuspended(!hasCredit()); } @@ -107,12 +86,17 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl public synchronized boolean hasCredit() { // Note !=, if credit is < 0 that indicates infinite credit - return (_bytesCredit != 0L && _messageCredit != 0L); + return (_bytesCredit != 0L && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting()); } public synchronized boolean useCreditForMessage(long msgSize) { - if(_messageCredit >= 0L) + if (_serverProtocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCredit >= 0L) { if(_messageCredit > 0) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java index 30aecdb2d2..5c919252b8 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java @@ -86,7 +86,10 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator conn.setRemoteAddress(network.getRemoteAddress()); conn.setLocalAddress(network.getLocalAddress()); - return new ProtocolEngine_0_10( conn, network); + ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network); + conn.setProtocolEngine(protocolEngine); + + return protocolEngine; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 854cd388b9..58b6a3abcb 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -52,8 +52,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private ServerConnection _connection; private long _createTime = System.currentTimeMillis(); - private long _lastReadTime; - private long _lastWriteTime; + private long _lastReadTime = _createTime; + private long _lastWriteTime = _createTime; + private volatile boolean _transportBlockedForWriting; public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) @@ -101,17 +102,13 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return new Sender<ByteBuffer>() { @Override - public void setIdleTimeout(int i) - { - sender.setIdleTimeout(i); - - } - - @Override public void send(ByteBuffer msg) { _lastWriteTime = System.currentTimeMillis(); - sender.send(msg); + ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]); + copy.put(msg); + copy.flip(); + sender.send(copy); } @@ -190,6 +187,11 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return _writtenBytes; } + @Override + public void encryptedTransport() + { + } + public void writerIdle() { _connection.doHeartBeat(); @@ -215,11 +217,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return getRemoteAddress().toString(); } - public String getAuthId() - { - return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName(); - } - public boolean isDurable() { return false; @@ -246,4 +243,18 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { return _connection.getAuthorizedSubject(); } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + } + } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 8567be37f0..cbd569d036 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; @@ -90,6 +91,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private int _messageCompressionThreshold; private int _maxMessageSize; + private ServerProtocolEngine _serverProtocolEngine; + public ServerConnection(final long connectionId, Broker<?> broker, final AmqpPort<?> port, @@ -189,6 +192,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S super.setConnectionDelegate(delegate); } + public ServerProtocolEngine getProtocolEngine() + { + return _serverProtocolEngine; + } + + public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine) + { + _serverProtocolEngine = serverProtocolEngine; + } + public VirtualHostImpl<?,?,?> getVirtualHost() { return _virtualHost; @@ -664,4 +677,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { return _maxMessageSize; } + + public void transportStateChanged() + { + for (AMQSessionModel ssn : getSessionModels()) + { + ssn.transportStateChanged(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 223de4f84e..1d8676edd6 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -874,6 +875,15 @@ public class ServerSession extends Session } @Override + public void transportStateChanged() + { + for(ConsumerTarget_0_10 consumerTarget : getSubscriptions()) + { + consumerTarget.transportStateChanged(); + } + } + + @Override public Object getConnectionReference() { return getConnection().getReference(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 288a4f946c..8fdee7a0f7 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -35,8 +35,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -243,8 +245,8 @@ public class ServerSessionDelegate extends SessionDelegate } else { - - FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); + ServerProtocolEngine serverProtocolEngine = getServerConnection(session).getProtocolEngine(); + FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, serverProtocolEngine); FilterManager filterManager = null; try @@ -381,58 +383,69 @@ public class ServerSessionDelegate extends SessionDelegate new MessageTransferMessage(storeMessage, serverSession.getReference()); MessageReference<MessageTransferMessage> reference = message.newReference(); - final InstanceProperties instanceProperties = new InstanceProperties() + try { - @Override - public Object getProperty(final Property prop) + final InstanceProperties instanceProperties = new InstanceProperties() { - switch (prop) + @Override + public Object getProperty(final Property prop) { - case EXPIRATION: - return message.getExpiration(); - case IMMEDIATE: - return message.isImmediate(); - case MANDATORY: - return (delvProps == null || !delvProps.getDiscardUnroutable()) - && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT; - case PERSISTENT: - return message.isPersistent(); - case REDELIVERED: - return delvProps.getRedelivered(); + switch (prop) + { + case EXPIRATION: + return message.getExpiration(); + case IMMEDIATE: + return message.isImmediate(); + case MANDATORY: + return (delvProps == null || !delvProps.getDiscardUnroutable()) + && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT; + case PERSISTENT: + return message.isPersistent(); + case REDELIVERED: + return delvProps.getRedelivered(); + } + return null; } - return null; - } - }; + }; - int enqueues = serverSession.enqueue(message, instanceProperties, destination); + int enqueues = serverSession.enqueue(message, instanceProperties, destination); - if (enqueues == 0) - { - if ((delvProps == null || !delvProps.getDiscardUnroutable()) - && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) + if (enqueues == 0) { - RangeSet rejects = RangeSetFactory.createRangeSet(); - rejects.add(xfr.getId()); - MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); - ssn.invoke(reject); + if ((delvProps == null || !delvProps.getDiscardUnroutable()) + && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) + { + RangeSet rejects = RangeSetFactory.createRangeSet(); + rejects.add(xfr.getId()); + MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); + ssn.invoke(reject); + } + else + { + virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(), + messageMetaData.getRoutingKey())); + } + } + + if (serverSession.isTransactional()) + { + serverSession.processed(xfr); } else { - virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(), - messageMetaData.getRoutingKey())); + serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, + new CommandProcessedAction(serverSession, xfr)); } } - - if (serverSession.isTransactional()) + catch (VirtualHostUnavailableException e) { - serverSession.processed(xfr); + getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage()); } - else + finally { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, - new CommandProcessedAction(serverSession, xfr)); + reference.release(); } - reference.release(); + } } @@ -549,7 +562,7 @@ public class ServerSessionDelegate extends SessionDelegate { try { - ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend()); + ((ServerSession) session).endDtx(method.getXid(), method.getFail(), method.getSuspend()); } catch (TimeoutDtxException e) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java index 8e48741b91..e11d2ce9bb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java @@ -21,11 +21,14 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; + +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class); + private final ServerProtocolEngine _serverProtocolEngine; private volatile long _bytesCreditLimit; private volatile long _messageCreditLimit; @@ -33,39 +36,22 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl private volatile long _bytesUsed; private volatile long _messageUsed; - public WindowCreditManager() - { - this(0L, 0L); - } - - public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit) + public WindowCreditManager(long bytesCreditLimit, + long messageCreditLimit, + ServerProtocolEngine serverProtocolEngine) { + _serverProtocolEngine = serverProtocolEngine; _bytesCreditLimit = bytesCreditLimit; _messageCreditLimit = messageCreditLimit; setSuspended(!hasCredit()); } - public long getBytesCreditLimit() - { - return _bytesCreditLimit; - } - public long getMessageCreditLimit() { return _messageCreditLimit; } - public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) - { - _bytesCreditLimit = bytesCreditLimit; - _messageCreditLimit = messageCreditLimit; - - setSuspended(!hasCredit()); - - } - - public long getMessageCredit() { return _messageCreditLimit == -1L @@ -121,12 +107,18 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl public synchronized boolean hasCredit() { return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed) - && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed); + && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed) + && !_serverProtocolEngine.isTransportBlockedForWriting(); } public synchronized boolean useCreditForMessage(final long msgSize) { - if(_messageCreditLimit >= 0L) + if (_serverProtocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCreditLimit >= 0L) { if(_messageUsed < _messageCreditLimit) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java index 1c4a694be6..b05edc5d04 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java @@ -20,17 +20,25 @@ */ package org.apache.qpid.server.protocol.v0_10; -import org.apache.qpid.server.protocol.v0_10.WindowCreditManager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.test.utils.QpidTestCase; public class WindowCreditManagerTest extends QpidTestCase { private WindowCreditManager _creditManager; + private ServerProtocolEngine _protocolEngine; protected void setUp() throws Exception { super.setUp(); - _creditManager = new WindowCreditManager(); + + _protocolEngine = mock(ServerProtocolEngine.class); + when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false); + + _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine); } /** diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index d52fb735a2..f7f65e29c2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -66,8 +66,6 @@ import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -133,7 +131,8 @@ public class AMQChannel private final int _channelId; - private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); + private final Pre0_10CreditManager _creditManager; + private final FlowCreditManager _noAckCreditManager; /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -213,6 +212,9 @@ public class AMQChannel public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { + _creditManager = new Pre0_10CreditManager(0l,0l, connection); + _noAckCreditManager = new NoAckCreditManager(connection); + _connection = connection; _channelId = channelId; @@ -699,7 +701,7 @@ public class AMQChannel if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { - target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _noAckCreditManager); } else if(acks) { @@ -709,7 +711,7 @@ public class AMQChannel } else { - target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _noAckCreditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } @@ -1633,6 +1635,7 @@ public class AMQChannel } } + public synchronized void block(AMQQueue queue) { if(_blockingEntities.add(queue)) @@ -1661,6 +1664,13 @@ public class AMQChannel } @Override + public void transportStateChanged() + { + _creditManager.restoreCredit(0, 0); + _noAckCreditManager.restoreCredit(0, 0); + } + + @Override public Object getConnectionReference() { return getConnection().getReference(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 4212505d75..cea9b0930f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -170,7 +170,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private Sender<ByteBuffer> _sender; private volatile boolean _deferFlush; - private long _lastReceivedTime; + private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want? private boolean _blocking; private final ReentrantLock _receivedLock; @@ -188,6 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private int _currentMethodId; private int _binaryDataLimit; private long _maxMessageSize; + private volatile boolean _transportBlockedForWriting; public AMQProtocolEngine(Broker<?> broker, final NetworkConnection network, @@ -250,6 +251,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _authorizedSubject; } + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + for(AMQChannel channel : _channelMap.values()) + { + channel.transportStateChanged(); + } + } + public void setNetworkConnection(NetworkConnection network) { setNetworkConnection(network, network.getSender()); @@ -514,20 +531,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, throw new ServerScopedRuntimeException(e); } - final ByteBuffer buf; - - if(size <= REUSABLE_BYTE_BUFFER_CAPACITY) - { - buf = _reusableByteBuffer; - buf.position(0); - } - else - { - buf = ByteBuffer.wrap(data); - } - buf.limit(_reusableDataOutput.length()); - - return buf; + final ByteBuffer copy = ByteBuffer.allocate(_reusableDataOutput.length()); + copy.put(data, 0, _reusableDataOutput.length()); + copy.flip(); + return copy; } @@ -1160,6 +1167,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } + @Override + public void encryptedTransport() + { + } + public void readerIdle() { Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 43982db2fd..d6642aef2e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -136,12 +136,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } - @Override - public boolean allocateCredit(ServerMessage msg) - { - return true; - } - } public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, @@ -215,12 +209,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } - @Override - public boolean allocateCredit(ServerMessage msg) - { - return true; - } - private static final ServerTransaction.Action NOOP = new ServerTransaction.Action() { @@ -250,11 +238,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } - public boolean allocateCredit(ServerMessage msg) - { - return getCreditManager().useCreditForMessage(msg.getSize()); - } - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java index 1817e8ad31..af54c911dc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java @@ -18,10 +18,13 @@ * under the License. * */ -package org.apache.qpid.server.flow; +package org.apache.qpid.server.protocol.v0_8; import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager { private final AtomicLong _messageCredit; @@ -31,16 +34,6 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen _messageCredit = new AtomicLong(initialCredit); } - public long getMessageCredit() - { - return _messageCredit.get(); - } - - public long getBytesCredit() - { - return -1L; - } - public void restoreCredit(long messageCredit, long bytesCredit) { _messageCredit.addAndGet(messageCredit); @@ -48,12 +41,6 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen } - public void removeAllCredit() - { - setSuspended(true); - _messageCredit.set(0L); - } - public boolean hasCredit() { return _messageCredit.get() > 0L; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java new file mode 100644 index 0000000000..2d32617106 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; + +public class NoAckCreditManager extends AbstractFlowCreditManager +{ + private final ServerProtocolEngine _serverProtocolEngine; + + public NoAckCreditManager(ServerProtocolEngine serverProtocolEngine) + { + _serverProtocolEngine = serverProtocolEngine; + } + + @Override + public void restoreCredit(final long messageCredit, final long bytesCredit) + { + setSuspended(!hasCredit()); + } + + @Override + public boolean hasCredit() + { + return !_serverProtocolEngine.isTransportBlockedForWriting(); + } + + @Override + public boolean useCreditForMessage(final long msgSize) + { + if (!hasCredit()) + { + setSuspended(true); + return false; + } + return true; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java index fc2d4bfb53..e63645ed09 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java @@ -18,20 +18,28 @@ * under the License. * */ -package org.apache.qpid.server.flow; +package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager { + private final ServerProtocolEngine _protocolEngine; private volatile long _bytesCreditLimit; private volatile long _messageCreditLimit; private volatile long _bytesCredit; private volatile long _messageCredit; - public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit) + public Pre0_10CreditManager(long bytesCreditLimit, + long messageCreditLimit, + ServerProtocolEngine protocolEngine) { + _protocolEngine = protocolEngine; _bytesCreditLimit = bytesCreditLimit; _messageCreditLimit = messageCreditLimit; _bytesCredit = bytesCreditLimit; @@ -39,6 +47,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } + public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) { long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit; @@ -80,16 +89,6 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } - public long getMessageCredit() - { - return _messageCredit; - } - - public long getBytesCredit() - { - return _bytesCredit; - } - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { final long messageCreditLimit = _messageCreditLimit; @@ -119,22 +118,21 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } - public synchronized void removeAllCredit() - { - _bytesCredit = 0L; - _messageCredit = 0L; - setSuspended(!hasCredit()); - } - public synchronized boolean hasCredit() { return (_bytesCreditLimit == 0L || _bytesCredit > 0) - && (_messageCreditLimit == 0L || _messageCredit > 0); + && (_messageCreditLimit == 0L || _messageCredit > 0) + && !_protocolEngine.isTransportBlockedForWriting(); } public synchronized boolean useCreditForMessage(final long msgSize) { - if(_messageCreditLimit != 0L) + if (_protocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCreditLimit != 0L) { if(_messageCredit != 0L) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 9326f16703..55fc865850 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -31,8 +31,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.flow.LimitlessCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoredMessage; @@ -328,7 +326,7 @@ public class AckTest extends QpidTestCase public void testMessageDequeueRestoresCreditTest() throws Exception { // Send 10 messages - Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); + Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1, _protocolEngine); _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 7407890b58..622cf32d17 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -288,10 +288,6 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { _sender = new Sender<ByteBuffer>() { - public void setIdleTimeout(int i) - { - } - public void send(ByteBuffer msg) { } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java index 89fc60666b..c4c89ac24a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java @@ -18,20 +18,14 @@ * under the License. * */ -package org.apache.qpid.server.flow; +package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager { - public long getMessageCredit() - { - return -1L; - } - - public long getBytesCredit() - { - return -1L; - } public void restoreCredit(long messageCredit, long bytesCredit) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 8e24d55da0..b55bd03a91 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -44,6 +44,7 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.End; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; @@ -64,6 +65,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final AmqpPort<?> _port; private final Broker<?> _broker; private final SubjectCreator _subjectCreator; + private final ServerProtocolEngine _protocolEngine; private VirtualHostImpl _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; @@ -101,12 +103,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private boolean _closedOnOpen; + public Connection_1_0(Broker<?> broker, ConnectionEndpoint conn, long connectionId, AmqpPort<?> port, - Transport transport, final SubjectCreator subjectCreator) + Transport transport, + final SubjectCreator subjectCreator, + final ServerProtocolEngine protocolEngine) { + _protocolEngine = protocolEngine; _broker = broker; _port = port; _transport = transport; @@ -363,6 +369,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod return _port; } + public ServerProtocolEngine getProtocolEngine() + { + return _protocolEngine; + } + @Override public Transport getTransport() { @@ -480,4 +491,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } + public void transportStateChanged() + { + for (Session_1_0 session : _sessions) + { + session.transportStateChanged(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index c5d9a5e35d..b5e1bdafbb 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -40,6 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; @@ -84,7 +85,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget public boolean isSuspended() { - return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend(); + return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE; } @@ -290,7 +291,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { synchronized (_link.getLock()) { - final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend(); + + ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine(); + final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting(); if(!hasCredit && getState() == State.ACTIVE) { suspend(); @@ -330,7 +333,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { synchronized(_link.getLock()) { - if(isSuspended() && getEndpoint() != null) + ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine(); + if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting()) { updateState(State.SUSPENDED, State.ACTIVE); _transactionId = _link.getTransactionId(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 740b01e459..b2783a2da2 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -118,6 +118,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private NetworkConnection _network; private Sender<ByteBuffer> _sender; private Connection_1_0 _connection; + private volatile boolean _transportBlockedForWriting; static enum State { @@ -179,6 +180,11 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut //Todo } + @Override + public void encryptedTransport() + { + } + public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) { _network = network; @@ -211,7 +217,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _endpoint.setProperties(serverProperties); _endpoint.setRemoteAddress(getRemoteAddress()); - _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator); + _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this); _endpoint.setConnectionEventListener(_connection); _endpoint.setFrameOutputHandler(this); @@ -524,6 +530,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + + public void close() { _sender.close(); @@ -554,4 +562,18 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { return _lastWriteTime; } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + + } + } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 2cfe431979..f8e4853099 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -728,4 +728,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { return _consumer; } + + public ConsumerTarget_1_0 getConsumerTarget() + { + return _target; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 1820de9d3a..01c11b9eca 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -109,6 +109,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private final Subject _subject = new Subject(); private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); + private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; @@ -211,7 +212,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio ); sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink)); - registerConsumer(sendingLink.getConsumer()); + registerConsumer(sendingLink); link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) @@ -411,12 +412,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } } - private void registerConsumer(final ConsumerImpl consumer) + private void registerConsumer(final SendingLink_1_0 link) { + ConsumerImpl consumer = link.getConsumer(); if(consumer instanceof Consumer<?>) { Consumer<?> modelConsumer = (Consumer<?>) consumer; _consumers.add(modelConsumer); + _sendingLinks.add(link); modelConsumer.addChangeListener(_consumerClosedListener); consumerAdded(modelConsumer); } @@ -614,6 +617,20 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } @Override + public void transportStateChanged() + { + for(SendingLink_1_0 link : _sendingLinks) + { + ConsumerTarget_1_0 target = link.getConsumerTarget(); + target.flowStateChanged(); + + + } + + + } + + @Override public LogSubject getLogSubject() { return this; diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index a194ac70f9..940e24d7cf 100644 --- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -81,9 +81,7 @@ class WebSocketProvider implements AcceptingTransport _supported = supported; _defaultSupportedProtocolReply = defaultSupportedProtocolReply; _factory = new MultiVersionProtocolEngineFactory( - _port.getParent(Broker.class), null, - _port.getWantClientAuth(), - _port.getNeedClientAuth(), + _port.getParent(Broker.class), _supported, _defaultSupportedProtocolReply, _port, @@ -273,12 +271,6 @@ class WebSocketProvider implements AcceptingTransport } @Override - public void setIdleTimeout(final int i) - { - - } - - @Override public void send(final ByteBuffer msg) { try diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c2582accdf..d5e3027601 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -316,6 +316,11 @@ public class AMQProtocolHandler implements ProtocolEngine } } + @Override + public void encryptedTransport() + { + } + public void readerIdle() { _logger.debug("Protocol Session [" + this + "] idle: reader"); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 8a7e6ab084..c7dee5b985 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -670,7 +670,7 @@ public class AMQSession_0_10Test extends QpidTestCase { private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>(); - public void setIdleTimeout(int i) + private void setIdleTimeout(int i) { } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java index 7c3988c19a..11b34d3dff 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java @@ -27,11 +27,6 @@ import org.apache.qpid.transport.Sender; public class MockSender implements Sender<ByteBuffer> { - public void setIdleTimeout(int i) - { - - } - public void send(ByteBuffer msg) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 6774d0a45a..cad5461d83 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.protocol; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.TransportActivity; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - /** * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received * decodes it and then process the result. @@ -56,7 +56,8 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) void readerIdle(); + void encryptedTransport(); public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender); -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java index 5c6918e87d..35d262cdb3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java @@ -30,4 +30,8 @@ public interface ServerProtocolEngine extends ProtocolEngine long getConnectionId(); Subject getSubject(); + + boolean isTransportBlockedForWriting(); + + void setTransportBlockedForWriting(boolean blocked); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 1866e1fd15..c2a0911bc3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -153,11 +153,9 @@ public class ClientDelegate extends ConnectionDelegate maxFrameSize, actualHeartbeatInterval); - int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor)); conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval); conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize); - conn.setIdleTimeout(idleTimeout); int channelMax = tune.getChannelMax(); //0 means no implied limit, except available server resources diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index e33e007f6e..92ccdb84af 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -159,7 +159,6 @@ public class Connection extends ConnectionInvoker public void setSender(Sender<ProtocolEvent> sender) { this.sender = sender; - sender.setIdleTimeout(idleTimeout); } protected void setState(State state) @@ -675,20 +674,6 @@ public class Connection extends ConnectionInvoker } } - public void setIdleTimeout(int i) - { - idleTimeout = i; - if (sender != null) - { - sender.setIdleTimeout(i); - } - } - - public int getIdleTimeout() - { - return idleTimeout; - } - public String getUserID() { return userID; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java index 6519702c76..9a6f675d7f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java @@ -28,7 +28,6 @@ package org.apache.qpid.transport; public interface Sender<T> { - void setIdleTimeout(int i); void send(T msg); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 82a677b8f7..f8fd286f17 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -126,8 +126,11 @@ public class ServerDelegate extends ConnectionDelegate protected void connectionAuthFailed(final Connection conn, Exception e) { - conn.exception(e); - conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage()); + if (e != null) + { + conn.exception(e); + } + conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e == null ? "Authentication failed" : e.getMessage()); } protected void connectionAuthContinue(final Connection conn, byte[] challenge) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index a804cb2f9d..81a4c781a4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -251,11 +251,6 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega throw new IllegalArgumentException(String.valueOf(error)); } - public void setIdleTimeout(int i) - { - sender.setIdleTimeout(i); - } - @Override public void setMaxFrameSize(final int maxFrame) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java index e0cd9cac1a..f378c54026 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.transport.network; +import java.util.Set; + import javax.net.ssl.SSLContext; import org.apache.qpid.protocol.ProtocolEngineFactory; @@ -29,7 +31,8 @@ public interface IncomingNetworkTransport extends NetworkTransport { public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, - SSLContext sslContext); + SSLContext sslContext, + final Set<TransportEncryption> encryptionSet); public int getAcceptingPort(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java new file mode 100644 index 0000000000..b3f1f1c7dd --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +public enum TransportEncryption +{ + NONE, TLS +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java new file mode 100644 index 0000000000..9d0fe5ddf6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java @@ -0,0 +1,343 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.util.Set; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLServerSocketFactory; + +import org.slf4j.LoggerFactory; + +import org.apache.qpid.configuration.CommonProperties; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.TransportActivity; +import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; + +// TODO we are no longer using the IncomingNetworkTransport +public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport +{ + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); + private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, + CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); + private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , + CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); + private Socket _socket; + private NetworkConnection _connection; + private AcceptingThread _acceptor; + + public NetworkConnection connect(ConnectionSettings settings, + Receiver<ByteBuffer> delegate, + TransportActivity transportActivity) + { + int sendBufferSize = settings.getWriteBufferSize(); + int receiveBufferSize = settings.getReadBufferSize(); + + try + { + _socket = new Socket(); + _socket.setReuseAddress(true); + _socket.setTcpNoDelay(settings.isTcpNodelay()); + _socket.setSendBufferSize(sendBufferSize); + _socket.setReceiveBufferSize(receiveBufferSize); + + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize()); + LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize()); + LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay()); + } + + InetAddress address = InetAddress.getByName(settings.getHost()); + + _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout()); + } + catch (SocketException e) + { + throw new TransportException("Error connecting to broker", e); + } + catch (IOException e) + { + throw new TransportException("Error connecting to broker", e); + } + + try + { + IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); + _connection = createNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker); + ticker.setConnection(_connection); + _connection.start(); + } + catch(Exception e) + { + try + { + _socket.close(); + } + catch(IOException ioe) + { + //ignored, throw based on original exception + } + + throw new TransportException("Error creating network connection", e); + } + + return _connection; + } + + public void close() + { + if(_connection != null) + { + _connection.close(); + } + if(_acceptor != null) + { + _acceptor.close(); + } + } + + public NetworkConnection getConnection() + { + return _connection; + } + + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext, final Set<TransportEncryption> encryptionSet) + { + try + { + _acceptor = new AcceptingThread(config, factory, sslContext); + _acceptor.setDaemon(false); + _acceptor.start(); + } + catch (IOException e) + { + throw new TransportException("Failed to start AMQP on port : " + config, e); + } + } + + public int getAcceptingPort() + { + return _acceptor == null ? -1 : _acceptor.getPort(); + } + + protected abstract NetworkConnection createNetworkConnection(Socket socket, + Receiver<ByteBuffer> engine, + Integer sendBufferSize, + Integer receiveBufferSize, + int timeout, + IdleTimeoutTicker ticker); + + private class AcceptingThread extends Thread + { + private volatile boolean _closed = false; + private NetworkTransportConfiguration _config; + private ProtocolEngineFactory _factory; + private SSLContext _sslContext; + private ServerSocket _serverSocket; + private int _timeout; + + private AcceptingThread(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) throws IOException + { + _config = config; + _factory = factory; + _sslContext = sslContext; + _timeout = TIMEOUT; + + InetSocketAddress address = config.getAddress(); + + if(sslContext == null) + { + _serverSocket = new ServerSocket(); + } + else + { + SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); + _serverSocket = socketFactory.createServerSocket(); + + SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket; + + SSLUtil.removeSSLv3Support(sslServerSocket); + + if(config.needClientAuth()) + { + sslServerSocket.setNeedClientAuth(true); + } + else if(config.wantClientAuth()) + { + sslServerSocket.setWantClientAuth(true); + } + + } + + _serverSocket.setReuseAddress(true); + _serverSocket.bind(address); + } + + + /** + Close the underlying ServerSocket if it has not already been closed. + */ + public void close() + { + LOGGER.debug("Shutting down the Acceptor"); + _closed = true; + + if (!_serverSocket.isClosed()) + { + try + { + _serverSocket.close(); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + + private int getPort() + { + return _serverSocket.getLocalPort(); + } + + @Override + public void run() + { + try + { + while (!_closed) + { + Socket socket = null; + try + { + socket = _serverSocket.accept(); + + ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress()); + + if(engine != null) + { + socket.setTcpNoDelay(_config.getTcpNoDelay()); + socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socket.setSendBufferSize(sendBufferSize); + socket.setReceiveBufferSize(receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NetworkConnection connection = + createNetworkConnection(socket, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker); + + connection.setMaxReadIdle(HANSHAKE_TIMEOUT); + + ticker.setConnection(connection); + + engine.setNetworkConnection(connection, connection.getSender()); + + connection.start(); + } + else + { + socket.close(); + } + } + catch(RuntimeException e) + { + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); + closeSocketIfNecessary(socket); + } + catch(IOException e) + { + if(!_closed) + { + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); + closeSocketIfNecessary(socket); + try + { + //Delay to avoid tight spinning the loop during issues such as too many open files + Thread.sleep(1000); + } + catch (InterruptedException ie) + { + LOGGER.debug("Stopping acceptor due to interrupt request"); + _closed = true; + } + } + } + } + } + finally + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + + _config.getAddress()); + } + } + } + + private void closeSocketIfNecessary(final Socket socket) + { + if(socket != null) + { + try + { + socket.close(); + } + catch (IOException e) + { + LOGGER.debug("Exception while closing socket", e); + } + } + } + + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index e5bc9fa977..f33f626601 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -20,312 +20,25 @@ */ package org.apache.qpid.transport.network.io; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketException; import java.nio.ByteBuffer; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLServerSocket; -import javax.net.ssl.SSLServerSocketFactory; - -import org.slf4j.LoggerFactory; - -import org.apache.qpid.configuration.CommonProperties; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.TransportActivity; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; -public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport +public class IoNetworkTransport extends AbstractNetworkTransport { - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class); - private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, - CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); - private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , - CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); - - - private Socket _socket; - private IoNetworkConnection _connection; - private AcceptingThread _acceptor; - - public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, - TransportActivity transportActivity) - { - int sendBufferSize = settings.getWriteBufferSize(); - int receiveBufferSize = settings.getReadBufferSize(); - - try - { - _socket = new Socket(); - _socket.setReuseAddress(true); - _socket.setTcpNoDelay(settings.isTcpNodelay()); - _socket.setSendBufferSize(sendBufferSize); - _socket.setReceiveBufferSize(receiveBufferSize); - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize()); - LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize()); - LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay()); - } - - InetAddress address = InetAddress.getByName(settings.getHost()); - - _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout()); - } - catch (SocketException e) - { - throw new TransportException("Error connecting to broker", e); - } - catch (IOException e) - { - throw new TransportException("Error connecting to broker", e); - } - - try - { - IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); - _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker); - ticker.setConnection(_connection); - _connection.start(); - } - catch(Exception e) - { - try - { - _socket.close(); - } - catch(IOException ioe) - { - //ignored, throw based on original exception - } - - throw new TransportException("Error creating network connection", e); - } - - return _connection; - } - - public void close() - { - if(_connection != null) - { - _connection.close(); - } - if(_acceptor != null) - { - _acceptor.close(); - } - } - - public NetworkConnection getConnection() - { - return _connection; - } - - public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext) - { - try - { - _acceptor = new AcceptingThread(config, factory, sslContext); - _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress())); - _acceptor.setDaemon(false); - _acceptor.start(); - } - catch (IOException e) - { - throw new TransportException("Failed to start AMQP on port : " + config, e); - } - } - - public int getAcceptingPort() - { - return _acceptor == null ? -1 : _acceptor.getPort(); - } - private class AcceptingThread extends Thread + @Override + protected IoNetworkConnection createNetworkConnection(final Socket socket, + final Receiver<ByteBuffer> engine, + final Integer sendBufferSize, + final Integer receiveBufferSize, + final int timeout, + final IdleTimeoutTicker ticker) { - private volatile boolean _closed = false; - private NetworkTransportConfiguration _config; - private ProtocolEngineFactory _factory; - private SSLContext _sslContext; - private ServerSocket _serverSocket; - private int _timeout; - - private AcceptingThread(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext) throws IOException - { - _config = config; - _factory = factory; - _sslContext = sslContext; - _timeout = TIMEOUT; - - InetSocketAddress address = config.getAddress(); - - if(sslContext == null) - { - _serverSocket = new ServerSocket(); - } - else - { - SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); - _serverSocket = socketFactory.createServerSocket(); - - SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket; - - SSLUtil.removeSSLv3Support(sslServerSocket); - - if(config.needClientAuth()) - { - sslServerSocket.setNeedClientAuth(true); - } - else if(config.wantClientAuth()) - { - sslServerSocket.setWantClientAuth(true); - } - - } - - _serverSocket.setReuseAddress(true); - _serverSocket.bind(address); - } - - - /** - Close the underlying ServerSocket if it has not already been closed. - */ - public void close() - { - LOGGER.debug("Shutting down the Acceptor"); - _closed = true; - - if (!_serverSocket.isClosed()) - { - try - { - _serverSocket.close(); - } - catch (IOException e) - { - throw new TransportException(e); - } - } - } - - private int getPort() - { - return _serverSocket.getLocalPort(); - } - - @Override - public void run() - { - try - { - while (!_closed) - { - Socket socket = null; - try - { - socket = _serverSocket.accept(); - - ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress()); - - if(engine != null) - { - socket.setTcpNoDelay(_config.getTcpNoDelay()); - socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT); - - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); - - socket.setSendBufferSize(sendBufferSize); - socket.setReceiveBufferSize(receiveBufferSize); - - - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - NetworkConnection connection = - new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout, - ticker); - - connection.setMaxReadIdle(HANSHAKE_TIMEOUT); - - ticker.setConnection(connection); - - engine.setNetworkConnection(connection, connection.getSender()); - - connection.start(); - } - else - { - socket.close(); - } - } - catch(RuntimeException e) - { - LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socket); - } - catch(IOException e) - { - if(!_closed) - { - LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socket); - try - { - //Delay to avoid tight spinning the loop during issues such as too many open files - Thread.sleep(1000); - } - catch (InterruptedException ie) - { - LOGGER.debug("Stopping acceptor due to interrupt request"); - _closed = true; - } - } - } - } - } - finally - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress()); - } - } - } - - private void closeSocketIfNecessary(final Socket socket) - { - if(socket != null) - { - try - { - socket.close(); - } - catch (IOException e) - { - LOGGER.debug("Exception while closing socket", e); - } - } - } - + return new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, + ticker); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index b52b59aa15..467115c76f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -78,7 +78,7 @@ final class IoReceiver implements Runnable throw new RuntimeException("Error creating IOReceiver thread",e); } receiverThread.setDaemon(true); - receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + receiverThread.setName(String.format("IoReceiver-%s", socket.getRemoteSocketAddress())); } public void initiate() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index e06782c58a..79e99287c4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -88,7 +88,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } senderThread.setDaemon(true); - senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress)); + senderThread.setName(String.format("IoSender-%s", _remoteSocketAddress)); } public void initiate() @@ -316,18 +316,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } - public void setIdleTimeout(int i) - { - try - { - socket.setSoTimeout(i); - } - catch (Exception e) - { - throw new SenderException(e); - } - } - public void setReceiver(IoReceiver receiver) { _receiver = receiver; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java new file mode 100644 index 0000000000..92cea345ca --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java @@ -0,0 +1,134 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.security.Principal; +import java.util.Set; + +import javax.net.ssl.SSLContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.network.TransportEncryption; + +public class NonBlockingConnection implements NetworkConnection +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); + private final SocketChannel _socket; + private final long _timeout; + private final NonBlockingSenderReceiver _nonBlockingSenderReceiver; + private int _maxReadIdle; + private int _maxWriteIdle; + private Principal _principal; + private boolean _principalChecked; + private final Object _lock = new Object(); + + public NonBlockingConnection(SocketChannel socket, + ServerProtocolEngine delegate, + int sendBufferSize, + int receiveBufferSize, + long timeout, + Ticker ticker, + final Set<TransportEncryption> encryptionSet, + final SSLContext sslContext, + final boolean wantClientAuth, + final boolean needClientAuth, + final Runnable onTransportEncryptionAction) + { + _socket = socket; + _timeout = timeout; + + _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction); + + } + + public void start() + { + _nonBlockingSenderReceiver.initiate(); + } + + public Sender<ByteBuffer> getSender() + { + return _nonBlockingSenderReceiver; + } + + public void close() + { + _nonBlockingSenderReceiver.close(); + } + + public SocketAddress getRemoteAddress() + { + return _socket.socket().getRemoteSocketAddress(); + } + + public SocketAddress getLocalAddress() + { + return _socket.socket().getLocalSocketAddress(); + } + + public void setMaxWriteIdle(int sec) + { + _maxWriteIdle = sec; + } + + public void setMaxReadIdle(int sec) + { + _maxReadIdle = sec; + } + + @Override + public Principal getPeerPrincipal() + { + synchronized (_lock) + { + if(!_principalChecked) + { + + _principal = _nonBlockingSenderReceiver.getPeerPrincipal(); + + _principalChecked = true; + } + + return _principal; + } + } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java new file mode 100644 index 0000000000..80ba7a0221 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java @@ -0,0 +1,268 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.StandardSocketOptions; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Set; + +import javax.net.ssl.SSLContext; + +import org.slf4j.LoggerFactory; + +import org.apache.qpid.configuration.CommonProperties; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.TransportEncryption; + +public class NonBlockingNetworkTransport implements IncomingNetworkTransport +{ + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); + private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, + CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); + private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , + CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); + private AcceptingThread _acceptor; + + protected NonBlockingConnection createNetworkConnection(final SocketChannel socket, + final ServerProtocolEngine engine, + final Integer sendBufferSize, + final Integer receiveBufferSize, + final int timeout, + final IdleTimeoutTicker ticker, + final Set<TransportEncryption> encryptionSet, + final SSLContext sslContext, + final boolean wantClientAuth, + final boolean needClientAuth, + final Runnable onTransportEncryptionAction) + { + return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction); + } + + public void close() + { + if(_acceptor != null) + { + _acceptor.close(); + } + } + + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext, + final Set<TransportEncryption> encryptionSet) + { + try + { + _acceptor = new AcceptingThread(config, factory, sslContext, encryptionSet); + _acceptor.setDaemon(false); + _acceptor.start(); + } + catch (IOException e) + { + throw new TransportException("Failed to start AMQP on port : " + config, e); + } + } + + public int getAcceptingPort() + { + return _acceptor == null ? -1 : _acceptor.getPort(); + } + + private class AcceptingThread extends Thread + { + private final Set<TransportEncryption> _encryptionSet; + private volatile boolean _closed = false; + private final NetworkTransportConfiguration _config; + private final ProtocolEngineFactory _factory; + private final SSLContext _sslContext; + private final ServerSocketChannel _serverSocket; + private int _timeout; + + private AcceptingThread(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext, + final Set<TransportEncryption> encryptionSet) throws IOException + { + _config = config; + _factory = factory; + _sslContext = sslContext; + _timeout = TIMEOUT; + + InetSocketAddress address = config.getAddress(); + + _serverSocket = ServerSocketChannel.open(); + + _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true); + _serverSocket.bind(address); + _encryptionSet = encryptionSet; + } + + + /** + Close the underlying ServerSocket if it has not already been closed. + */ + public void close() + { + LOGGER.debug("Shutting down the Acceptor"); + _closed = true; + + if (!_serverSocket.socket().isClosed()) + { + try + { + _serverSocket.close(); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + + private int getPort() + { + return _serverSocket.socket().getLocalPort(); + } + + @Override + public void run() + { + try + { + while (!_closed) + { + SocketChannel socket = null; + try + { + socket = _serverSocket.accept(); + + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); + + if(engine != null) + { + socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socket.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socket.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socket.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NetworkConnection connection = + createNetworkConnection(socket, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker, + _encryptionSet, + _sslContext, + _config.wantClientAuth(), + _config.needClientAuth(), + new Runnable() + { + + @Override + public void run() + { + engine.encryptedTransport(); + } + }); + + engine.setNetworkConnection(connection, connection.getSender()); + connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); + + ticker.setConnection(connection); + + connection.start(); + } + else + { + socket.close(); + } + } + catch(RuntimeException e) + { + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); + closeSocketIfNecessary(socket.socket()); + } + catch(IOException e) + { + if(!_closed) + { + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); + closeSocketIfNecessary(socket.socket()); + try + { + //Delay to avoid tight spinning the loop during issues such as too many open files + Thread.sleep(1000); + } + catch (InterruptedException ie) + { + LOGGER.debug("Stopping acceptor due to interrupt request"); + _closed = true; + } + } + } + } + } + finally + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + + _config.getAddress()); + } + } + } + + private void closeSocketIfNecessary(final Socket socket) + { + if(socket != null) + { + try + { + socket.close(); + } + catch (IOException e) + { + LOGGER.debug("Exception while closing socket", e); + } + } + } + + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java new file mode 100644 index 0000000000..acc2b89881 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLPeerUnverifiedException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderClosedException; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; + +public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class); + public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; + + private final SocketChannel _socketChannel; + private final Selector _selector; + + private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); + private final List<ByteBuffer> _encryptedOutput = new ArrayList<>(); + + private final Thread _ioThread; + private final String _remoteSocketAddress; + private final AtomicBoolean _closed = new AtomicBoolean(false); + private final ServerProtocolEngine _receiver; + private final int _receiveBufSize; + private final Ticker _ticker; + private final Set<TransportEncryption> _encryptionSet; + private final SSLContext _sslContext; + private final Runnable _onTransportEncryptionAction; + private ByteBuffer _netInputBuffer; + private SSLEngine _sslEngine; + + private ByteBuffer _currentBuffer; + + private TransportEncryption _transportEncryption; + private SSLEngineResult _status; + + + public NonBlockingSenderReceiver(final SocketChannel socketChannel, + ServerProtocolEngine receiver, + int receiveBufSize, + Ticker ticker, + final Set<TransportEncryption> encryptionSet, + final SSLContext sslContext, + final boolean wantClientAuth, + final boolean needClientAuth, + final Runnable onTransportEncryptionAction) + { + _socketChannel = socketChannel; + _receiver = receiver; + _receiveBufSize = receiveBufSize; + _ticker = ticker; + _encryptionSet = encryptionSet; + _sslContext = sslContext; + _onTransportEncryptionAction = onTransportEncryptionAction; + + if(encryptionSet.size() == 1) + { + _transportEncryption = _encryptionSet.iterator().next(); + if (_transportEncryption == TransportEncryption.TLS) + { + onTransportEncryptionAction.run(); + } + } + + if(encryptionSet.contains(TransportEncryption.TLS)) + { + _sslEngine = _sslContext.createSSLEngine(); + _sslEngine.setUseClientMode(false); + SSLUtil.removeSSLv3Support(_sslEngine); + if(needClientAuth) + { + _sslEngine.setNeedClientAuth(true); + } + else if(wantClientAuth) + { + _sslEngine.setWantClientAuth(true); + } + _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2)); + } + + try + { + _remoteSocketAddress = socketChannel.getRemoteAddress().toString(); + _socketChannel.configureBlocking(false); + _selector = Selector.open(); + _socketChannel.register(_selector, SelectionKey.OP_READ); + } + catch (IOException e) + { + throw new SenderException("Unable to prepare the channel for non-blocking IO", e); + } + try + { + //Create but deliberately don't start the thread. + _ioThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new SenderException("Error creating NonBlockingSenderReceiver thread for " + _remoteSocketAddress, e); + } + + _ioThread.setDaemon(true); + _ioThread.setName(String.format("NonBlockingSenderReceiver-%s", _remoteSocketAddress)); + + } + + public void initiate() + { + _ioThread.start(); + } + + @Override + public void send(final ByteBuffer msg) + { + if (_closed.get()) + { + throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed"); + } + // append to list and do selector wakeup + _buffers.add(msg); + _selector.wakeup(); + } + + @Override + public void run() + { + LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started"); + + + while (!_closed.get()) + { + + try + { + long currentTime = System.currentTimeMillis(); + int tick = _ticker.getTimeToNextTick(currentTime); + if(tick <= 0) + { + tick = _ticker.tick(currentTime); + } + + _selector.select(tick <= 0 ? 1 : tick); + Set<SelectionKey> selectionKeys = _selector.selectedKeys(); + selectionKeys.clear(); + + _receiver.setTransportBlockedForWriting(!doWrite()); + doRead(); + boolean fullyWritten = doWrite(); + _receiver.setTransportBlockedForWriting(!fullyWritten); + + _socketChannel.register(_selector, + fullyWritten + ? SelectionKey.OP_READ + : (SelectionKey.OP_WRITE | SelectionKey.OP_READ)); + + } + catch (IOException e) + { + LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); + close(); + } + } + + try(Selector selector = _selector; SocketChannel channel = _socketChannel) + { + while(!doWrite()) + { + } + + _receiver.closed(); + } + catch (IOException e) + { + LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e); + } + finally + { + LOGGER.debug("Shutting down IO thread for " + _remoteSocketAddress); + } + } + + @Override + public void flush() + { + _selector.wakeup(); + } + + @Override + public void close() + { + LOGGER.debug("Closing " + _remoteSocketAddress); + + _closed.set(true); + _selector.wakeup(); + + } + + private boolean doWrite() throws IOException + { + + ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; + Iterator<ByteBuffer> bufferIterator = _buffers.iterator(); + for (int i = 0; i < bufArray.length; i++) + { + bufArray[i] = bufferIterator.next(); + } + + int byteBuffersWritten = 0; + + if(_transportEncryption == TransportEncryption.NONE) + { + + + long written = _socketChannel.write(bufArray); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " bytes"); + } + + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + + + return bufArray.length == byteBuffersWritten; + } + else if(_transportEncryption == TransportEncryption.TLS) + { + int remaining = 0; + + do + { + if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) + { + final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); + _status = _sslEngine.wrap(bufArray, netBuffer); + runSSLEngineTasks(_status); + + netBuffer.flip(); + remaining = netBuffer.remaining(); + if (remaining != 0) + { + _encryptedOutput.add(netBuffer); + } + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + } + + } + while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP); + + ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); + long written = _socketChannel.write(encryptedBuffers); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " encrypted bytes"); + } + + ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator(); + while(iter.hasNext()) + { + ByteBuffer buf = iter.next(); + if(buf.remaining() == 0) + { + iter.remove(); + } + else + { + break; + } + } + + return bufArray.length == byteBuffersWritten; + + } + else + { + return true; + } + } + + private void doRead() throws IOException + { + + if(_transportEncryption == TransportEncryption.NONE) + { + int remaining = 0; + while (remaining == 0 && !_closed.get()) + { + if (_currentBuffer == null || _currentBuffer.remaining() == 0) + { + _currentBuffer = ByteBuffer.allocate(_receiveBufSize); + } + int read = _socketChannel.read(_currentBuffer); + if (read == -1) + { + _closed.set(true); + } + remaining = _currentBuffer.remaining(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " byte(s)"); + } + ByteBuffer dup = _currentBuffer.duplicate(); + dup.flip(); + _currentBuffer = _currentBuffer.slice(); + _receiver.received(dup); + } + } + else if(_transportEncryption == TransportEncryption.TLS) + { + int read = 1; + while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED)) + { + read = _socketChannel.read(_netInputBuffer); + if (read == -1) + { + _closed.set(true); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer); + } + _netInputBuffer.flip(); + + + int unwrapped = 0; + do + { + ByteBuffer appInputBuffer = + ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50); + + _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer); + runSSLEngineTasks(_status); + + appInputBuffer.flip(); + unwrapped = appInputBuffer.remaining(); + _receiver.received(appInputBuffer); + } + while(unwrapped > 0); + + _netInputBuffer.compact(); + + } + } + else + { + int read = 1; + while (!_closed.get() && read > 0) + { + + read = _socketChannel.read(_netInputBuffer); + if (read == -1) + { + _closed.set(true); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer); + } + + if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK) + { + _netInputBuffer.flip(); + final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK]; + ByteBuffer dup = _netInputBuffer.duplicate(); + dup.get(headerBytes); + + _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE; + LOGGER.debug("Identified transport encryption as " + _transportEncryption); + + if (_transportEncryption == TransportEncryption.NONE) + { + _receiver.received(_netInputBuffer); + } + else + { + _onTransportEncryptionAction.run(); + _netInputBuffer.compact(); + doRead(); + } + break; + } + } + } + } + + private void runSSLEngineTasks(final SSLEngineResult status) + { + if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) + { + Runnable task; + while((task = _sslEngine.getDelegatedTask()) != null) + { + task.run(); + } + } + } + + private boolean looksLikeSSL(byte[] headerBytes) + { + return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); + } + + private boolean looksLikeSSLv3ClientHello(byte[] headerBytes) + { + return headerBytes[0] == 22 && // SSL Handshake + (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[2] == 0 || // SSL 3.0 + headerBytes[2] == 1 || // TLS 1.0 + headerBytes[2] == 2 || // TLS 1.1 + headerBytes[2] == 3)) && // TLS1.2 + (headerBytes[5] == 1); // client_hello + } + + private boolean looksLikeSSLv2ClientHello(byte[] headerBytes) + { + return headerBytes[0] == -128 && + headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[4] == 0 || // SSL 3.0 + headerBytes[4] == 1 || // TLS 1.0 + headerBytes[4] == 2 || // TLS 1.1 + headerBytes[4] == 3); + } + + public Principal getPeerPrincipal() + { + + if (_sslEngine != null) + { + try + { + return _sslEngine.getSession().getPeerPrincipal(); + } + catch (SSLPeerUnverifiedException e) + { + return null; + } + } + + return null; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java index 098f2fb20c..fa1801cb65 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java @@ -103,11 +103,6 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { } } - public void setIdleTimeout(int i) - { - delegate.setIdleTimeout(i); - } - public void securityLayerEstablished() { appData = new byte[getSendBuffSize()]; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java index 24f95d7798..e69de29bb2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.security.ssl; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLEngineResult.HandshakeStatus; -import javax.net.ssl.SSLEngineResult.Status; -import javax.net.ssl.SSLException; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.util.Logger; - -public class SSLBufferingSender implements Sender<ByteBuffer> -{ - private static final Logger log = Logger.get(SSLBufferingSender.class); - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); - - private final Sender<ByteBuffer> delegate; - private final SSLEngine engine; - private final int sslBufSize; - private final ByteBuffer netData; - private final SSLStatus _sslStatus; - - private String _hostname; - - private final AtomicBoolean closed = new AtomicBoolean(false); - private ByteBuffer _appData = EMPTY_BYTE_BUFFER; - - - public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus) - { - this.engine = engine; - this.delegate = delegate; - sslBufSize = engine.getSession().getPacketBufferSize(); - netData = ByteBuffer.allocate(sslBufSize); - _sslStatus = sslStatus; - } - - public void setHostname(String hostname) - { - _hostname = hostname; - } - - public void close() - { - if (!closed.getAndSet(true)) - { - if (engine.isOutboundDone()) - { - return; - } - log.debug("Closing SSL connection"); - doSend(); - engine.closeOutbound(); - try - { - tearDownSSLConnection(); - } - catch(Exception e) - { - throw new SenderException("Error closing SSL connection",e); - } - - - synchronized(_sslStatus.getSslLock()) - { - while (!engine.isOutboundDone()) - { - try - { - _sslStatus.getSslLock().wait(); - } - catch(InterruptedException e) - { - // pass - } - - } - } - delegate.close(); - } - } - - private void tearDownSSLConnection() throws Exception - { - SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData); - Status status = result.getStatus(); - int read = result.bytesProduced(); - while (status != Status.CLOSED) - { - if (status == Status.BUFFER_OVERFLOW) - { - netData.clear(); - } - if(read > 0) - { - int limit = netData.limit(); - netData.limit(netData.position()); - netData.position(netData.position() - read); - - ByteBuffer data = netData.slice(); - - netData.limit(limit); - netData.position(netData.position() + read); - - delegate.send(data); - flush(); - } - result = engine.wrap(ByteBuffer.allocate(0), netData); - status = result.getStatus(); - read = result.bytesProduced(); - } - } - - public void flush() - { - delegate.flush(); - } - - public void send() - { - if(!closed.get()) - { - doSend(); - } - } - - public synchronized void send(ByteBuffer appData) - { - boolean buffered; - if(buffered = _appData.hasRemaining()) - { - ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining()); - newBuf.put(_appData); - newBuf.put(appData); - newBuf.flip(); - _appData = newBuf; - } - if (closed.get()) - { - throw new SenderException("SSL Sender is closed"); - } - doSend(); - if(!appData.hasRemaining()) - { - _appData = EMPTY_BYTE_BUFFER; - } - else if(!buffered) - { - _appData = ByteBuffer.allocate(appData.remaining()); - _appData.put(appData); - _appData.flip(); - } - } - - private synchronized void doSend() - { - - HandshakeStatus handshakeStatus; - Status status; - - while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) - && !_sslStatus.getSslErrorFlag()) - { - int read = 0; - try - { - SSLEngineResult result = engine.wrap(_appData, netData); - read = result.bytesProduced(); - status = result.getStatus(); - handshakeStatus = result.getHandshakeStatus(); - } - catch(SSLException e) - { - // Should this set _sslError?? - throw new SenderException("SSL, Error occurred while encrypting data",e); - } - - if(read > 0) - { - int limit = netData.limit(); - netData.limit(netData.position()); - netData.position(netData.position() - read); - - ByteBuffer data = netData.slice(); - - netData.limit(limit); - netData.position(netData.position() + read); - - delegate.send(data); - } - - switch(status) - { - case CLOSED: - throw new SenderException("SSLEngine is closed"); - - case BUFFER_OVERFLOW: - netData.clear(); - continue; - - case OK: - break; // do nothing - - default: - throw new IllegalStateException("SSLReceiver: Invalid State " + status); - } - - switch (handshakeStatus) - { - case NEED_WRAP: - if (netData.hasRemaining()) - { - continue; - } - - case NEED_TASK: - doTasks(); - break; - - case NEED_UNWRAP: - flush(); - return; - - case FINISHED: - if (_hostname != null) - { - SSLUtil.verifyHostname(engine, _hostname); - } - - case NOT_HANDSHAKING: - break; //do nothing - - default: - throw new IllegalStateException("SSLSender: Invalid State " + status); - } - - } - } - - private void doTasks() - { - Runnable runnable; - while ((runnable = engine.getDelegatedTask()) != null) { - runnable.run(); - } - } - - public void setIdleTimeout(int i) - { - delegate.setIdleTimeout(i); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java index 7c61136b42..7d64012fea 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java @@ -264,8 +264,4 @@ public class SSLSender implements Sender<ByteBuffer> } } - public void setIdleTimeout(int i) - { - delegate.setIdleTimeout(i); - } } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java index 3071594be7..865a3603f5 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java @@ -22,6 +22,7 @@ package org.apache.qpid.transport.network; import java.nio.ByteBuffer; +import java.util.Set; import javax.net.ssl.SSLContext; @@ -150,7 +151,9 @@ public class TransportTest extends QpidTestCase } public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, SSLContext sslContext) + ProtocolEngineFactory factory, + SSLContext sslContext, + final Set<TransportEncryption> encryptionSet) { throw new UnsupportedOperationException(); } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java index 3025414e4a..cdd5e13870 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java @@ -39,7 +39,7 @@ public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase private Session session; private AMQQueue queue; private MessageConsumer consumer; - private int numMessages; + private int _numMessages; private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class); @@ -86,7 +86,7 @@ public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase { super.setUp(); - numMessages = isBrokerStorePersistent() ? 300 : 1000; + _numMessages = isBrokerStorePersistent() ? 300 : 1000; _logger.info("Create Connection"); con = getConnection(); @@ -106,30 +106,33 @@ public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase // Setup initial messages _logger.info("Creating first producer thread"); - producerThread = new ASyncProducer(queue, 0, numMessages / 2); + producerThread = new ASyncProducer(queue, 0, _numMessages / 2); producerThread.start(); // Wait for them to be done producerThread.join(); // Setup second set of messages to produce while we consume _logger.info("Creating second producer thread"); - producerThread = new ASyncProducer(queue, numMessages / 2, numMessages); + producerThread = new ASyncProducer(queue, _numMessages / 2, _numMessages); producerThread.start(); // Start consuming and checking they're in order _logger.info("Consuming messages"); - for (int i = 0; i < numMessages; i++) + for (int i = 0; i < _numMessages; i++) { Message msg = consumer.receive(3000); + + _logger.debug("KWDEBUG got " + msg); + assertNotNull("Message " + i + " should not be null", msg); assertTrue("Message " + i + " should be a text message", msg instanceof TextMessage); - assertEquals("Message content " + i + "does not match expected", Integer.toString(i), ((TextMessage) msg).getText()); + assertEquals("Message content " + i + " does not match expected", Integer.toString(i), ((TextMessage) msg).getText()); } } protected void tearDown() throws Exception { - _logger.info("Interuptting producer thread"); + _logger.info("Interrupting producer thread"); producerThread.interrupt(); _logger.info("Closing connection"); con.close(); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index 007772e8be..9a8d021828 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -161,7 +161,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l); MultiVersionProtocolEngineFactory factory = - new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port, + new MultiVersionProtocolEngineFactory(_broker, protocols, null, port, org.apache.qpid.server.model.Transport.TCP); //create a dummy to retrieve the 'current' ID number @@ -215,7 +215,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase try { - new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, Protocol.AMQP_0_9, null, + new MultiVersionProtocolEngineFactory(_broker, versions, Protocol.AMQP_0_9, null, org.apache.qpid.server.model.Transport.TCP); fail("should not have been allowed to create the factory"); } @@ -236,10 +236,6 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase { _sender = new Sender<ByteBuffer>() { - public void setIdleTimeout(int i) - { - } - public void send(ByteBuffer msg) { } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 3ffa73b9b7..347bf933ad 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -114,11 +114,6 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase _sender = new Sender<ByteBuffer>() { - public void setIdleTimeout(int i) - { - - } - public void send(ByteBuffer msg) { diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java index f76203887c..5928a41025 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java @@ -313,14 +313,10 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase _maxFrameSize, actualHeartbeatInterval); - int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor)); conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval); conn.setMaxFrameSize(_maxFrameSize); - - conn.setIdleTimeout(idleTimeout); - int channelMax = tune.getChannelMax(); conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax); diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes index cafb1d573a..b52987f369 100644 --- a/qpid/java/test-profiles/JavaExcludes +++ b/qpid/java/test-profiles/JavaExcludes @@ -26,3 +26,4 @@ org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy //QPID-4153 Messages causing a runtime selector error should be dead-lettered (or something similar) org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError + |
