summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-29 12:54:06 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-29 12:54:06 +0000
commit5f303af110e3696845372b5cb657c8d26a688f8e (patch)
treeef7f825a49a3bd68a96b05f400bed125425387b9 /java
parent7000de985ebcde725045642ab7e7fa57038170f7 (diff)
downloadqpid-python-5f303af110e3696845372b5cb657c8d26a688f8e.tar.gz
Temp fix out of order issue with async(sub)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@661324 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java19
3 files changed, 16 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 847c8b8459..a302a5b503 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -831,7 +831,7 @@ public class AMQChannel
// may need to deliver queued messages
for (Subscription s : _tag2SubscriptionMap.values())
{
- s.getQueue().deliverAsync(s);
+ s.getQueue().deliverAsync();
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 570bd97a28..f3e4e7c28b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -152,6 +152,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void deliverAsync(final Subscription sub);
+ void deliverAsync();
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 95cfa8d36c..efa9e9180d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -245,12 +245,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new ExistingExclusiveSubscription();
}
- if(exclusive && getConsumerCount() != 0)
+ if(exclusive)
{
- throw new ExistingSubscriptionPreventsExclusive();
+ if(getConsumerCount() != 0)
+ {
+ throw new ExistingSubscriptionPreventsExclusive();
+ }
+ else
+ {
+ _exclusiveSubscriber = subscription;
+
+ }
}
- setExclusiveSubscriber(subscription);
_activeSubscriberCount.incrementAndGet();
subscription.setStateListener(this);
@@ -271,7 +278,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- deliverAsync(subscription);
+ deliverAsync();
}
@@ -765,7 +772,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_activeSubscriberCount.incrementAndGet();
}
- deliverAsync(sub);
+ deliverAsync();
}
}
@@ -1655,7 +1662,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
- deliverAsync(_sub);
+ deliverAsync();
}
}
} \ No newline at end of file