summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-09-07 16:15:44 +0000
committerRobert Gemmell <robbie@apache.org>2011-09-07 16:15:44 +0000
commit5d73fc1d377d1884d275d73ffa670eca362c6bce (patch)
tree32bcc544cdf4be0601f105f15e26507d9fca1e08 /qpid/java
parentb45b258b585f68952fa58f9b31ff1ea444eb5574 (diff)
downloadqpid-python-5d73fc1d377d1884d275d73ffa670eca362c6bce.tar.gz
QPID-2901: fixes racing conditions on broker side when TopicDeletePolicy is closing the consumer Session but the broker is trying to deliver next message to the Subscription of this closing Session.
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1166246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
3 files changed, 16 insertions, 2 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 8a5f9ffef5..c5d6bc203c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -194,7 +194,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public boolean isSuspended()
{
- return !isActive() || _deleted.get(); // TODO check for Session suspension
+ return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
}
public boolean hasInterest(QueueEntry entry)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 65790e2e6f..12ef125b2e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -677,4 +677,18 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
getChannel())
+ "] ";
}
+
+ @Override
+ public void close()
+ {
+ // unregister subscriptions in order to prevent sending of new messages
+ // to subscriptions with closing session
+ final Collection<Subscription_0_10> subscriptions = getSubscriptions();
+ for (Subscription_0_10 subscription_0_10 : subscriptions)
+ {
+ unregister(subscription_0_10);
+ }
+
+ super.close();
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index c38188cdc3..d9194a3408 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1183,7 +1183,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (code != null)
{
- je = new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": " + cause);
+ je = new JMSException("Exception thrown against " + toString() + ": " + cause, Integer.toString(code.getCode()));
}
else
{