diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-08 13:15:35 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-08 13:15:35 +0000 |
| commit | 8468c9b9b067489aef2227456f662bfdbdc71272 (patch) | |
| tree | 370659fd9ea9784d6e608f049a870433aa821353 /qpid/java | |
| parent | e7375322dc1083dbfffe49a903d4737a6943907e (diff) | |
| download | qpid-python-8468c9b9b067489aef2227456f662bfdbdc71272.tar.gz | |
QPID-3978 : [Java Broker] Allow for acquired messages to be removed from a queue due to TTL or management actions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
23 files changed, 389 insertions, 90 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index faf5a724f3..f8585344b0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -29,6 +29,8 @@ public interface ConsumerTarget { + void acquisitionRemoved(MessageInstance node); + enum State { ACTIVE, SUSPENDED, CLOSED diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 4ee47e05e9..1bf451948d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -51,6 +51,8 @@ public interface MessageInstance boolean isAcquiredBy(ConsumerImpl consumer); + boolean removeAcquisitionFromConsumer(ConsumerImpl consumer); + void setRedelivered(); boolean isRedelivered(); @@ -67,6 +69,10 @@ public interface MessageInstance boolean acquire(ConsumerImpl sub); + boolean lockAcquisition(); + + boolean unlockAcquisition(); + int getMaximumDeliveryCount(); int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn); @@ -99,6 +105,7 @@ public interface MessageInstance State currentState = getState(); return currentState == State.DEQUEUED || currentState == State.DELETED; } + } @@ -162,10 +169,12 @@ public interface MessageInstance public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState { private final C _consumer; + private final LockedAcquiredState<C> _lockedState; public ConsumerAcquiredState(C consumer) { _consumer = consumer; + _lockedState = new LockedAcquiredState<>(this); } @@ -183,6 +192,43 @@ public interface MessageInstance { return "{" + getState().name() + " : " + _consumer +"}"; } + + public LockedAcquiredState<C> getLockedState() + { + return _lockedState; + } + + } + + public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState + { + private final ConsumerAcquiredState<C> _acquiredState; + + public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState) + { + _acquiredState = acquiredState; + } + + @Override + public State getState() + { + return State.ACQUIRED; + } + + public C getConsumer() + { + return _acquiredState.getConsumer(); + } + + public String toString() + { + return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}"; + } + + public ConsumerAcquiredState<C> getUnlockedState() + { + return _acquiredState; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 54f3c4de09..8483e35b9e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1164,6 +1164,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, entry, false); + if(sub.acquires()) + { + entry.unlockAcquisition(); + } } } } @@ -2001,6 +2005,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, node, batch); + if(sub.acquires()) + { + node.unlockAcquisition(); + } } } @@ -2253,14 +2261,28 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (!node.isDeleted()) { // If the node has expired then acquire it - if (node.expired() && node.acquire()) + if (node.expired()) { - if (_logger.isDebugEnabled()) + boolean acquiredForDequeueing = node.acquire(); + if(!acquiredForDequeueing && node.getDeliveredToConsumer()) + { + QueueConsumer consumer = (QueueConsumer) node.getDeliveredConsumer(); + acquiredForDequeueing = node.removeAcquisitionFromConsumer(consumer); + if(acquiredForDequeueing) + { + consumer.acquisitionRemoved(node); + } + } + + if(acquiredForDequeueing) { - _logger.debug("Dequeuing expired node " + node); + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeuing expired node " + node); + } + // Then dequeue it. + dequeueEntry(node); } - // Then dequeue it. - dequeueEntry(node); } else { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 5ffbc0dbaa..71b7636159 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -39,6 +39,8 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, void send(QueueEntry entry, boolean batch); + void acquisitionRemoved(QueueEntry node); + void queueDeleted(); SubFlushRunner getRunner(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 55782ac095..d80aa92007 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -477,6 +477,13 @@ class QueueConsumerImpl } @Override + public void acquisitionRemoved(final QueueEntry node) + { + _target.acquisitionRemoved(node); + _queue.decrementUnackedMsgCount(node); + } + + @Override public String getDistributionMode() { return _distributionMode; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 49644f8d76..96916a02e2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -210,7 +210,7 @@ public abstract class QueueEntryImpl implements QueueEntry public boolean acquire(ConsumerImpl sub) { - final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState()); + final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState()); if(acquired) { _deliveryCountUpdater.compareAndSet(this,-1,0); @@ -218,17 +218,57 @@ public abstract class QueueEntryImpl implements QueueEntry return acquired; } + @Override + public boolean lockAcquisition() + { + EntryState state = _state; + if(state instanceof ConsumerAcquiredState) + { + return _stateUpdater.compareAndSet(this, state, ((ConsumerAcquiredState)state).getLockedState()); + } + return state instanceof LockedAcquiredState; + } + + @Override + public boolean unlockAcquisition() + { + EntryState state = _state; + if(state instanceof LockedAcquiredState) + { + return _stateUpdater.compareAndSet(this, state, ((LockedAcquiredState)state).getUnlockedState()); + } + return false; + } + public boolean acquiredByConsumer() { - return (_state instanceof ConsumerAcquiredState); + return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState); } + @Override public boolean isAcquiredBy(ConsumerImpl consumer) { EntryState state = _state; - return state instanceof ConsumerAcquiredState - && ((ConsumerAcquiredState)state).getConsumer() == consumer; + return (state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState)state).getConsumer() == consumer) + || (state instanceof LockedAcquiredState + && ((LockedAcquiredState)state).getConsumer() == consumer); + } + + @Override + public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer) + { + EntryState state = _state; + if(state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState)state).getConsumer() == consumer) + { + return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE); + } + else + { + return false; + } } public void release() @@ -238,7 +278,7 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE)) { - if(state instanceof ConsumerAcquiredState) + if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) { getQueue().decrementUnackedMsgCount(this); } @@ -268,6 +308,10 @@ public abstract class QueueEntryImpl implements QueueEntry { return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer(); } + else if (state instanceof LockedAcquiredState) + { + return (QueueConsumer) ((LockedAcquiredState) state).getConsumer(); + } else { return null; @@ -312,7 +356,7 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { - if (state instanceof ConsumerAcquiredState) + if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) { getQueue().decrementUnackedMsgCount(this); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index f15f608907..b72d44debf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -68,6 +68,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueConsumer; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; @@ -953,15 +955,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte op.withinTransaction(new Transaction() { - public void dequeue(final MessageInstance entry) + public void dequeue(final MessageInstance messageInstance) { - if(entry.acquire()) + boolean acquired = messageInstance.acquire(); + if(!acquired && messageInstance instanceof QueueEntry) + { + QueueEntry entry = (QueueEntry) messageInstance; + QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer(); + acquired = messageInstance.removeAcquisitionFromConsumer(consumer); + if(acquired) + { + consumer.acquisitionRemoved((QueueEntry)messageInstance); + } + } + if(acquired) { - txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action() + txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action() { public void postCommit() { - entry.delete(); + messageInstance.delete(); } public void onRollback() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index e6afbc6e90..c36f87c4ae 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -181,6 +181,12 @@ public class MockConsumer implements ConsumerTarget } + @Override + public void acquisitionRemoved(final MessageInstance node) + { + + } + public State getState() { return _state; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java index 74a2262265..37c4eeb127 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java @@ -65,6 +65,12 @@ public class MockMessageInstance implements MessageInstance return false; } + @Override + public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer) + { + return false; + } + public void delete() { @@ -81,6 +87,18 @@ public class MockMessageInstance implements MessageInstance return false; } + @Override + public boolean lockAcquisition() + { + return false; + } + + @Override + public boolean unlockAcquisition() + { + return false; + } + public boolean isAvailable() { return false; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 3189010284..3a9f990846 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -137,6 +137,40 @@ public abstract class QueueEntryImplTestBase extends TestCase return consumer; } + + public void testLocking() + { + QueueConsumer consumer = newConsumer(); + QueueConsumer consumer2 = newConsumer(); + + _queueEntry.acquire(consumer); + assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method", + _queueEntry.isAcquired()); + + assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer)); + assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition()); + assertFalse("Acquisition should not be able to be removed from the wrong consumer",_queueEntry.removeAcquisitionFromConsumer(consumer2)); + assertTrue("Acquisition should be able to be removed once unlocked",_queueEntry.removeAcquisitionFromConsumer(consumer)); + assertTrue("Queue Entry should still be acquired", _queueEntry.isAcquired()); + assertFalse("Queue Entry should not be marked as acquired by a consumer", _queueEntry.acquiredByConsumer()); + + _queueEntry.release(); + + assertFalse("Hijacked queue entry should be able to be released", _queueEntry.isAcquired()); + + _queueEntry.acquire(consumer); + assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method", + _queueEntry.isAcquired()); + + assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer)); + assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition()); + assertTrue("Should be able to unlock locked queue entry",_queueEntry.lockAcquisition()); + assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer)); + + _queueEntry.delete(); + assertTrue("Locked queue entry should be able to be deleted", _queueEntry.isDeleted()); + } + /** * A helper method to get entry state * diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index d328e21a94..ce1c95e674 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -342,5 +342,17 @@ public class StandardQueueTest extends AbstractQueueTestBase return super.acquire(sub); } } + + @Override + public boolean lockAcquisition() + { + return true; + } + + @Override + public boolean unlockAcquisition() + { + return true; + } } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index d73d019000..7ab3fbb1f5 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -534,15 +534,25 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC return _stopped.get(); } - public void acknowledge(MessageInstance entry) + public boolean deleteAcquired(MessageInstance entry) { - // TODO Fix Store Context / cleanup if(entry.isAcquiredBy(getConsumer())) { - _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); - _unacknowledgedCount.decrementAndGet(); + acquisitionRemoved(entry); entry.delete(); + return true; } + else + { + return false; + } + } + + @Override + public void acquisitionRemoved(final MessageInstance entry) + { + _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); + _unacknowledgedCount.decrementAndGet(); } public void flush() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index 4420709a91..94f04bbae3 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -41,13 +41,13 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onAccept() { - if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition()) { _target.getSessionModel().acknowledge(_target, _entry); } else { - _logger.warn("MessageAccept received for message which has not been acquired (likely client error)"); + _logger.info("MessageAccept received for message which is not been acquired - message may have expired or been removed"); } } @@ -60,7 +60,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } else { - _logger.warn("MessageRelease received for message which has not been acquired (likely client error)"); + _logger.warn("MessageRelease received for message which has not been acquired - message may have expired or been removed"); } } @@ -72,7 +72,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } else { - _logger.warn("MessageReject received for message which has not been acquired (likely client error)"); + _logger.warn("MessageReject received for message which has not been acquired - message may have expired or been removed"); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index cd1146ac0b..7917b7989a 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -29,6 +29,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene private final ConsumerTarget_0_10 _sub; private final MessageInstance _entry; private final ServerSession _session; + private long _messageSize; private boolean _restoreCredit; public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) @@ -38,15 +39,19 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene _entry = entry; _session = session; _restoreCredit = restoreCredit; + if(restoreCredit) + { + _messageSize = entry.getMessage().getSize(); + } } public void onComplete(Method method) { if(_restoreCredit) { - _sub.restoreCredit(_entry.getMessage()); + _sub.getCreditManager().restoreCredit(1l, _messageSize); } - if(_entry.isAcquiredBy(_sub.getConsumer())) + if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition()) { _session.acknowledge(_sub, _entry); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 3fe1515b18..b1c22fe823 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -460,7 +460,7 @@ public class ServerSession extends Session public void postCommit() { - sub.acknowledge(entry); + sub.deleteAcquired(entry); } public void onRollback() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b6e1b7dd6a..7877812d84 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1413,7 +1413,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // explicit rollbacks resend the message after the rollback-ok is sent if(_rollingBack) { - _resendList.addAll(_ackedMessages); + for(MessageInstance entry : _ackedMessages) + { + entry.unlockAcquisition(); + } + _resendList.addAll(_ackedMessages); } else { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 3de89a1d70..7303ef30da 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -20,11 +20,16 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; @@ -34,14 +39,10 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.StateChangeListener; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - /** * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag * that was given out by the broker and the channel id. <p/> @@ -57,7 +58,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen final MessageInstance.State oldSate, final MessageInstance.State newState) { - if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED)) + if (oldSate == QueueEntry.State.ACQUIRED && newState != QueueEntry.State.ACQUIRED) { restoreCredit(entry.getMessage()); } @@ -74,8 +75,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, - AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager) throws AMQException + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager) throws AMQException { return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } @@ -557,6 +558,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen }); } + @Override + public void acquisitionRemoved(final MessageInstance node) + { + } + public long getUnacknowledgedBytes() { return _unacknowledgedBytes.longValue(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java index 8d70e769d3..1bd9ab079e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java @@ -20,31 +20,28 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.queue.QueueEntry; - +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; + public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { private final Object _lock = new Object(); - private long _unackedSize; - private Map<Long, MessageInstance> _map; - private long _lastDeliveryTag; - private final int _prefetchLimit; public UnacknowledgedMessageMapImpl(int prefetchLimit) { _prefetchLimit = prefetchLimit; - _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit); + _map = new LinkedHashMap<>(prefetchLimit); } public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs) @@ -81,12 +78,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { MessageInstance message = _map.remove(deliveryTag); - if(message != null) - { - _unackedSize -= message.getMessage().getSize(); - - } - return message; } } @@ -109,8 +100,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.put(deliveryTag, message); - _unackedSize += message.getMessage().getSize(); - _lastDeliveryTag = deliveryTag; } } @@ -119,8 +108,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { Collection<MessageInstance> currentEntries = _map.values(); - _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit); - _unackedSize = 0l; + _map = new LinkedHashMap<>(_prefetchLimit); return currentEntries; } } @@ -138,7 +126,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.clear(); - _unackedSize = 0l; } } @@ -163,6 +150,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>(); collect(deliveryTag, multiple, ackedMessageMap); remove(ackedMessageMap); + List<MessageInstance> acknowledged = new ArrayList<>(); + for(MessageInstance instance : ackedMessageMap.values()) + { + if(instance.lockAcquisition()) + { + acknowledged.add(instance); + } + } return ackedMessageMap.values(); } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index adb2f8ea6a..bceae85896 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.nio.ByteBuffer; +import java.util.List; + import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; @@ -37,19 +40,16 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import java.nio.ByteBuffer; -import java.util.List; - class ConsumerTarget_1_0 extends AbstractConsumerTarget { private final boolean _acquires; @@ -378,6 +378,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget if(outcome instanceof Accepted) { + _queueEntry.lockAcquisition(); txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(), new ServerTransaction.Action() { @@ -412,6 +413,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget modified.setDeliveryFailed(true); _link.getEndpoint().updateDisposition(_deliveryTag, modified, true); _link.getEndpoint().sendFlowConditional(); + _queueEntry.unlockAcquisition(); } } }); @@ -498,6 +500,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } @Override + public void acquisitionRemoved(final MessageInstance node) + { + } + + @Override public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 7a844cbc79..a8fc5387b4 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -636,19 +636,21 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore()); if(_consumer.acquires()) { - txn.dequeue(Collections.singleton(queueEntry), - new ServerTransaction.Action() - { - public void postCommit() - { - queueEntry.delete(); - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - }); + if(queueEntry.acquire() || queueEntry.isAcquired()) + { + txn.dequeue(Collections.singleton(queueEntry), + new ServerTransaction.Action() + { + public void postCommit() + { + queueEntry.delete(); + } + + public void onRollback() + { + } + }); + } } } else if(outcome instanceof Released) diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index e73d177599..34f08615ad 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -1071,6 +1071,12 @@ class ManagementNode implements MessageSource, MessageDestination } @Override + public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer) + { + return false; + } + + @Override public void setRedelivered() { @@ -1119,6 +1125,18 @@ class ManagementNode implements MessageSource, MessageDestination } @Override + public boolean lockAcquisition() + { + return false; + } + + @Override + public boolean unlockAcquisition() + { + return false; + } + + @Override public int getMaximumDeliveryCount() { return 0; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index ae2828d392..03e7eab61b 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -90,6 +90,12 @@ class ManagementResponse implements MessageInstance } @Override + public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer) + { + return consumer == _consumer; + } + + @Override public void setRedelivered() { _isRedelivered = true; @@ -138,6 +144,18 @@ class ManagementResponse implements MessageInstance } @Override + public boolean lockAcquisition() + { + return false; + } + + @Override + public boolean unlockAcquisition() + { + return false; + } + + @Override public int getMaximumDeliveryCount() { return 0; @@ -190,7 +208,7 @@ class ManagementResponse implements MessageInstance @Override public void delete() { - // TODO + _isDeleted = true; } @Override diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java index fedb88d008..e606df3f7d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -21,15 +21,8 @@ package org.apache.qpid.server.queue; -import org.junit.Assert; -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import javax.jms.Connection; import javax.jms.JMSException; @@ -39,8 +32,17 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TopicSubscriber; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import javax.naming.NamingException; + +import org.apache.log4j.Logger; +import org.junit.Assert; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; public class TimeToLiveTest extends QpidBrokerTestCase { @@ -53,18 +55,29 @@ public class TimeToLiveTest extends QpidBrokerTestCase private static final int MSG_COUNT = 50; private static final long SERVER_TTL_TIMEOUT = 60000L; + public void testPassiveTTLWithPrefetch() throws Exception + { + doTestPassiveTTL(true); + } + public void testPassiveTTL() throws Exception { + doTestPassiveTTL(false); + + } + + private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException + { //Create Client 1 Connection clientConnection = getConnection(); - + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = clientSession.createQueue(QUEUE); - + Queue queue = clientSession.createQueue(QUEUE); + // Create then close the consumer so the queue is actually created // Closing it then reopening it ensures that the consumer shouldn't get messages // which should have expired and allows a shorter sleep period. See QPID-1418 - + MessageConsumer consumer = clientSession.createConsumer(queue); consumer.close(); @@ -79,6 +92,12 @@ public class TimeToLiveTest extends QpidBrokerTestCase MessageProducer producer = producerSession.createProducer(queue); + consumer = clientSession.createConsumer(queue); + if(prefetchMessages) + { + clientConnection.start(); + } + //Set TTL int msg = 0; producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); @@ -96,7 +115,6 @@ public class TimeToLiveTest extends QpidBrokerTestCase producerSession.commit(); - consumer = clientSession.createConsumer(queue); // Ensure we sleep the required amount of time. ReentrantLock waitLock = new ReentrantLock(); @@ -124,6 +142,16 @@ public class TimeToLiveTest extends QpidBrokerTestCase } + if(prefetchMessages) + { + clientConnection.close(); + clientConnection = getConnection(); + + clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = clientSession.createQueue(QUEUE); + consumer = clientSession.createConsumer(queue); + } + clientConnection.start(); //Receive Message 0 @@ -131,14 +159,14 @@ public class TimeToLiveTest extends QpidBrokerTestCase Message receivedFirst = consumer.receive(5000); Message receivedSecond = consumer.receive(5000); Message receivedThird = consumer.receive(1000); - + // Log the messages to help diagnosis incase of failure _logger.info("First:"+receivedFirst); _logger.info("Second:"+receivedSecond); _logger.info("Third:"+receivedThird); // Only first and last messages sent should survive expiry - Assert.assertNull("More messages received", receivedThird); + Assert.assertNull("More messages received", receivedThird); Assert.assertNotNull("First message not received", receivedFirst); Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); |
