diff options
| author | Rupert Smith <rupertlssmith@apache.org> | 2007-06-01 14:33:07 +0000 |
|---|---|---|
| committer | Rupert Smith <rupertlssmith@apache.org> | 2007-06-01 14:33:07 +0000 |
| commit | 3b5d4734b777b54b52ce2710f404143aca8c5c6e (patch) | |
| tree | d436e7a5239ec6be725852c12e7ccae975892745 /java/client/src | |
| parent | 566e08caa331629a15bedca1d8cfc896886b0497 (diff) | |
| download | qpid-python-3b5d4734b777b54b52ce2710f404143aca8c5c6e.tar.gz | |
QPID-402: FailoverException falling through to client. All blocking operations now wrapped in failover support wrappers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@543496 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
18 files changed, 2861 insertions, 2186 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 347f5728e2..674f205af6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,34 +20,15 @@ */ package org.apache.qpid.client; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.*; -import javax.jms.IllegalStateException; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; - import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.client.failover.FailoverSupport; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; @@ -67,6 +48,27 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); @@ -96,8 +98,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>(); - + private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>(); private String _clientName; @@ -486,72 +487,72 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect final int prefetchHigh, final int prefetchLow) throws JMSException { checkNotClosed(); + if (channelLimitReached()) { throw new ChannelLimitReachedException(_maximumChannelCount); } - else - { - return (org.apache.qpid.jms.Session) new FailoverSupport() + + return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( + new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>() + { + public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException { - public Object operation() throws JMSException + int channelId = _idFactory.incrementAndGet(); + + if (_logger.isDebugEnabled()) { - int channelId = _idFactory.incrementAndGet(); + _logger.debug("Write channel open frame for channel id " + channelId); + } + + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + // _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); - if (_logger.isDebugEnabled()) + boolean success = false; + try + { + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) { - _logger.debug("Write channel open frame for channel id " + channelId); + deregisterSession(channelId); } + } - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = - new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, - prefetchLow); - //_protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); - - boolean success = false; + if (_started) + { try { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; + session.start(); } catch (AMQException e) { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) - { - deregisterSession(channelId); - } + throw new JMSAMQException(e); } - - if (_started) - { - try - { - session.start(); - } - catch (AMQException e) - { - throw new JMSAMQException(e); - } - } - - return session; } - }.execute(this); - } + + return session; + } + }, this).execute(); } private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) - throws AMQException + throws AMQException, FailoverException { // TODO: Be aware of possible changes to parameter order as versions change. @@ -581,7 +582,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException + private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) + throws AMQException, FailoverException { try { @@ -1128,14 +1130,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. */ - public void resubscribeSessions() throws JMSException, AMQException + public void resubscribeSessions() throws JMSException, AMQException, FailoverException { ArrayList sessions = new ArrayList(_sessions.values()); _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey? for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); - //_protocolHandler.addSessionByChannel(s.getChannelId(), s); + // _protocolHandler.addSessionByChannel(s.getChannelId(), s); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); s.resubscribe(); } @@ -1223,7 +1225,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _taskPool.execute(task); } - public AMQSession getSession(int channelId) { return _sessions.get(channelId); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b7615c5b7b..25c2d94377 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,47 +20,16 @@ */ package org.apache.qpid.client; -import java.io.Serializable; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; -import java.util.Arrays; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.AMQInvalidArgumentException; -import org.apache.qpid.client.failover.FailoverSupport; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverNoopSupport; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.message.JMSMapMessage; @@ -70,21 +39,20 @@ import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.AccessRequestBody; -import org.apache.qpid.framing.AccessRequestOkBody; import org.apache.qpid.framing.BasicAckBody; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; import org.apache.qpid.framing.ExchangeBoundBody; import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.framing.ExchangeDeclareBody; @@ -92,358 +60,242 @@ import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxCommitOkBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.BasicRecoverOkBody; -import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import java.io.Serializable; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { + /** Used for debugging. */ private static final Logger _logger = Logger.getLogger(AMQSession.class); + /** Used for debugging in the dispatcher. */ + private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); + + /** The default maximum number of prefetched message at which to suspend the channel. */ public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; + + /** The default minimum number of prefetched messages at which to resume the channel. */ public static final int DEFAULT_PREFETCH_LOW_MARK = 2500; + /** + * The default value for immediate flag used by producers created by this session is false. That is, a consumer does + * not need to be attached to a queue. + */ + protected static final boolean DEFAULT_IMMEDIATE = false; + + /** + * The default value for mandatory flag used by producers created by this session is true. That is, server will not + * silently drop messages where no queue is connected to the exchange for the message. + */ + protected static final boolean DEFAULT_MANDATORY = true; + + /** System property to enable strict AMQP compliance. */ + public static final String STRICT_AMQP = "STRICT_AMQP"; + + /** Strict AMQP default setting. */ + public static final String STRICT_AMQP_DEFAULT = "false"; + + /** System property to enable failure if strict AMQP compliance is violated. */ + public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; + + /** Strickt AMQP failure default. */ + public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; + + /** System property to enable immediate message prefetching. */ + public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; + + /** Immediate message prefetch default. */ + public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; + + /** The connection to which this session belongs. */ private AMQConnection _connection; + /** Used to indicate whether or not this is a transactional session. */ private boolean _transacted; + /** Holds the sessions acknowledgement mode. */ private int _acknowledgeMode; + /** Holds this session unique identifier, used to distinguish it from other sessions. */ private int _channelId; + /** @todo This does not appear to be set? */ private int _ticket; + /** Holds the high mark for prefetched message, at which the session is suspended. */ private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + + /** Holds the low mark for prefetched messages, below which the session is resumed. */ private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; + /** Holds the message listener, if any, which is attached to this session. */ private MessageListener _messageListener = null; + /** Used to indicate that this session has been started at least once. */ private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); /** - * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only - * keeps a record of subscriptions which have been created in the current instance. It does not remember - * subscriptions between executions of the client + * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only + * keeps a record of subscriptions which have been created in the current instance. It does not remember + * subscriptions between executions of the client. */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = - new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); - private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = - new ConcurrentHashMap<BasicMessageConsumer, String>(); + new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); - /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */ - private int _nextTag = 1; + /** + * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked + * up in the {@link #_subscriptions} map. + */ + private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = + new ConcurrentHashMap<BasicMessageConsumer, String>(); - /** This queue is bounded and is used to store messages before being dispatched to the consumer */ + /** + * Used to hold incoming messages. + * + * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. + */ private final FlowControllingBlockingQueue _queue; + /** Holds the dispatcher thread for this session. */ private Dispatcher _dispatcher; + /** Holds the message factory factory for this session. */ private MessageFactoryRegistry _messageFactoryRegistry; - /** Set of all producers created by this session */ - private Map _producers = new ConcurrentHashMap(); - - /** Maps from consumer tag (String) to JMSMessageConsumer instance */ - private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); - - /** Maps from destination to count of JMSMessageConsumers */ - private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = - new ConcurrentHashMap<Destination, AtomicInteger>(); + /** Holds all of the producers created by this session, keyed by their unique identifiers. */ + private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); /** - * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not - * need to be attached to a queue + * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume methods. */ - protected static final boolean DEFAULT_IMMEDIATE = false; + private int _nextTag = 1; /** - * Default value for mandatory flag used by producers created by this sessio is true, i.e. server will not silently - * drop messages where no queue is connected to the exchange for the message + * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right + * consumer. */ - protected static final boolean DEFAULT_MANDATORY = true; + private Map<AMQShortString, BasicMessageConsumer> _consumers = + new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + + /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ + private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = + new ConcurrentHashMap<Destination, AtomicInteger>(); /** - * The counter of the next producer id. This id is generated by the session and used only to allow the producer to - * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be - * synchronized since according to the JMS specification only one thread of control is allowed to create producers - * for any given session instance. + * Used as a source of unique identifiers for producers within the session. + * + * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one + * thread of control is allowed to create producers for any given session instance. */ private long _nextProducerId; - /** * Set when recover is called. This is to handle the case where recover() is called by application code during - * onMessage() processing. We need to make sure we do not send an auto ack if recover was called. + * onMessage() processing to enure that an auto ack is not sent. */ private boolean _inRecovery; + /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; + /** Used to indicate that this session has a message listener attached to it. */ private boolean _hasMessageListeners; + /** Used to indicate that this session has been suspended. */ private boolean _suspended; + /** + * Used to protect the suspension of this session, so that critical code can be executed during suspension, + * without the session being resumed by other threads. + */ private final Object _suspensionLock = new Object(); - /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */ + /** + * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel. + * + * @todo This is accessed only within a synchronized method, so does not need to be atomic. + */ private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); - /** System property to enable strickt AMQP compliance */ - public static final String STRICT_AMQP = "STRICT_AMQP"; - /** Strickt AMQP default */ - public static final String STRICT_AMQP_DEFAULT = "false"; + /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ + private final boolean _immediatePrefetch; + /** Indicates that warnings should be generated on violations of the strict AMQP. */ private final boolean _strictAMQP; - /** System property to enable strickt AMQP compliance */ - public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; - /** Strickt AMQP default */ - public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; - + /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ private final boolean _strictAMQPFATAL; - /** System property to enable immediate message prefetching */ - public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; - /** Immediate message prefetch default */ - public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - - private final boolean _immediatePrefetch; - - private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); - - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - private class Dispatcher extends Thread - { - - /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ - private final AtomicBoolean _closed = new AtomicBoolean(false); - - private final Object _lock = new Object(); - - public Dispatcher() - { - super("Dispatcher-Channel-" + _channelId); - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " created"); - } - } - - public void run() - { - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " started"); - } - - UnprocessedMessage message; - - // Allow disptacher to start stopped - synchronized (_lock) - { - while (connectionStopped()) - { - try - { - _lock.wait(); - } - catch (InterruptedException e) - { - // ignore - } - } - } - - try - { - while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) - { - synchronized (_lock) - { - - while (connectionStopped()) - { - _lock.wait(); - } - - dispatchMessage(message); - - while (connectionStopped()) - { - _lock.wait(); - } - - } - - } - } - catch (InterruptedException e) - { - //ignore - } - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); - } - } - - // only call while holding lock - final boolean connectionStopped() - { - return _connectionStopped; - } - - boolean setConnectionStopped(boolean connectionStopped) - { - boolean currently; - synchronized (_lock) - { - currently = _connectionStopped; - _connectionStopped = connectionStopped; - _lock.notify(); - - if (_dispatcherLogger.isDebugEnabled()) - { - _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") + - ": Currently " + (currently ? "Stopped" : "Started")); - } - } - return currently; - } - - private void dispatchMessage(UnprocessedMessage message) - { - if (message.getDeliverBody() != null) - { - final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); - - if (consumer == null || consumer.isClosed()) - { - if (_dispatcherLogger.isInfoEnabled()) - { - if (consumer == null) - { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + - "[" + message.getDeliverBody().deliveryTag + "] from queue " + - message.getDeliverBody().consumerTag + - " )without a handler - rejecting(requeue)..."); - } - else - { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + - "[" + message.getDeliverBody().deliveryTag + "] from queue " + - " consumer(" + consumer.debugIdentity() + - ") is closed rejecting(requeue)..."); - } - } - // Don't reject if we're already closing - if (!_closed.get()) - { - rejectMessage(message, true); - } - } - else - { - consumer.notifyMessage(message, _channelId); - } - } - } - - public void close() - { - _closed.set(true); - interrupt(); - - //fixme awaitTermination - - } - - public void rollback() - { - - synchronized (_lock) - { - boolean isStopped = connectionStopped(); - - if (!isStopped) - { - setConnectionStopped(true); - } - - rejectAllMessages(true); - - _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); - - for (BasicMessageConsumer consumer : _consumers.values()) - { - if (!consumer.isNoConsume()) - { - consumer.rollback(); - } - else - { - // should perhaps clear the _SQ here. - //consumer._synchronousQueue.clear(); - consumer.clearReceiveQueue(); - } - - - } - - setConnectionStopped(isStopped); - } - - } - - public void rejectPending(BasicMessageConsumer consumer) - { - synchronized (_lock) - { - boolean stopped = _dispatcher.connectionStopped(); - - if (!stopped) - { - _dispatcher.setConnectionStopped(true); - } - - // Reject messages on pre-receive queue - consumer.rollback(); - - // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - - // closeConsumer - consumer.markClosed(); - - _dispatcher.setConnectionStopped(stopped); - - } - } - } - - - + /** + * Creates a new session on a connection. + * + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param messageFactoryRegistry The message factory factory for the session. + * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. + */ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) + MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); - _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); - _immediatePrefetch = _strictAMQP || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); + _strictAMQPFATAL = + Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); + _immediatePrefetch = + _strictAMQP + || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); _connection = con; _transacted = transacted; @@ -455,6 +307,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _acknowledgeMode = acknowledgeMode; } + _channelId = channelId; _messageFactoryRegistry = messageFactoryRegistry; _defaultPrefetchHighMark = defaultPrefetchHighMark; @@ -462,27 +315,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { - _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, - new FlowControllingBlockingQueue.ThresholdListener() - { - public void aboveThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(true)).start(); - } - } - - public void underThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(false)).start(); - } - } - }); + _queue = + new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, + new FlowControllingBlockingQueue.ThresholdListener() + { + public void aboveThreshold(int currentValue) + { + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.debug( + "Above threshold(" + _defaultPrefetchHighMark + + ") so suspending channel. Current value is " + currentValue); + new Thread(new SuspenderRunner(true)).start(); + } + } + + public void underThreshold(int currentValue) + { + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.debug( + "Below threshold(" + _defaultPrefetchLowMark + + ") so unsuspending channel. Current value is " + currentValue); + new Thread(new SuspenderRunner(false)).start(); + } + } + }); } else { @@ -490,183 +348,146 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); - } - - public AMQConnection getAMQConnection() - { - return _connection; - } - - public BytesMessage createBytesMessage() throws JMSException + /** + * Creates a new session on a connection with the default message factory factory. + * + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. + */ + AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, + int defaultPrefetchLow) { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); - return new JMSBytesMessage(); - } + this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, + defaultPrefetchLow); } - public MapMessage createMapMessage() throws JMSException + /** + * Acknowledges all unacknowledged messages on the session, for all message consumers on the session. + * + * @throws IllegalStateException If the session is closed. + */ + public void acknowledge() throws IllegalStateException { - synchronized (_connection.getFailoverMutex()) + if (isClosed()) { - checkNotClosed(); - return new JMSMapMessage(); + throw new IllegalStateException("Session is already closed"); } - } - - public javax.jms.Message createMessage() throws JMSException - { - return createBytesMessage(); - } - public ObjectMessage createObjectMessage() throws JMSException - { - synchronized (_connection.getFailoverMutex()) + for (BasicMessageConsumer consumer : _consumers.values()) { - checkNotClosed(); - return (ObjectMessage) new JMSObjectMessage(); + consumer.acknowledge(); } } - public ObjectMessage createObjectMessage(Serializable object) throws JMSException - { - ObjectMessage msg = createObjectMessage(); - msg.setObject(object); - return msg; - } - - public StreamMessage createStreamMessage() throws JMSException + /** + * Acknowledge one or many messages. + * + * @param deliveryTag The tag of the last message to be acknowledged. + * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the + * delivery tag, <tt>false</tt> to just acknowledge that message. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void acknowledgeMessage(long deliveryTag, boolean multiple) { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); + final AMQFrame ackFrame = + BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + multiple); - return new JMSStreamMessage(); - } - } - - public TextMessage createTextMessage() throws JMSException - { - synchronized (_connection.getFailoverMutex()) + if (_logger.isDebugEnabled()) { - checkNotClosed(); - - return new JMSTextMessage(); + _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); } - } - public TextMessage createTextMessage(String text) throws JMSException - { - - TextMessage msg = createTextMessage(); - msg.setText(text); - return msg; - } - - public boolean getTransacted() throws JMSException - { - checkNotClosed(); - return _transacted; - } - - public int getAcknowledgeMode() throws JMSException - { - checkNotClosed(); - return _acknowledgeMode; - } - - public void commit() throws JMSException - { - checkTransacted(); - try - { - // Acknowledge up to message last delivered (if any) for each consumer. - //need to send ack for messages delivered to consumers so far - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - //Sends acknowledgement to server - i.next().acknowledgeLastDelivered(); - } - - // Commits outstanding messages sent and outstanding acknowledgements. - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQProtocolHandler handler = getProtocolHandler(); - - handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion()), - TxCommitOkBody.class); - } - catch (AMQException e) - { - JMSException exception = new JMSException("Failed to commit: " + e.getMessage()); - exception.setLinkedException(e); - throw exception; - } + getProtocolHandler().writeFrame(ackFrame); } - - public void rollback() throws JMSException + /** + * Binds the named queue, with the specified routing key, to the named exchange. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param queueName The name of the queue to bind. + * @param routingKey The routing key to bind the queue with. + * @param arguments Additional arguments. + * @param exchangeName The exchange to bind the queue on. + * + * @throws AMQException If the queue cannot be bound for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + * + * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? + */ + public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, + final AMQShortString exchangeName) throws AMQException { - synchronized (_suspensionLock) - { - checkTransacted(); - try + /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - - boolean isSuspended = isSuspended(); - - if (!isSuspended) - { - suspendChannel(true); - } - - if (_dispatcher != null) + public Object execute() throws AMQException, FailoverException { - _dispatcher.rollback(); + AMQFrame queueBind = + QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + arguments, // arguments + exchangeName, // exchange + false, // nowait + queueName, // queue + routingKey, // routingKey + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + + return null; } - - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); - - - if (!isSuspended) - { - suspendChannel(false); - } - } - catch (AMQException e) - { - throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); - } - } + }, _connection).execute(); } + /** + * Closes the session with no timeout. + * + * @throws JMSException If the JMS provider fails to close the session due to some internal error. + */ public void close() throws JMSException { close(-1); } + /** + * Closes the session. + * + * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close + * the channel. This is because the channel is marked as closed before the request to close it is made, so the + * fail-over should not re-open it. + * + * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. + * + * @throws JMSException If the JMS provider fails to close the session due to some internal error. + * + * @todo Be aware of possible changes to parameter order as versions change. + * + * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be + * re-opened. May need to examine this more carefully. + * + * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, + * because the failover process sends the failover event before acquiring the mutex itself. + */ public void close(long timeout) throws JMSException { if (_logger.isInfoEnabled()) { - _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + _logger.info("Closing session: " + this + ":" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session + // that can be called from a different thread of control from the one controlling the session. synchronized (_connection.getFailoverMutex()) { - //Ensure we only try and close an open session. + // Ensure we only try and close an open session. if (!_closed.getAndSet(true)) { // we pass null since this is not an error case @@ -676,18 +497,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { getProtocolHandler().closeSession(this); - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText + + final AMQFrame frame = + ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. } catch (AMQException e) { @@ -695,6 +516,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi jmse.setLinkedException(e); throw jmse; } + // This is ignored because the channel is already marked as closed so the fail-over process will + // not re-open it. + catch (FailoverException e) + { + _logger.debug( + "Got FailoverException during channel close, ignored as channel already marked as closed."); + } finally { _connection.deregisterSession(_channelId); @@ -703,65 +531,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private AMQProtocolHandler getProtocolHandler() - { - return _connection.getProtocolHandler(); - } - - - private byte getProtocolMinorVersion() - { - return getProtocolHandler().getProtocolMinorVersion(); - } - - private byte getProtocolMajorVersion() - { - return getProtocolHandler().getProtocolMajorVersion(); - } - - - /** - * Close all producers or consumers. This is called either in the error case or when closing the session normally. - * - * @param amqe the exception, may be null to indicate no error has occurred - */ - private void closeProducersAndConsumers(AMQException amqe) throws JMSException - { - JMSException jmse = null; - try - { - closeProducers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - jmse = e; - } - try - { - closeConsumers(amqe); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - if (jmse == null) - { - jmse = e; - } - } - if (jmse != null) - { - throw jmse; - } - } - - - public boolean isSuspended() - { - return _suspended; - } - - /** * Called when the server initiates the closure of the session unilaterally. * @@ -783,738 +552,342 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { amqe = new AMQException("Closing session forcibly", e); } + _connection.deregisterSession(_channelId); closeProducersAndConsumers(amqe); } } /** - * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover - * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex. + * Commits all messages done in this transaction and releases any locks currently held. + * + * <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the + * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. + * The client will be unable to determine whether or not the commit actually happened on the broker in this case. + * + * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does + * not mean that the commit is known to have failed, merely that it is not known whether it + * failed or not. + * + * @todo Be aware of possible changes to parameter order as versions change. */ - void markClosed() + public void commit() throws JMSException { - _closed.set(true); - _connection.deregisterSession(_channelId); - markClosedProducersAndConsumers(); - - } + checkTransacted(); - private void markClosedProducersAndConsumers() - { - try - { - // no need for a markClosed* method in this case since there is no protocol traffic closing a producer - closeProducers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } try { - markClosedConsumers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } - } + // Acknowledge up to message last delivered (if any) for each consumer. + // need to send ack for messages delivered to consumers so far + for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + { + // Sends acknowledgement to server + i.next().acknowledgeLastDelivered(); + } - /** - * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is - * currently no way of propagating errors to message producers (this is a JMS limitation). - */ - private void closeProducers() throws JMSException - { - // we need to clone the list of producers since the close() method updates the _producers collection - // which would result in a concurrent modification exception - final ArrayList clonedProducers = new ArrayList(_producers.values()); + // Commits outstanding messages sent and outstanding acknowledgements. + final AMQProtocolHandler handler = getProtocolHandler(); - final Iterator it = clonedProducers.iterator(); - while (it.hasNext()) - { - final BasicMessageProducer prod = (BasicMessageProducer) it.next(); - prod.close(); + handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), + TxCommitOkBody.class); } - // at this point the _producers map is empty - } - - /** - * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. - * - * @param error not null if this is a result of an error occurring at the connection level - */ - private void closeConsumers(Throwable error) throws JMSException - { - if (_dispatcher != null) + catch (AMQException e) { - _dispatcher.close(); - _dispatcher = null; + throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); } - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList(_consumers.values()); - - final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); - while (it.hasNext()) + catch (FailoverException e) { - final BasicMessageConsumer con = it.next(); - if (error != null) - { - con.notifyError(error); - } - else - { - con.close(); - } + throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); } - // at this point the _consumers map will be empty } - private void markClosedConsumers() throws JMSException + public void confirmConsumerCancelled(AMQShortString consumerTag) { - if (_dispatcher != null) - { - _dispatcher.close(); - _dispatcher = null; - } - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); - final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); - while (it.hasNext()) - { - final BasicMessageConsumer con = it.next(); - con.markClosed(); - } - // at this point the _consumers map will be empty - } - - /** - * Asks the broker to resend all unacknowledged messages for the session. - * - * @throws JMSException - */ - public void recover() throws JMSException - { - checkNotClosed(); - checkNotTransacted(); // throws IllegalStateException if a transacted session - // this is set only here, and the before the consumer's onMessage is called it is set to false - _inRecovery = true; - try + // Remove the consumer from the map + BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); + if (consumer != null) { - - boolean isSuspended = isSuspended(); - - if (!isSuspended) + // fixme this isn't right.. needs to check if _queue contains data for this consumer + if (consumer.isAutoClose()) // && _queue.isEmpty()) { - suspendChannel(true); + consumer.closeWhenNoMessages(true); } - for (BasicMessageConsumer consumer : _consumers.values()) + if (!consumer.isNoConsume()) { - consumer.clearUnackedMessages(); - } + // Clean the Maps up first + // Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _logger.info("Dispatcher is not null"); + } + else + { + _logger.info("Dispatcher is null so created stopped dispatcher"); - if (_dispatcher != null) - { - _dispatcher.rollback(); - } + startDistpatcherIfNecessary(true); + } - if (isStrictAMQP()) - { - // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. - _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - false)); // requeue - _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); + _dispatcher.rejectPending(consumer); } else { + // Just close the consumer + // fixme the CancelOK is being processed before the arriving messages.. + // The dispatcher is still to process them so the server sent in order but the client + // has yet to receive before the close comes in. - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - false) // requeue - , BasicRecoverOkBody.class); - } - if (!isSuspended) - { - suspendChannel(false); + // consumer.markClosed(); } } - catch (AMQException e) + else { - throw new JMSAMQException(e); + _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); } - } - - boolean isInRecovery() - { - return _inRecovery; - } - void setInRecovery(boolean inRecovery) - { - _inRecovery = inRecovery; } - public void acknowledge() throws JMSException + public QueueBrowser createBrowser(Queue queue) throws JMSException { - if (isClosed()) - { - throw new IllegalStateException("Session is already closed"); - } - for (BasicMessageConsumer consumer : _consumers.values()) + if (isStrictAMQP()) { - consumer.acknowledge(); + throw new UnsupportedOperationException(); } - - } - - - public MessageListener getMessageListener() throws JMSException - { -// checkNotClosed(); - return _messageListener; - } - - public void setMessageListener(MessageListener listener) throws JMSException - { -// checkNotClosed(); -// -// if (_dispatcher != null && !_dispatcher.connectionStopped()) -// { -// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); -// } -// -// // We are stopped -// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) -// { -// BasicMessageConsumer consumer = i.next(); -// -// if (consumer.isReceiving()) -// { -// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); -// } -// } -// -// _messageListener = listener; -// -// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) -// { -// i.next().setMessageListener(_messageListener); -// } - - } - - public void run() - { - throw new java.lang.UnsupportedOperationException(); + return createBrowser(queue, null); } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, - boolean immediate, boolean waitUntilSent) - throws JMSException + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { - return createProducerImpl(destination, mandatory, immediate, waitUntilSent); - } + if (isStrictAMQP()) + { + throw new UnsupportedOperationException(); + } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) - throws JMSException - { - return createProducerImpl(destination, mandatory, immediate); - } + checkNotClosed(); + checkValidQueue(queue); - public BasicMessageProducer createProducer(Destination destination, boolean immediate) - throws JMSException - { - return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); + return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); } - public BasicMessageProducer createProducer(Destination destination) throws JMSException + public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) + throws JMSException { - return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); - } + checkValidDestination(destination); - private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, - boolean immediate) - throws JMSException - { - return createProducerImpl(destination, mandatory, immediate, false); + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, + messageSelector, null, true, true); } - private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent) - throws JMSException + public BytesMessage createBytesMessage() throws JMSException { - return (BasicMessageProducer) new FailoverSupport() + synchronized (_connection.getFailoverMutex()) { - public Object operation() throws JMSException - { - checkNotClosed(); - long producerId = getNextProducerId(); - BasicMessageProducer producer = new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, - AMQSession.this, getProtocolHandler(), - producerId, immediate, mandatory, waitUntilSent); - registerProducer(producerId, producer); - return producer; - } - }.execute(_connection); - } - - /** - * Creates a QueueReceiver - * - * @param destination - * - * @return QueueReceiver - a wrapper around our MessageConsumer - * - * @throws JMSException - */ - public QueueReceiver createQueueReceiver(Destination destination) throws JMSException - { - checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); - return new QueueReceiverAdaptor(dest, consumer); - } + checkNotClosed(); - /** - * Creates a QueueReceiver using a message selector - * - * @param destination - * @param messageSelector - * - * @return QueueReceiver - a wrapper around our MessageConsumer - * - * @throws JMSException - */ - public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException - { - checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) - createConsumer(destination, messageSelector); - return new QueueReceiverAdaptor(dest, consumer); + return new JMSBytesMessage(); + } } public MessageConsumer createConsumer(Destination destination) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - false, - false, - null, - null, - false, - false); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null, + false, false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - false, - false, - messageSelector, - null, - false, - false); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, + messageSelector, null, false, false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException - { - checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - noLocal, - false, - messageSelector, - null, - false, - false); - } - - public MessageConsumer createBrowserConsumer(Destination destination, - String messageSelector, - boolean noLocal) - throws JMSException + throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - noLocal, - false, - messageSelector, - null, - true, - true); - } - - public MessageConsumer createConsumer(Destination destination, - int prefetch, - boolean noLocal, - boolean exclusive, - String selector) throws JMSException + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, + messageSelector, null, false, false); + } + + public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, + String selector) throws JMSException { checkValidDestination(destination); + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); } - - public MessageConsumer createConsumer(Destination destination, - int prefetchHigh, - int prefetchLow, - boolean noLocal, - boolean exclusive, - String selector) throws JMSException + public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, + boolean exclusive, String selector) throws JMSException { checkValidDestination(destination); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false); } - public MessageConsumer createConsumer(Destination destination, - int prefetch, - boolean noLocal, - boolean exclusive, - String selector, - FieldTable rawSelector) throws JMSException + public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, + String selector, FieldTable rawSelector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, - selector, rawSelector, false, false); + + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false); } - public MessageConsumer createConsumer(Destination destination, - int prefetchHigh, - int prefetchLow, - boolean noLocal, - boolean exclusive, - String selector, - FieldTable rawSelector) throws JMSException + public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, + boolean exclusive, String selector, FieldTable rawSelector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, - selector, rawSelector, false, false); + + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false, + false); } - protected MessageConsumer createConsumerImpl(final Destination destination, - final int prefetchHigh, - final int prefetchLow, - final boolean noLocal, - final boolean exclusive, - String selector, - final FieldTable rawSelector, - final boolean noConsume, - final boolean autoClose) throws JMSException + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { - checkTemporaryDestination(destination); - final String messageSelector; - - if (_strictAMQP && !(selector == null || selector.equals(""))) + checkNotClosed(); + AMQTopic origTopic = checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) { - if (_strictAMQPFATAL) + if (subscriber.getTopic().equals(topic)) { - throw new UnsupportedOperationException("Selectors not currently supported by AMQP."); + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); } else { - messageSelector = null; + unsubscribe(name); } } else { - messageSelector = selector; - } - - return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport() - { - public Object operation() throws JMSException + AMQShortString topicName; + if (topic instanceof AMQTopic) { - checkNotClosed(); - - AMQDestination amqd = (AMQDestination) destination; + topicName = ((AMQTopic) topic).getDestinationName(); + } + else + { + topicName = new AMQShortString(topic.getTopicName()); + } - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - // TODO: Define selectors in AMQP - // TODO: construct the rawSelector from the selector string if rawSelector == null - final FieldTable ft = FieldTableFactory.newFieldTable(); - //if (rawSelector != null) - // ft.put("headers", rawSelector.getDataAsBytes()); - if (rawSelector != null) + if (_strictAMQP) + { + if (_strictAMQPFATAL) { - ft.addAll(rawSelector); + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); } - - BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, - _messageFactoryRegistry, AMQSession.this, - protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, - _acknowledgeMode, noConsume, autoClose); - - if (_messageListener != null) + else { - consumer.setMessageListener(_messageListener); + _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " + + "for creation durableSubscriber. Requesting queue deletion regardless."); } - try - { - registerConsumer(consumer, false); - } - catch (AMQInvalidArgumentException ise) - { - JMSException ex = new InvalidSelectorException(ise.getMessage()); - ex.setLinkedException(ise); - throw ex; - } - catch (AMQInvalidRoutingKeyException e) + deleteQueue(dest.getAMQQueueName()); + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) { - JMSException ide = new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); - ide.setLinkedException(e); - throw ide; + deleteQueue(dest.getAMQQueueName()); } - catch (AMQException e) - { - JMSException ex = new JMSException("Error registering consumer: " + e); + } + } - if (_logger.isDebugEnabled()) - { - e.printStackTrace(); - } - ex.setLinkedException(e); - throw ex; - } + subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - synchronized (destination) - { - _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); - _destinationConsumerCount.get(destination).incrementAndGet(); - } + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - return consumer; - } - }.execute(_connection); + return subscriber; } - private void checkTemporaryDestination(Destination destination) - throws JMSException + /** Note, currently this does not handle reuse of the same name with different topics correctly. */ + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + throws JMSException { - if ((destination instanceof TemporaryDestination)) - { - _logger.debug("destination is temporary"); - final TemporaryDestination tempDest = (TemporaryDestination) destination; - if (tempDest.getSession() != this) - { - _logger.debug("destination is on different session"); - throw new JMSException("Cannot consume from a temporary destination created onanother session"); - } - if (tempDest.isDeleted()) - { - _logger.debug("destination is deleted"); - throw new JMSException("Cannot consume from a deleted destination"); - } - } - } + checkNotClosed(); + checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + return subscriber; + } - public boolean hasConsumer(Destination destination) + public MapMessage createMapMessage() throws JMSException { - AtomicInteger counter = _destinationConsumerCount.get(destination); + synchronized (_connection.getFailoverMutex()) + { + checkNotClosed(); - return (counter != null) && (counter.get() != 0); + return new JMSMapMessage(); + } } - public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException + public javax.jms.Message createMessage() throws JMSException { - declareExchange(name, type, getProtocolHandler(), nowait); + return createBytesMessage(); } - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException + public ObjectMessage createObjectMessage() throws JMSException { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); + synchronized (_connection.getFailoverMutex()) + { + checkNotClosed(); + + return (ObjectMessage) new JMSObjectMessage(); + } } - private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException + public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - nowait, // nowait - false, // passive - getTicket(), // ticket - type); // type + ObjectMessage msg = createObjectMessage(); + msg.setObject(object); - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + return msg; } - - public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException + public BasicMessageProducer createProducer(Destination destination) throws JMSException { - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - autoDelete, // autoDelete - durable, // durable - exclusive, // exclusive - false, // nowait - false, // passive - name, // queue - getTicket()); // ticket - - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); - + return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } - - public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException + public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - arguments, // arguments - exchangeName, // exchange - false, // nowait - queueName, // queue - routingKey, // routingKey - getTicket()); // ticket - - - getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); } - /** - * Declare the queue. - * - * @param amqd - * @param protocolHandler - * - * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. - * - * @throws AMQException - */ - private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException + public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) + throws JMSException { - // For queues (but not topics) we generate the name in the client rather than the - // server. This allows the name to be reused on failover if required. In general, - // the destination indicates whether it wants a name generated or not. - if (amqd.isNameRequired()) - { - amqd.setQueueName(protocolHandler.generateQueueName()); - } - - //TODO verify the destiation is valid. else throw - - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - false, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - getTicket()); // ticket - - protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); - return amqd.getAMQQueueName(); + return createProducerImpl(destination, mandatory, immediate); } - private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException + public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, + boolean waitUntilSent) throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - ft, // arguments - amqd.getExchangeName(), // exchange - false, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - getTicket()); // ticket - - - protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); + return createProducerImpl(destination, mandatory, immediate, waitUntilSent); } - /** - * Register to consume from the queue. - * - * @param queueName - * - * @return the consumer tag generated by the broker - */ - private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector) throws AMQException + public TopicPublisher createPublisher(Topic topic) throws JMSException { - //need to generate a consumer tag on the client so we can exploit the nowait flag - AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); - - FieldTable arguments = FieldTableFactory.newFieldTable(); - if (messageSelector != null && !messageSelector.equals("")) - { - arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); - } - if (consumer.isAutoClose()) - { - arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); - } - if (consumer.isNoConsume()) - { - arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); - } - - consumer.setConsumerTag(tag); - // we must register the consumer in the map before we actually start listening - _consumers.put(tag, consumer); + checkNotClosed(); - try - { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - getTicket()); // ticket - if (nowait) - { - protocolHandler.writeFrame(jmsConsume); - } - else - { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); - } - } - catch (AMQException e) - { - // clean-up the map in the event of an error - _consumers.remove(tag); - throw e; - } + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1540,9 +913,80 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - public AMQShortString getDefaultQueueExchangeName() + /** + * Declares the named queue. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param name The name of the queue to declare. + * @param autoDelete + * @param durable Flag to indicate that the queue is durable. + * @param exclusive Flag to indicate that the queue is exclusive to this client. + * + * @throws AMQException If the queue cannot be declared for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, + final boolean exclusive) throws AMQException { - return _connection.getDefaultQueueExchangeName(); + new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + AMQFrame queueDeclare = + QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + autoDelete, // autoDelete + durable, // durable + exclusive, // exclusive + false, // nowait + false, // passive + name, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + + return null; + } + }, _connection).execute(); + } + + /** + * Creates a QueueReceiver + * + * @param destination + * + * @return QueueReceiver - a wrapper around our MessageConsumer + * + * @throws JMSException + */ + public QueueReceiver createQueueReceiver(Destination destination) throws JMSException + { + checkValidDestination(destination); + AMQQueue dest = (AMQQueue) destination; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); + + return new QueueReceiverAdaptor(dest, consumer); + } + + /** + * Creates a QueueReceiver using a message selector + * + * @param destination + * @param messageSelector + * + * @return QueueReceiver - a wrapper around our MessageConsumer + * + * @throws JMSException + */ + public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException + { + checkValidDestination(destination); + AMQQueue dest = (AMQQueue) destination; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); + + return new QueueReceiverAdaptor(dest, consumer); } /** @@ -1559,6 +1003,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotClosed(); AMQQueue dest = (AMQQueue) queue; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); + return new QueueReceiverAdaptor(dest, consumer); } @@ -1576,47 +1021,29 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) - createConsumer(dest, messageSelector); + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); + return new QueueReceiverAdaptor(dest, consumer); } public QueueSender createSender(Queue queue) throws JMSException { checkNotClosed(); - //return (QueueSender) createProducer(queue); + + // return (QueueSender) createProducer(queue); return new QueueSenderAdapter(createProducer(queue), queue); } - public Topic createTopic(String topicName) throws JMSException + public StreamMessage createStreamMessage() throws JMSException { - checkNotClosed(); - - if (topicName.indexOf('/') == -1) - { - return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); - } - else + synchronized (_connection.getFailoverMutex()) { - try - { - return new AMQTopic(new AMQBindingURL(topicName)); - } - catch (URLSyntaxException urlse) - { - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); + checkNotClosed(); - throw jmse; - } + return new JMSStreamMessage(); } } - public AMQShortString getDefaultTopicExchangeName() - { - return _connection.getDefaultTopicExchangeName(); - } - /** * Creates a non-durable subscriber * @@ -1630,7 +1057,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); AMQTopic dest = checkValidTopic(topic); - //AMQTopic dest = new AMQTopic(topic.getTopicName()); + + // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1649,150 +1077,401 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); AMQTopic dest = checkValidTopic(topic); - //AMQTopic dest = new AMQTopic(topic.getTopicName()); + + // AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + public TemporaryQueue createTemporaryQueue() throws JMSException { + checkNotClosed(); + return new AMQTemporaryQueue(this); + } + public TemporaryTopic createTemporaryTopic() throws JMSException + { checkNotClosed(); - AMQTopic origTopic = checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); - if (subscriber != null) + + return new AMQTemporaryTopic(this); + } + + public TextMessage createTextMessage() throws JMSException + { + synchronized (_connection.getFailoverMutex()) { - if (subscriber.getTopic().equals(topic)) + checkNotClosed(); + + return new JMSTextMessage(); + } + } + + public TextMessage createTextMessage(String text) throws JMSException + { + + TextMessage msg = createTextMessage(); + msg.setText(text); + + return msg; + } + + public Topic createTopic(String topicName) throws JMSException + { + checkNotClosed(); + + if (topicName.indexOf('/') == -1) + { + return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); + } + else + { + try { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + - name); + return new AMQTopic(new AMQBindingURL(topicName)); } - else + catch (URLSyntaxException urlse) { - unsubscribe(name); + JMSException jmse = new JMSException(urlse.getReason()); + jmse.setLinkedException(urlse); + + throw jmse; } } + } + + public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException + { + declareExchange(name, type, getProtocolHandler(), nowait); + } + + public int getAcknowledgeMode() throws JMSException + { + checkNotClosed(); + + return _acknowledgeMode; + } + + public AMQConnection getAMQConnection() + { + return _connection; + } + + public int getChannelId() + { + return _channelId; + } + + public int getDefaultPrefetch() + { + return _defaultPrefetchHighMark; + } + + public int getDefaultPrefetchHigh() + { + return _defaultPrefetchHighMark; + } + + public int getDefaultPrefetchLow() + { + return _defaultPrefetchLowMark; + } + + public AMQShortString getDefaultQueueExchangeName() + { + return _connection.getDefaultQueueExchangeName(); + } + + public AMQShortString getDefaultTopicExchangeName() + { + return _connection.getDefaultTopicExchangeName(); + } + + public MessageListener getMessageListener() throws JMSException + { + // checkNotClosed(); + return _messageListener; + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _connection.getTemporaryQueueExchangeName(); + } + + public AMQShortString getTemporaryTopicExchangeName() + { + return _connection.getTemporaryTopicExchangeName(); + } + + public int getTicket() + { + return _ticket; + } + + public boolean getTransacted() throws JMSException + { + checkNotClosed(); + + return _transacted; + } + + public boolean hasConsumer(Destination destination) + { + AtomicInteger counter = _destinationConsumerCount.get(destination); + + return (counter != null) && (counter.get() != 0); + } + + public boolean isStrictAMQP() + { + return _strictAMQP; + } + + public boolean isSuspended() + { + return _suspended; + } + + /** + * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto + * the queue read by the dispatcher. + * + * @param message the message that has been received + */ + public void messageReceived(UnprocessedMessage message) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Message[" + + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody())) + + "] received in session with channel id " + _channelId); + } + + if (message.getDeliverBody() == null) + { + // Return of the bounced message. + returnBouncedMessage(message); + } else { - AMQShortString topicName; - if (topic instanceof AMQTopic) + _queue.add(message); + } + } + + /** + * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message. + * + * <p/>All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all + * messages that have been delivered to the client. + * + * <p/>Restarting a session causes it to take the following actions: + * + * <ul> + * <li>Stop message delivery.</li> + * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". + * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered. + * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> + * </ul> + * + * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and + * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible + * for the client to determine whether the broker is going to recover the session or not. + * + * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. + * Not that this does not necessarily mean that the recovery has failed, but simply that it + * is not possible to tell if it has or not. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void recover() throws JMSException + { + // Ensure that the session is open. + checkNotClosed(); + + // Ensure that the session is not transacted. + checkNotTransacted(); + + // this is set only here, and the before the consumer's onMessage is called it is set to false + _inRecovery = true; + try + { + + boolean isSuspended = isSuspended(); + + if (!isSuspended) { - topicName = ((AMQTopic) topic).getDestinationName(); + suspendChannel(true); } - else + + for (BasicMessageConsumer consumer : _consumers.values()) { - topicName = new AMQShortString(topic.getTopicName()); + consumer.clearUnackedMessages(); } - if (_strictAMQP) + if (_dispatcher != null) { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } + _dispatcher.rollback(); + } - deleteQueue(dest.getAMQQueueName()); + if (isStrictAMQP()) + { + // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. + _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue + _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); } else { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) && - !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - return subscriber; - } + _connection.getProtocolHandler().syncWrite( + BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue + , BasicRecoverOkBody.class); + } - void deleteQueue(AMQShortString queueName) throws JMSException - { - try - { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - getTicket()); // ticket - getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + if (!isSuspended) + { + suspendChannel(false); + } } catch (AMQException e) { - throw new JMSAMQException(e); + throw new JMSAMQException("Recover failed: " + e.getMessage(), e); + } + catch (FailoverException e) + { + throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e); } } - /** Note, currently this does not handle reuse of the same name with different topics correctly. */ - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException + public void rejectMessage(UnprocessedMessage message, boolean requeue) { - checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - return subscriber; - } - public TopicPublisher createPublisher(Topic topic) throws JMSException - { - checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + } + + rejectMessage(message.getDeliverBody().deliveryTag, requeue); } - public QueueBrowser createBrowser(Queue queue) throws JMSException + public void rejectMessage(AbstractJMSMessage message, boolean requeue) { - if (isStrictAMQP()) + if (_logger.isTraceEnabled()) { - throw new UnsupportedOperationException(); + _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); } - return createBrowser(queue, null); + rejectMessage(message.getDeliveryTag(), requeue); + } - public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException + public void rejectMessage(long deliveryTag, boolean requeue) { - if (isStrictAMQP()) + if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) { - throw new UnsupportedOperationException(); - } + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting delivery tag:" + deliveryTag); + } - checkNotClosed(); - checkValidQueue(queue); - return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); + AMQFrame basicRejectBody = + BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + requeue); + + _connection.getProtocolHandler().writeFrame(basicRejectBody); + } } - public TemporaryQueue createTemporaryQueue() throws JMSException + /** + * Commits all messages done in this transaction and releases any locks currently held. + * + * <p/>If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the + * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. + * The client will be unable to determine whether or not the rollback actually happened on the broker in this case. + * + * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does + * not mean that the rollback is known to have failed, merely that it is not known whether it + * failed or not. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void rollback() throws JMSException { - checkNotClosed(); - return new AMQTemporaryQueue(this); + synchronized (_suspensionLock) + { + checkTransacted(); + + try + { + boolean isSuspended = isSuspended(); + + if (!isSuspended) + { + suspendChannel(true); + } + + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + + if (!isSuspended) + { + suspendChannel(false); + } + } + catch (AMQException e) + { + throw new JMSAMQException("Failed to rollback: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e); + } + } } - public TemporaryTopic createTemporaryTopic() throws JMSException + public void run() { - checkNotClosed(); - return new AMQTemporaryTopic(this); + throw new java.lang.UnsupportedOperationException(); } + public void setMessageListener(MessageListener listener) throws JMSException + { + // checkNotClosed(); + // + // if (_dispatcher != null && !_dispatcher.connectionStopped()) + // { + // throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); + // } + // + // // We are stopped + // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + // { + // BasicMessageConsumer consumer = i.next(); + // + // if (consumer.isReceiving()) + // { + // throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); + // } + // } + // + // _messageListener = listener; + // + // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + // { + // i.next().setMessageListener(_messageListener); + // } + + } + + /*public void setTicket(int ticket) + { + _ticket = ticket; + }*/ + public void unsubscribe(String name) throws JMSException { checkNotClosed(); @@ -1815,7 +1494,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." - + " Requesting queue deletion regardless."); + + " Requesting queue deletion regardless."); } deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); @@ -1835,189 +1514,248 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException + protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh, + final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean noConsume, final boolean autoClose) throws JMSException { - return isQueueBound(exchangeName, queueName, null); - } + checkTemporaryDestination(destination); - boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException - { + final String messageSelector; - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - exchangeName, // exchange - queueName, // queue - routingKey); // routingKey - AMQMethodEvent response = null; - try + if (_strictAMQP && !((selector == null) || selector.equals(""))) { - response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("Selectors not currently supported by AMQP."); + } + else + { + messageSelector = null; + } } - catch (AMQException e) + else { - throw new JMSAMQException(e); + messageSelector = selector; } - ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); - return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency - } - private void checkTransacted() throws JMSException - { - if (!getTransacted()) - { - throw new IllegalStateException("Session is not transacted"); - } - } + return new FailoverRetrySupport<MessageConsumer, JMSException>( + new FailoverProtectedOperation<MessageConsumer, JMSException>() + { + public MessageConsumer execute() throws JMSException, FailoverException + { + checkNotClosed(); - private void checkNotTransacted() throws JMSException - { - if (getTransacted()) - { - throw new IllegalStateException("Session is transacted"); - } + AMQDestination amqd = (AMQDestination) destination; + + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + // TODO: Define selectors in AMQP + // TODO: construct the rawSelector from the selector string if rawSelector == null + final FieldTable ft = FieldTableFactory.newFieldTable(); + // if (rawSelector != null) + // ft.put("headers", rawSelector.getDataAsBytes()); + if (rawSelector != null) + { + ft.addAll(rawSelector); + } + + BasicMessageConsumer consumer = + new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, + _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, + exclusive, _acknowledgeMode, noConsume, autoClose); + + if (_messageListener != null) + { + consumer.setMessageListener(_messageListener); + } + + try + { + registerConsumer(consumer, false); + } + catch (AMQInvalidArgumentException ise) + { + JMSException ex = new InvalidSelectorException(ise.getMessage()); + ex.setLinkedException(ise); + throw ex; + } + catch (AMQInvalidRoutingKeyException e) + { + JMSException ide = + new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); + ide.setLinkedException(e); + throw ide; + } + catch (AMQException e) + { + JMSException ex = new JMSException("Error registering consumer: " + e); + + if (_logger.isDebugEnabled()) + { + e.printStackTrace(); + } + + ex.setLinkedException(e); + throw ex; + } + + synchronized (destination) + { + _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); + _destinationConsumerCount.get(destination).incrementAndGet(); + } + + return consumer; + } + }, _connection).execute(); } /** - * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto - * the queue read by the dispatcher. + * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer + * instance. * - * @param message the message that has been received + * @param consumer the consum */ - public void messageReceived(UnprocessedMessage message) + void deregisterConsumer(BasicMessageConsumer consumer) { - if (_logger.isDebugEnabled()) + if (_consumers.remove(consumer.getConsumerTag()) != null) { - _logger.debug("Message[" + (message.getDeliverBody() == null ? - "B:" + message.getBounceBody() : "D:" + message.getDeliverBody()) - + "] received in session with channel id " + _channelId); - } + String subscriptionName = _reverseSubscriptionMap.remove(consumer); + if (subscriptionName != null) + { + _subscriptions.remove(subscriptionName); + } - if (message.getDeliverBody() == null) - { - // Return of the bounced message. - returnBouncedMessage(message); - } - else - { - _queue.add(message); + Destination dest = consumer.getDestination(); + synchronized (dest) + { + if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) + { + _destinationConsumerCount.remove(dest); + } + } } } - private void returnBouncedMessage(final UnprocessedMessage message) + void deregisterProducer(long producerId) { - _connection.performConnectionTask( - new Runnable() - { - public void run() - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, - false, - message.getBounceBody().exchange, - message.getBounceBody().routingKey, - message.getContentHeader(), - message.getBodies()); - - AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); - AMQShortString reason = message.getBounceBody().replyText; - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); - } - else - { - _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } + _producers.remove(new Long(producerId)); + } - } - catch (Exception e) - { - _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); - } - } - }); + boolean isInRecovery() + { + return _inRecovery; + } + + boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException + { + return isQueueBound(exchangeName, queueName, null); } /** - * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a - * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar. + * Tests whether or not the specified queue is bound to the specified exchange under a particular routing key. * - * @param deliveryTag the tag of the last message to be acknowledged - * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery - * tag + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param exchangeName The exchange name to test for binding against. + * @param queueName The queue name to check if bound. + * @param routingKey The routing key to check if the queue is bound under. + * + * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not. + * + * @throws JMSException If the query fails for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) + boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - deliveryTag, // deliveryTag - multiple); // multiple - if (_logger.isDebugEnabled()) + try { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + AMQMethodEvent response = + new FailoverRetrySupport<AMQMethodEvent, AMQException>( + new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + AMQFrame boundFrame = + ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), + getProtocolMinorVersion(), exchangeName, // exchange + queueName, // queue + routingKey); // routingKey + + return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + + } + }, _connection).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + return (responseBody.replyCode == 0); + } + catch (AMQException e) + { + throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); } - getProtocolHandler().writeFrame(ackFrame); } - public int getDefaultPrefetch() + /** + * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover + * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex. + */ + void markClosed() { - return _defaultPrefetchHighMark; + _closed.set(true); + _connection.deregisterSession(_channelId); + markClosedProducersAndConsumers(); + } - public int getDefaultPrefetchHigh() + /** + * Resubscribes all producers and consumers. This is called when performing failover. + * + * @throws AMQException + */ + void resubscribe() throws AMQException { - return _defaultPrefetchHighMark; + resubscribeProducers(); + resubscribeConsumers(); } - public int getDefaultPrefetchLow() + void setHasMessageListeners() { - return _defaultPrefetchLowMark; + _hasMessageListeners = true; } - public int getChannelId() + void setInRecovery(boolean inRecovery) { - return _channelId; + _inRecovery = inRecovery; } + /** + * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. + * + * @throws AMQException If the session cannot be started for any reason. + * + * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the + * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages + * for each subsequent call to flow.. only need to do this if we have called stop. + */ void start() throws AMQException { - //fixme This should be controlled by _stopped as it pairs with the stop method - //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. - //will result in sending Flow messages for each subsequent call to flow.. only need to do this - // if we have called stop. + // Check if the session has perviously been started and suspended, in which case it must be unsuspended. if (_startedAtLeastOnce.getAndSet(true)) { - //then we stopped this and are restarting, so signal server to resume delivery suspendChannel(false); } + // If the event dispatcher is not running then start it too. if (hasMessageListeners()) { startDistpatcherIfNecessary(); } } - private boolean hasMessageListeners() - { - return _hasMessageListeners; - } - - void setHasMessageListeners() - { - _hasMessageListeners = true; - } - synchronized void startDistpatcherIfNecessary() { // If IMMEDIATE_PREFETCH is not set then we need to start fetching @@ -2032,7 +1770,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQException e) { - _logger.info("Suspending channel threw an exception:" + e); + _logger.info("Unsuspending channel threw an exception:" + e); } } } @@ -2057,7 +1795,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void stop() throws AMQException { - //stop the server delivering messages to this session + // Stop the server delivering messages to this session. suspendChannel(true); if (_dispatcher != null) @@ -2066,320 +1804,556 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - /** - * Callers must hold the failover mutex before calling this method. + /* + * Binds the named queue, with the specified routing key, to the named exchange. * - * @param consumer + * <p/>Note that this operation automatically retries in the event of fail-over. * - * @throws AMQException + * @param queueName The name of the queue to bind. + * @param routingKey The routing key to bind the queue with. + * @param arguments Additional arguments. + * @param exchangeName The exchange to bind the queue on. + * + * @throws AMQException If the queue cannot be bound for any reason. */ - void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException + /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) + throws AMQException, FailoverException { - AMQDestination amqd = consumer.getDestination(); + AMQFrame queueBind = + QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments + amqd.getExchangeName(), // exchange + false, // nowait + queueName, // queue + amqd.getRoutingKey(), // routingKey + getTicket()); // ticket - AMQProtocolHandler protocolHandler = getProtocolHandler(); - - declareExchange(amqd, protocolHandler, false); - - AMQShortString queueName = declareQueue(amqd, protocolHandler); - - bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - - // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch - if (!_immediatePrefetch) - { - // The dispatcher will be null if we have just created this session - // so suspend the channel before we register our consumer so that we don't - // start prefetching until a receive/mListener is set. - if (_dispatcher == null) - { - if (!isSuspended()) - { - try - { - suspendChannel(true); - _logger.info("Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); - } - catch (AMQException e) - { - _logger.info("Suspending channel threw an exception:" + e); - } - } - } - } - else - { - _logger.info("Immediately prefetching existing messages to new consumer."); - } + protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); + }*/ - try - { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); - } - catch (JMSException e) //thrown by getMessageSelector + private void checkNotTransacted() throws JMSException + { + if (getTransacted()) { - throw new AMQException(e.getMessage(), e); + throw new IllegalStateException("Session is transacted"); } } - /** - * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer - * instance. - * - * @param consumer the consum - */ - void deregisterConsumer(BasicMessageConsumer consumer) + private void checkTemporaryDestination(Destination destination) throws JMSException { - if (_consumers.remove(consumer.getConsumerTag()) != null) + if ((destination instanceof TemporaryDestination)) { - String subscriptionName = _reverseSubscriptionMap.remove(consumer); - if (subscriptionName != null) + _logger.debug("destination is temporary"); + final TemporaryDestination tempDest = (TemporaryDestination) destination; + if (tempDest.getSession() != this) { - _subscriptions.remove(subscriptionName); + _logger.debug("destination is on different session"); + throw new JMSException("Cannot consume from a temporary destination created onanother session"); } - Destination dest = consumer.getDestination(); - synchronized (dest) + if (tempDest.isDeleted()) { - if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) - { - _destinationConsumerCount.remove(dest); - } + _logger.debug("destination is deleted"); + throw new JMSException("Cannot consume from a deleted destination"); } } } - private void registerProducer(long producerId, MessageProducer producer) + private void checkTransacted() throws JMSException { - _producers.put(new Long(producerId), producer); + if (!getTransacted()) + { + throw new IllegalStateException("Session is not transacted"); + } } - void deregisterProducer(long producerId) + private void checkValidDestination(Destination destination) throws InvalidDestinationException { - _producers.remove(new Long(producerId)); + if (destination == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } - private long getNextProducerId() + private void checkValidQueue(Queue queue) throws InvalidDestinationException { - return ++_nextProducerId; + if (queue == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } - /** - * Resubscribes all producers and consumers. This is called when performing failover. - * - * @throws AMQException + /* + * I could have combined the last 3 methods, but this way it improves readability */ - void resubscribe() throws AMQException + private AMQTopic checkValidTopic(Topic topic) throws JMSException { - resubscribeProducers(); - resubscribeConsumers(); + if (topic == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Topic"); + } + + if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this)) + { + throw new javax.jms.InvalidDestinationException( + "Cannot create a subscription on a temporary topic created in another session"); + } + + if (!(topic instanceof AMQTopic)) + { + throw new javax.jms.InvalidDestinationException( + "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + + topic.getClass().getName()); + } + + return (AMQTopic) topic; } - private void resubscribeProducers() throws AMQException + /** + * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. + * + * @param error not null if this is a result of an error occurring at the connection level + */ + private void closeConsumers(Throwable error) throws JMSException { - ArrayList producers = new ArrayList(_producers.values()); - _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey - for (Iterator it = producers.iterator(); it.hasNext();) + if (_dispatcher != null) { - BasicMessageProducer producer = (BasicMessageProducer) it.next(); - producer.resubscribe(); + _dispatcher.close(); + _dispatcher = null; + } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); + + final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); + while (it.hasNext()) + { + final BasicMessageConsumer con = it.next(); + if (error != null) + { + con.notifyError(error); + } + else + { + con.close(); + } } + // at this point the _consumers map will be empty } - private void resubscribeConsumers() throws AMQException + /** + * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is + * currently no way of propagating errors to message producers (this is a JMS limitation). + */ + private void closeProducers() throws JMSException { - ArrayList consumers = new ArrayList(_consumers.values()); - _consumers.clear(); + // we need to clone the list of producers since the close() method updates the _producers collection + // which would result in a concurrent modification exception + final ArrayList clonedProducers = new ArrayList(_producers.values()); - for (Iterator it = consumers.iterator(); it.hasNext();) + final Iterator it = clonedProducers.iterator(); + while (it.hasNext()) { - BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); - registerConsumer(consumer, true); + final BasicMessageProducer prod = (BasicMessageProducer) it.next(); + prod.close(); } + // at this point the _producers map is empty } - private void suspendChannel(boolean suspend) throws AMQException + /** + * Close all producers or consumers. This is called either in the error case or when closing the session normally. + * + * @param amqe the exception, may be null to indicate no error has occurred + */ + private void closeProducersAndConsumers(AMQException amqe) throws JMSException { - synchronized (_suspensionLock) + JMSException jmse = null; + try { - if (_logger.isDebugEnabled()) + closeProducers(); + } + catch (JMSException e) + { + _logger.error("Error closing session: " + e, e); + jmse = e; + } + + try + { + closeConsumers(amqe); + } + catch (JMSException e) + { + _logger.error("Error closing session: " + e, e); + if (jmse == null) { - _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + jmse = e; } + } - _suspended = suspend; - - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - !suspend); // active - - _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + if (jmse != null) + { + throw jmse; } } - - public void confirmConsumerCancelled(AMQShortString consumerTag) + /** + * Register to consume from the queue. + * + * @param queueName + */ + private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { + // need to generate a consumer tag on the client so we can exploit the nowait flag + AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); - // Remove the consumer from the map - BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); - if (consumer != null) + FieldTable arguments = FieldTableFactory.newFieldTable(); + if ((messageSelector != null) && !messageSelector.equals("")) { -// fixme this isn't right.. needs to check if _queue contains data for this consumer - if (consumer.isAutoClose())// && _queue.isEmpty()) - { - consumer.closeWhenNoMessages(true); - } + arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); + } - if (!consumer.isNoConsume()) - { - //Clean the Maps up first - //Flush any pending messages for this consumerTag - if (_dispatcher != null) - { - _logger.info("Dispatcher is not null"); - } - else - { - _logger.info("Dispatcher is null so created stopped dispatcher"); + if (consumer.isAutoClose()) + { + arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); + } - startDistpatcherIfNecessary(true); - } + if (consumer.isNoConsume()) + { + arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); + } - _dispatcher.rejectPending(consumer); + consumer.setConsumerTag(tag); + // we must register the consumer in the map before we actually start listening + _consumers.put(tag, consumer); + + try + { + // TODO: Be aware of possible changes to parameter order as versions change. + AMQFrame jmsConsume = + BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + getTicket()); // ticket + + if (nowait) + { + protocolHandler.writeFrame(jmsConsume); } else { - //Just close the consumer - //fixme the CancelOK is being processed before the arriving messages.. - // The dispatcher is still to process them so the server sent in order but the client - // has yet to receive before the close comes in. - -// consumer.markClosed(); + protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); } } - else + catch (AMQException e) { - _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); + // clean-up the map in the event of an error + _consumers.remove(tag); + throw e; } + } + private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) + throws JMSException + { + return createProducerImpl(destination, mandatory, immediate, false); + } + private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, + final boolean immediate, final boolean waitUntilSent) throws JMSException + { + return new FailoverRetrySupport<BasicMessageProducer, JMSException>( + new FailoverProtectedOperation<BasicMessageProducer, JMSException>() + { + public BasicMessageProducer execute() throws JMSException, FailoverException + { + checkNotClosed(); + long producerId = getNextProducerId(); + BasicMessageProducer producer = + new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, + AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); + registerProducer(producerId, producer); + + return producer; + } + }, _connection).execute(); } - /* - * I could have combined the last 3 methods, but this way it improves readability - */ - private AMQTopic checkValidTopic(Topic topic) throws JMSException + private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { - if (topic == null) - { - throw new javax.jms.InvalidDestinationException("Invalid Topic"); - } - if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this) - { - throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session"); - } - if (!(topic instanceof AMQTopic)) - { - throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); - } - return (AMQTopic) topic; + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } - private void checkValidQueue(Queue queue) throws InvalidDestinationException + /** + * Declares the named exchange and type of exchange. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param name The name of the exchange to declare. + * @param type The type of the exchange to declare. + * @param protocolHandler The protocol handler to process the communication through. + * @param nowait + * + * @throws AMQException If the exchange cannot be declared for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private void declareExchange(final AMQShortString name, final AMQShortString type, + final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException { - if (queue == null) - { - throw new javax.jms.InvalidDestinationException("Invalid Queue"); - } + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + AMQFrame exchangeDeclare = + ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + nowait, // nowait + false, // passive + getTicket(), // ticket + type); // type + + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + + return null; + } + }, _connection).execute(); } - private void checkValidDestination(Destination destination) throws InvalidDestinationException + /** + * Declares a queue for a JMS destination. + * + * <p/>Note that for queues but not topics the name is generated in the client rather than the server. This allows + * the name to be reused on failover if required. In general, the destination indicates whether it wants a name + * generated or not. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param amqd The destination to declare as a queue. + * @param protocolHandler The protocol handler to communicate through. + * + * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of + * the client. + * + * @throws AMQException If the queue cannot be declared for any reason. + * + * @todo Verify the destiation is valid or throw an exception. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + throws AMQException { - if (destination == null) + /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ + return new FailoverNoopSupport<AMQShortString, AMQException>( + new FailoverProtectedOperation<AMQShortString, AMQException>() + { + public AMQShortString execute() throws AMQException, FailoverException + { + // Generate the queue name if the destination indicates that a client generated name is to be used. + if (amqd.isNameRequired()) + { + amqd.setQueueName(protocolHandler.generateQueueName()); + } + + AMQFrame queueDeclare = + QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + false, // nowait + false, // passive + amqd.getAMQQueueName(), // queue + getTicket()); // ticket + + protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); + + return amqd.getAMQQueueName(); + } + }, _connection).execute(); + } + + /** + * Undeclares the specified queue. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param queueName The name of the queue to delete. + * + * @throws JMSException If the queue could not be deleted for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private void deleteQueue(final AMQShortString queueName) throws JMSException + { + try { - throw new javax.jms.InvalidDestinationException("Invalid Queue"); + new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + AMQFrame queueDeleteFrame = + QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + + return null; + } + }, _connection).execute(); + } + catch (AMQException e) + { + throw new JMSAMQException("The queue deletion failed: " + e.getMessage(), e); } } - - public AMQShortString getTemporaryTopicExchangeName() + private long getNextProducerId() { - return _connection.getTemporaryTopicExchangeName(); + return ++_nextProducerId; } - public AMQShortString getTemporaryQueueExchangeName() + private AMQProtocolHandler getProtocolHandler() { - return _connection.getTemporaryQueueExchangeName(); + return _connection.getProtocolHandler(); } - - public int getTicket() + private byte getProtocolMajorVersion() { - return _ticket; + return getProtocolHandler().getProtocolMajorVersion(); } - public void setTicket(int ticket) + private byte getProtocolMinorVersion() { - _ticket = ticket; + return getProtocolHandler().getProtocolMinorVersion(); } + private boolean hasMessageListeners() + { + return _hasMessageListeners; + } - public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException + private void markClosedConsumers() throws JMSException { - getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), - getProtocolMinorVersion(), - active, - exclusive, - passive, - read, - realm, - write), - new BlockingMethodFrameListener(_channelId) - { - - public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException - { - if (frame instanceof AccessRequestOkBody) - { - setTicket(((AccessRequestOkBody) frame).getTicket()); - return true; - } - else - { - return false; - } - } - }); + if (_dispatcher != null) + { + _dispatcher.close(); + _dispatcher = null; + } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); + final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); + while (it.hasNext()) + { + final BasicMessageConsumer con = it.next(); + con.markClosed(); + } + // at this point the _consumers map will be empty } - private class SuspenderRunner implements Runnable + private void markClosedProducersAndConsumers() { - private boolean _suspend; + try + { + // no need for a markClosed* method in this case since there is no protocol traffic closing a producer + closeProducers(); + } + catch (JMSException e) + { + _logger.error("Error closing session: " + e, e); + } - public SuspenderRunner(boolean suspend) + try { - _suspend = suspend; + markClosedConsumers(); } + catch (JMSException e) + { + _logger.error("Error closing session: " + e, e); + } + } - public void run() + /** + * Callers must hold the failover mutex before calling this method. + * + * @param consumer + * + * @throws AMQException + */ + private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException + { + AMQDestination amqd = consumer.getDestination(); + + AMQProtocolHandler protocolHandler = getProtocolHandler(); + + declareExchange(amqd, protocolHandler, false); + + AMQShortString queueName = declareQueue(amqd, protocolHandler); + + // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName()); + + // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch + if (!_immediatePrefetch) { - try - { - suspendChannel(_suspend); - } - catch (AMQException e) + // The dispatcher will be null if we have just created this session + // so suspend the channel before we register our consumer so that we don't + // start prefetching until a receive/mListener is set. + if (_dispatcher == null) { - _logger.warn("Unable to suspend channel"); + if (!isSuspended()) + { + try + { + suspendChannel(true); + _logger.info( + "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); + } + catch (AMQException e) + { + _logger.info("Suspending channel threw an exception:" + e); + } + } } } + else + { + _logger.info("Immediately prefetching existing messages to new consumer."); + } + + try + { + consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); + } + catch (JMSException e) // thrown by getMessageSelector + { + throw new AMQException(e.getMessage(), e); + } + catch (FailoverException e) + { + throw new AMQException("Fail-over exception interrupted basic consume.", e); + } } + private void registerProducer(long producerId, MessageProducer producer) + { + _producers.put(new Long(producerId), producer); + } private void rejectAllMessages(boolean requeue) { @@ -2396,8 +2370,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi Iterator messages = _queue.iterator(); if (_logger.isInfoEnabled()) { - _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + - ") (PDispatchQ) requeue:" + requeue); + _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" + + requeue); if (messages.hasNext()) { @@ -2412,12 +2386,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { UnprocessedMessage message = (UnprocessedMessage) messages.next(); - if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) + if ((consumerTag == null) || message.getDeliverBody().consumerTag.equals(consumerTag)) { if (_logger.isDebugEnabled()) { - _logger.debug("Removing message(" + System.identityHashCode(message) + - ") from _queue DT:" + message.getDeliverBody().deliveryTag); + _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" + + message.getDeliverBody().deliveryTag); } messages.remove(); @@ -2432,50 +2406,361 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - - public void rejectMessage(UnprocessedMessage message, boolean requeue) + private void resubscribeConsumers() throws AMQException { + ArrayList consumers = new ArrayList(_consumers.values()); + _consumers.clear(); - if (_logger.isTraceEnabled()) + for (Iterator it = consumers.iterator(); it.hasNext();) { - _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + registerConsumer(consumer, true); } - - rejectMessage(message.getDeliverBody().deliveryTag, requeue); } - public void rejectMessage(AbstractJMSMessage message, boolean requeue) + private void resubscribeProducers() throws AMQException { - if (_logger.isTraceEnabled()) + ArrayList producers = new ArrayList(_producers.values()); + _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey + for (Iterator it = producers.iterator(); it.hasNext();) { - _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); + BasicMessageProducer producer = (BasicMessageProducer) it.next(); + producer.resubscribe(); } - rejectMessage(message.getDeliveryTag(), requeue); + } + + private void returnBouncedMessage(final UnprocessedMessage message) + { + _connection.performConnectionTask(new Runnable() + { + public void run() + { + try + { + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = + _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange, + message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies()); + + AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); + AMQShortString reason = message.getBounceBody().replyText; + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); + } + else + { + _connection.exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } + } + catch (Exception e) + { + _logger.error( + "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", + e); + } + } + }); } - public void rejectMessage(long deliveryTag, boolean requeue) + /** + * Suspends or unsuspends this session. + * + * @param suspend <tt>true</tt> indicates that the session should be suspended, <tt>false<tt> indicates that it + * should be unsuspended. + * + * @throws AMQException If the session cannot be suspended for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private void suspendChannel(boolean suspend) throws AMQException // , FailoverException { - if (_acknowledgeMode == CLIENT_ACKNOWLEDGE || - _acknowledgeMode == SESSION_TRANSACTED) + synchronized (_suspensionLock) { - if (_logger.isDebugEnabled()) + try { - _logger.debug("Rejecting delivery tag:" + deliveryTag); - } - AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - deliveryTag, - requeue); + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } - _connection.getProtocolHandler().writeFrame(basicRejectBody); + _suspended = suspend; + + AMQFrame channelFlowFrame = + ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + !suspend); + + _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + } + catch (FailoverException e) + { + throw new AMQException("Fail-over interrupted suspend/unsuspend channel.", e); + } } } - public boolean isStrictAMQP() + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ + private class Dispatcher extends Thread { - return _strictAMQP; + + /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private final Object _lock = new Object(); + + public Dispatcher() + { + super("Dispatcher-Channel-" + _channelId); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " created"); + } + } + + public void close() + { + _closed.set(true); + interrupt(); + + // fixme awaitTermination + + } + + public void rejectPending(BasicMessageConsumer consumer) + { + synchronized (_lock) + { + boolean stopped = _dispatcher.connectionStopped(); + + if (!stopped) + { + _dispatcher.setConnectionStopped(true); + } + + // Reject messages on pre-receive queue + consumer.rollback(); + + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + + // closeConsumer + consumer.markClosed(); + + _dispatcher.setConnectionStopped(stopped); + + } + } + + public void rollback() + { + + synchronized (_lock) + { + boolean isStopped = connectionStopped(); + + if (!isStopped) + { + setConnectionStopped(true); + } + + rejectAllMessages(true); + + _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); + + for (BasicMessageConsumer consumer : _consumers.values()) + { + if (!consumer.isNoConsume()) + { + consumer.rollback(); + } + else + { + // should perhaps clear the _SQ here. + // consumer._synchronousQueue.clear(); + consumer.clearReceiveQueue(); + } + + } + + setConnectionStopped(isStopped); + } + + } + + public void run() + { + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " started"); + } + + UnprocessedMessage message; + + // Allow disptacher to start stopped + synchronized (_lock) + { + while (connectionStopped()) + { + try + { + _lock.wait(); + } + catch (InterruptedException e) + { + // ignore + } + } + } + + try + { + while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null)) + { + synchronized (_lock) + { + + while (connectionStopped()) + { + _lock.wait(); + } + + dispatchMessage(message); + + while (connectionStopped()) + { + _lock.wait(); + } + + } + + } + } + catch (InterruptedException e) + { + // ignore + } + + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); + } + } + + // only call while holding lock + final boolean connectionStopped() + { + return _connectionStopped; + } + + boolean setConnectionStopped(boolean connectionStopped) + { + boolean currently; + synchronized (_lock) + { + currently = _connectionStopped; + _connectionStopped = connectionStopped; + _lock.notify(); + + if (_dispatcherLogger.isDebugEnabled()) + { + _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") + + ": Currently " + (currently ? "Stopped" : "Started")); + } + } + + return currently; + } + + private void dispatchMessage(UnprocessedMessage message) + { + if (message.getDeliverBody() != null) + { + final BasicMessageConsumer consumer = + (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); + + if ((consumer == null) || consumer.isClosed()) + { + if (_dispatcherLogger.isInfoEnabled()) + { + if (consumer == null) + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)..."); + } + else + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + + message.getDeliverBody().deliveryTag + "] from queue " + " consumer(" + + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); + } + } + // Don't reject if we're already closing + if (!_closed.get()) + { + rejectMessage(message, true); + } + } + else + { + consumer.notifyMessage(message, _channelId); + } + } + } } + /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, + boolean read) throws AMQException + { + getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), + getProtocolMajorVersion(), getProtocolMinorVersion(), active, exclusive, passive, read, realm, write), + new BlockingMethodFrameListener(_channelId) + { + + public boolean processMethod(int channelId, AMQMethodBody frame) // throws AMQException + { + if (frame instanceof AccessRequestOkBody) + { + setTicket(((AccessRequestOkBody) frame).getTicket()); + + return true; + } + else + { + return false; + } + } + }); + }*/ + + private class SuspenderRunner implements Runnable + { + private boolean _suspend; + + public SuspenderRunner(boolean suspend) + { + _suspend = suspend; + } + + public void run() + { + try + { + suspendChannel(_suspend); + } + catch (AMQException e) + { + _logger.warn("Unable to suspend channel"); + } + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 1c3cdbcb65..3a31eda754 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,22 +20,10 @@ */ package org.apache.qpid.client; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; @@ -48,6 +36,19 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + public class BasicMessageConsumer extends Closeable implements MessageConsumer { private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class); @@ -140,9 +141,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private List<StackTraceElement> _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, - String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -219,7 +220,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " - + _destination); + + _destination); } } else @@ -468,7 +469,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " close():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -481,9 +482,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { // TODO: Be aware of possible changes to parameter order as versions change. final AMQFrame cancelFrame = - BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag - false); // nowait + BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag + false); // nowait try { @@ -497,10 +498,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (AMQException e) { - // _logger.error("Error closing consumer: " + e, e); - JMSException jmse = new JMSException("Error closing consumer: " + e); - jmse.setLinkedException(e); - throw jmse; + throw new JMSAMQException("Error closing consumer: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("FailoverException interrupted basic cancel.", e); } } else @@ -540,7 +542,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " markClosed():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -572,9 +574,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); + _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, + messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, + messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); if (debug) { @@ -659,15 +661,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - break; + case Session.PRE_ACKNOWLEDGE: + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); - break; + case Session.CLIENT_ACKNOWLEDGE: + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + break; } } @@ -677,55 +679,55 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.CLIENT_ACKNOWLEDGE: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } + case Session.DUPS_OK_ACKNOWLEDGE: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } - if (_outstanding <= _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } - if (_dups_ok_acknowledge_send) + if (_dups_ok_acknowledge_send) + { + if (!_session.isInRecovery()) { - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); - } + _session.acknowledgeMessage(msg.getDeliveryTag(), true); } + } - break; + break; - case Session.AUTO_ACKNOWLEDGE: - // we do not auto ack a message if the application code called recover() - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.AUTO_ACKNOWLEDGE: + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - _receivedDeliveryTags.add(msg.getDeliveryTag()); - } + case Session.SESSION_TRANSACTED: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _receivedDeliveryTags.add(msg.getDeliveryTag()); + } - break; + break; } } @@ -757,7 +759,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " notifyError():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously" + _closedStack.toString()); } else @@ -817,7 +819,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void acknowledge() throws JMSException + public void acknowledge() // throws JMSException { if (!isClosed()) { @@ -877,7 +879,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" - + "for consumer with tag:" + _consumerTag); + + "for consumer with tag:" + _consumerTag); } Long tag = _receivedDeliveryTags.poll(); @@ -907,7 +909,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" - + "for consumer with tag:" + _consumerTag); + + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); @@ -931,7 +933,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer else { _logger.error("Queue contained a :" + o.getClass() - + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); iterator.remove(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java index d1237cff49..0927ca3625 100644 --- a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java +++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java @@ -18,29 +18,12 @@ * under the License. * */ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ package org.apache.qpid.client; -import javax.jms.JMSException; - import org.apache.qpid.AMQException; +import javax.jms.JMSException; + /** * JMSException does not accept wrapped exceptions in its constructor. Presumably this is because it is a relatively old * Java exception class, before this was added as a default to Throwable. This exception class accepts wrapped exceptions @@ -50,8 +33,6 @@ import org.apache.qpid.AMQException; * <tr><th> Responsibilities <th> Collaborations * <tr><td> Accept wrapped exceptions as a JMSException. * </table> - * - * @author Apache Software Foundation */ public class JMSAMQException extends JMSException { @@ -71,6 +52,11 @@ public class JMSAMQException extends JMSException } } + /** + * @param s The underlying exception. + * + * @deprecated Use the other constructor and write a helpfull message. This one will be deleted. + */ public JMSAMQException(AMQException s) { super(s.getMessage(), String.valueOf(s.getErrorCode())); diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java index 49377fdc19..037b0dc2d1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,10 +21,26 @@ package org.apache.qpid.client.failover; /** - * This exception is thrown when failover is taking place and we need to let other - * parts of the client know about this. + * FailoverException is used to indicate that a synchronous request has failed to receive the reply that it is waiting + * for because the fail-over process has been started whilst it was waiting for its reply. Synchronous methods generally + * raise this exception to indicate that they must be re-tried once the fail-over process has completed. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Used to indicate failure of a synchronous request due to fail-over. + * </table> + * + * @todo This exception is created and passed as an argument to a method, rather than thrown. The exception is being + * used to represent an event, passed out to other threads. Use of exceptions as arguments rather than as + * exceptions is extremly confusing. Ideally use a condition or set a flag and check it instead. + * This exceptions-as-events pattern seems to be in a similar style to Mina code, which is not pretty, but + * potentially acceptable for that reason. We have the option of extending the mina model to add more events + * to it, that is, anything that is interested in handling failover as an event occurs below the main + * amq event handler, which knows the specific interface of the qpid handlers, which can pass this down as + * an explicit event, without it being an exception. Add failover method to BlockingMethodFrameListener, + * have it set a flag or interrupt the waiting thread, which then creates and raises this exception. */ -public class FailoverException extends RuntimeException +public class FailoverException extends Exception { public FailoverException(String message) { diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 844ecbe743..dbbceff523 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,59 +20,108 @@ */ package org.apache.qpid.client.failover; -import java.util.concurrent.CountDownLatch; - import org.apache.log4j.Logger; + import org.apache.mina.common.IoSession; + import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; +import java.util.concurrent.CountDownLatch; + /** - * When failover is required, we need a separate thread to handle the establishment of the new connection and - * the transfer of subscriptions. - * </p> - * The reason this needs to be a separate thread is because you cannot do this work inside the MINA IO processor - * thread. One significant task is the connection setup which involves a protocol exchange until a particular state - * is achieved. However if you do this in the MINA thread, you have to block until the state is achieved which means - * the IO processor is not able to do anything at all. + * FailoverHandler is a continuation that performs the failover procedure on a protocol session. As described in the + * class level comment for {@link AMQProtocolHandler}, a protocol connection can span many physical transport + * connections, failing over to a new connection if the transport connection fails. The procedure to establish a new + * connection is expressed as a continuation, in order that it may be run in a seperate thread to the i/o thread that + * detected the failure and is used to handle the communication to establish a new connection. + * + * </p>The reason this needs to be a separate thread is because this work cannot be done inside the i/o processor + * thread. The significant task is the connection setup which involves a protocol exchange until a particular state + * is achieved. This procedure waits until the state is achieved which would prevent the i/o thread doing the work + * it needs to do to achieve the new state. + * + * <p/>The failover procedure does the following: + * + * <ol> + * <li>Sets the failing over condition to true.</li> + * <li>Creates a {@link FailoverException} and gets the protocol connection handler to propagate this event to all + * interested parties.</li> + * <li>Takes the failover mutex on the protocol connection handler.</li> + * <li>Abandons the fail over if any of the interested parties vetoes it. The mutex is released and the condition + * reset.</li> + * <li>Creates a new {@link AMQStateManager} and re-established the connection through it.</li> + * <li>Informs the AMQConnection if the connection cannot be re-established.</li> + * <li>Recreates all sessions from the old connection to the new.</li> + * <li>Resets the failing over condition and releases the mutex.</li> + * </ol> + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Update fail-over state <td> {@link AMQProtocolHandler} + * </table> + * + * @todo The failover latch and mutex are used like a lock and condition. If the retrotranlator supports lock/condition + * then could change over to using them. 1.4 support still needed. + * + * @todo If the condition is set to null on a vetoes fail-over and there are already other threads waiting on the + * condition, they will never be released. It might be an idea to reset the condition in a finally block. + * + * @todo Creates a {@link AMQDisconnectedException} and passes it to the AMQConnection. No need to use an + * exception-as-argument here, could just as easily call a specific method for this purpose on AMQConnection. + * + * @todo Creates a {@link FailoverException} and propagates it to the MethodHandlers. No need to use an + * exception-as-argument here, could just as easily call a specific method for this purpose on + * {@link org.apache.qpid.protocol.AMQMethodListener}. */ public class FailoverHandler implements Runnable { + /** Used for debugging. */ private static final Logger _logger = Logger.getLogger(FailoverHandler.class); + /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */ private final IoSession _session; + + /** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */ private AMQProtocolHandler _amqProtocolHandler; - /** - * Used where forcing the failover host - */ + /** Used to hold the host to fail over to. This is optional and if not set a reconnect to the previous host is tried. */ private String _host; - /** - * Used where forcing the failover port - */ + /** Used to hold the port to fail over to. */ private int _port; + /** + * Creates a failover handler on a protocol session, for a particular MINA session (network connection). + * + * @param amqProtocolHandler The protocol handler that spans the failover. + * @param session The MINA session, for the failing connection. + */ public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session) { _amqProtocolHandler = amqProtocolHandler; _session = session; } + /** + * Performs the failover procedure. See the class level comment, {@link FailoverHandler}, for a description of the + * failover procedure. + */ public void run() { if (Thread.currentThread().isDaemon()) { throw new IllegalStateException("FailoverHandler must run on a non-daemon thread."); } - //Thread.currentThread().setName("Failover Thread"); + // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of + // the fail over. _amqProtocolHandler.setFailoverLatch(new CountDownLatch(1)); // We wake up listeners. If they can handle failover, they will extend the - // FailoverSupport class and will in turn block on the latch until failover - // has completed before retrying the operation + // FailoverRetrySupport class and will in turn block on the latch until failover + // has completed before retrying the operation. _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start")); // Since failover impacts several structures we protect them all with a single mutex. These structures @@ -93,14 +142,18 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setStateManager(existingStateManager); if (_host != null) { - _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( + "Redirect was vetoed by client")); } else { - _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( + "Failover was vetoed by client")); } + _amqProtocolHandler.getFailoverLatch().countDown(); _amqProtocolHandler.setFailoverLatch(null); + return; } @@ -119,12 +172,12 @@ public class FailoverHandler implements Runnable { failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(); } + if (!failoverSucceeded) { _amqProtocolHandler.setStateManager(existingStateManager); - _amqProtocolHandler.getConnection().exceptionReceived( - new AMQDisconnectedException("Server closed connection and no failover " + - "was successful")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( + "Server closed connection and no failover " + "was successful")); } else { @@ -140,6 +193,7 @@ public class FailoverHandler implements Runnable { _logger.info("Client vetoed automatic resubscription"); } + _amqProtocolHandler.getConnection().fireFailoverComplete(); _amqProtocolHandler.setFailoverState(FailoverState.NOT_STARTED); _logger.info("Connection failover completed successfully"); @@ -148,35 +202,36 @@ public class FailoverHandler implements Runnable { _logger.info("Failover process failed - exception being propagated by protocol handler"); _amqProtocolHandler.setFailoverState(FailoverState.FAILED); - try - { - _amqProtocolHandler.exceptionCaught(_session, e); - } + /*try + {*/ + _amqProtocolHandler.exceptionCaught(_session, e); + /*} catch (Exception ex) { _logger.error("Error notifying protocol session of error: " + ex, ex); - } + }*/ } } } - _amqProtocolHandler.getFailoverLatch().countDown(); - } - public String getHost() - { - return _host; + _amqProtocolHandler.getFailoverLatch().countDown(); } + /** + * Sets the host name to fail over to. This is optional and if not set a reconnect to the previous host is tried. + * + * @param host The host name to fail over to. + */ public void setHost(String host) { _host = host; } - public int getPort() - { - return _port; - } - + /** + * Sets the port to fail over to. + * + * @param port The port to fail over to. + */ public void setPort(int port) { _port = port; diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java new file mode 100644 index 0000000000..dece1b6c3f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java @@ -0,0 +1,54 @@ +package org.apache.qpid.client.failover;
+
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * FailoverNoopSupport is a {@link FailoverSupport} implementation that does not really provide any failover support
+ * at all. It wraps a {@link FailoverProtectedOperation} but should that operation throw {@link FailoverException} this
+ * support class simply re-raises that exception as an IllegalStateException. This support wrapper should only be
+ * used where the caller can be certain that the failover protected operation cannot acutally throw a failover exception,
+ * for example, because the caller already holds locks preventing that condition from arising.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Perform a fail-over protected operation with no handling of fail-over conditions.
+ * </table>
+ */
+public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupport<T, E>
+{
+ /** The protected operation that is to be retried in the event of fail-over. */
+ FailoverProtectedOperation<T, E> operation;
+
+ /** The connection on which the fail-over protected operation is to be performed. */
+ AMQConnection connection;
+
+ /**
+ * Creates an automatic retrying fail-over handler for the specified operation.
+ *
+ * @param operation The fail-over protected operation to wrap in this handler.
+ */
+ public FailoverNoopSupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
+ {
+ this.operation = operation;
+ this.connection = con;
+ }
+
+ /**
+ * Delegates to another continuation which is to be provided with fail-over handling.
+ *
+ * @return The return value from the delegated to continuation.
+ * @throws E Any exception that the delegated to continuation may raise.
+ */
+ public T execute() throws E
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ throw new IllegalStateException("Fail-over interupted no-op failover support. "
+ + "No-op support should only be used where the caller is certaing fail-over cannot occur.", e);
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java new file mode 100644 index 0000000000..efb7bf8aed --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java @@ -0,0 +1,30 @@ +package org.apache.qpid.client.failover;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * FailoverProtectedOperation is a continuation for an operation that may throw a {@link FailoverException} because
+ * it has been interrupted by the fail-over process. The {@link FailoverRetrySupport} class defines support wrappers
+ * for failover protected operations, in order to provide different handling schemes when failovers occurr.
+ *
+ * <p/>The type of checked exception that the operation may perform has been generified, in order that fail over
+ * protected operations can be defined that raise arbitrary exceptions. The actuall exception types used should not
+ * be sub-classes of FailoverException, or else catching FailoverException in the {@link FailoverRetrySupport} classes
+ * will mask the exception.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Perform an operation that may be interrupted by fail-over.
+ * </table>
+ */
+public interface FailoverProtectedOperation<T, E extends Exception>
+{
+ /**
+ * Performs the continuations work.
+ *
+ * @return Provdes scope for the continuation to return an arbitrary value.
+ *
+ * @throws FailoverException If the operation is interrupted by a fail-over notification.
+ */
+ public abstract T execute() throws E, FailoverException;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java new file mode 100644 index 0000000000..1e4908976b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -0,0 +1,130 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+
+import javax.jms.JMSException;
+
+/**
+ * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified
+ * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due
+ * to the original condition being broken, whilst the continuation is waiting for a reponse to a synchronous request,
+ * FailoverRetrySupport automatcally rechecks the condition and re-acquires the mutex and re-runs the continution. This
+ * automatic retrying is continued until the continuation succeeds, or throws an exception (different to
+ * FailoverException, which is used to signal the failure of the original condition).
+ *
+ * <p/>The blocking condition used is that the connection is not currently failing over, and the mutex used is the
+ * connection failover mutex, which guards against the fail-over process being run during fail-over vulnerable methods.
+ * These are used like a lock and condition variable.
+ *
+ * <p/>The wrapped operation may throw a FailoverException, this is an exception that can be raised by a
+ * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener}, in response to it being notified that a
+ * fail-over wants to start whilst it was waiting. Methods that are vulnerable to fail-over are those that are
+ * synchronous, where a failure will prevent them from getting the reply they are waiting for and asynchronous
+ * methods that should not be attempted when a fail-over is in progress.
+ *
+ * <p/>Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be
+ * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want
+ * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation
+ * until it succeeds. Synchronous methods are usually coordinated with a
+ * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants
+ * to start and throws a FailoverException in response to this.
+ *
+ * <p/>Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be
+ * started during fail-over, but be delayed until any current fail-over has completed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a continuation synchronized on a fail-over lock and condition.
+ * <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
+ * </table>
+ *
+ * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
+ * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
+ * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
+ * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
+ * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
+ *
+ * @todo InterruptedException not handled well.
+ */
+public class FailoverRetrySupport<T, E extends Exception> implements FailoverSupport<T, E>
+{
+ /** Used for debugging. */
+ private static final Logger _log = Logger.getLogger(FailoverRetrySupport.class);
+
+ /** The protected operation that is to be retried in the event of fail-over. */
+ FailoverProtectedOperation<T, E> operation;
+
+ /** The connection on which the fail-over protected operation is to be performed. */
+ AMQConnection connection;
+
+ /**
+ * Creates an automatic retrying fail-over handler for the specified operation.
+ *
+ * @param operation The fail-over protected operation to wrap in this handler.
+ */
+ public FailoverRetrySupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
+ {
+ this.operation = operation;
+ this.connection = con;
+ }
+
+ /**
+ * Delays a continuation until the "not failing over" condition is met on the specified connection. Repeats
+ * until the operation throws AMQException or succeeds without being interrupted by fail-over.
+ *
+ * @return The result of executing the continuation.
+ *
+ * @throws AMQException Any underlying exception is allowed to fall through.
+ */
+ public T execute() throws E
+ {
+ while (true)
+ {
+ try
+ {
+ connection.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ _log.debug("Interrupted: " + e, e);
+
+ return null;
+ }
+
+ synchronized (connection.getFailoverMutex())
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ _log.debug("Failover exception caught during operation: " + e, e);
+ }
+ }
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java index a005bc5fdf..41bac34a34 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java @@ -1,65 +1,28 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.client.failover; -import javax.jms.JMSException; - -import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; -public abstract class FailoverSupport +/** + * FailoverSupport defines an interface for different types of fail-over handlers, that provide different types of + * behaviour for handling fail-overs during operations that can be interrupted by the fail-over process. For example, + * the support could automatically retry once the fail-over process completes, could prevent an operation from being + * started whilst fail-over is running, or could quietly abandon the operation or raise an exception, and so on. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities + * <tr><td> Perform a fail-over protected operation with handling for fail-over conditions. + * </table> + * + * @todo Continuation, extend some sort of re-usable Continuation interface, which might look very like this one. + */ +public interface FailoverSupport<T, E extends Exception> { - private static final Logger _log = Logger.getLogger(FailoverSupport.class); - - public Object execute(AMQConnection con) throws JMSException - { - // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding. - // Any method that can potentially block for any reason should use this class so that deadlock will not - // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners) - // that might be causing a block. When that happens, the exception is caught here and the mutex is released - // before waiting for the failover to complete (either successfully or unsuccessfully). - while (true) - { - try - { - con.blockUntilNotFailingOver(); - } - catch (InterruptedException e) - { - _log.info("Interrupted: " + e, e); - return null; - } - synchronized (con.getFailoverMutex()) - { - try - { - return operation(); - } - catch (FailoverException e) - { - _log.info("Failover exception caught during operation: " + e, e); - } - } - } - } - - protected abstract Object operation() throws JMSException; + /** + * Delegates to another continuation which is to be provided with fail-over handling. + * + * @return The return value from the delegated to continuation. + * + * @throws E Any exception that the delegated to continuation may raise. + */ + public T execute() throws E; } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index addef94215..5bf7bffc63 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,24 +20,22 @@ */ package org.apache.qpid.client.protocol; -import java.util.Iterator; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - import org.apache.log4j.Logger; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; + import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.SSLConfiguration; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; import org.apache.qpid.client.state.AMQState; @@ -60,22 +58,86 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.SSLContextFactory; +import java.util.Iterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +/** + * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the + * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the + * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the + * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, + * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in + * terms of "message received" and so on. + * + * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is + * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public + * API calls through which an individual connection can be manipulated. This protocol handler talks to the network + * through MINA, in a behind the scenes role; it is not an exposed part of the client API. + * + * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level, + * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in + * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol + * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions + * in the event of failover. See below for more information about this. + * + * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named + * attributes. A more convenient, type-safe, container for session data is provided in the form of + * {@link AMQProtocolSession}. + * + * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session + * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper + * as described above). This event handler is different, because dealing with failover complicates things. To the + * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but + * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot + * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old + * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection + * and the protocol session data is held outside of the MINA IOSession. + * + * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through. + * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from + * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, + * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Create the filter chain to filter this handlers events. + * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. + * + * <tr><td> Maintain fail-over state. + * <tr><td> + * </table> + * + * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the + * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing + * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec + * filter before it mean not doing the read/write asynchronously but in the main filter thread? + * + * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including + * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of + * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could + * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * that lifecycles of the fields match lifecycles of their containing objects. + */ public class AMQProtocolHandler extends IoHandlerAdapter { + /** Used for debugging. */ private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); /** - * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances - * and protocol handler instances. + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection + * instances and protocol handler instances. */ private AMQConnection _connection; /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ private volatile AMQProtocolSession _protocolSession; + /** Holds the state of the protocol session. */ private AMQStateManager _stateManager = new AMQStateManager(); + /** Holds the method listeners, */ private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); /** @@ -91,15 +153,31 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ private FailoverState _failoverState = FailoverState.NOT_STARTED; + /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; + /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + /** + * Creates a new protocol handler, associated with the specified client connection instance. + * + * @param con The client connection that this is the event handler for. + */ public AMQProtocolHandler(AMQConnection con) { _connection = con; } + /** + * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the + * session, which filters the events handled by this handler. The filter chain consists of, handing off events + * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP. + * + * @param session The MINA session. + * + * @throws Exception Any underlying exceptions are allowed to fall through to MINA. + */ public void sessionCreated(IoSession session) throws Exception { _logger.debug("Protocol session created for session " + System.identityHashCode(session)); @@ -119,16 +197,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (_connection.getSSLConfiguration() != null) { SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + SSLContextFactory sslFactory = + new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); sslFilter.setUseClientMode(true); session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); } - try { - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); threadModel.getAsynchronousReadFilter().createNewJobForSession(session); threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); @@ -142,35 +219,38 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.init(); } - public void sessionOpened(IoSession session) throws Exception - { - //System.setProperty("foo", "bar"); - } - /** - * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by - * sessionClosed() depending on whether we were trying to send data at the time of failure. + * Called when the network connection is closed. This can happen, either because the client explicitly requested + * that the connection be closed, in which case nothing is done, or because the connection died. In the case + * where the connection died, an attempt to failover automatically to a new connection may be started. The failover + * process will be started, provided that it is the clients policy to allow failover, and provided that a failover + * has not already been started or failed. + * + * <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught} + * may be called first followed by this method. This depends on whether the client was trying to send data at the + * time of the failure. * - * @param session + * @param session The MINA session. * - * @throws Exception + * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and + * not otherwise? The above comment doesn't make that clear. */ - public void sessionClosed(IoSession session) throws Exception + public void sessionClosed(IoSession session) { if (_connection.isClosed()) { - _logger.info("Session closed called by client"); + _logger.debug("Session closed called by client"); } else { - _logger.info("Session closed called with failover state currently " + _failoverState); + _logger.debug("Session closed called with failover state currently " + _failoverState); - //reconnetablility was introduced here so as not to disturb the client as they have made their intentions + // reconnetablility was introduced here so as not to disturb the client as they have made their intentions // known through the policy settings. if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) { - _logger.info("FAILOVER STARTING"); + _logger.debug("FAILOVER STARTING"); if (_failoverState == FailoverState.NOT_STARTED) { _failoverState = FailoverState.IN_PROGRESS; @@ -178,12 +258,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter } else { - _logger.info("Not starting failover as state currently " + _failoverState); + _logger.debug("Not starting failover as state currently " + _failoverState); } } else { - _logger.info("Failover not allowed by policy."); + _logger.debug("Failover not allowed by policy."); // or already in progress? if (_logger.isDebugEnabled()) { @@ -192,19 +272,18 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (_failoverState != FailoverState.IN_PROGRESS) { - _logger.info("sessionClose() not allowed to failover"); - _connection.exceptionReceived( - new AMQDisconnectedException("Server closed connection and reconnection " + - "not permitted.")); + _logger.debug("sessionClose() not allowed to failover"); + _connection.exceptionReceived(new AMQDisconnectedException( + "Server closed connection and reconnection " + "not permitted.")); } else { - _logger.info("sessionClose() failover in progress"); + _logger.debug("sessionClose() failover in progress"); } } } - _logger.info("Protocol Session [" + this + "] closed"); + _logger.debug("Protocol Session [" + this + "] closed"); } /** See {@link FailoverHandler} to see rationale for separate thread. */ @@ -223,25 +302,32 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); if (IdleStatus.WRITER_IDLE.equals(status)) { - //write heartbeat frame: + // write heartbeat frame: _logger.debug("Sent heartbeat"); session.write(HeartbeatBody.FRAME); HeartbeatDiagnostics.sent(); } else if (IdleStatus.READER_IDLE.equals(status)) { - //failover: + // failover: HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); session.close(); } } - public void exceptionCaught(IoSession session, Throwable cause) throws Exception + /** + * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an + * IOException, MINA will close the connection automatically. + * + * @param session The MINA session. + * @param cause The exception that triggered this event. + */ + public void exceptionCaught(IoSession session, Throwable cause) { if (_failoverState == FailoverState.NOT_STARTED) { - //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) + // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) if (cause instanceof AMQConnectionClosedException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); @@ -250,8 +336,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter sessionClosed(session); } - //FIXME Need to correctly handle other exceptions. Things like ... -// if (cause instanceof AMQChannelClosedException) + // FIXME Need to correctly handle other exceptions. Things like ... + // if (cause instanceof AMQChannelClosedException) // which will cause the JMSSession to end due to a channel close and so that Session needs // to be removed from the map so we can correctly still call close without an exception when trying to close // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception @@ -261,6 +347,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter else if (_failoverState == FailoverState.FAILED) { _logger.error("Exception caught by protocol handler: " + cause, cause); + // we notify the state manager of the error in case we have any clients waiting on a state // change. Those "waiters" will be interrupted and can handle the exception AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); @@ -297,7 +384,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter final boolean debug = _logger.isDebugEnabled(); final long msgNumber = ++_messageReceivedCount; - if (debug && (msgNumber % 1000 == 0)) + if (debug && ((msgNumber % 1000) == 0)) { _logger.debug("Received " + _messageReceivedCount + " protocol messages"); } @@ -310,72 +397,77 @@ public class AMQProtocolHandler extends IoHandlerAdapter switch (bodyFrame.getFrameType()) { - case AMQMethodBody.TYPE: + case AMQMethodBody.TYPE: - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } + if (debug) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); + } - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); + final AMQMethodEvent<AMQMethodBody> evt = + new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); - try - { + try + { - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - if (!wasAnyoneInterested) + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } - catch (AMQException e) + + if (!wasAnyoneInterested) { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners); + } + } + catch (AMQException e) + { + getStateManager().error(e); + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); } - exceptionCaught(session, e); } - break; - case ContentHeaderBody.TYPE: + exceptionCaught(session, e); + } - _protocolSession.messageContentHeaderReceived(frame.getChannel(), - (ContentHeaderBody) bodyFrame); - break; + break; - case ContentBody.TYPE: + case ContentHeaderBody.TYPE: - _protocolSession.messageContentBodyReceived(frame.getChannel(), - (ContentBody) bodyFrame); - break; + _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); + break; - case HeartbeatBody.TYPE: + case ContentBody.TYPE: - if (debug) - { - _logger.debug("Received heartbeat"); - } - break; + _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); + break; + + case HeartbeatBody.TYPE: + + if (debug) + { + _logger.debug("Received heartbeat"); + } + + break; - default: + default: } + _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } @@ -387,10 +479,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter final boolean debug = _logger.isDebugEnabled(); - if (debug && (sentMessages % 1000 == 0)) + if (debug && ((sentMessages % 1000) == 0)) { _logger.debug("Sent " + _messagesOut + " protocol messages"); } + _connection.bytesSent(session.getWrittenBytes()); if (debug) { @@ -408,7 +501,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _frameListeners.remove(listener); } - */ + */ public void attainState(AMQState s) throws AMQException { getStateManager().attainState(s); @@ -437,9 +530,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param frame * @param listener the blocking listener. Note the calling thread will block. */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener) - throws AMQException + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener) + throws AMQException, FailoverException { return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); } @@ -451,9 +543,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param frame * @param listener the blocking listener. Note the calling thread will block. */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener, long timeout) - throws AMQException + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener, + long timeout) throws AMQException, FailoverException { try { @@ -461,9 +552,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.writeFrame(frame); AMQMethodEvent e = listener.blockForFrame(timeout); + return e; - // When control resumes before this line, a reply will have been received - // that matches the criteria defined in the blocking listener + // When control resumes before this line, a reply will have been received + // that matches the criteria defined in the blocking listener } catch (AMQException e) { @@ -478,25 +570,33 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException + public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException { return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); } /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException + public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException { - return writeCommandFrameAndWaitForReply(frame, - new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout); + return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass), + timeout); } - - public void closeSession(AMQSession session) throws AMQException { _protocolSession.closeSession(session); } + /** + * Closes the connection. + * + * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed + * anyway. + * + * @param timeout The timeout to wait for an acknowledgement to the close request. + * + * @throws AMQException If the close fails for any reason. + */ public void closeConnection(long timeout) throws AMQException { getStateManager().changeState(AMQState.CONNECTION_CLOSING); @@ -504,13 +604,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, - _protocolSession.getProtocolMajorVersion(), - _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection.")); // replyText + final AMQFrame frame = + ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(), + _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client is closing the connection.")); // replyText try { @@ -521,8 +621,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _protocolSession.closeProtocolSession(false); } - - + catch (FailoverException e) + { + _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + } } /** @return the number of bytes read from this protocol session */ @@ -604,7 +706,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter return _protocolSession.getProtocolMajorVersion(); } - public byte getProtocolMinorVersion() { return _protocolSession.getProtocolMinorVersion(); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 85f98eab69..86db9d5859 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,71 +27,137 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +/** + * BlockingMethodFrameListener is a 'rendezvous' which acts as a {@link AMQMethodListener} that delegates handling of + * incoming methods to a method listener implemented as a sub-class of this and hands off the processed method or + * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this + * differs from a 'rendezvous' in that sense. + * + * <p/>BlockingMethodFrameListeners are used to coordinate waiting for replies to method calls that expect a response. + * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register + * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they + * have been completed. + * + * <p/>The {@link #processMethod} must return <tt>true</tt> on any incoming method that it handles. This indicates to + * this listeners that the method it is waiting for has arrived. Incoming methods are also filtered by channel prior to + * being passed to the {@link #processMethod} method, so responses are only received for a particular channel. The + * channel id must be passed to the constructor. + * + * <p/>Errors from the producer are rethrown to the consumer. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Accept notification of AMQP method events. <td> {@link AMQMethodEvent} + * <tr><td> Delegate handling of the method to another method listener. <td> {@link AMQMethodBody} + * <tr><td> Block until a method is handled by the delegated to handler. + * <tr><td> Propagate the most recent exception to the consumer. + * </table> + * + * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a + * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations + * seem to use it. So wrapping the listeners is possible. + * + * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener, + * overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot + * behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for + * method has been received. + * + * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull + * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry + * when this happens. At the very least, restore the interrupted status flag. + * + * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to + * check that SynchronousQueue has a non-blocking put method available. + */ public abstract class BlockingMethodFrameListener implements AMQMethodListener { + /** This flag is used to indicate that the blocked for method has been received. */ private volatile boolean _ready = false; - public abstract boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException; - + /** Used to protect the shared event and ready flag between the producer and consumer. */ private final Object _lock = new Object(); - /** - * This is set if there is an exception thrown from processCommandFrame and the - * exception is rethrown to the caller of blockForFrame() - */ + /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ private volatile Exception _error; + /** Holds the channel id for the channel upon which this listener is waiting for a response. */ protected int _channelId; + /** Holds the incoming method. */ protected AMQMethodEvent _doneEvt = null; + /** + * Creates a new method listener, that filters incoming method to just those that match the specified channel id. + * + * @param channelId The channel id to filter incoming methods with. + */ public BlockingMethodFrameListener(int channelId) { _channelId = channelId; } /** - * This method is called by the MINA dispatching thread. Note that it could - * be called before blockForFrame() has been called. + * Delegates any additional handling of the incoming methods to another handler. * - * @param evt the frame event - * @return true if the listener has dealt with this frame - * @throws AMQException + * @param channelId The channel id of the incoming method. + * @param frame The method body. + * + * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise. */ - public boolean methodReceived(AMQMethodEvent evt) throws AMQException + public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException; + + /** + * Informs this listener that an AMQP method has been received. + * + * @param evt The AMQP method. + * + * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise. + */ + public boolean methodReceived(AMQMethodEvent evt) // throws AMQException { AMQMethodBody method = evt.getMethod(); - try + /*try + {*/ + boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); + + if (ready) { - boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); - if (ready) + // we only update the flag from inside the synchronized block + // so that the blockForFrame method cannot "miss" an update - it + // will only ever read the flag from within the synchronized block + synchronized (_lock) { - // we only update the flag from inside the synchronized block - // so that the blockForFrame method cannot "miss" an update - it - // will only ever read the flag from within the synchronized block - synchronized (_lock) - { - _doneEvt = evt; - _ready = ready; - _lock.notify(); - } + _doneEvt = evt; + _ready = ready; + _lock.notify(); } - return ready; } + + return ready; + + /*} catch (AMQException e) { error(e); // we rethrow the error here, and the code in the frame dispatcher will go round // each listener informing them that an exception has been thrown throw e; - } + }*/ } /** - * This method is called by the thread that wants to wait for a frame. + * Blocks until a method is received that is handled by the delegated to method listener, or the specified timeout + * has passed. + * + * @param timeout The timeout in milliseconds. + * + * @return The AMQP method that was received. + * + * @throws AMQException + * @throws FailoverException */ - public AMQMethodEvent blockForFrame(long timeout) throws AMQException + public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException { synchronized (_lock) { @@ -117,24 +183,25 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener catch (InterruptedException e) { // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess -// if (!_ready && timeout != -1) -// { -// _error = new AMQException("Server did not respond timely"); -// _ready = true; -// } + // if (!_ready && timeout != -1) + // { + // _error = new AMQException("Server did not respond timely"); + // _ready = true; + // } } } } + if (_error != null) { if (_error instanceof AMQException) { - throw(AMQException) _error; + throw (AMQException) _error; } else if (_error instanceof FailoverException) { - // This should ensure that FailoverException is not wrapped and can be caught. - throw(FailoverException) _error; // needed to expose FailoverException. + // This should ensure that FailoverException is not wrapped and can be caught. + throw (FailoverException) _error; // needed to expose FailoverException. } else { @@ -156,6 +223,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener // set the error so that the thread that is blocking (against blockForFrame()) // can pick up the exception and rethrow to the caller _error = e; + synchronized (_lock) { _ready = true; diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index 1c70ded62a..623591e0b6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -34,7 +34,7 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener _expectedClass = expectedClass; } - public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException + public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException { return _expectedClass.isInstance(frame); } diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 642b928d81..0fc39a9318 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,16 +20,17 @@ */ package org.apache.qpid.client.util; - +import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.Iterator; /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single * thread adding items and a single (different) thread removing items. + * + * @todo Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted. */ public class FlowControllingBlockingQueue { @@ -81,6 +82,7 @@ public class FlowControllingBlockingQueue } } } + return o; } @@ -104,4 +106,3 @@ public class FlowControllingBlockingQueue return _queue.iterator(); } } - diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 51bbe7d0e6..c201e88104 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -14,42 +14,43 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.test.unit.client.channelclose; import junit.framework.TestCase; -import javax.jms.Connection; -import javax.jms.Session; - -import javax.jms.JMSException; -import javax.jms.ExceptionListener; -import javax.jms.MessageProducer; -import javax.jms.MessageConsumer; -import javax.jms.Message; -import javax.jms.TextMessage; -import javax.jms.Queue; +import org.apache.log4j.Logger; -import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.jms.ConnectionListener; -import org.apache.log4j.Logger; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener { @@ -73,15 +74,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con TransportConnection.killAllVMBrokers(); } - /* close channel, use chanel with same id ensure error. - */ - public void testReusingChannelAfterFullClosure() + */ + public void testReusingChannelAfterFullClosure() throws Exception { _connection = newConnection(); - //Create Producer + // Create Producer try { _connection.start(); @@ -113,6 +113,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con { _logger.info("Exception occured was:" + e.getErrorCode()); } + assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); _connection = newConnection(); @@ -134,29 +135,27 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con /* close channel and send guff then send ok no errors */ - public void testSendingMethodsAfterClose() + public void testSendingMethodsAfterClose() throws Exception { try { - _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" - + _brokerlist + "'"); + _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); ((AMQConnection) _connection).setConnectionListener(this); - _connection.setExceptionListener(this); - //Change the StateManager for one that doesn't respond with Close-OKs + // Change the StateManager for one that doesn't respond with Close-OKs AMQStateManager oldStateManager = ((AMQConnection) _connection).getProtocolHandler().getStateManager(); _session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); _connection.start(); - //Test connection + // Test connection checkSendingMessage(); - //Set StateManager to manager that ignores Close-oks + // Set StateManager to manager that ignores Close-oks AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession(); AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession); newStateManager.changeState(oldStateManager.getCurrentState()); @@ -214,7 +213,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con createChannelAndTest(TEST_CHANNEL); - //Test connection is still ok + // Test connection is still ok checkSendingMessage(); @@ -248,9 +247,9 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } } - private void createChannelAndTest(int channel) + private void createChannelAndTest(int channel) throws FailoverException { - //Create A channel + // Create A channel try { createChannel(channel); @@ -274,14 +273,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con private void sendClose(int channel) { - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); + AMQFrame frame = + ChannelCloseOkBody.createAMQFrame(channel, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame); } - private void checkSendingMessage() throws JMSException { TEST++; @@ -307,8 +306,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con AMQConnection connection = null; try { - connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" - + _brokerlist + "'"); + connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); connection.setConnectionListener(this); @@ -330,24 +328,24 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con fail("Creating new connection when:" + e.getMessage()); } - return connection; } - private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException + private void declareExchange(int channelId, String _type, String _name, boolean nowait) + throws AMQException, FailoverException { - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - new AMQShortString(_name), // exchange - false, // internal - nowait, // nowait - true, // passive - 0, // ticket - new AMQShortString(_type)); // type + AMQFrame exchangeDeclare = + ExchangeDeclareBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + new AMQShortString(_name), // exchange + false, // internal + nowait, // nowait + true, // passive + 0, // ticket + new AMQShortString(_type)); // type if (nowait) { @@ -355,36 +353,31 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } else { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, + SYNC_TIMEOUT); } } - private void createChannel(int channelId) throws AMQException + private void createChannel(int channelId) throws AMQException, FailoverException { - ((AMQConnection) _connection).getProtocolHandler().syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); + ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand + ChannelOpenOkBody.class); } - public void onException(JMSException jmsException) { - //_logger.info("CCT" + jmsException); + // _logger.info("CCT" + jmsException); fail(jmsException.getMessage()); } public void bytesSent(long count) - { - } + { } public void bytesReceived(long count) - { - - } + { } public boolean preFailover(boolean redirect) { @@ -397,6 +390,5 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } public void failoverComplete() - { - } + { } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index d52707d965..58ac8294f2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -22,29 +22,29 @@ package org.apache.qpid.test.unit.close; import junit.framework.TestCase; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.url.URLSyntaxException; -import javax.jms.ExceptionListener; -import javax.jms.Session; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.Message; -import javax.jms.TextMessage; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.AMQException; -import org.apache.qpid.testutil.QpidClientConnection; -import org.apache.log4j.Logger; -import org.apache.log4j.Level; +import java.util.concurrent.atomic.AtomicInteger; public class MessageRequeueTest extends TestCase { @@ -86,7 +86,7 @@ public class MessageRequeueTest extends TestCase { super.tearDown(); - if (!passed) // clean up + if (!passed) // clean up { QpidClientConnection conn = new QpidClientConnection(BROKER); @@ -96,6 +96,7 @@ public class MessageRequeueTest extends TestCase conn.disconnect(); } + TransportConnection.killVMBroker(1); } @@ -117,7 +118,7 @@ public class MessageRequeueTest extends TestCase final MessageConsumer consumer = conn.getSession().createConsumer(q); int messagesReceived = 0; - long messageLog[] = new long[numTestMessages + 1]; + long[] messageLog = new long[numTestMessages + 1]; _logger.info("consuming..."); Message msg = consumer.receive(1000); @@ -130,15 +131,13 @@ public class MessageRequeueTest extends TestCase int msgindex = msg.getIntProperty("index"); if (messageLog[msgindex] != 0) { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + - ") more than once."); + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + + ") more than once."); } if (_logger.isInfoEnabled()) { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + - "DT:" + dt + - "IN:" + msgindex); + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex); } if (dt == 0) @@ -148,7 +147,7 @@ public class MessageRequeueTest extends TestCase messageLog[msgindex] = dt; - //get Next message + // get Next message msg = consumer.receive(1000); } @@ -163,7 +162,7 @@ public class MessageRequeueTest extends TestCase for (long b : messageLog) { - if (b == 0 && index != 0) //delivery tag of zero shouldn't exist + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist { _logger.error("Index: " + index + " was not received."); list.append(" "); @@ -175,6 +174,7 @@ public class MessageRequeueTest extends TestCase index++; } + assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); @@ -199,7 +199,7 @@ public class MessageRequeueTest extends TestCase t1.start(); t2.start(); t3.start(); -// t4.start(); + // t4.start(); try { @@ -228,7 +228,7 @@ public class MessageRequeueTest extends TestCase for (long b : receieved) { - if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0) + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0) { _logger.error("Index: " + index + " was not received."); list.append(" "); @@ -237,8 +237,10 @@ public class MessageRequeueTest extends TestCase list.append(b); failed++; } + index++; } + assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); passed = true; @@ -278,15 +280,14 @@ public class MessageRequeueTest extends TestCase int msgindex = result.getIntProperty("index"); if (receieved[msgindex] != 0) { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() + - ") more than once."); + _logger.error("Received Message(" + msgindex + ":" + + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once."); } if (_logger.isInfoEnabled()) { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + - "DT:" + dt + - "IN:" + msgindex); + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + + "IN:" + msgindex); } if (dt == 0) @@ -297,9 +298,8 @@ public class MessageRequeueTest extends TestCase receieved[msgindex] = dt; } - count++; - if (count % 100 == 0) + if ((count % 100) == 0) { _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); } @@ -328,11 +328,10 @@ public class MessageRequeueTest extends TestCase } } - public void testRequeue() throws JMSException, AMQException, URLSyntaxException { int run = 0; -// while (run < 10) + // while (run < 10) { run++; @@ -359,7 +358,6 @@ public class MessageRequeueTest extends TestCase assertNotNull("Message should not be null", msg); - // As we have not ack'd message will be requeued. _logger.debug("Close Consumer"); consumer.close(); @@ -369,4 +367,4 @@ public class MessageRequeueTest extends TestCase } } -}
\ No newline at end of file +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 0e718da19b..8d96977df2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -21,18 +21,20 @@ package org.apache.qpid.test.unit.transacted; import junit.framework.TestCase; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.AMQException; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; -import javax.jms.Session; -import javax.jms.MessageProducer; -import javax.jms.MessageConsumer; -import javax.jms.Queue; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; /** @@ -62,10 +64,10 @@ public class CommitRollbackTest extends TestCase { TransportConnection.createVMBroker(1); } + testMethod++; queue += testMethod; - newConnection(); } @@ -106,7 +108,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenDisconnect"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -119,7 +120,7 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); - //commit to ensure message is removed from queue + // commit to ensure message is removed from queue _session.commit(); assertNull("test message was put and disconnected before commit, but is still present", result); @@ -135,7 +136,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenDisconnect"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -151,7 +151,7 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); - //commit to ensure message is removed from queue + // commit to ensure message is removed from queue _session.commit(); assertNull("test message was put and disconnected before commit, but is still present", result); @@ -168,7 +168,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenRollback"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -335,13 +334,12 @@ public class CommitRollbackTest extends TestCase assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); } - /** * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order * * @throws Exception On error */ - public void testSend2ThenRollback() throws Exception + /*public void testSend2ThenRollback() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -391,7 +389,7 @@ public class CommitRollbackTest extends TestCase } assertNull("test message should be null", result); - } + }*/ public void testSend2ThenCloseAfter1andTryAgain() throws Exception { @@ -428,7 +426,7 @@ public class CommitRollbackTest extends TestCase { assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); } - else // or it will be msg 2 arriving the first time due to latency. + else // or it will be msg 2 arriving the first time due to latency. { _logger.info("Message 2 wasn't prefetched so wasn't rejected"); assertEquals("2", ((TextMessage) result).getText()); @@ -445,7 +443,6 @@ public class CommitRollbackTest extends TestCase } - public void testPutThenRollbackThenGet() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java index 195ed79dab..d52da06f76 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -1,25 +1,25 @@ package org.apache.qpid.testutil; +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; -import javax.jms.ExceptionListener; -import javax.jms.Session; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; public class QpidClientConnection implements ExceptionListener { - private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); private boolean transacted = true; @@ -40,17 +40,16 @@ public class QpidClientConnection implements ExceptionListener setPrefetch(5000); } - public void connect() throws JMSException { if (!connected) { /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; try { @@ -63,7 +62,6 @@ public class QpidClientConnection implements ExceptionListener session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - _logger.info("starting connection"); connection.start(); @@ -124,7 +122,6 @@ public class QpidClientConnection implements ExceptionListener this.prefetch = prefetch; } - /** override as necessary */ public void onException(JMSException exception) { @@ -266,4 +263,3 @@ public class QpidClientConnection implements ExceptionListener _logger.info("consumed: " + messagesReceived); } } - |
