diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-10 18:10:16 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-10 18:10:16 +0000 |
| commit | f5ee46517eb096030a6c44b14b801eb2aaeb9392 (patch) | |
| tree | 25544486642cc770061489663dba650d85769404 /qpid/java/broker-core | |
| parent | 085486ebe5ff21133b9caf1c31625ac6ea356568 (diff) | |
| download | qpid-python-f5ee46517eb096030a6c44b14b801eb2aaeb9392.tar.gz | |
Refactoring: make the queue no longer be responsible for pushing messages onto the wire
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
11 files changed, 212 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index 0421a66abf..cad7b71fdd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -23,12 +23,14 @@ package org.apache.qpid.server.consumer; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; public abstract class AbstractConsumerTarget implements ConsumerTarget @@ -41,6 +43,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget private final Lock _stateChangeLock = new ReentrantLock(); private final AtomicInteger _stateActivates = new AtomicInteger(); + private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue(); protected AbstractConsumerTarget(final State initialState) @@ -48,6 +51,22 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget _state = new AtomicReference<State>(initialState); } + @Override + public void processPendingMessages() + { + while(hasMessagesToSend()) + { + sendNextMessage(); + } + } + + @Override + public final boolean isSuspended() + { + return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended(); + } + + protected abstract boolean doIsSuspended(); public final State getState() { @@ -136,4 +155,42 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget _stateChangeLock.unlock(); } + @Override + public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + { + _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch)); + + getSessionModel().getConnectionModel().flushBatched(); + return entry.getMessage().getSize(); + } + + protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch); + + @Override + public boolean hasMessagesToSend() + { + return !_queue.isEmpty(); + } + + @Override + public void sendNextMessage() + { + + ConsumerMessageInstancePair consumerMessage = _queue.peek(); + if (consumerMessage != null) + { + _queue.poll(); + + ConsumerImpl consumer = consumerMessage.getConsumer(); + MessageInstance entry = consumerMessage.getEntry(); + boolean batch = consumerMessage.isBatch(); + doSend(consumer, entry, batch); + + if (consumer.acquires()) + { + entry.unlockAcquisition(); + } + } + + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index b15b01ede5..3b196df902 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -31,6 +31,8 @@ public interface ConsumerImpl void externalStateChange(); + ConsumerTarget getTarget(); + enum Option { ACQUIRES, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java new file mode 100644 index 0000000000..aa5e419ce2 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.consumer; + +import org.apache.qpid.server.message.MessageInstance; + +public class ConsumerMessageInstancePair +{ + private final ConsumerImpl _consumer; + private final MessageInstance _entry; + private final boolean _batch; + + public ConsumerMessageInstancePair(final ConsumerImpl consumer, final MessageInstance entry, final boolean batch) + { + _consumer = consumer; + _entry = entry; + _batch = batch; + + } + + public ConsumerImpl getConsumer() + { + return _consumer; + } + + public MessageInstance getEntry() + { + return _entry; + } + + public boolean isBatch() + { + return _batch; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index 5aef922da5..b2e8cec315 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -33,6 +33,8 @@ public interface ConsumerTarget void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener); + void processPendingMessages(); + enum State { ACTIVE, SUSPENDED, CLOSED @@ -54,6 +56,10 @@ public interface ConsumerTarget long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch); + boolean hasMessagesToSend(); + + void sendNextMessage(); + void flushBatched(); void queueDeleted(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 26e8271d14..96900d9a5a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -107,4 +107,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends void removeSessionListener(SessionModelListener listener); + void flushBatched(); + + boolean isMessageAssignmentSuspended(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index 40aa1bbafd..d488ccc138 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -115,4 +115,6 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo long getTransactionUpdateTime(); void transportStateChanged(); + + void processPendingMessages(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index eb7599c5cc..946992cbb6 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -84,6 +84,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _onCloseTask = onCloseTask; } + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + _delegate.setMessageAssignmentSuspended(value); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _delegate.isMessageAssignmentSuspended(); + } + public SocketAddress getRemoteAddress() { return _delegate.getRemoteAddress(); @@ -198,10 +210,33 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getLastWriteTime(); } - + @Override + public void processPendingMessages() + { + _delegate.processPendingMessages(); + } private class ClosedDelegateProtocolEngine implements ServerProtocolEngine { + + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override + public void processPendingMessages() + { + + } + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -318,6 +353,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return 0; } + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override + public void processPendingMessages() + { + + } + public void received(ByteBuffer msg) { _lastReadTime = System.currentTimeMillis(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 26cbf379a0..a545ce6e10 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1148,10 +1148,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, entry, false); - if(sub.acquires()) - { - entry.unlockAcquisition(); - } } } } @@ -1978,10 +1974,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, node, batch); - if(sub.acquires()) - { - node.unlockAcquisition(); - } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index a2c275e797..c459737c46 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -52,5 +52,4 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, QueueContext getQueueContext(); - ConsumerTarget getTarget(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index c85e4058a1..4fb89575aa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -507,6 +507,7 @@ class QueueConsumerImpl return _selector; } + @Override public String toLogString() { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 47ed224133..8b424d2c9e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -178,6 +178,18 @@ public class MockConsumer implements ConsumerTarget return size; } + @Override + public boolean hasMessagesToSend() + { + return false; + } + + @Override + public void sendNextMessage() + { + + } + public void flushBatched() { @@ -230,6 +242,12 @@ public class MockConsumer implements ConsumerTarget } } + @Override + public void processPendingMessages() + { + + } + public ArrayList<MessageInstance> getMessages() { return messages; @@ -462,6 +480,12 @@ public class MockConsumer implements ConsumerTarget { } + + @Override + public void processPendingMessages() + { + + } } private static class MockConnectionModel implements AMQConnectionModel @@ -594,6 +618,18 @@ public class MockConsumer implements ConsumerTarget } @Override + public void flushBatched() + { + + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override public String getClientVersion() { return null; |
