summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-11 16:54:29 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-11 16:54:29 +0000
commit5701b4ba67b8e475326acfc9f28735aead8d9dfc (patch)
tree9eb4212be8b423d1b756c75ff24a57a98c79c966 /qpid/java
parentf75a0292a343f2d07b1b413486121999540fd64a (diff)
downloadqpid-python-5701b4ba67b8e475326acfc9f28735aead8d9dfc.tar.gz
more refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659004 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java5
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java24
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java10
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java16
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java20
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java3
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java17
9 files changed, 89 insertions, 12 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 6ccdec3436..e32613d700 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -30,12 +30,14 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
public abstract class AbstractConsumerTarget implements ConsumerTarget
{
-
+ private static final Logger LOGGER = Logger.getLogger(AbstractConsumerTarget.class);
private final AtomicReference<State> _state;
private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
@@ -176,7 +178,6 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
@Override
public void sendNextMessage()
{
-
ConsumerMessageInstancePair consumerMessage = _queue.peek();
if (consumerMessage != null)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 5c575006cf..6f55e59067 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -36,6 +37,7 @@ import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -57,6 +59,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private final Protocol _defaultSupportedReply;
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
public MultiVersionProtocolEngine(final Broker<?> broker,
final Set<Protocol> supported,
@@ -228,6 +231,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
@Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ _delegate.setWorkListener(listener);
+ }
+
+ @Override
public void clearWork()
{
_delegate.clearWork();
@@ -267,6 +277,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
@Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+
+ }
+
+ @Override
public void clearWork()
{
@@ -418,6 +434,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
@Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+
+ }
+
+ @Override
public void clearWork()
{
@@ -512,7 +534,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
else
{
_delegate = newDelegate;
-
+ _delegate.setWorkListener(_workListener.get());
_header.flip();
_delegate.received(_header);
if(msg.hasRemaining())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
index 3a2abd45e7..7a0f43d74d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol;
import javax.security.auth.Subject;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.util.Action;
public interface ServerProtocolEngine extends ProtocolEngine
{
@@ -48,4 +49,6 @@ public interface ServerProtocolEngine extends ProtocolEngine
void clearWork();
void notifyWork();
+
+ void setWorkListener(Action<ServerProtocolEngine> listener);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index dfa6b49e9b..97df05419a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
@@ -114,6 +115,15 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
_sslContext = sslContext;
_onTransportEncryptionAction = onTransportEncryptionAction;
+ delegate.setWorkListener(new Action<ServerProtocolEngine>()
+ {
+ @Override
+ public void performAction(final ServerProtocolEngine object)
+ {
+ _selector.wakeup();
+ }
+ });
+
if(encryptionSet.size() == 1)
{
_transportEncryption = _encryptionSet.iterator().next();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index e521754edf..5f99ddf56c 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -36,6 +37,7 @@ import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.network.Assembler;
@@ -61,6 +63,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private volatile boolean _messageAssignmentSuspended;
private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
+
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network)
@@ -302,6 +306,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void notifyWork()
{
_stateChanged.set(true);
+
+ final Action<ServerProtocolEngine> listener = _workListener.get();
+ if(listener != null)
+ {
+ listener.performAction(this);
+ }
}
@Override
@@ -309,4 +319,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
_stateChanged.set(false);
}
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index dce656e7fd..caa8b90485 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -690,9 +690,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
public void notifyWork()
{
_serverProtocolEngine.notifyWork();
-
- // TODO
- getSender().flush();
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index b1547f13e2..f8ca3077b4 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -98,6 +99,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
ServerMethodProcessor<ServerChannelMethodProcessor>
{
+
+
enum ConnectionState
{
INIT,
@@ -120,6 +123,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private final AmqpPort<?> _port;
private final long _creationTime;
private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
private AMQShortString _contextKey;
@@ -218,7 +222,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
_messageAssignmentSuspended = messageAssignmentSuspended;
-
if(!messageAssignmentSuspended)
{
for(AMQSessionModel<?,?> session : getSessionModels())
@@ -2095,8 +2098,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
_stateChanged.set(true);
- // TODO
- _sender.flush();
+ final Action<ServerProtocolEngine> listener = _workListener.get();
+ _logger.info("Work lister is null? " + (listener == null));
+ if(listener != null)
+ {
+
+ listener.performAction(this);
+ }
}
@Override
@@ -2104,4 +2112,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
_stateChanged.set(false);
}
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 09250ea6ac..097abe9d8b 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -503,9 +503,6 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
public void notifyWork()
{
_protocolEngine.notifyWork();
-
- // TODO
- _protocolEngine.flushBatched();
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 856aa14947..3d7712d1ad 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
@@ -62,6 +63,7 @@ import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.TransportException;
@@ -84,6 +86,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private ConnectionEndpoint _endpoint;
private long _connectionId;
private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
+
private static final ByteBuffer HEADER =
ByteBuffer.wrap(new byte[]
@@ -633,6 +637,12 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
public void notifyWork()
{
_stateChanged.set(true);
+
+ final Action<ServerProtocolEngine> listener = _workListener.get();
+ if(listener != null)
+ {
+ listener.performAction(this);
+ }
}
@Override
@@ -640,4 +650,11 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
_stateChanged.set(false);
}
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ }
+
}