summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-17 16:19:59 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-17 16:19:59 +0000
commitd34ea820d8617b44f4118c7d3bceedf41280819f (patch)
tree6ceee8983b33e8895a61845071d9b79eb405dcb0 /java/client/src/main
parent032eec1e2a1a73fce899c2def66a5280882f1194 (diff)
downloadqpid-python-d34ea820d8617b44f4118c7d3bceedf41280819f.tar.gz
QPID-455 Prefetched messages can cause problems with client tools.
AMQSession - suspend channel at startup until start() and recieve/setMessageListener are called. BasicMessageConsumer - mainly style sheet changes MessageListenerMultiConsumerTest - removed one test case as we cannot ensure round-robin effect at start up .. added test case for only c2 consuming when c1 does nothing. MessageListenerTest - added new test that can demonstrate a further bug of message 'loss' when a receive is called only once before a message listener is set. Prefetched message end up on _SynchronousQueue regression of QPID-293 as of r501004. MessageRequeueTest - Was missing a conn.start() DurableSubscriptionTest - Removed blocking receives() so we don't block on failure CommitRollbackTest - Text message was wrong on testGetThenDisconnect tests so adjusted git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@529666 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java42
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java125
2 files changed, 101 insertions, 66 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d8d15d22c5..118b13cdba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -202,6 +202,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+ private AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
private class Dispatcher extends Thread
{
@@ -328,7 +329,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
// Don't reject if we're already closing
- if(!_closed.get())
+ if (!_closed.get())
{
rejectMessage(message, true);
}
@@ -999,7 +1000,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
public BasicMessageProducer createProducer(Destination destination, boolean mandatory,
- boolean immediate, boolean waitUntilSent)
+ boolean immediate, boolean waitUntilSent)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
@@ -1023,14 +1024,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory,
- boolean immediate)
+ boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, false);
}
private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent)
+ final boolean immediate, final boolean waitUntilSent)
throws JMSException
{
return (BasicMessageProducer) new FailoverSupport()
@@ -1947,6 +1948,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
_dispatcher.setConnectionStopped(initiallyStopped);
}
+
+ if (!AMQSession.this._closed.get()
+ && AMQSession.this._startedAtLeastOnce.get()
+ && _firstDispatcher.getAndSet(false))
+ {
+ if (isSuspended())
+ {
+ try
+ {
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
+ }
+ }
+ }
+
}
void stop() throws AMQException
@@ -1979,6 +1998,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
+ if (_dispatcher == null)
+ {
+ if (!isSuspended())
+ {
+ try
+ {
+ suspendChannel(true);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
+ }
+ }
+ }
+
try
{
consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 38c1cd8205..1fd95cacd6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -140,9 +140,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
- String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -219,7 +219,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
_logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination "
- + _destination);
+ + _destination);
}
}
else
@@ -468,7 +468,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_closedStack != null)
{
_logger.trace(_consumerTag + " close():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -481,9 +481,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
// TODO: Be aware of possible changes to parameter order as versions change.
final AMQFrame cancelFrame =
- BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
- false); // nowait
+ BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
+ false); // nowait
try
{
@@ -498,6 +498,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
catch (AMQException e)
{
// _logger.error("Error closing consumer: " + e, e);
+ e.printStackTrace();
JMSException jmse = new JMSException("Error closing consumer: " + e);
jmse.setLinkedException(e);
throw jmse;
@@ -540,7 +541,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_closedStack != null)
{
_logger.trace(_consumerTag + " markClosed():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -572,9 +573,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
- messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
- messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
+ _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
+ messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
+ messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
if (debug)
{
@@ -659,15 +660,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
- case Session.PRE_ACKNOWLEDGE:
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- break;
+ case Session.PRE_ACKNOWLEDGE:
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ break;
- case Session.CLIENT_ACKNOWLEDGE:
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
- break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ break;
}
}
@@ -677,55 +678,55 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
- break;
+ break;
- case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetchHigh)
- {
- _dups_ok_acknowledge_send = true;
- }
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
- if (_outstanding <= _prefetchLow)
- {
- _dups_ok_acknowledge_send = false;
- }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
- if (_dups_ok_acknowledge_send)
- {
- if (!_session.isInRecovery())
+ if (_dups_ok_acknowledge_send)
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ }
}
- }
- break;
+ break;
- case Session.AUTO_ACKNOWLEDGE:
- // we do not auto ack a message if the application code called recover()
- if (!_session.isInRecovery())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ case Session.AUTO_ACKNOWLEDGE:
+ // we do not auto ack a message if the application code called recover()
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
- break;
+ break;
- case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _receivedDeliveryTags.add(msg.getDeliveryTag());
- }
+ case Session.SESSION_TRANSACTED:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
+ }
- break;
+ break;
}
}
@@ -757,7 +758,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_closedStack != null)
{
_logger.trace(_consumerTag + " notifyError():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously" + _closedStack.toString());
}
else
@@ -877,7 +878,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
- + "for consumer with tag:" + _consumerTag);
+ + "for consumer with tag:" + _consumerTag);
}
Long tag = _receivedDeliveryTags.poll();
@@ -907,7 +908,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
- + "for consumer with tag:" + _consumerTag);
+ + "for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
@@ -931,7 +932,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
else
{
_logger.error("Queue contained a :" + o.getClass()
- + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
iterator.remove();
}
}