diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-04-17 16:19:59 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-04-17 16:19:59 +0000 |
| commit | d34ea820d8617b44f4118c7d3bceedf41280819f (patch) | |
| tree | 6ceee8983b33e8895a61845071d9b79eb405dcb0 /java/client/src/main | |
| parent | 032eec1e2a1a73fce899c2def66a5280882f1194 (diff) | |
| download | qpid-python-d34ea820d8617b44f4118c7d3bceedf41280819f.tar.gz | |
QPID-455 Prefetched messages can cause problems with client tools.
AMQSession - suspend channel at startup until start() and recieve/setMessageListener are called.
BasicMessageConsumer - mainly style sheet changes
MessageListenerMultiConsumerTest - removed one test case as we cannot ensure round-robin effect at start up .. added test case for only c2 consuming when c1 does nothing.
MessageListenerTest - added new test that can demonstrate a further bug of message 'loss' when a receive is called only once before a message listener is set. Prefetched message end up on _SynchronousQueue regression of QPID-293 as of r501004.
MessageRequeueTest - Was missing a conn.start()
DurableSubscriptionTest - Removed blocking receives() so we don't block on failure
CommitRollbackTest - Text message was wrong on testGetThenDisconnect tests so adjusted
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@529666 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 42 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 125 |
2 files changed, 101 insertions, 66 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d8d15d22c5..118b13cdba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -202,6 +202,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); + private AtomicBoolean _firstDispatcher = new AtomicBoolean(true); private class Dispatcher extends Thread { @@ -328,7 +329,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } // Don't reject if we're already closing - if(!_closed.get()) + if (!_closed.get()) { rejectMessage(message, true); } @@ -999,7 +1000,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public BasicMessageProducer createProducer(Destination destination, boolean mandatory, - boolean immediate, boolean waitUntilSent) + boolean immediate, boolean waitUntilSent) throws JMSException { return createProducerImpl(destination, mandatory, immediate, waitUntilSent); @@ -1023,14 +1024,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, - boolean immediate) + boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate, false); } private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent) + final boolean immediate, final boolean waitUntilSent) throws JMSException { return (BasicMessageProducer) new FailoverSupport() @@ -1947,6 +1948,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _dispatcher.setConnectionStopped(initiallyStopped); } + + if (!AMQSession.this._closed.get() + && AMQSession.this._startedAtLeastOnce.get() + && _firstDispatcher.getAndSet(false)) + { + if (isSuspended()) + { + try + { + suspendChannel(false); + } + catch (AMQException e) + { + _logger.info("Suspending channel threw an exception:" + e); + } + } + } + } void stop() throws AMQException @@ -1979,6 +1998,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); + if (_dispatcher == null) + { + if (!isSuspended()) + { + try + { + suspendChannel(true); + } + catch (AMQException e) + { + _logger.info("Suspending channel threw an exception:" + e); + } + } + } + try { consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 38c1cd8205..1fd95cacd6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -140,9 +140,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private List<StackTraceElement> _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, - String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -219,7 +219,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " - + _destination); + + _destination); } } else @@ -468,7 +468,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " close():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -481,9 +481,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { // TODO: Be aware of possible changes to parameter order as versions change. final AMQFrame cancelFrame = - BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag - false); // nowait + BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag + false); // nowait try { @@ -498,6 +498,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer catch (AMQException e) { // _logger.error("Error closing consumer: " + e, e); + e.printStackTrace(); JMSException jmse = new JMSException("Error closing consumer: " + e); jmse.setLinkedException(e); throw jmse; @@ -540,7 +541,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " markClosed():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -572,9 +573,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); + _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, + messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, + messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); if (debug) { @@ -659,15 +660,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - break; + case Session.PRE_ACKNOWLEDGE: + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); - break; + case Session.CLIENT_ACKNOWLEDGE: + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + break; } } @@ -677,55 +678,55 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.CLIENT_ACKNOWLEDGE: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } + case Session.DUPS_OK_ACKNOWLEDGE: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } - if (_outstanding <= _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } - if (_dups_ok_acknowledge_send) - { - if (!_session.isInRecovery()) + if (_dups_ok_acknowledge_send) { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), true); + } } - } - break; + break; - case Session.AUTO_ACKNOWLEDGE: - // we do not auto ack a message if the application code called recover() - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.AUTO_ACKNOWLEDGE: + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - _receivedDeliveryTags.add(msg.getDeliveryTag()); - } + case Session.SESSION_TRANSACTED: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _receivedDeliveryTags.add(msg.getDeliveryTag()); + } - break; + break; } } @@ -757,7 +758,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " notifyError():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously" + _closedStack.toString()); } else @@ -877,7 +878,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" - + "for consumer with tag:" + _consumerTag); + + "for consumer with tag:" + _consumerTag); } Long tag = _receivedDeliveryTags.poll(); @@ -907,7 +908,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" - + "for consumer with tag:" + _consumerTag); + + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); @@ -931,7 +932,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer else { _logger.error("Queue contained a :" + o.getClass() - + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); iterator.remove(); } } |
