diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-14 08:21:37 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-14 08:21:37 +0000 |
| commit | c5b44d59395ba7ebe8c84ce6461c4e39a0e5b99a (patch) | |
| tree | ecd23c5206e43d23936b62da6a790fc66d0c69c1 /qpid/java/client | |
| parent | b4b42b55d78674660d31609c40022988ddb8e318 (diff) | |
| download | qpid-python-c5b44d59395ba7ebe8c84ce6461c4e39a0e5b99a.tar.gz | |
QPID-346 Message loss after rollback/recover
Messages were still occasionally being sent twice.
AMQChannel - added trace level logging that will show an error if the same message is attempted to be sent to the same client.
AMQMessage - Remove logic that says the same subscriber can take always 'take' the message.
SubscriptionImpl - Release message when it is put back on to the resendQueue this will allow it to be re-'taken'
AMQSession - Added method to Dispatcher to clean up incomming _queue to try and prevent messages arriving for closed consumers.
BasicMessageConsumer - added comments
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@507433 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 66 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 5 |
2 files changed, 69 insertions, 2 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e475270ecd..783678f67c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -47,6 +47,7 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; +import java.util.LinkedList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -289,6 +290,61 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + + /** + * The dispatcher should be stopped when calling this. + * + * @param consumerTag + */ + public void removePending(String consumerTag) + { + + synchronized (_lock) + { + boolean stopped = connectionStopped(); + + _dispatcher.setConnectionStopped(false); + + LinkedList<UnprocessedMessage> tmpList = new LinkedList<UnprocessedMessage>(); + + while (_queue.size() != 0) + { + UnprocessedMessage message = null; + try + { + message = (UnprocessedMessage) _queue.take(); + + if (!message.deliverBody.consumerTag.equals(consumerTag)) + { + tmpList.add(message); + } + else + { + _logger.error("Pruned pending message for consumer:" + consumerTag); + } + } + catch (InterruptedException e) + { + _logger.error("Interrupted whilst taking message"); + } + } + + if (!tmpList.isEmpty()) + { + _logger.error("Tmp list is not empty"); + } + + for (UnprocessedMessage msg : tmpList) + { + _queue.add(msg); + } + + if (stopped) + { + _dispatcher.setConnectionStopped(stopped); + } + } + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, @@ -599,8 +655,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) { - // we pass null since this is not an error case - closeProducersAndConsumers(null); try { @@ -618,6 +672,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully + // we pass null since this is not an error case + closeProducersAndConsumers(null); + } catch (AMQException e) { @@ -1784,7 +1841,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ void deregisterConsumer(BasicMessageConsumer consumer) { + //need to clear pending messages from session _queue that the dispatcher will handle + // or we will get + // _dispatcher.removePending(consumer.getConsumerTag()); + _consumers.remove(consumer.getConsumerTag()); + String subscriptionName = _reverseSubscriptionMap.remove(consumer); if (subscriptionName != null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 058afab605..1607326e47 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -481,8 +481,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + + //this will remove consumer from _consumers map deregisterConsumer(); + + // clears unacks from this consumer _unacknowledgedDeliveryTags.clear(); + if (_messageListener != null && _receiving.get()) { _logger.info("Interrupting thread: " + _receivingThread); |
