diff options
Diffstat (limited to 'qpid/java/client')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 27 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 5 |
2 files changed, 11 insertions, 21 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ce624cb91b..55d3ccb6e7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -3212,28 +3212,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void rejectPending(C consumer) { - synchronized (_lock) - { - boolean stopped = connectionStopped(); + // Reject messages on pre-receive queue + consumer.rollbackPendingMessages(); - if (!stopped) - { - setConnectionStopped(true); - } + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); - // Reject messages on pre-receive queue - consumer.rollbackPendingMessages(); + // closeConsumer + consumer.markClosed(); - // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); - //Let the dispatcher deal with this when it gets to them. - - // closeConsumer - consumer.markClosed(); - - setConnectionStopped(stopped); - - } } public void rollback() @@ -3425,7 +3412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { final C consumer = _consumers.get(message.getConsumerTag()); - if ((consumer == null) || consumer.isClosed()) + if ((consumer == null) || consumer.isClosed() || consumer.isClosing()) { if (_dispatcherLogger.isInfoEnabled()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0d717a3216..0f8b5717d6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -593,7 +593,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa // no point otherwise as the connection will be gone if (!_session.isClosed() || _session.isClosing()) { - sendCancel(); + synchronized(_session.getMessageDeliveryLock()) + { + sendCancel(); + } cleanupQueue(); } } |
