diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-08 13:15:35 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-08 13:15:35 +0000 |
| commit | 8468c9b9b067489aef2227456f662bfdbdc71272 (patch) | |
| tree | 370659fd9ea9784d6e608f049a870433aa821353 /qpid/java/broker-plugins | |
| parent | e7375322dc1083dbfffe49a903d4737a6943907e (diff) | |
| download | qpid-python-8468c9b9b067489aef2227456f662bfdbdc71272.tar.gz | |
QPID-3978 : [Java Broker] Allow for acquired messages to be removed from a queue due to TTL or management actions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
11 files changed, 122 insertions, 57 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index d73d019000..7ab3fbb1f5 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -534,15 +534,25 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC return _stopped.get(); } - public void acknowledge(MessageInstance entry) + public boolean deleteAcquired(MessageInstance entry) { - // TODO Fix Store Context / cleanup if(entry.isAcquiredBy(getConsumer())) { - _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); - _unacknowledgedCount.decrementAndGet(); + acquisitionRemoved(entry); entry.delete(); + return true; } + else + { + return false; + } + } + + @Override + public void acquisitionRemoved(final MessageInstance entry) + { + _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); + _unacknowledgedCount.decrementAndGet(); } public void flush() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index 4420709a91..94f04bbae3 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -41,13 +41,13 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onAccept() { - if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition()) { _target.getSessionModel().acknowledge(_target, _entry); } else { - _logger.warn("MessageAccept received for message which has not been acquired (likely client error)"); + _logger.info("MessageAccept received for message which is not been acquired - message may have expired or been removed"); } } @@ -60,7 +60,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } else { - _logger.warn("MessageRelease received for message which has not been acquired (likely client error)"); + _logger.warn("MessageRelease received for message which has not been acquired - message may have expired or been removed"); } } @@ -72,7 +72,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } else { - _logger.warn("MessageReject received for message which has not been acquired (likely client error)"); + _logger.warn("MessageReject received for message which has not been acquired - message may have expired or been removed"); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index cd1146ac0b..7917b7989a 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -29,6 +29,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene private final ConsumerTarget_0_10 _sub; private final MessageInstance _entry; private final ServerSession _session; + private long _messageSize; private boolean _restoreCredit; public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) @@ -38,15 +39,19 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene _entry = entry; _session = session; _restoreCredit = restoreCredit; + if(restoreCredit) + { + _messageSize = entry.getMessage().getSize(); + } } public void onComplete(Method method) { if(_restoreCredit) { - _sub.restoreCredit(_entry.getMessage()); + _sub.getCreditManager().restoreCredit(1l, _messageSize); } - if(_entry.isAcquiredBy(_sub.getConsumer())) + if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition()) { _session.acknowledge(_sub, _entry); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 3fe1515b18..b1c22fe823 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -460,7 +460,7 @@ public class ServerSession extends Session public void postCommit() { - sub.acknowledge(entry); + sub.deleteAcquired(entry); } public void onRollback() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b6e1b7dd6a..7877812d84 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1413,7 +1413,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // explicit rollbacks resend the message after the rollback-ok is sent if(_rollingBack) { - _resendList.addAll(_ackedMessages); + for(MessageInstance entry : _ackedMessages) + { + entry.unlockAcquisition(); + } + _resendList.addAll(_ackedMessages); } else { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 3de89a1d70..7303ef30da 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -20,11 +20,16 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; @@ -34,14 +39,10 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.StateChangeListener; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - /** * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag * that was given out by the broker and the channel id. <p/> @@ -57,7 +58,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen final MessageInstance.State oldSate, final MessageInstance.State newState) { - if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED)) + if (oldSate == QueueEntry.State.ACQUIRED && newState != QueueEntry.State.ACQUIRED) { restoreCredit(entry.getMessage()); } @@ -74,8 +75,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, - AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager) throws AMQException + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager) throws AMQException { return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } @@ -557,6 +558,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen }); } + @Override + public void acquisitionRemoved(final MessageInstance node) + { + } + public long getUnacknowledgedBytes() { return _unacknowledgedBytes.longValue(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java index 8d70e769d3..1bd9ab079e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java @@ -20,31 +20,28 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.queue.QueueEntry; - +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; + public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { private final Object _lock = new Object(); - private long _unackedSize; - private Map<Long, MessageInstance> _map; - private long _lastDeliveryTag; - private final int _prefetchLimit; public UnacknowledgedMessageMapImpl(int prefetchLimit) { _prefetchLimit = prefetchLimit; - _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit); + _map = new LinkedHashMap<>(prefetchLimit); } public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs) @@ -81,12 +78,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { MessageInstance message = _map.remove(deliveryTag); - if(message != null) - { - _unackedSize -= message.getMessage().getSize(); - - } - return message; } } @@ -109,8 +100,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.put(deliveryTag, message); - _unackedSize += message.getMessage().getSize(); - _lastDeliveryTag = deliveryTag; } } @@ -119,8 +108,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { Collection<MessageInstance> currentEntries = _map.values(); - _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit); - _unackedSize = 0l; + _map = new LinkedHashMap<>(_prefetchLimit); return currentEntries; } } @@ -138,7 +126,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.clear(); - _unackedSize = 0l; } } @@ -163,6 +150,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>(); collect(deliveryTag, multiple, ackedMessageMap); remove(ackedMessageMap); + List<MessageInstance> acknowledged = new ArrayList<>(); + for(MessageInstance instance : ackedMessageMap.values()) + { + if(instance.lockAcquisition()) + { + acknowledged.add(instance); + } + } return ackedMessageMap.values(); } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index adb2f8ea6a..bceae85896 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.nio.ByteBuffer; +import java.util.List; + import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; @@ -37,19 +40,16 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import java.nio.ByteBuffer; -import java.util.List; - class ConsumerTarget_1_0 extends AbstractConsumerTarget { private final boolean _acquires; @@ -378,6 +378,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget if(outcome instanceof Accepted) { + _queueEntry.lockAcquisition(); txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(), new ServerTransaction.Action() { @@ -412,6 +413,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget modified.setDeliveryFailed(true); _link.getEndpoint().updateDisposition(_deliveryTag, modified, true); _link.getEndpoint().sendFlowConditional(); + _queueEntry.unlockAcquisition(); } } }); @@ -498,6 +500,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } @Override + public void acquisitionRemoved(final MessageInstance node) + { + } + + @Override public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 7a844cbc79..a8fc5387b4 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -636,19 +636,21 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore()); if(_consumer.acquires()) { - txn.dequeue(Collections.singleton(queueEntry), - new ServerTransaction.Action() - { - public void postCommit() - { - queueEntry.delete(); - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - }); + if(queueEntry.acquire() || queueEntry.isAcquired()) + { + txn.dequeue(Collections.singleton(queueEntry), + new ServerTransaction.Action() + { + public void postCommit() + { + queueEntry.delete(); + } + + public void onRollback() + { + } + }); + } } } else if(outcome instanceof Released) diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index e73d177599..34f08615ad 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -1071,6 +1071,12 @@ class ManagementNode implements MessageSource, MessageDestination } @Override + public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer) + { + return false; + } + + @Override public void setRedelivered() { @@ -1119,6 +1125,18 @@ class ManagementNode implements MessageSource, MessageDestination } @Override + public boolean lockAcquisition() + { + return false; + } + + @Override + public boolean unlockAcquisition() + { + return false; + } + + @Override public int getMaximumDeliveryCount() { return 0; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index ae2828d392..03e7eab61b 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -90,6 +90,12 @@ class ManagementResponse implements MessageInstance } @Override + public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer) + { + return consumer == _consumer; + } + + @Override public void setRedelivered() { _isRedelivered = true; @@ -138,6 +144,18 @@ class ManagementResponse implements MessageInstance } @Override + public boolean lockAcquisition() + { + return false; + } + + @Override + public boolean unlockAcquisition() + { + return false; + } + + @Override public int getMaximumDeliveryCount() { return 0; @@ -190,7 +208,7 @@ class ManagementResponse implements MessageInstance @Override public void delete() { - // TODO + _isDeleted = true; } @Override |
