diff options
| author | Robert Greig <rgreig@apache.org> | 2007-09-22 22:05:30 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2007-09-22 22:05:30 +0000 |
| commit | 277f7b4c50c2152f10751cb1a240199fefdde594 (patch) | |
| tree | f3fe639277d19e47c273f078261a8731deb042d4 /java | |
| parent | 12ee62df0feac9aa67b82ed69034a38c2fcaaa95 (diff) | |
| download | qpid-python-277f7b4c50c2152f10751cb1a240199fefdde594.tar.gz | |
QPID-609 : dispatcher thread was being restarted by the code that closed the consumer due to the receipt of a basic.cancel frame. Move the dispatcher shutdown to the end of the consumer close process. Also rename the dispatcher _closed field since it clashes with a field in the container class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@578509 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 21 |
1 files changed, 10 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d47ed47702..a53bd3384e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -673,11 +673,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { _logger.info("Dispatcher is null so created stopped dispatcher"); - startDistpatcherIfNecessary(true); } - _dispatcher.rejectPending(consumer); + _dispatcher.rejectPending(consumer); } else { @@ -1954,11 +1953,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private void closeConsumers(Throwable error) throws JMSException { - if (_dispatcher != null) - { - _dispatcher.close(); - _dispatcher = null; - } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); @@ -1977,6 +1971,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } // at this point the _consumers map will be empty + if (_dispatcher != null) + { + _dispatcher.close(); + _dispatcher = null; + } } /** @@ -2567,7 +2566,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ - private final AtomicBoolean _closed = new AtomicBoolean(false); + private final AtomicBoolean _dispatcherClosed = new AtomicBoolean(false); private final Object _lock = new Object(); private final AtomicLong _rollbackMark = new AtomicLong(-1); @@ -2583,7 +2582,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void close() { - _closed.set(true); + _dispatcherClosed.set(true); interrupt(); // fixme awaitTermination @@ -2678,7 +2677,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - while (!_closed.get()) + while (!_dispatcherClosed.get()) { message = (UnprocessedMessage) _queue.poll(1000, TimeUnit.MILLISECONDS); if (message != null) @@ -2768,7 +2767,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } // Don't reject if we're already closing - if (!_closed.get()) + if (!_dispatcherClosed.get()) { rejectMessage(message, true); } |
