summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-10 18:10:16 +0000
committerKeith Wall <kwall@apache.org>2015-02-10 18:10:16 +0000
commitf5ee46517eb096030a6c44b14b801eb2aaeb9392 (patch)
tree25544486642cc770061489663dba650d85769404 /qpid/java/broker-core
parent085486ebe5ff21133b9caf1c31625ac6ea356568 (diff)
downloadqpid-python-f5ee46517eb096030a6c44b14b801eb2aaeb9392.tar.gz
Refactoring: make the queue no longer be responsible for pushing messages onto the wire
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java57
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java52
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java54
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java36
11 files changed, 212 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 0421a66abf..cad7b71fdd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -23,12 +23,14 @@ package org.apache.qpid.server.consumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
public abstract class AbstractConsumerTarget implements ConsumerTarget
@@ -41,6 +43,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicInteger _stateActivates = new AtomicInteger();
+ private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
protected AbstractConsumerTarget(final State initialState)
@@ -48,6 +51,22 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
_state = new AtomicReference<State>(initialState);
}
+ @Override
+ public void processPendingMessages()
+ {
+ while(hasMessagesToSend())
+ {
+ sendNextMessage();
+ }
+ }
+
+ @Override
+ public final boolean isSuspended()
+ {
+ return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended();
+ }
+
+ protected abstract boolean doIsSuspended();
public final State getState()
{
@@ -136,4 +155,42 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
_stateChangeLock.unlock();
}
+ @Override
+ public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ {
+ _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
+
+ getSessionModel().getConnectionModel().flushBatched();
+ return entry.getMessage().getSize();
+ }
+
+ protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+
+ @Override
+ public boolean hasMessagesToSend()
+ {
+ return !_queue.isEmpty();
+ }
+
+ @Override
+ public void sendNextMessage()
+ {
+
+ ConsumerMessageInstancePair consumerMessage = _queue.peek();
+ if (consumerMessage != null)
+ {
+ _queue.poll();
+
+ ConsumerImpl consumer = consumerMessage.getConsumer();
+ MessageInstance entry = consumerMessage.getEntry();
+ boolean batch = consumerMessage.isBatch();
+ doSend(consumer, entry, batch);
+
+ if (consumer.acquires())
+ {
+ entry.unlockAcquisition();
+ }
+ }
+
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index b15b01ede5..3b196df902 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -31,6 +31,8 @@ public interface ConsumerImpl
void externalStateChange();
+ ConsumerTarget getTarget();
+
enum Option
{
ACQUIRES,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
new file mode 100644
index 0000000000..aa5e419ce2
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.consumer;
+
+import org.apache.qpid.server.message.MessageInstance;
+
+public class ConsumerMessageInstancePair
+{
+ private final ConsumerImpl _consumer;
+ private final MessageInstance _entry;
+ private final boolean _batch;
+
+ public ConsumerMessageInstancePair(final ConsumerImpl consumer, final MessageInstance entry, final boolean batch)
+ {
+ _consumer = consumer;
+ _entry = entry;
+ _batch = batch;
+
+ }
+
+ public ConsumerImpl getConsumer()
+ {
+ return _consumer;
+ }
+
+ public MessageInstance getEntry()
+ {
+ return _entry;
+ }
+
+ public boolean isBatch()
+ {
+ return _batch;
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 5aef922da5..b2e8cec315 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -33,6 +33,8 @@ public interface ConsumerTarget
void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
+ void processPendingMessages();
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
@@ -54,6 +56,10 @@ public interface ConsumerTarget
long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+ boolean hasMessagesToSend();
+
+ void sendNextMessage();
+
void flushBatched();
void queueDeleted();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 26e8271d14..96900d9a5a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -107,4 +107,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
void removeSessionListener(SessionModelListener listener);
+ void flushBatched();
+
+ boolean isMessageAssignmentSuspended();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index 40aa1bbafd..d488ccc138 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -115,4 +115,6 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo
long getTransactionUpdateTime();
void transportStateChanged();
+
+ void processPendingMessages();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index eb7599c5cc..946992cbb6 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -84,6 +84,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_onCloseTask = onCloseTask;
}
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+ _delegate.setMessageAssignmentSuspended(value);
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _delegate.isMessageAssignmentSuspended();
+ }
+
public SocketAddress getRemoteAddress()
{
return _delegate.getRemoteAddress();
@@ -198,10 +210,33 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getLastWriteTime();
}
-
+ @Override
+ public void processPendingMessages()
+ {
+ _delegate.processPendingMessages();
+ }
private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public void processPendingMessages()
+ {
+
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -318,6 +353,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return 0;
}
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public void processPendingMessages()
+ {
+
+ }
+
public void received(ByteBuffer msg)
{
_lastReadTime = System.currentTimeMillis();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 26cbf379a0..a545ce6e10 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1148,10 +1148,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, entry, false);
- if(sub.acquires())
- {
- entry.unlockAcquisition();
- }
}
}
}
@@ -1978,10 +1974,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, node, batch);
- if(sub.acquires())
- {
- node.unlockAcquisition();
- }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index a2c275e797..c459737c46 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -52,5 +52,4 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl,
QueueContext getQueueContext();
- ConsumerTarget getTarget();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index c85e4058a1..4fb89575aa 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -507,6 +507,7 @@ class QueueConsumerImpl
return _selector;
}
+
@Override
public String toLogString()
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 47ed224133..8b424d2c9e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -178,6 +178,18 @@ public class MockConsumer implements ConsumerTarget
return size;
}
+ @Override
+ public boolean hasMessagesToSend()
+ {
+ return false;
+ }
+
+ @Override
+ public void sendNextMessage()
+ {
+
+ }
+
public void flushBatched()
{
@@ -230,6 +242,12 @@ public class MockConsumer implements ConsumerTarget
}
}
+ @Override
+ public void processPendingMessages()
+ {
+
+ }
+
public ArrayList<MessageInstance> getMessages()
{
return messages;
@@ -462,6 +480,12 @@ public class MockConsumer implements ConsumerTarget
{
}
+
+ @Override
+ public void processPendingMessages()
+ {
+
+ }
}
private static class MockConnectionModel implements AMQConnectionModel
@@ -594,6 +618,18 @@ public class MockConsumer implements ConsumerTarget
}
@Override
+ public void flushBatched()
+ {
+
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
public String getClientVersion()
{
return null;