diff options
Diffstat (limited to 'java/client/src/main')
5 files changed, 420 insertions, 146 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 7f17dad8e6..bf812ee302 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -88,6 +89,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final FlowControllingBlockingQueue _queue; + private final java.util.Queue<MessageConsumerPair> _reprocessQueue; + private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; @@ -135,11 +138,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private volatile AtomicBoolean _stopped = new AtomicBoolean(true); /** + * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer + */ + private final AtomicBoolean _pausing = new AtomicBoolean(false); + + /** + * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer + */ + private final AtomicBoolean _paused = new AtomicBoolean(false); + + /** * Set when recover is called. This is to handle the case where recover() is called by application code * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called. */ private boolean _inRecovery; + public void doDispatcherTask(DispatcherCallback dispatcherCallback) + { + synchronized (this) + { + _dispatcher.pause(); + + dispatcherCallback.whilePaused(_reprocessQueue); + + _dispatcher.reprocess(); + } + } /** @@ -147,6 +171,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private class Dispatcher extends Thread { + private final Logger _logger = Logger.getLogger(Dispatcher.class); + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -154,23 +180,105 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void run() { - UnprocessedMessage message; _stopped.set(false); + + while (!_stopped.get()) + { + if (_pausing.get()) + { + try + { + //Wait for unpausing + synchronized (_pausing) + { + synchronized (_paused) + { + _paused.notify(); + } + + _logger.info("dispatcher paused"); + + _pausing.wait(); + _logger.info("dispatcher notified"); + } + + } + catch (InterruptedException e) + { + //do nothing... occurs when a pause request occurs will already + // be here if another pause event is pending + _logger.info("dispacher interrupted"); + } + + doReDispatch(); + + } + else + { + doNormalDispatch(); + } + } + + _logger.info("Dispatcher thread terminating for channel " + _channelId); + } + + private void doNormalDispatch() + { + UnprocessedMessage message; try { - while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) + while (!_stopped.get() && !_pausing.get() && (message = (UnprocessedMessage) _queue.take()) != null) { dispatchMessage(message); } } catch (InterruptedException e) { - ; + _logger.info("dispatcher normal dispatch interrupted"); } - _logger.info("Dispatcher thread terminating for channel " + _channelId); } + private void doReDispatch() + { + _logger.info("doRedispatching"); + + MessageConsumerPair messageConsumerPair; + + if (_reprocessQueue != null) + { + _logger.info("Reprocess Queue has size:" + _reprocessQueue.size()); + while (!_stopped.get() && ((messageConsumerPair = _reprocessQueue.poll()) != null)) + { + reDispatchMessage(messageConsumerPair); + } + } + + if (_reprocessQueue == null || _reprocessQueue.isEmpty()) + { + _logger.info("Reprocess Queue emptied"); + _pausing.set(false); + } + else + { + _logger.info("Reprocess Queue still contains contains:" + _reprocessQueue.size()); + } + + } + + private void reDispatchMessage(MessageConsumerPair consumerPair) + { + if (consumerPair.getItem() instanceof AbstractJMSMessage) + { + _logger.info("do renotify:" + consumerPair.getItem()); + consumerPair.getConsumer().notifyMessage((AbstractJMSMessage) consumerPair.getItem(), _channelId); + } + + // BasicMessageConsumer.notifyError(Throwable cause) + // will put the cause in to the list which could come out here... need to watch this. + } + + private void dispatchMessage(UnprocessedMessage message) { if (message.deliverBody != null) @@ -231,6 +339,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _stopped.set(true); interrupt(); } + + public void pause() + { + _logger.info("pausing"); + _pausing.set(true); + + + interrupt(); + + synchronized (_paused) + { + try + { + _paused.wait(); + } + catch (InterruptedException e) + { + //do nothing + } + } + } + + public void reprocess() + { + synchronized (_pausing) + { + _logger.info("reprocessing"); + _pausing.notify(); + } + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, @@ -263,6 +401,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _defaultPrefetchHighMark = defaultPrefetchHighMark; _defaultPrefetchLowMark = defaultPrefetchLowMark; + _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>(); + if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, @@ -315,7 +455,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); return new JMSBytesMessage(); @@ -324,7 +464,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); return new JMSMapMessage(); @@ -338,7 +478,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); return (ObjectMessage) new JMSObjectMessage(); @@ -354,7 +494,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public StreamMessage createStreamMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); @@ -364,7 +504,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); @@ -409,7 +549,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class); + _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxCommitOkBody.class); } catch (AMQException e) { @@ -428,7 +568,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class); + TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class); } catch (AMQException e) { @@ -440,7 +580,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) @@ -451,15 +591,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { _connection.getProtocolHandler().closeSession(this); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText + (byte) 8, (byte) 0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -512,7 +652,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request @@ -653,8 +793,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false)); // requeue + (byte) 8, (byte) 0, // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -825,8 +965,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } public MessageConsumer createBrowserConsumer(Destination destination, - String messageSelector, - boolean noLocal) + String messageSelector, + boolean noLocal) throws JMSException { checkValidDestination(destination); @@ -935,11 +1075,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (AMQException e) { JMSException ex = new JMSException("Error registering consumer: " + e); + + //todo remove + e.printStackTrace(); ex.setLinkedException(e); throw ex; } - synchronized(destination) + synchronized (destination) { _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); _destinationConsumerCount.get(destination).incrementAndGet(); @@ -990,16 +1133,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - 0, // ticket - type); // type + (byte) 8, (byte) 0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + false, // nowait + false, // passive + 0, // ticket + type); // type _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); } @@ -1014,16 +1157,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - true, // nowait - false, // passive - 0, // ticket - type); // type + (byte) 8, (byte) 0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + true, // nowait + false, // passive + 0, // ticket + type); // type protocolHandler.writeFrame(exchangeDeclare); } @@ -1049,15 +1192,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - true, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + true, // nowait + false, // passive + amqd.getAMQQueueName(), // queue + 0); // ticket protocolHandler.writeFrame(queueDeclare); return amqd.getAMQQueueName(); @@ -1069,13 +1212,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - ft, // arguments - amqd.getExchangeName(), // exchange - true, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + ft, // arguments + amqd.getExchangeName(), // exchange + true, // nowait + queueName, // queue + amqd.getRoutingKey(), // routingKey + 0); // ticket protocolHandler.writeFrame(queueBind); } @@ -1098,11 +1241,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); } - if(consumer.isAutoClose()) + if (consumer.isAutoClose()) { arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); } - if(consumer.isNoConsume()) + if (consumer.isNoConsume()) { arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } @@ -1117,15 +1260,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + 0); // ticket if (nowait) { protocolHandler.writeFrame(jmsConsume); @@ -1282,9 +1425,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { AMQShortString topicName; - if(topic instanceof AMQTopic) + if (topic instanceof AMQTopic) { - topicName = ((AMQTopic)topic).getDestinationName(); + topicName = ((AMQTopic) topic).getDestinationName(); } else { @@ -1315,12 +1458,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - 0); // ticket + (byte) 8, (byte) 0, // AMQP version (major, minor) + false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + 0); // ticket _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } catch (AMQException e) @@ -1360,7 +1503,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkValidQueue(queue); - return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector); + return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1410,10 +1553,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange - queueName, // queue - routingKey); // routingKey + (byte) 8, (byte) 0, // AMQP version (major, minor) + ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange + queueName, // queue + routingKey); // routingKey AMQMethodEvent response = null; try { @@ -1474,9 +1617,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - deliveryTag, // deliveryTag - multiple); // multiple + (byte) 8, (byte) 0, // AMQP version (major, minor) + deliveryTag, // deliveryTag + multiple); // multiple if (_logger.isDebugEnabled()) { _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); @@ -1521,7 +1664,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //stop the server delivering messages to this session suspendChannel(); -//stop the dispatcher thread + //stop the dispatcher thread _stopped.set(true); } @@ -1574,7 +1717,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } Destination dest = consumer.getDestination(); - synchronized(dest) + synchronized (dest) { if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) { @@ -1639,8 +1782,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false); // active + (byte) 8, (byte) 0, // AMQP version (major, minor) + false); // active _connection.getProtocolHandler().writeFrame(channelFlowFrame); } @@ -1651,15 +1794,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - true); // active + (byte) 8, (byte) 0, // AMQP version (major, minor) + true); // active _connection.getProtocolHandler().writeFrame(channelFlowFrame); } public void confirmConsumerCancelled(AMQShortString consumerTag) { BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); - if((consumer != null) && (consumer.isAutoClose())) + if ((consumer != null) && (consumer.isAutoClose())) { consumer.closeWhenNoMessages(true); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index c5e97a27f6..b3ae54f982 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -34,6 +34,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import java.util.Iterator; +import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; @@ -63,7 +64,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * Holds an atomic reference to the listener installed. */ - private final AtomicReference _messageListener = new AtomicReference(); + private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>(); /** * The consumer tag allows us to close the consumer by sending a jmsCancel method to the @@ -78,13 +79,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * Used in the blocking receive methods to receive a message from - * the Session thread. Argument true indicates we want strict FIFO semantics + * the Session thread. + * <p/> + * Or to notify of errors + * <p/> + * Argument true indicates we want strict FIFO semantics */ private final ArrayBlockingQueue _synchronousQueue; private MessageFactoryRegistry _messageFactory; - private AMQSession _session; + private final AMQSession _session; private AMQProtocolHandler _protocolHandler; @@ -141,8 +146,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private Thread _receivingThread; /** - * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive - * on the queue. This is used for queue browsing. + * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive + * on the queue. This is used for queue browsing. */ private boolean _autoClose; private boolean _closeWhenNoMessages; @@ -179,14 +184,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public String getMessageSelector() throws JMSException { - checkPreConditions(); + checkPreConditions(); return _messageSelector; } public MessageListener getMessageListener() throws JMSException { - checkPreConditions(); - return (MessageListener) _messageListener.get(); + checkPreConditions(); + return _messageListener.get(); } public int getAcknowledgeMode() @@ -199,9 +204,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return _messageListener.get() != null; } - public void setMessageListener(MessageListener messageListener) throws JMSException + public void setMessageListener(final MessageListener messageListener) throws JMSException { - checkPreConditions(); + checkPreConditions(); //if the current listener is non-null and the session is not stopped, then //it is an error to call this method. @@ -216,7 +221,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_session.isStopped()) { _messageListener.set(messageListener); - _logger.debug("Message listener set for destination " + _destination); + _logger.debug("Session stopped : Message listener set for destination " + _destination); } else { @@ -228,18 +233,35 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started."); } + _logger.debug("Message listener set for destination " + _destination); if (messageListener != null) { - //handle case where connection has already been started, and the dispatcher is blocked - //doing a put on the _synchronousQueue - AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll(); - if (jmsMsg != null) + //handle case where connection has already been started, and the dispatcher has alreaded started + // putting values on the _synchronousQueue + + synchronized (_session) { - preApplicationProcessing(jmsMsg); - messageListener.onMessage(jmsMsg); - postDeliver(jmsMsg); + //Pause Dispatcher + _session.doDispatcherTask(new DispatcherCallback(this) + { + public void whilePaused(Queue<MessageConsumerPair> reprocessQueue) + { + // Prepend messages in _synchronousQueue to dispatcher queue + _logger.debug("ReprocessQueue current size:" + reprocessQueue.size()); + for (Object item : _synchronousQueue) + { + reprocessQueue.offer(new MessageConsumerPair(_consumer, item)); + } + _logger.debug("Added items to reprocessQueue:" + reprocessQueue.size()); + + // Set Message Listener + _logger.debug("Set Message Listener"); + _messageListener.set(messageListener); + } + } + ); } } } @@ -247,7 +269,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { - if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName()); @@ -314,13 +336,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { - checkPreConditions(); + checkPreConditions(); acquireReceiving(); try { - if(closeOnAutoClose()) + if (closeOnAutoClose()) { return null; } @@ -355,7 +377,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private boolean closeOnAutoClose() throws JMSException { - if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) + if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) { close(false); return true; @@ -368,13 +390,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { - checkPreConditions(); + checkPreConditions(); acquireReceiving(); try { - if(closeOnAutoClose()) + if (closeOnAutoClose()) { return null; } @@ -430,19 +452,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) { - if(sendClose) + if (sendClose) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - _consumerTag, // consumerTag - false); // nowait + (byte) 8, (byte) 0, // AMQP version (major, minor) + _consumerTag, // consumerTag + false); // nowait try { @@ -499,7 +521,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer messageFrame.contentHeader, messageFrame.bodies); - if(debug) + if (debug) { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); } @@ -507,6 +529,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer preDeliver(jmsMessage); + notifyMessage(jmsMessage, channelId); + } + catch (Exception e) + { + if (e instanceof InterruptedException) + { + _logger.info("SynchronousQueue.put interupted. Usually result of connection closing"); + } + else + { + _logger.error("Caught exception (dump follows) - ignoring...", e); + } + } + } + + /** + * @param jmsMessage this message has already been processed so can't redo preDeliver + * @param channelId + */ + public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId) + { + try + { if (isMessageListenerSet()) { //we do not need a lock around the test above, and the dispatch below as it is invalid @@ -517,6 +562,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { + //This shouldn't be possible. _synchronousQueue.put(jmsMessage); } } @@ -524,11 +570,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (e instanceof InterruptedException) { - _logger.info("SynchronousQueue.put interupted. Usually result of connection closing"); + _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing"); } else { - _logger.error("Caught exception (dump follows) - ignoring...", e); + _logger.error("reNotification : Caught exception (dump follows) - ignoring...", e); } } } @@ -550,7 +596,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private void postDeliver(AbstractJMSMessage msg) throws JMSException { - msg.setJMSDestination(_destination); + msg.setJMSDestination(_destination); switch (_acknowledgeMode) { case Session.CLIENT_ACKNOWLEDGE: @@ -613,6 +659,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closed.set(true); + //QPID-293 can "request redelivery of this error through dispatcher" + // we have no way of propagating the exception to a message listener - a JMS limitation - so we // deal with the case where we have a synchronous receive() waiting for a message to arrive if (!isMessageListenerSet()) @@ -626,13 +674,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer deregisterConsumer(); } + /** * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean * case and in the case of an error occurring. */ private void deregisterConsumer() { - _session.deregisterConsumer(this); + _session.deregisterConsumer(this); } public AMQShortString getConsumerTag() @@ -645,26 +694,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _consumerTag = consumerTag; } - public AMQSession getSession() { - return _session; - } + public AMQSession getSession() + { + return _session; + } - private void checkPreConditions() throws JMSException{ + private void checkPreConditions() throws JMSException + { - this.checkNotClosed(); + this.checkNotClosed(); - if(_session == null || _session.isClosed()){ - throw new javax.jms.IllegalStateException("Invalid Session"); - } - } + if (_session == null || _session.isClosed()) + { + throw new javax.jms.IllegalStateException("Invalid Session"); + } + } public void acknowledge() throws JMSException { - if(!isClosed()) + if (!isClosed()) { Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator(); - while(tags.hasNext()) + while (tags.hasNext()) { _session.acknowledgeMessage(tags.next(), false); tags.remove(); @@ -699,10 +751,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closeWhenNoMessages = b; - if(_closeWhenNoMessages - && _synchronousQueue.isEmpty() - && _receiving.get() - && _messageListener != null) + if (_closeWhenNoMessages + && _synchronousQueue.isEmpty() + && _receiving.get() + && _messageListener != null) { _receivingThread.interrupt(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java new file mode 100644 index 0000000000..81a55006ed --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import java.util.Queue; + +public abstract class DispatcherCallback +{ + BasicMessageConsumer _consumer; + + public DispatcherCallback(BasicMessageConsumer mc) + { + _consumer = mc; + } + + abstract public void whilePaused(Queue<MessageConsumerPair> reprocessQueue); + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java new file mode 100644 index 0000000000..585d6db3fd --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +public class MessageConsumerPair +{ + BasicMessageConsumer _consumer; + Object _item; + + public MessageConsumerPair(BasicMessageConsumer consumer, Object item) + { + _consumer = consumer; + _item = item; + } + + public BasicMessageConsumer getConsumer() + { + return _consumer; + } + + public Object getItem() + { + return _item; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 32a25bd915..c6d6731967 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -86,7 +86,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } else { - _logger.warn("No Provider URL specified."); + _logger.info("No Provider URL specified."); } } catch (IOException ioe) |
