diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 16:54:29 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 16:54:29 +0000 |
| commit | 5701b4ba67b8e475326acfc9f28735aead8d9dfc (patch) | |
| tree | 9eb4212be8b423d1b756c75ff24a57a98c79c966 /qpid/java | |
| parent | f75a0292a343f2d07b1b413486121999540fd64a (diff) | |
| download | qpid-python-5701b4ba67b8e475326acfc9f28735aead8d9dfc.tar.gz | |
more refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659004 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
9 files changed, 89 insertions, 12 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index 6ccdec3436..e32613d700 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -30,12 +30,14 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.log4j.Logger; + import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; public abstract class AbstractConsumerTarget implements ConsumerTarget { - + private static final Logger LOGGER = Logger.getLogger(AbstractConsumerTarget.class); private final AtomicReference<State> _state; private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new @@ -176,7 +178,6 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget @Override public void sendNextMessage() { - ConsumerMessageInstancePair consumerMessage = _queue.peek(); if (consumerMessage != null) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 5c575006cf..6f55e59067 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -25,6 +25,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; @@ -36,6 +37,7 @@ import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ProtocolEngineCreator; +import org.apache.qpid.server.util.Action; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; @@ -57,6 +59,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private final Protocol _defaultSupportedReply; private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); public MultiVersionProtocolEngine(final Broker<?> broker, final Set<Protocol> supported, @@ -228,6 +231,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + _delegate.setWorkListener(listener); + } + + @Override public void clearWork() { _delegate.clearWork(); @@ -267,6 +277,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + + } + + @Override public void clearWork() { @@ -418,6 +434,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + + } + + @Override public void clearWork() { @@ -512,7 +534,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine else { _delegate = newDelegate; - + _delegate.setWorkListener(_workListener.get()); _header.flip(); _delegate.received(_header); if(msg.hasRemaining()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java index 3a2abd45e7..7a0f43d74d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol; import javax.security.auth.Subject; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.util.Action; public interface ServerProtocolEngine extends ProtocolEngine { @@ -48,4 +49,6 @@ public interface ServerProtocolEngine extends ProtocolEngine void clearWork(); void notifyWork(); + + void setWorkListener(Action<ServerProtocolEngine> listener); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java index dfa6b49e9b..97df05419a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -43,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.server.util.Action; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; @@ -114,6 +115,15 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende _sslContext = sslContext; _onTransportEncryptionAction = onTransportEncryptionAction; + delegate.setWorkListener(new Action<ServerProtocolEngine>() + { + @Override + public void performAction(final ServerProtocolEngine object) + { + _selector.wakeup(); + } + }); + if(encryptionSet.size() == 1) { _transportEncryption = _encryptionSet.iterator().next(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index e521754edf..5f99ddf56c 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; @@ -36,6 +37,7 @@ import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.Action; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.network.Assembler; @@ -61,6 +63,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private volatile boolean _messageAssignmentSuspended; private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); + public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) @@ -302,6 +306,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void notifyWork() { _stateChanged.set(true); + + final Action<ServerProtocolEngine> listener = _workListener.get(); + if(listener != null) + { + listener.performAction(this); + } } @Override @@ -309,4 +319,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { _stateChanged.set(false); } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index dce656e7fd..caa8b90485 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -690,9 +690,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S public void notifyWork() { _serverProtocolEngine.notifyWork(); - - // TODO - getSender().flush(); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index b1547f13e2..f8ca3077b4 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -98,6 +99,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, ServerMethodProcessor<ServerChannelMethodProcessor> { + + enum ConnectionState { INIT, @@ -120,6 +123,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private final AmqpPort<?> _port; private final long _creationTime; private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); private AMQShortString _contextKey; @@ -218,7 +222,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) { _messageAssignmentSuspended = messageAssignmentSuspended; - if(!messageAssignmentSuspended) { for(AMQSessionModel<?,?> session : getSessionModels()) @@ -2095,8 +2098,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { _stateChanged.set(true); - // TODO - _sender.flush(); + final Action<ServerProtocolEngine> listener = _workListener.get(); + _logger.info("Work lister is null? " + (listener == null)); + if(listener != null) + { + + listener.performAction(this); + } } @Override @@ -2104,4 +2112,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { _stateChanged.set(false); } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 09250ea6ac..097abe9d8b 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -503,9 +503,6 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod public void notifyWork() { _protocolEngine.notifyWork(); - - // TODO - _protocolEngine.flushBatched(); } @Override diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 856aa14947..3d7712d1ad 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -30,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import javax.security.sasl.SaslException; @@ -62,6 +63,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.TransportException; @@ -84,6 +86,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private ConnectionEndpoint _endpoint; private long _connectionId; private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); + private static final ByteBuffer HEADER = ByteBuffer.wrap(new byte[] @@ -633,6 +637,12 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut public void notifyWork() { _stateChanged.set(true); + + final Action<ServerProtocolEngine> listener = _workListener.get(); + if(listener != null) + { + listener.performAction(this); + } } @Override @@ -640,4 +650,11 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { _stateChanged.set(false); } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + } + } |
