summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-08-08 22:56:17 +0000
committerRobert Gemmell <robbie@apache.org>2011-08-08 22:56:17 +0000
commit1fee7faf7d77814cf95b757d8d78591e287eef08 (patch)
treea1e8a5d58c964b31255d398ffb95a4d1b6bca657
parentfafbea7f0e3109edf8a25a3ac40d57c7c52d4c73 (diff)
downloadqpid-python-1fee7faf7d77814cf95b757d8d78591e287eef08.tar.gz
QPID-3387: use the subscription ID to track rejection rather than the subscription itself
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1155138 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java12
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java9
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java34
7 files changed, 45 insertions, 33 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 88349586c3..be29245901 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -202,9 +202,7 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
void reject();
- void reject(Subscription subscription);
-
- boolean isRejectedBy(Subscription subscription);
+ boolean isRejectedBy(long subscriptionId);
void dequeue();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index e951b3dfd4..5b57e40a82 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -51,7 +51,7 @@ public class QueueEntryImpl implements QueueEntry
private MessageReference _message;
- private Set<Subscription> _rejectedBy = null;
+ private Set<Long> _rejectedBy = null;
private volatile EntryState _state = AVAILABLE_STATE;
@@ -325,19 +325,16 @@ public class QueueEntryImpl implements QueueEntry
public void reject()
{
- reject(getDeliveredSubscription());
- }
+ Subscription subscription = getDeliveredSubscription();
- public void reject(Subscription subscription)
- {
if (subscription != null)
{
if (_rejectedBy == null)
{
- _rejectedBy = new HashSet<Subscription>();
+ _rejectedBy = new HashSet<Long>();
}
- _rejectedBy.add(subscription);
+ _rejectedBy.add(subscription.getSubscriptionID());
}
else
{
@@ -345,12 +342,12 @@ public class QueueEntryImpl implements QueueEntry
}
}
- public boolean isRejectedBy(Subscription subscription)
+ public boolean isRejectedBy(long subscriptionId)
{
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
- return _rejectedBy.contains(subscription);
+ return _rejectedBy.contains(subscriptionId);
}
else // This messasge hasn't been rejected yet.
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index b791596054..d6a256e2e1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -475,7 +475,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
//check that the message hasn't been rejected
- if (entry.isRejectedBy(this))
+ if (entry.isRejectedBy(getSubscriptionID()))
{
if (_logger.isDebugEnabled())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 309879c17c..1bfc2205f2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -203,7 +203,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
//check that the message hasn't been rejected
- if (entry.isRejectedBy(this))
+ if (entry.isRejectedBy(getSubscriptionID()))
{
return false;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 44da761353..6db1560fb7 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -428,21 +428,11 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public void reject(Subscription subscription)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isRejectedBy(Subscription subscription)
+ public boolean isRejectedBy(long subscriptionId)
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void requeue(Subscription subscription)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
public void dequeue()
{
//To change body of implemented methods use File | Settings | File Templates.
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index a3f8f4c0d3..ab8850c18c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -139,7 +139,7 @@ public class MockQueueEntry implements QueueEntry
}
- public boolean isRejectedBy(Subscription subscription)
+ public boolean isRejectedBy(long subscriptionId)
{
return false;
@@ -153,13 +153,6 @@ public class MockQueueEntry implements QueueEntry
}
- public void reject(Subscription subscription)
- {
-
-
- }
-
-
public void release()
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
index 0899f25cc5..d8afd8d829 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry.EntryState;
import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.subscription.Subscription;
/**
* Tests for {@link QueueEntryImpl}
@@ -210,4 +211,37 @@ public class QueueEntryImplTest extends TestCase
}
return state;
}
+
+ /**
+ * Tests rejecting a queue entry records the Subscription ID
+ * for later verification by isRejectedBy(subscriptionId).
+ */
+ public void testRejectAndRejectedBy()
+ {
+ Subscription sub = new MockSubscription();
+ long subId = sub.getSubscriptionID();
+
+ assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+ assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired());
+
+ //acquire, reject, and release the message using the subscription
+ assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
+ _queueEntry.reject();
+ _queueEntry.release();
+
+ //verify the rejection is recorded
+ assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+
+ //repeat rejection using a second subscription
+ Subscription sub2 = new MockSubscription();
+ long sub2Id = sub2.getSubscriptionID();
+
+ assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+ assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
+ _queueEntry.reject();
+
+ //verify it still records being rejected by both subscriptions
+ assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+ assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+ }
}