diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-29 12:54:06 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-29 12:54:06 +0000 |
| commit | 5f303af110e3696845372b5cb657c8d26a688f8e (patch) | |
| tree | ef7f825a49a3bd68a96b05f400bed125425387b9 /java | |
| parent | 7000de985ebcde725045642ab7e7fa57038170f7 (diff) | |
| download | qpid-python-5f303af110e3696845372b5cb657c8d26a688f8e.tar.gz | |
Temp fix out of order issue with async(sub)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@661324 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 16 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 847c8b8459..a302a5b503 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -831,7 +831,7 @@ public class AMQChannel // may need to deliver queued messages for (Subscription s : _tag2SubscriptionMap.values()) { - s.getQueue().deliverAsync(s); + s.getQueue().deliverAsync(); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 570bd97a28..f3e4e7c28b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -152,6 +152,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void deliverAsync(final Subscription sub); + void deliverAsync(); + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 95cfa8d36c..efa9e9180d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -245,12 +245,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new ExistingExclusiveSubscription(); } - if(exclusive && getConsumerCount() != 0) + if(exclusive) { - throw new ExistingSubscriptionPreventsExclusive(); + if(getConsumerCount() != 0) + { + throw new ExistingSubscriptionPreventsExclusive(); + } + else + { + _exclusiveSubscriber = subscription; + + } } - setExclusiveSubscriber(subscription); _activeSubscriberCount.incrementAndGet(); subscription.setStateListener(this); @@ -271,7 +278,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - deliverAsync(subscription); + deliverAsync(); } @@ -765,7 +772,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _activeSubscriberCount.incrementAndGet(); } - deliverAsync(sub); + deliverAsync(); } } @@ -1655,7 +1662,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) { entry.removeStateChangeListener(this); - deliverAsync(_sub); + deliverAsync(); } } }
\ No newline at end of file |
