From 8762acf82c2d20042773fab648878860c8854cbd Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Mon, 28 Nov 2011 09:26:58 +0000 Subject: QPID-3642, QPID-3640: Add Dead Letter Queue functionality for 0-10 path. Also, it fixes issue with setting of redelivered flag for pre-fetched messages as DLQ functionality relies on this flag being set correctly. Applied patch from Andrew MacBean and Oleksandr Rudyy git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1207031 13f79535-47bb-0310-9956-ffa450edef68 --- .../ExplicitAcceptDispositionChangeListener.java | 4 +- .../ImplicitAcceptDispositionChangeListener.java | 4 +- .../server/subscription/Subscription_0_10.java | 71 ++++- .../qpid/server/transport/ServerSession.java | 8 +- .../server/transport/ServerSessionDelegate.java | 2 +- .../org/apache/qpid/client/AMQSession_0_10.java | 47 +++- .../qpid/client/BasicMessageConsumer_0_10.java | 2 +- .../qpid/test/unit/transacted/TransactedTest.java | 297 ++++++++++----------- 8 files changed, 252 insertions(+), 183 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java index b49b12fb79..80c5e2866c 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java @@ -53,12 +53,12 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } - public void onRelease() + public void onRelease(boolean setRedelivered) { final Subscription_0_10 subscription = getSubscription(); if(subscription != null && _entry.isAcquiredBy(_sub)) { - subscription.release(_entry); + subscription.release(_entry, setRedelivered); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java index b5bb2014b5..a61b0b4e82 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java @@ -43,11 +43,11 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi _logger.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)"); } - public void onRelease() + public void onRelease(boolean setRedelivered) { if(_entry.isAcquiredBy(_sub)) { - getSubscription().release(_entry); + getSubscription().release(_entry, setRedelivered); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index d302c9ad15..273bab0ebe 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -24,12 +24,15 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPT import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.InboundMessageAdapter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SubscriptionConfig; import org.apache.qpid.server.configuration.SubscriptionConfigType; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.CreditCreditManager; import org.apache.qpid.server.flow.WindowCreditManager; @@ -37,9 +40,11 @@ import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.AMQMessage; @@ -80,6 +85,7 @@ import java.nio.ByteBuffer; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { + private final long _subscriptionID; private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); @@ -601,6 +607,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } _session.sendMessage(xfr, _postIdSettingAction); + entry.incrementDeliveryCount(); _deliveredCount.incrementAndGet(); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) { @@ -643,10 +650,68 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } - void release(QueueEntry entry) + void release(QueueEntry entry, boolean setRedelivered) { - entry.setRedelivered(); - entry.release(); + boolean maxDeliveryLimitExceeded = false; + if (setRedelivered) + { + entry.setRedelivered(); + maxDeliveryLimitExceeded = isMaxDeliveryLimitExceeded(entry); + } + else + { + entry.decrementDeliveryCount(); + } + + if (maxDeliveryLimitExceeded) + { + sendToDLQOrDiscard(entry); + } + else + { + entry.release(); + } + } + + protected void sendToDLQOrDiscard(QueueEntry entry) + { + final Exchange alternateExchange = entry.getQueue().getAlternateExchange(); + final LogActor logActor = CurrentActor.get(); + final ServerMessage msg = entry.getMessage(); + if (alternateExchange != null) + { + final InboundMessage m = new InboundMessageAdapter(entry); + + final ArrayList destinationQueues = alternateExchange.route(m); + + if (destinationQueues == null || destinationQueues.isEmpty()) + { + entry.discard(); + + logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); + } + else + { + entry.routeToAlternate(); + + //output operational logging for each delivery post commit + for (final BaseQueue destinationQueue : destinationQueues) + { + logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString())); + } + } + } + else + { + entry.discard(); + logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey())); + } + } + + private boolean isMaxDeliveryLimitExceeded(QueueEntry entry) + { + final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); + return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); } public void queueDeleted(AMQQueue queue) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 7031502e34..ac95750e66 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -93,7 +93,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { public void onAccept(); - public void onRelease(); + public void onRelease(boolean setRedelivered); public void onReject(); @@ -230,13 +230,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } - public void release(RangeSet ranges) + public void release(RangeSet ranges, final boolean setRedelivered) { dispositionChange(ranges, new MessageDispositionAction() { public void performAction(MessageDispositionChangeListener listener) { - listener.onRelease(); + listener.onRelease(setRedelivered); } }); } @@ -350,7 +350,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi _transaction.rollback(); for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) { - listener.onRelease(); + listener.onRelease(false); } _messageDispositionListenerMap.clear(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 754a233907..a0dca53ed0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -148,7 +148,7 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageRelease(Session session, MessageRelease method) { - ((ServerSession)session).release(method.getTransfers()); + ((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered()); } @Override diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 7bde470c8e..dde020a750 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,11 +27,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; import javax.jms.JMSException; @@ -402,6 +405,10 @@ public class AMQSession_0_10 extends AMQSession deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + for (Iterator prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + flushProcessed(all, false); + getQpidSession().messageRelease(delivered, Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + // We need to sync so that we get notify of an error. sync(); } - private RangeSet gatherUnackedRangeSet() + private RangeSet gatherRangeSet(ConcurrentLinkedQueue messageTags) { RangeSet ranges = new RangeSet(); while (true) { - Long tag = _unacknowledgedMessageTags.poll(); + Long tag = messageTags.poll(); if (tag == null) { break; @@ -504,7 +525,14 @@ public class AMQSession_0_10 extends AMQSession 0 ) + if(ranges.size() > 0 ) { - messageAcknowledge(range, true); + messageAcknowledge(ranges, true); getQpidSession().sync(); } } @@ -1343,6 +1371,7 @@ public class AMQSession_0_10 extends AMQSession