From d9f4fdcab179600c43d86fd07949d1c5ecbb1767 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Thu, 26 Mar 2015 23:44:39 +0000 Subject: QPID-6466: [Java Client] Avoid possibilty that the dispatcher and IO thread can reject/release the same message during consumer close git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1669472 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 68 ++++++++++------------ .../client/util/FlowControllingBlockingQueue.java | 59 +++++++++++-------- 2 files changed, 68 insertions(+), 59 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 6c43396481..0ca3505053 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -890,16 +890,17 @@ public abstract class AMQSession messages = _queue.iterator(); if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" - + requeue); + _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ")"); if (messages.hasNext()) { @@ -3104,21 +3110,23 @@ public abstract class AMQSession return _queue.isEmpty(); } + public interface ThresholdListener { void aboveThreshold(int currentValue); @@ -104,14 +105,7 @@ public class FlowControllingBlockingQueue if (o != null && !disableFlowControl && _listener != null) { - synchronized (_listener) - { - if (_count-- == _flowControlLowThreshold) - { - _listener.underThreshold(_count); - } - } - + reportBelowIfNecessary(); } return o; @@ -132,14 +126,7 @@ public class FlowControllingBlockingQueue } if (!disableFlowControl && _listener != null) { - synchronized (_listener) - { - if (_count-- == _flowControlLowThreshold) - { - _listener.underThreshold(_count); - } - } - + reportBelowIfNecessary(); } return o; @@ -155,18 +142,44 @@ public class FlowControllingBlockingQueue } if (!disableFlowControl && _listener != null) { - synchronized (_listener) - { - if (++_count == _flowControlHighThreshold) - { - _listener.aboveThreshold(_count); - } - } + reportAboveIfNecessary(); } } + public boolean remove(final T o) + { + final boolean removed = _queue.remove(o); + if (removed && !disableFlowControl && _listener != null) + { + reportBelowIfNecessary(); + } + return removed; + } + public Iterator iterator() { return _queue.iterator(); } + + private void reportAboveIfNecessary() + { + synchronized (_listener) + { + if (++_count == _flowControlHighThreshold) + { + _listener.aboveThreshold(_count); + } + } + } + + private void reportBelowIfNecessary() + { + synchronized (_listener) + { + if (_count-- == _flowControlLowThreshold) + { + _listener.underThreshold(_count); + } + } + } } -- cgit v1.2.1