diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-08-08 22:56:17 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-08-08 22:56:17 +0000 |
| commit | 1fee7faf7d77814cf95b757d8d78591e287eef08 (patch) | |
| tree | a1e8a5d58c964b31255d398ffb95a4d1b6bca657 | |
| parent | fafbea7f0e3109edf8a25a3ac40d57c7c52d4c73 (diff) | |
| download | qpid-python-1fee7faf7d77814cf95b757d8d78591e287eef08.tar.gz | |
QPID-3387: use the subscription ID to track rejection rather than the subscription itself
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1155138 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 45 insertions, 33 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 88349586c3..be29245901 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -202,9 +202,7 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable void reject(); - void reject(Subscription subscription); - - boolean isRejectedBy(Subscription subscription); + boolean isRejectedBy(long subscriptionId); void dequeue(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index e951b3dfd4..5b57e40a82 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -51,7 +51,7 @@ public class QueueEntryImpl implements QueueEntry private MessageReference _message; - private Set<Subscription> _rejectedBy = null; + private Set<Long> _rejectedBy = null; private volatile EntryState _state = AVAILABLE_STATE; @@ -325,19 +325,16 @@ public class QueueEntryImpl implements QueueEntry public void reject() { - reject(getDeliveredSubscription()); - } + Subscription subscription = getDeliveredSubscription(); - public void reject(Subscription subscription) - { if (subscription != null) { if (_rejectedBy == null) { - _rejectedBy = new HashSet<Subscription>(); + _rejectedBy = new HashSet<Long>(); } - _rejectedBy.add(subscription); + _rejectedBy.add(subscription.getSubscriptionID()); } else { @@ -345,12 +342,12 @@ public class QueueEntryImpl implements QueueEntry } } - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(long subscriptionId) { if (_rejectedBy != null) // We have subscriptions that rejected this message { - return _rejectedBy.contains(subscription); + return _rejectedBy.contains(subscriptionId); } else // This messasge hasn't been rejected yet. { diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index b791596054..d6a256e2e1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -475,7 +475,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage //check that the message hasn't been rejected - if (entry.isRejectedBy(this)) + if (entry.isRejectedBy(getSubscriptionID())) { if (_logger.isDebugEnabled()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 309879c17c..1bfc2205f2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -203,7 +203,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr //check that the message hasn't been rejected - if (entry.isRejectedBy(this)) + if (entry.isRejectedBy(getSubscriptionID())) { return false; diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 44da761353..6db1560fb7 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -428,21 +428,11 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase //To change body of implemented methods use File | Settings | File Templates. } - public void reject(Subscription subscription) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(long subscriptionId) { return false; //To change body of implemented methods use File | Settings | File Templates. } - public void requeue(Subscription subscription) - { - //To change body of implemented methods use File | Settings | File Templates. - } - public void dequeue() { //To change body of implemented methods use File | Settings | File Templates. diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index a3f8f4c0d3..ab8850c18c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -139,7 +139,7 @@ public class MockQueueEntry implements QueueEntry } - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(long subscriptionId) { return false; @@ -153,13 +153,6 @@ public class MockQueueEntry implements QueueEntry } - public void reject(Subscription subscription) - { - - - } - - public void release() { diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java index 0899f25cc5..d8afd8d829 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.queue.QueueEntry.EntryState; import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.subscription.Subscription; /** * Tests for {@link QueueEntryImpl} @@ -210,4 +211,37 @@ public class QueueEntryImplTest extends TestCase } return state; } + + /** + * Tests rejecting a queue entry records the Subscription ID + * for later verification by isRejectedBy(subscriptionId). + */ + public void testRejectAndRejectedBy() + { + Subscription sub = new MockSubscription(); + long subId = sub.getSubscriptionID(); + + assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired()); + + //acquire, reject, and release the message using the subscription + assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub)); + _queueEntry.reject(); + _queueEntry.release(); + + //verify the rejection is recorded + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + + //repeat rejection using a second subscription + Subscription sub2 = new MockSubscription(); + long sub2Id = sub2.getSubscriptionID(); + + assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); + assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2)); + _queueEntry.reject(); + + //verify it still records being rejected by both subscriptions + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); + } } |
