summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-09-22 22:05:30 +0000
committerRobert Greig <rgreig@apache.org>2007-09-22 22:05:30 +0000
commit277f7b4c50c2152f10751cb1a240199fefdde594 (patch)
treef3fe639277d19e47c273f078261a8731deb042d4 /java
parent12ee62df0feac9aa67b82ed69034a38c2fcaaa95 (diff)
downloadqpid-python-277f7b4c50c2152f10751cb1a240199fefdde594.tar.gz
QPID-609 : dispatcher thread was being restarted by the code that closed the consumer due to the receipt of a basic.cancel frame. Move the dispatcher shutdown to the end of the consumer close process. Also rename the dispatcher _closed field since it clashes with a field in the container class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@578509 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java21
1 files changed, 10 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d47ed47702..a53bd3384e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -673,11 +673,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
else
{
_logger.info("Dispatcher is null so created stopped dispatcher");
-
startDistpatcherIfNecessary(true);
}
- _dispatcher.rejectPending(consumer);
+ _dispatcher.rejectPending(consumer);
}
else
{
@@ -1954,11 +1953,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private void closeConsumers(Throwable error) throws JMSException
{
- if (_dispatcher != null)
- {
- _dispatcher.close();
- _dispatcher = null;
- }
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
@@ -1977,6 +1971,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
// at this point the _consumers map will be empty
+ if (_dispatcher != null)
+ {
+ _dispatcher.close();
+ _dispatcher = null;
+ }
}
/**
@@ -2567,7 +2566,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
/** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
- private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final AtomicBoolean _dispatcherClosed = new AtomicBoolean(false);
private final Object _lock = new Object();
private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -2583,7 +2582,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void close()
{
- _closed.set(true);
+ _dispatcherClosed.set(true);
interrupt();
// fixme awaitTermination
@@ -2678,7 +2677,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
- while (!_closed.get())
+ while (!_dispatcherClosed.get())
{
message = (UnprocessedMessage) _queue.poll(1000, TimeUnit.MILLISECONDS);
if (message != null)
@@ -2768,7 +2767,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
// Don't reject if we're already closing
- if (!_closed.get())
+ if (!_dispatcherClosed.get())
{
rejectMessage(message, true);
}