summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-06-01 14:33:07 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-06-01 14:33:07 +0000
commit3b5d4734b777b54b52ce2710f404143aca8c5c6e (patch)
treed436e7a5239ec6be725852c12e7ccae975892745 /java/client/src
parent566e08caa331629a15bedca1d8cfc896886b0497 (diff)
downloadqpid-python-3b5d4734b777b54b52ce2710f404143aca8c5c6e.tar.gz
QPID-402: FailoverException falling through to client. All blocking operations now wrapped in failover support wrappers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@543496 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java149
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java3499
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java162
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java26
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java135
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java30
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java130
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java81
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java327
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java144
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java136
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java74
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java31
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java28
18 files changed, 2861 insertions, 2186 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 347f5728e2..674f205af6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -20,34 +20,15 @@
*/
package org.apache.qpid.client;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.transport.TransportConnection;
@@ -67,6 +48,27 @@ import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.url.URLSyntaxException;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = Logger.getLogger(AMQConnection.class);
@@ -96,8 +98,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>();
-
+ private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
private String _clientName;
@@ -486,72 +487,72 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
final int prefetchHigh, final int prefetchLow) throws JMSException
{
checkNotClosed();
+
if (channelLimitReached())
{
throw new ChannelLimitReachedException(_maximumChannelCount);
}
- else
- {
- return (org.apache.qpid.jms.Session) new FailoverSupport()
+
+ return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
+ new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
+ {
+ public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
{
- public Object operation() throws JMSException
+ int channelId = _idFactory.incrementAndGet();
+
+ if (_logger.isDebugEnabled())
{
- int channelId = _idFactory.incrementAndGet();
+ _logger.debug("Write channel open frame for channel id " + channelId);
+ }
+
+ // We must create the session and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AMQSession session =
+ new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+ prefetchLow);
+ // _protocolHandler.addSessionByChannel(channelId, session);
+ registerSession(channelId, session);
- if (_logger.isDebugEnabled())
+ boolean success = false;
+ try
+ {
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ success = true;
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error creating session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ finally
+ {
+ if (!success)
{
- _logger.debug("Write channel open frame for channel id " + channelId);
+ deregisterSession(channelId);
}
+ }
- // We must create the session and register it before actually sending the frame to the server to
- // open it, so that there is no window where we could receive data on the channel and not be set
- // up to handle it appropriately.
- AMQSession session =
- new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
- prefetchLow);
- //_protocolHandler.addSessionByChannel(channelId, session);
- registerSession(channelId, session);
-
- boolean success = false;
+ if (_started)
+ {
try
{
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- success = true;
+ session.start();
}
catch (AMQException e)
{
- JMSException jmse = new JMSException("Error creating session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- finally
- {
- if (!success)
- {
- deregisterSession(channelId);
- }
+ throw new JMSAMQException(e);
}
-
- if (_started)
- {
- try
- {
- session.start();
- }
- catch (AMQException e)
- {
- throw new JMSAMQException(e);
- }
- }
-
- return session;
}
- }.execute(this);
- }
+
+ return session;
+ }
+ }, this).execute();
}
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException
+ throws AMQException, FailoverException
{
// TODO: Be aware of possible changes to parameter order as versions change.
@@ -581,7 +582,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException
+ private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+ throws AMQException, FailoverException
{
try
{
@@ -1128,14 +1130,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.
*/
- public void resubscribeSessions() throws JMSException, AMQException
+ public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
ArrayList sessions = new ArrayList(_sessions.values());
_logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
- //_protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
}
@@ -1223,7 +1225,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_taskPool.execute(task);
}
-
public AMQSession getSession(int channelId)
{
return _sessions.get(channelId);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b7615c5b7b..25c2d94377 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,47 +20,16 @@
*/
package org.apache.qpid.client;
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverNoopSupport;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.message.JMSMapMessage;
@@ -70,21 +39,20 @@ import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AccessRequestBody;
-import org.apache.qpid.framing.AccessRequestOkBody;
import org.apache.qpid.framing.BasicAckBody;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
import org.apache.qpid.framing.ExchangeBoundBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
@@ -92,358 +60,242 @@ import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ */
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
+ /** Used for debugging. */
private static final Logger _logger = Logger.getLogger(AMQSession.class);
+ /** Used for debugging in the dispatcher. */
+ private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+
+ /** The default maximum number of prefetched message at which to suspend the channel. */
public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
+
+ /** The default minimum number of prefetched messages at which to resume the channel. */
public static final int DEFAULT_PREFETCH_LOW_MARK = 2500;
+ /**
+ * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
+ * not need to be attached to a queue.
+ */
+ protected static final boolean DEFAULT_IMMEDIATE = false;
+
+ /**
+ * The default value for mandatory flag used by producers created by this session is true. That is, server will not
+ * silently drop messages where no queue is connected to the exchange for the message.
+ */
+ protected static final boolean DEFAULT_MANDATORY = true;
+
+ /** System property to enable strict AMQP compliance. */
+ public static final String STRICT_AMQP = "STRICT_AMQP";
+
+ /** Strict AMQP default setting. */
+ public static final String STRICT_AMQP_DEFAULT = "false";
+
+ /** System property to enable failure if strict AMQP compliance is violated. */
+ public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
+
+ /** Strickt AMQP failure default. */
+ public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
+
+ /** System property to enable immediate message prefetching. */
+ public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+
+ /** Immediate message prefetch default. */
+ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+
+ /** The connection to which this session belongs. */
private AMQConnection _connection;
+ /** Used to indicate whether or not this is a transactional session. */
private boolean _transacted;
+ /** Holds the sessions acknowledgement mode. */
private int _acknowledgeMode;
+ /** Holds this session unique identifier, used to distinguish it from other sessions. */
private int _channelId;
+ /** @todo This does not appear to be set? */
private int _ticket;
+ /** Holds the high mark for prefetched message, at which the session is suspended. */
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+
+ /** Holds the low mark for prefetched messages, below which the session is resumed. */
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
+ /** Holds the message listener, if any, which is attached to this session. */
private MessageListener _messageListener = null;
+ /** Used to indicate that this session has been started at least once. */
private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
/**
- * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only
- * keeps a record of subscriptions which have been created in the current instance. It does not remember
- * subscriptions between executions of the client
+ * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only
+ * keeps a record of subscriptions which have been created in the current instance. It does not remember
+ * subscriptions between executions of the client.
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
- new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
- private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<BasicMessageConsumer, String>();
+ new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
- /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */
- private int _nextTag = 1;
+ /**
+ * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
+ * up in the {@link #_subscriptions} map.
+ */
+ private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
+ new ConcurrentHashMap<BasicMessageConsumer, String>();
- /** This queue is bounded and is used to store messages before being dispatched to the consumer */
+ /**
+ * Used to hold incoming messages.
+ *
+ * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
+ */
private final FlowControllingBlockingQueue _queue;
+ /** Holds the dispatcher thread for this session. */
private Dispatcher _dispatcher;
+ /** Holds the message factory factory for this session. */
private MessageFactoryRegistry _messageFactoryRegistry;
- /** Set of all producers created by this session */
- private Map _producers = new ConcurrentHashMap();
-
- /** Maps from consumer tag (String) to JMSMessageConsumer instance */
- private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
-
- /** Maps from destination to count of JMSMessageConsumers */
- private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
- new ConcurrentHashMap<Destination, AtomicInteger>();
+ /** Holds all of the producers created by this session, keyed by their unique identifiers. */
+ private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
/**
- * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
- * need to be attached to a queue
+ * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume methods.
*/
- protected static final boolean DEFAULT_IMMEDIATE = false;
+ private int _nextTag = 1;
/**
- * Default value for mandatory flag used by producers created by this sessio is true, i.e. server will not silently
- * drop messages where no queue is connected to the exchange for the message
+ * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
+ * consumer.
*/
- protected static final boolean DEFAULT_MANDATORY = true;
+ private Map<AMQShortString, BasicMessageConsumer> _consumers =
+ new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+
+ /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
+ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
+ new ConcurrentHashMap<Destination, AtomicInteger>();
/**
- * The counter of the next producer id. This id is generated by the session and used only to allow the producer to
- * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be
- * synchronized since according to the JMS specification only one thread of control is allowed to create producers
- * for any given session instance.
+ * Used as a source of unique identifiers for producers within the session.
+ *
+ * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one
+ * thread of control is allowed to create producers for any given session instance.
*/
private long _nextProducerId;
-
/**
* Set when recover is called. This is to handle the case where recover() is called by application code during
- * onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+ * onMessage() processing to enure that an auto ack is not sent.
*/
private boolean _inRecovery;
+ /** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
+ /** Used to indicate that this session has a message listener attached to it. */
private boolean _hasMessageListeners;
+ /** Used to indicate that this session has been suspended. */
private boolean _suspended;
+ /**
+ * Used to protect the suspension of this session, so that critical code can be executed during suspension,
+ * without the session being resumed by other threads.
+ */
private final Object _suspensionLock = new Object();
- /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */
+ /**
+ * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel.
+ *
+ * @todo This is accessed only within a synchronized method, so does not need to be atomic.
+ */
private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
- /** System property to enable strickt AMQP compliance */
- public static final String STRICT_AMQP = "STRICT_AMQP";
- /** Strickt AMQP default */
- public static final String STRICT_AMQP_DEFAULT = "false";
+ /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
+ private final boolean _immediatePrefetch;
+ /** Indicates that warnings should be generated on violations of the strict AMQP. */
private final boolean _strictAMQP;
- /** System property to enable strickt AMQP compliance */
- public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
- /** Strickt AMQP default */
- public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
-
+ /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
private final boolean _strictAMQPFATAL;
- /** System property to enable immediate message prefetching */
- public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
- /** Immediate message prefetch default */
- public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
-
- private final boolean _immediatePrefetch;
-
- private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
-
- /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
- private class Dispatcher extends Thread
- {
-
- /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
- private final AtomicBoolean _closed = new AtomicBoolean(false);
-
- private final Object _lock = new Object();
-
- public Dispatcher()
- {
- super("Dispatcher-Channel-" + _channelId);
- if (_dispatcherLogger.isInfoEnabled())
- {
- _dispatcherLogger.info(getName() + " created");
- }
- }
-
- public void run()
- {
- if (_dispatcherLogger.isInfoEnabled())
- {
- _dispatcherLogger.info(getName() + " started");
- }
-
- UnprocessedMessage message;
-
- // Allow disptacher to start stopped
- synchronized (_lock)
- {
- while (connectionStopped())
- {
- try
- {
- _lock.wait();
- }
- catch (InterruptedException e)
- {
- // ignore
- }
- }
- }
-
- try
- {
- while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
- {
- synchronized (_lock)
- {
-
- while (connectionStopped())
- {
- _lock.wait();
- }
-
- dispatchMessage(message);
-
- while (connectionStopped())
- {
- _lock.wait();
- }
-
- }
-
- }
- }
- catch (InterruptedException e)
- {
- //ignore
- }
- if (_dispatcherLogger.isInfoEnabled())
- {
- _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
- }
- }
-
- // only call while holding lock
- final boolean connectionStopped()
- {
- return _connectionStopped;
- }
-
- boolean setConnectionStopped(boolean connectionStopped)
- {
- boolean currently;
- synchronized (_lock)
- {
- currently = _connectionStopped;
- _connectionStopped = connectionStopped;
- _lock.notify();
-
- if (_dispatcherLogger.isDebugEnabled())
- {
- _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") +
- ": Currently " + (currently ? "Stopped" : "Started"));
- }
- }
- return currently;
- }
-
- private void dispatchMessage(UnprocessedMessage message)
- {
- if (message.getDeliverBody() != null)
- {
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
-
- if (consumer == null || consumer.isClosed())
- {
- if (_dispatcherLogger.isInfoEnabled())
- {
- if (consumer == null)
- {
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
- "[" + message.getDeliverBody().deliveryTag + "] from queue " +
- message.getDeliverBody().consumerTag +
- " )without a handler - rejecting(requeue)...");
- }
- else
- {
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
- "[" + message.getDeliverBody().deliveryTag + "] from queue " +
- " consumer(" + consumer.debugIdentity() +
- ") is closed rejecting(requeue)...");
- }
- }
- // Don't reject if we're already closing
- if (!_closed.get())
- {
- rejectMessage(message, true);
- }
- }
- else
- {
- consumer.notifyMessage(message, _channelId);
- }
- }
- }
-
- public void close()
- {
- _closed.set(true);
- interrupt();
-
- //fixme awaitTermination
-
- }
-
- public void rollback()
- {
-
- synchronized (_lock)
- {
- boolean isStopped = connectionStopped();
-
- if (!isStopped)
- {
- setConnectionStopped(true);
- }
-
- rejectAllMessages(true);
-
- _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
-
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- if (!consumer.isNoConsume())
- {
- consumer.rollback();
- }
- else
- {
- // should perhaps clear the _SQ here.
- //consumer._synchronousQueue.clear();
- consumer.clearReceiveQueue();
- }
-
-
- }
-
- setConnectionStopped(isStopped);
- }
-
- }
-
- public void rejectPending(BasicMessageConsumer consumer)
- {
- synchronized (_lock)
- {
- boolean stopped = _dispatcher.connectionStopped();
-
- if (!stopped)
- {
- _dispatcher.setConnectionStopped(true);
- }
-
- // Reject messages on pre-receive queue
- consumer.rollback();
-
- // Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
-
- // closeConsumer
- consumer.markClosed();
-
- _dispatcher.setConnectionStopped(stopped);
-
- }
- }
- }
-
-
-
+ /**
+ * Creates a new session on a connection.
+ *
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param messageFactoryRegistry The message factory factory for the session.
+ * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
+ */
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
_strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
- _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
- _immediatePrefetch = _strictAMQP || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+ _strictAMQPFATAL =
+ Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
+ _immediatePrefetch =
+ _strictAMQP
+ || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
_connection = con;
_transacted = transacted;
@@ -455,6 +307,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
_acknowledgeMode = acknowledgeMode;
}
+
_channelId = channelId;
_messageFactoryRegistry = messageFactoryRegistry;
_defaultPrefetchHighMark = defaultPrefetchHighMark;
@@ -462,27 +315,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
- _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
- new FlowControllingBlockingQueue.ThresholdListener()
- {
- public void aboveThreshold(int currentValue)
- {
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(true)).start();
- }
- }
-
- public void underThreshold(int currentValue)
- {
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(false)).start();
- }
- }
- });
+ _queue =
+ new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+ new FlowControllingBlockingQueue.ThresholdListener()
+ {
+ public void aboveThreshold(int currentValue)
+ {
+ if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ {
+ _logger.debug(
+ "Above threshold(" + _defaultPrefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ new Thread(new SuspenderRunner(true)).start();
+ }
+ }
+
+ public void underThreshold(int currentValue)
+ {
+ if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ {
+ _logger.debug(
+ "Below threshold(" + _defaultPrefetchLowMark
+ + ") so unsuspending channel. Current value is " + currentValue);
+ new Thread(new SuspenderRunner(false)).start();
+ }
+ }
+ });
}
else
{
@@ -490,183 +348,146 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
-
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
- {
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
- }
-
- public AMQConnection getAMQConnection()
- {
- return _connection;
- }
-
- public BytesMessage createBytesMessage() throws JMSException
+ /**
+ * Creates a new session on a connection with the default message factory factory.
+ *
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
+ */
+ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
+ int defaultPrefetchLow)
{
- synchronized (_connection.getFailoverMutex())
- {
- checkNotClosed();
- return new JMSBytesMessage();
- }
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
+ defaultPrefetchLow);
}
- public MapMessage createMapMessage() throws JMSException
+ /**
+ * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
+ *
+ * @throws IllegalStateException If the session is closed.
+ */
+ public void acknowledge() throws IllegalStateException
{
- synchronized (_connection.getFailoverMutex())
+ if (isClosed())
{
- checkNotClosed();
- return new JMSMapMessage();
+ throw new IllegalStateException("Session is already closed");
}
- }
-
- public javax.jms.Message createMessage() throws JMSException
- {
- return createBytesMessage();
- }
- public ObjectMessage createObjectMessage() throws JMSException
- {
- synchronized (_connection.getFailoverMutex())
+ for (BasicMessageConsumer consumer : _consumers.values())
{
- checkNotClosed();
- return (ObjectMessage) new JMSObjectMessage();
+ consumer.acknowledge();
}
}
- public ObjectMessage createObjectMessage(Serializable object) throws JMSException
- {
- ObjectMessage msg = createObjectMessage();
- msg.setObject(object);
- return msg;
- }
-
- public StreamMessage createStreamMessage() throws JMSException
+ /**
+ * Acknowledge one or many messages.
+ *
+ * @param deliveryTag The tag of the last message to be acknowledged.
+ * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the
+ * delivery tag, <tt>false</tt> to just acknowledge that message.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- synchronized (_connection.getFailoverMutex())
- {
- checkNotClosed();
+ final AMQFrame ackFrame =
+ BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+ multiple);
- return new JMSStreamMessage();
- }
- }
-
- public TextMessage createTextMessage() throws JMSException
- {
- synchronized (_connection.getFailoverMutex())
+ if (_logger.isDebugEnabled())
{
- checkNotClosed();
-
- return new JMSTextMessage();
+ _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
}
- }
- public TextMessage createTextMessage(String text) throws JMSException
- {
-
- TextMessage msg = createTextMessage();
- msg.setText(text);
- return msg;
- }
-
- public boolean getTransacted() throws JMSException
- {
- checkNotClosed();
- return _transacted;
- }
-
- public int getAcknowledgeMode() throws JMSException
- {
- checkNotClosed();
- return _acknowledgeMode;
- }
-
- public void commit() throws JMSException
- {
- checkTransacted();
- try
- {
- // Acknowledge up to message last delivered (if any) for each consumer.
- //need to send ack for messages delivered to consumers so far
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- //Sends acknowledgement to server
- i.next().acknowledgeLastDelivered();
- }
-
- // Commits outstanding messages sent and outstanding acknowledgements.
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQProtocolHandler handler = getProtocolHandler();
-
- handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion()),
- TxCommitOkBody.class);
- }
- catch (AMQException e)
- {
- JMSException exception = new JMSException("Failed to commit: " + e.getMessage());
- exception.setLinkedException(e);
- throw exception;
- }
+ getProtocolHandler().writeFrame(ackFrame);
}
-
- public void rollback() throws JMSException
+ /**
+ * Binds the named queue, with the specified routing key, to the named exchange.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param queueName The name of the queue to bind.
+ * @param routingKey The routing key to bind the queue with.
+ * @param arguments Additional arguments.
+ * @param exchangeName The exchange to bind the queue on.
+ *
+ * @throws AMQException If the queue cannot be bound for any reason.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ *
+ * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
+ */
+ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+ final AMQShortString exchangeName) throws AMQException
{
- synchronized (_suspensionLock)
- {
- checkTransacted();
- try
+ /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
-
- boolean isSuspended = isSuspended();
-
- if (!isSuspended)
- {
- suspendChannel(true);
- }
-
- if (_dispatcher != null)
+ public Object execute() throws AMQException, FailoverException
{
- _dispatcher.rollback();
+ AMQFrame queueBind =
+ QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ arguments, // arguments
+ exchangeName, // exchange
+ false, // nowait
+ queueName, // queue
+ routingKey, // routingKey
+ getTicket()); // ticket
+
+ getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+
+ return null;
}
-
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
-
-
- if (!isSuspended)
- {
- suspendChannel(false);
- }
- }
- catch (AMQException e)
- {
- throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
- }
- }
+ }, _connection).execute();
}
+ /**
+ * Closes the session with no timeout.
+ *
+ * @throws JMSException If the JMS provider fails to close the session due to some internal error.
+ */
public void close() throws JMSException
{
close(-1);
}
+ /**
+ * Closes the session.
+ *
+ * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
+ * the channel. This is because the channel is marked as closed before the request to close it is made, so the
+ * fail-over should not re-open it.
+ *
+ * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
+ *
+ * @throws JMSException If the JMS provider fails to close the session due to some internal error.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ *
+ * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be
+ * re-opened. May need to examine this more carefully.
+ *
+ * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
+ * because the failover process sends the failover event before acquiring the mutex itself.
+ */
public void close(long timeout) throws JMSException
{
if (_logger.isInfoEnabled())
{
- _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ _logger.info("Closing session: " + this + ":"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
// We must close down all producers and consumers in an orderly fashion. This is the only method
- // that can be called from a different thread of control from the one controlling the session
+ // that can be called from a different thread of control from the one controlling the session.
synchronized (_connection.getFailoverMutex())
{
- //Ensure we only try and close an open session.
+ // Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
{
// we pass null since this is not an error case
@@ -676,18 +497,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
getProtocolHandler().closeSession(this);
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client closing channel")); // replyText
+
+ final AMQFrame frame =
+ ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client closing channel")); // replyText
getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully.
}
catch (AMQException e)
{
@@ -695,6 +516,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
jmse.setLinkedException(e);
throw jmse;
}
+ // This is ignored because the channel is already marked as closed so the fail-over process will
+ // not re-open it.
+ catch (FailoverException e)
+ {
+ _logger.debug(
+ "Got FailoverException during channel close, ignored as channel already marked as closed.");
+ }
finally
{
_connection.deregisterSession(_channelId);
@@ -703,65 +531,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- private AMQProtocolHandler getProtocolHandler()
- {
- return _connection.getProtocolHandler();
- }
-
-
- private byte getProtocolMinorVersion()
- {
- return getProtocolHandler().getProtocolMinorVersion();
- }
-
- private byte getProtocolMajorVersion()
- {
- return getProtocolHandler().getProtocolMajorVersion();
- }
-
-
- /**
- * Close all producers or consumers. This is called either in the error case or when closing the session normally.
- *
- * @param amqe the exception, may be null to indicate no error has occurred
- */
- private void closeProducersAndConsumers(AMQException amqe) throws JMSException
- {
- JMSException jmse = null;
- try
- {
- closeProducers();
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- jmse = e;
- }
- try
- {
- closeConsumers(amqe);
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- if (jmse == null)
- {
- jmse = e;
- }
- }
- if (jmse != null)
- {
- throw jmse;
- }
- }
-
-
- public boolean isSuspended()
- {
- return _suspended;
- }
-
-
/**
* Called when the server initiates the closure of the session unilaterally.
*
@@ -783,738 +552,342 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
amqe = new AMQException("Closing session forcibly", e);
}
+
_connection.deregisterSession(_channelId);
closeProducersAndConsumers(amqe);
}
}
/**
- * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
- * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
+ * Commits all messages done in this transaction and releases any locks currently held.
+ *
+ * <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the
+ * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
+ * The client will be unable to determine whether or not the commit actually happened on the broker in this case.
+ *
+ * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
+ * not mean that the commit is known to have failed, merely that it is not known whether it
+ * failed or not.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
*/
- void markClosed()
+ public void commit() throws JMSException
{
- _closed.set(true);
- _connection.deregisterSession(_channelId);
- markClosedProducersAndConsumers();
-
- }
+ checkTransacted();
- private void markClosedProducersAndConsumers()
- {
- try
- {
- // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
- closeProducers();
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- }
try
{
- markClosedConsumers();
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- }
- }
+ // Acknowledge up to message last delivered (if any) for each consumer.
+ // need to send ack for messages delivered to consumers so far
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ {
+ // Sends acknowledgement to server
+ i.next().acknowledgeLastDelivered();
+ }
- /**
- * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
- * currently no way of propagating errors to message producers (this is a JMS limitation).
- */
- private void closeProducers() throws JMSException
- {
- // we need to clone the list of producers since the close() method updates the _producers collection
- // which would result in a concurrent modification exception
- final ArrayList clonedProducers = new ArrayList(_producers.values());
+ // Commits outstanding messages sent and outstanding acknowledgements.
+ final AMQProtocolHandler handler = getProtocolHandler();
- final Iterator it = clonedProducers.iterator();
- while (it.hasNext())
- {
- final BasicMessageProducer prod = (BasicMessageProducer) it.next();
- prod.close();
+ handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
+ TxCommitOkBody.class);
}
- // at this point the _producers map is empty
- }
-
- /**
- * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
- *
- * @param error not null if this is a result of an error occurring at the connection level
- */
- private void closeConsumers(Throwable error) throws JMSException
- {
- if (_dispatcher != null)
+ catch (AMQException e)
{
- _dispatcher.close();
- _dispatcher = null;
+ throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
}
- // we need to clone the list of consumers since the close() method updates the _consumers collection
- // which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList(_consumers.values());
-
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
- while (it.hasNext())
+ catch (FailoverException e)
{
- final BasicMessageConsumer con = it.next();
- if (error != null)
- {
- con.notifyError(error);
- }
- else
- {
- con.close();
- }
+ throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
}
- // at this point the _consumers map will be empty
}
- private void markClosedConsumers() throws JMSException
+ public void confirmConsumerCancelled(AMQShortString consumerTag)
{
- if (_dispatcher != null)
- {
- _dispatcher.close();
- _dispatcher = null;
- }
- // we need to clone the list of consumers since the close() method updates the _consumers collection
- // which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
- while (it.hasNext())
- {
- final BasicMessageConsumer con = it.next();
- con.markClosed();
- }
- // at this point the _consumers map will be empty
- }
-
- /**
- * Asks the broker to resend all unacknowledged messages for the session.
- *
- * @throws JMSException
- */
- public void recover() throws JMSException
- {
- checkNotClosed();
- checkNotTransacted(); // throws IllegalStateException if a transacted session
- // this is set only here, and the before the consumer's onMessage is called it is set to false
- _inRecovery = true;
- try
+ // Remove the consumer from the map
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ if (consumer != null)
{
-
- boolean isSuspended = isSuspended();
-
- if (!isSuspended)
+ // fixme this isn't right.. needs to check if _queue contains data for this consumer
+ if (consumer.isAutoClose()) // && _queue.isEmpty())
{
- suspendChannel(true);
+ consumer.closeWhenNoMessages(true);
}
- for (BasicMessageConsumer consumer : _consumers.values())
+ if (!consumer.isNoConsume())
{
- consumer.clearUnackedMessages();
- }
+ // Clean the Maps up first
+ // Flush any pending messages for this consumerTag
+ if (_dispatcher != null)
+ {
+ _logger.info("Dispatcher is not null");
+ }
+ else
+ {
+ _logger.info("Dispatcher is null so created stopped dispatcher");
- if (_dispatcher != null)
- {
- _dispatcher.rollback();
- }
+ startDistpatcherIfNecessary(true);
+ }
- if (isStrictAMQP())
- {
- // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- false)); // requeue
- _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+ _dispatcher.rejectPending(consumer);
}
else
{
+ // Just close the consumer
+ // fixme the CancelOK is being processed before the arriving messages..
+ // The dispatcher is still to process them so the server sent in order but the client
+ // has yet to receive before the close comes in.
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- false) // requeue
- , BasicRecoverOkBody.class);
- }
- if (!isSuspended)
- {
- suspendChannel(false);
+ // consumer.markClosed();
}
}
- catch (AMQException e)
+ else
{
- throw new JMSAMQException(e);
+ _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
}
- }
-
- boolean isInRecovery()
- {
- return _inRecovery;
- }
- void setInRecovery(boolean inRecovery)
- {
- _inRecovery = inRecovery;
}
- public void acknowledge() throws JMSException
+ public QueueBrowser createBrowser(Queue queue) throws JMSException
{
- if (isClosed())
- {
- throw new IllegalStateException("Session is already closed");
- }
- for (BasicMessageConsumer consumer : _consumers.values())
+ if (isStrictAMQP())
{
- consumer.acknowledge();
+ throw new UnsupportedOperationException();
}
-
- }
-
-
- public MessageListener getMessageListener() throws JMSException
- {
-// checkNotClosed();
- return _messageListener;
- }
-
- public void setMessageListener(MessageListener listener) throws JMSException
- {
-// checkNotClosed();
-//
-// if (_dispatcher != null && !_dispatcher.connectionStopped())
-// {
-// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
-// }
-//
-// // We are stopped
-// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-// {
-// BasicMessageConsumer consumer = i.next();
-//
-// if (consumer.isReceiving())
-// {
-// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
-// }
-// }
-//
-// _messageListener = listener;
-//
-// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-// {
-// i.next().setMessageListener(_messageListener);
-// }
-
- }
-
- public void run()
- {
- throw new java.lang.UnsupportedOperationException();
+ return createBrowser(queue, null);
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory,
- boolean immediate, boolean waitUntilSent)
- throws JMSException
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
- return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
- }
+ if (isStrictAMQP())
+ {
+ throw new UnsupportedOperationException();
+ }
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
- throws JMSException
- {
- return createProducerImpl(destination, mandatory, immediate);
- }
+ checkNotClosed();
+ checkValidQueue(queue);
- public BasicMessageProducer createProducer(Destination destination, boolean immediate)
- throws JMSException
- {
- return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
+ return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
}
- public BasicMessageProducer createProducer(Destination destination) throws JMSException
+ public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+ throws JMSException
{
- return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
- }
+ checkValidDestination(destination);
- private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory,
- boolean immediate)
- throws JMSException
- {
- return createProducerImpl(destination, mandatory, immediate, false);
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+ messageSelector, null, true, true);
}
- private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent)
- throws JMSException
+ public BytesMessage createBytesMessage() throws JMSException
{
- return (BasicMessageProducer) new FailoverSupport()
+ synchronized (_connection.getFailoverMutex())
{
- public Object operation() throws JMSException
- {
- checkNotClosed();
- long producerId = getNextProducerId();
- BasicMessageProducer producer = new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
- AMQSession.this, getProtocolHandler(),
- producerId, immediate, mandatory, waitUntilSent);
- registerProducer(producerId, producer);
- return producer;
- }
- }.execute(_connection);
- }
-
- /**
- * Creates a QueueReceiver
- *
- * @param destination
- *
- * @return QueueReceiver - a wrapper around our MessageConsumer
- *
- * @throws JMSException
- */
- public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
- {
- checkValidDestination(destination);
- AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
- return new QueueReceiverAdaptor(dest, consumer);
- }
+ checkNotClosed();
- /**
- * Creates a QueueReceiver using a message selector
- *
- * @param destination
- * @param messageSelector
- *
- * @return QueueReceiver - a wrapper around our MessageConsumer
- *
- * @throws JMSException
- */
- public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
- {
- checkValidDestination(destination);
- AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer)
- createConsumer(destination, messageSelector);
- return new QueueReceiverAdaptor(dest, consumer);
+ return new JMSBytesMessage();
+ }
}
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination,
- _defaultPrefetchHighMark,
- _defaultPrefetchLowMark,
- false,
- false,
- null,
- null,
- false,
- false);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null,
+ false, false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination,
- _defaultPrefetchHighMark,
- _defaultPrefetchLowMark,
- false,
- false,
- messageSelector,
- null,
- false,
- false);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false,
+ messageSelector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
- throws JMSException
- {
- checkValidDestination(destination);
- return createConsumerImpl(destination,
- _defaultPrefetchHighMark,
- _defaultPrefetchLowMark,
- noLocal,
- false,
- messageSelector,
- null,
- false,
- false);
- }
-
- public MessageConsumer createBrowserConsumer(Destination destination,
- String messageSelector,
- boolean noLocal)
- throws JMSException
+ throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination,
- _defaultPrefetchHighMark,
- _defaultPrefetchLowMark,
- noLocal,
- false,
- messageSelector,
- null,
- true,
- true);
- }
-
- public MessageConsumer createConsumer(Destination destination,
- int prefetch,
- boolean noLocal,
- boolean exclusive,
- String selector) throws JMSException
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+ messageSelector, null, false, false);
+ }
+
+ public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
+ String selector) throws JMSException
{
checkValidDestination(destination);
+
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
}
-
- public MessageConsumer createConsumer(Destination destination,
- int prefetchHigh,
- int prefetchLow,
- boolean noLocal,
- boolean exclusive,
- String selector) throws JMSException
+ public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
+ boolean exclusive, String selector) throws JMSException
{
checkValidDestination(destination);
+
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
}
- public MessageConsumer createConsumer(Destination destination,
- int prefetch,
- boolean noLocal,
- boolean exclusive,
- String selector,
- FieldTable rawSelector) throws JMSException
+ public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
+ String selector, FieldTable rawSelector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
- selector, rawSelector, false, false);
+
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false);
}
- public MessageConsumer createConsumer(Destination destination,
- int prefetchHigh,
- int prefetchLow,
- boolean noLocal,
- boolean exclusive,
- String selector,
- FieldTable rawSelector) throws JMSException
+ public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
+ boolean exclusive, String selector, FieldTable rawSelector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
- selector, rawSelector, false, false);
+
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
+ false);
}
- protected MessageConsumer createConsumerImpl(final Destination destination,
- final int prefetchHigh,
- final int prefetchLow,
- final boolean noLocal,
- final boolean exclusive,
- String selector,
- final FieldTable rawSelector,
- final boolean noConsume,
- final boolean autoClose) throws JMSException
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
- checkTemporaryDestination(destination);
- final String messageSelector;
-
- if (_strictAMQP && !(selector == null || selector.equals("")))
+ checkNotClosed();
+ AMQTopic origTopic = checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
{
- if (_strictAMQPFATAL)
+ if (subscriber.getTopic().equals(topic))
{
- throw new UnsupportedOperationException("Selectors not currently supported by AMQP.");
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+ + name);
}
else
{
- messageSelector = null;
+ unsubscribe(name);
}
}
else
{
- messageSelector = selector;
- }
-
- return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
- {
- public Object operation() throws JMSException
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
{
- checkNotClosed();
-
- AMQDestination amqd = (AMQDestination) destination;
+ topicName = ((AMQTopic) topic).getDestinationName();
+ }
+ else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
- // TODO: Define selectors in AMQP
- // TODO: construct the rawSelector from the selector string if rawSelector == null
- final FieldTable ft = FieldTableFactory.newFieldTable();
- //if (rawSelector != null)
- // ft.put("headers", rawSelector.getDataAsBytes());
- if (rawSelector != null)
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
{
- ft.addAll(rawSelector);
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
}
-
- BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
- _messageFactoryRegistry, AMQSession.this,
- protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
- _acknowledgeMode, noConsume, autoClose);
-
- if (_messageListener != null)
+ else
{
- consumer.setMessageListener(_messageListener);
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting queue deletion regardless.");
}
- try
- {
- registerConsumer(consumer, false);
- }
- catch (AMQInvalidArgumentException ise)
- {
- JMSException ex = new InvalidSelectorException(ise.getMessage());
- ex.setLinkedException(ise);
- throw ex;
- }
- catch (AMQInvalidRoutingKeyException e)
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
{
- JMSException ide = new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
- ide.setLinkedException(e);
- throw ide;
+ deleteQueue(dest.getAMQQueueName());
}
- catch (AMQException e)
- {
- JMSException ex = new JMSException("Error registering consumer: " + e);
+ }
+ }
- if (_logger.isDebugEnabled())
- {
- e.printStackTrace();
- }
- ex.setLinkedException(e);
- throw ex;
- }
+ subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
- synchronized (destination)
- {
- _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
- _destinationConsumerCount.get(destination).incrementAndGet();
- }
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
- return consumer;
- }
- }.execute(_connection);
+ return subscriber;
}
- private void checkTemporaryDestination(Destination destination)
- throws JMSException
+ /** Note, currently this does not handle reuse of the same name with different topics correctly. */
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+ throws JMSException
{
- if ((destination instanceof TemporaryDestination))
- {
- _logger.debug("destination is temporary");
- final TemporaryDestination tempDest = (TemporaryDestination) destination;
- if (tempDest.getSession() != this)
- {
- _logger.debug("destination is on different session");
- throw new JMSException("Cannot consume from a temporary destination created onanother session");
- }
- if (tempDest.isDeleted())
- {
- _logger.debug("destination is deleted");
- throw new JMSException("Cannot consume from a deleted destination");
- }
- }
- }
+ checkNotClosed();
+ checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+ return subscriber;
+ }
- public boolean hasConsumer(Destination destination)
+ public MapMessage createMapMessage() throws JMSException
{
- AtomicInteger counter = _destinationConsumerCount.get(destination);
+ synchronized (_connection.getFailoverMutex())
+ {
+ checkNotClosed();
- return (counter != null) && (counter.get() != 0);
+ return new JMSMapMessage();
+ }
}
- public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
+ public javax.jms.Message createMessage() throws JMSException
{
- declareExchange(name, type, getProtocolHandler(), nowait);
+ return createBytesMessage();
}
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
+ public ObjectMessage createObjectMessage() throws JMSException
{
- declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
+ synchronized (_connection.getFailoverMutex())
+ {
+ checkNotClosed();
+
+ return (ObjectMessage) new JMSObjectMessage();
+ }
}
- private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
+ public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- nowait, // nowait
- false, // passive
- getTicket(), // ticket
- type); // type
+ ObjectMessage msg = createObjectMessage();
+ msg.setObject(object);
- protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ return msg;
}
-
- public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException
+ public BasicMessageProducer createProducer(Destination destination) throws JMSException
{
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- autoDelete, // autoDelete
- durable, // durable
- exclusive, // exclusive
- false, // nowait
- false, // passive
- name, // queue
- getTicket()); // ticket
-
- getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
-
+ return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
}
-
- public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException
+ public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- arguments, // arguments
- exchangeName, // exchange
- false, // nowait
- queueName, // queue
- routingKey, // routingKey
- getTicket()); // ticket
-
-
- getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+ return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
}
- /**
- * Declare the queue.
- *
- * @param amqd
- * @param protocolHandler
- *
- * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
- *
- * @throws AMQException
- */
- private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
+ public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+ throws JMSException
{
- // For queues (but not topics) we generate the name in the client rather than the
- // server. This allows the name to be reused on failover if required. In general,
- // the destination indicates whether it wants a name generated or not.
- if (amqd.isNameRequired())
- {
- amqd.setQueueName(protocolHandler.generateQueueName());
- }
-
- //TODO verify the destiation is valid. else throw
-
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- amqd.isAutoDelete(), // autoDelete
- amqd.isDurable(), // durable
- amqd.isExclusive(), // exclusive
- false, // nowait
- false, // passive
- amqd.getAMQQueueName(), // queue
- getTicket()); // ticket
-
- protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
- return amqd.getAMQQueueName();
+ return createProducerImpl(destination, mandatory, immediate);
}
- private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
+ public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
+ boolean waitUntilSent) throws JMSException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- ft, // arguments
- amqd.getExchangeName(), // exchange
- false, // nowait
- queueName, // queue
- amqd.getRoutingKey(), // routingKey
- getTicket()); // ticket
-
-
- protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
+ return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
}
- /**
- * Register to consume from the queue.
- *
- * @param queueName
- *
- * @return the consumer tag generated by the broker
- */
- private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector) throws AMQException
+ public TopicPublisher createPublisher(Topic topic) throws JMSException
{
- //need to generate a consumer tag on the client so we can exploit the nowait flag
- AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
-
- FieldTable arguments = FieldTableFactory.newFieldTable();
- if (messageSelector != null && !messageSelector.equals(""))
- {
- arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
- }
- if (consumer.isAutoClose())
- {
- arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
- }
- if (consumer.isNoConsume())
- {
- arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
- }
-
- consumer.setConsumerTag(tag);
- // we must register the consumer in the map before we actually start listening
- _consumers.put(tag, consumer);
+ checkNotClosed();
- try
- {
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- arguments, // arguments
- tag, // consumerTag
- consumer.isExclusive(), // exclusive
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
- consumer.isNoLocal(), // noLocal
- nowait, // nowait
- queueName, // queue
- getTicket()); // ticket
- if (nowait)
- {
- protocolHandler.writeFrame(jmsConsume);
- }
- else
- {
- protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
- }
- }
- catch (AMQException e)
- {
- // clean-up the map in the event of an error
- _consumers.remove(tag);
- throw e;
- }
+ return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
}
public Queue createQueue(String queueName) throws JMSException
@@ -1540,9 +913,80 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- public AMQShortString getDefaultQueueExchangeName()
+ /**
+ * Declares the named queue.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param name The name of the queue to declare.
+ * @param autoDelete
+ * @param durable Flag to indicate that the queue is durable.
+ * @param exclusive Flag to indicate that the queue is exclusive to this client.
+ *
+ * @throws AMQException If the queue cannot be declared for any reason.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
+ final boolean exclusive) throws AMQException
{
- return _connection.getDefaultQueueExchangeName();
+ new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ AMQFrame queueDeclare =
+ QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ null, // arguments
+ autoDelete, // autoDelete
+ durable, // durable
+ exclusive, // exclusive
+ false, // nowait
+ false, // passive
+ name, // queue
+ getTicket()); // ticket
+
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+
+ return null;
+ }
+ }, _connection).execute();
+ }
+
+ /**
+ * Creates a QueueReceiver
+ *
+ * @param destination
+ *
+ * @return QueueReceiver - a wrapper around our MessageConsumer
+ *
+ * @throws JMSException
+ */
+ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
+ {
+ checkValidDestination(destination);
+ AMQQueue dest = (AMQQueue) destination;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
+
+ return new QueueReceiverAdaptor(dest, consumer);
+ }
+
+ /**
+ * Creates a QueueReceiver using a message selector
+ *
+ * @param destination
+ * @param messageSelector
+ *
+ * @return QueueReceiver - a wrapper around our MessageConsumer
+ *
+ * @throws JMSException
+ */
+ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
+ {
+ checkValidDestination(destination);
+ AMQQueue dest = (AMQQueue) destination;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector);
+
+ return new QueueReceiverAdaptor(dest, consumer);
}
/**
@@ -1559,6 +1003,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
+
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1576,47 +1021,29 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer)
- createConsumer(dest, messageSelector);
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector);
+
return new QueueReceiverAdaptor(dest, consumer);
}
public QueueSender createSender(Queue queue) throws JMSException
{
checkNotClosed();
- //return (QueueSender) createProducer(queue);
+
+ // return (QueueSender) createProducer(queue);
return new QueueSenderAdapter(createProducer(queue), queue);
}
- public Topic createTopic(String topicName) throws JMSException
+ public StreamMessage createStreamMessage() throws JMSException
{
- checkNotClosed();
-
- if (topicName.indexOf('/') == -1)
- {
- return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
- }
- else
+ synchronized (_connection.getFailoverMutex())
{
- try
- {
- return new AMQTopic(new AMQBindingURL(topicName));
- }
- catch (URLSyntaxException urlse)
- {
- JMSException jmse = new JMSException(urlse.getReason());
- jmse.setLinkedException(urlse);
+ checkNotClosed();
- throw jmse;
- }
+ return new JMSStreamMessage();
}
}
- public AMQShortString getDefaultTopicExchangeName()
- {
- return _connection.getDefaultTopicExchangeName();
- }
-
/**
* Creates a non-durable subscriber
*
@@ -1630,7 +1057,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
AMQTopic dest = checkValidTopic(topic);
- //AMQTopic dest = new AMQTopic(topic.getTopicName());
+
+ // AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1649,150 +1077,401 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
AMQTopic dest = checkValidTopic(topic);
- //AMQTopic dest = new AMQTopic(topic.getTopicName());
+
+ // AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ public TemporaryQueue createTemporaryQueue() throws JMSException
{
+ checkNotClosed();
+ return new AMQTemporaryQueue(this);
+ }
+ public TemporaryTopic createTemporaryTopic() throws JMSException
+ {
checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
- if (subscriber != null)
+
+ return new AMQTemporaryTopic(this);
+ }
+
+ public TextMessage createTextMessage() throws JMSException
+ {
+ synchronized (_connection.getFailoverMutex())
{
- if (subscriber.getTopic().equals(topic))
+ checkNotClosed();
+
+ return new JMSTextMessage();
+ }
+ }
+
+ public TextMessage createTextMessage(String text) throws JMSException
+ {
+
+ TextMessage msg = createTextMessage();
+ msg.setText(text);
+
+ return msg;
+ }
+
+ public Topic createTopic(String topicName) throws JMSException
+ {
+ checkNotClosed();
+
+ if (topicName.indexOf('/') == -1)
+ {
+ return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
+ }
+ else
+ {
+ try
{
- throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " +
- name);
+ return new AMQTopic(new AMQBindingURL(topicName));
}
- else
+ catch (URLSyntaxException urlse)
{
- unsubscribe(name);
+ JMSException jmse = new JMSException(urlse.getReason());
+ jmse.setLinkedException(urlse);
+
+ throw jmse;
}
}
+ }
+
+ public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
+ {
+ declareExchange(name, type, getProtocolHandler(), nowait);
+ }
+
+ public int getAcknowledgeMode() throws JMSException
+ {
+ checkNotClosed();
+
+ return _acknowledgeMode;
+ }
+
+ public AMQConnection getAMQConnection()
+ {
+ return _connection;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public int getDefaultPrefetch()
+ {
+ return _defaultPrefetchHighMark;
+ }
+
+ public int getDefaultPrefetchHigh()
+ {
+ return _defaultPrefetchHighMark;
+ }
+
+ public int getDefaultPrefetchLow()
+ {
+ return _defaultPrefetchLowMark;
+ }
+
+ public AMQShortString getDefaultQueueExchangeName()
+ {
+ return _connection.getDefaultQueueExchangeName();
+ }
+
+ public AMQShortString getDefaultTopicExchangeName()
+ {
+ return _connection.getDefaultTopicExchangeName();
+ }
+
+ public MessageListener getMessageListener() throws JMSException
+ {
+ // checkNotClosed();
+ return _messageListener;
+ }
+
+ public AMQShortString getTemporaryQueueExchangeName()
+ {
+ return _connection.getTemporaryQueueExchangeName();
+ }
+
+ public AMQShortString getTemporaryTopicExchangeName()
+ {
+ return _connection.getTemporaryTopicExchangeName();
+ }
+
+ public int getTicket()
+ {
+ return _ticket;
+ }
+
+ public boolean getTransacted() throws JMSException
+ {
+ checkNotClosed();
+
+ return _transacted;
+ }
+
+ public boolean hasConsumer(Destination destination)
+ {
+ AtomicInteger counter = _destinationConsumerCount.get(destination);
+
+ return (counter != null) && (counter.get() != 0);
+ }
+
+ public boolean isStrictAMQP()
+ {
+ return _strictAMQP;
+ }
+
+ public boolean isSuspended()
+ {
+ return _suspended;
+ }
+
+ /**
+ * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
+ * the queue read by the dispatcher.
+ *
+ * @param message the message that has been received
+ */
+ public void messageReceived(UnprocessedMessage message)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message["
+ + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
+ + "] received in session with channel id " + _channelId);
+ }
+
+ if (message.getDeliverBody() == null)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage(message);
+ }
else
{
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
+ _queue.add(message);
+ }
+ }
+
+ /**
+ * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
+ *
+ * <p/>All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all
+ * messages that have been delivered to the client.
+ *
+ * <p/>Restarting a session causes it to take the following actions:
+ *
+ * <ul>
+ * <li>Stop message delivery.</li>
+ * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
+ * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
+ * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
+ * </ul>
+ *
+ * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
+ * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible
+ * for the client to determine whether the broker is going to recover the session or not.
+ *
+ * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
+ * Not that this does not necessarily mean that the recovery has failed, but simply that it
+ * is not possible to tell if it has or not.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ public void recover() throws JMSException
+ {
+ // Ensure that the session is open.
+ checkNotClosed();
+
+ // Ensure that the session is not transacted.
+ checkNotTransacted();
+
+ // this is set only here, and the before the consumer's onMessage is called it is set to false
+ _inRecovery = true;
+ try
+ {
+
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
{
- topicName = ((AMQTopic) topic).getDestinationName();
+ suspendChannel(true);
}
- else
+
+ for (BasicMessageConsumer consumer : _consumers.values())
{
- topicName = new AMQShortString(topic.getTopicName());
+ consumer.clearUnackedMessages();
}
- if (_strictAMQP)
+ if (_dispatcher != null)
{
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting queue deletion regardless.");
- }
+ _dispatcher.rollback();
+ }
- deleteQueue(dest.getAMQQueueName());
+ if (isStrictAMQP())
+ {
+ // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+ _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
+ _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
}
else
{
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
- !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
-
- subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
-
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
- return subscriber;
- }
+ _connection.getProtocolHandler().syncWrite(
+ BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
+ , BasicRecoverOkBody.class);
+ }
- void deleteQueue(AMQShortString queueName) throws JMSException
- {
- try
- {
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- false, // ifEmpty
- false, // ifUnused
- true, // nowait
- queueName, // queue
- getTicket()); // ticket
- getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+ if (!isSuspended)
+ {
+ suspendChannel(false);
+ }
}
catch (AMQException e)
{
- throw new JMSAMQException(e);
+ throw new JMSAMQException("Recover failed: " + e.getMessage(), e);
+ }
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
}
}
- /** Note, currently this does not handle reuse of the same name with different topics correctly. */
- public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
- throws JMSException
+ public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
- checkNotClosed();
- checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
- return subscriber;
- }
- public TopicPublisher createPublisher(Topic topic) throws JMSException
- {
- checkNotClosed();
- return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+ }
+
+ rejectMessage(message.getDeliverBody().deliveryTag, requeue);
}
- public QueueBrowser createBrowser(Queue queue) throws JMSException
+ public void rejectMessage(AbstractJMSMessage message, boolean requeue)
{
- if (isStrictAMQP())
+ if (_logger.isTraceEnabled())
{
- throw new UnsupportedOperationException();
+ _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag());
}
- return createBrowser(queue, null);
+ rejectMessage(message.getDeliveryTag(), requeue);
+
}
- public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
+ public void rejectMessage(long deliveryTag, boolean requeue)
{
- if (isStrictAMQP())
+ if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
{
- throw new UnsupportedOperationException();
- }
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting delivery tag:" + deliveryTag);
+ }
- checkNotClosed();
- checkValidQueue(queue);
- return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
+ AMQFrame basicRejectBody =
+ BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+ requeue);
+
+ _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ }
}
- public TemporaryQueue createTemporaryQueue() throws JMSException
+ /**
+ * Commits all messages done in this transaction and releases any locks currently held.
+ *
+ * <p/>If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the
+ * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
+ * The client will be unable to determine whether or not the rollback actually happened on the broker in this case.
+ *
+ * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
+ * not mean that the rollback is known to have failed, merely that it is not known whether it
+ * failed or not.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ public void rollback() throws JMSException
{
- checkNotClosed();
- return new AMQTemporaryQueue(this);
+ synchronized (_suspensionLock)
+ {
+ checkTransacted();
+
+ try
+ {
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
+ {
+ suspendChannel(true);
+ }
+
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
+ _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+
+ if (!isSuspended)
+ {
+ suspendChannel(false);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Failed to rollback: " + e, e);
+ }
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
+ }
+ }
}
- public TemporaryTopic createTemporaryTopic() throws JMSException
+ public void run()
{
- checkNotClosed();
- return new AMQTemporaryTopic(this);
+ throw new java.lang.UnsupportedOperationException();
}
+ public void setMessageListener(MessageListener listener) throws JMSException
+ {
+ // checkNotClosed();
+ //
+ // if (_dispatcher != null && !_dispatcher.connectionStopped())
+ // {
+ // throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+ // }
+ //
+ // // We are stopped
+ // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ // {
+ // BasicMessageConsumer consumer = i.next();
+ //
+ // if (consumer.isReceiving())
+ // {
+ // throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+ // }
+ // }
+ //
+ // _messageListener = listener;
+ //
+ // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ // {
+ // i.next().setMessageListener(_messageListener);
+ // }
+
+ }
+
+ /*public void setTicket(int ticket)
+ {
+ _ticket = ticket;
+ }*/
+
public void unsubscribe(String name) throws JMSException
{
checkNotClosed();
@@ -1815,7 +1494,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
else
{
_logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
- + " Requesting queue deletion regardless.");
+ + " Requesting queue deletion regardless.");
}
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
@@ -1835,189 +1514,248 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
+ protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh,
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+ final boolean noConsume, final boolean autoClose) throws JMSException
{
- return isQueueBound(exchangeName, queueName, null);
- }
+ checkTemporaryDestination(destination);
- boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
- {
+ final String messageSelector;
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- exchangeName, // exchange
- queueName, // queue
- routingKey); // routingKey
- AMQMethodEvent response = null;
- try
+ if (_strictAMQP && !((selector == null) || selector.equals("")))
{
- response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("Selectors not currently supported by AMQP.");
+ }
+ else
+ {
+ messageSelector = null;
+ }
}
- catch (AMQException e)
+ else
{
- throw new JMSAMQException(e);
+ messageSelector = selector;
}
- ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
- return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency
- }
- private void checkTransacted() throws JMSException
- {
- if (!getTransacted())
- {
- throw new IllegalStateException("Session is not transacted");
- }
- }
+ return new FailoverRetrySupport<MessageConsumer, JMSException>(
+ new FailoverProtectedOperation<MessageConsumer, JMSException>()
+ {
+ public MessageConsumer execute() throws JMSException, FailoverException
+ {
+ checkNotClosed();
- private void checkNotTransacted() throws JMSException
- {
- if (getTransacted())
- {
- throw new IllegalStateException("Session is transacted");
- }
+ AMQDestination amqd = (AMQDestination) destination;
+
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
+ // TODO: Define selectors in AMQP
+ // TODO: construct the rawSelector from the selector string if rawSelector == null
+ final FieldTable ft = FieldTableFactory.newFieldTable();
+ // if (rawSelector != null)
+ // ft.put("headers", rawSelector.getDataAsBytes());
+ if (rawSelector != null)
+ {
+ ft.addAll(rawSelector);
+ }
+
+ BasicMessageConsumer consumer =
+ new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
+ _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow,
+ exclusive, _acknowledgeMode, noConsume, autoClose);
+
+ if (_messageListener != null)
+ {
+ consumer.setMessageListener(_messageListener);
+ }
+
+ try
+ {
+ registerConsumer(consumer, false);
+ }
+ catch (AMQInvalidArgumentException ise)
+ {
+ JMSException ex = new InvalidSelectorException(ise.getMessage());
+ ex.setLinkedException(ise);
+ throw ex;
+ }
+ catch (AMQInvalidRoutingKeyException e)
+ {
+ JMSException ide =
+ new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
+ ide.setLinkedException(e);
+ throw ide;
+ }
+ catch (AMQException e)
+ {
+ JMSException ex = new JMSException("Error registering consumer: " + e);
+
+ if (_logger.isDebugEnabled())
+ {
+ e.printStackTrace();
+ }
+
+ ex.setLinkedException(e);
+ throw ex;
+ }
+
+ synchronized (destination)
+ {
+ _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
+ _destinationConsumerCount.get(destination).incrementAndGet();
+ }
+
+ return consumer;
+ }
+ }, _connection).execute();
}
/**
- * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
- * the queue read by the dispatcher.
+ * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
+ * instance.
*
- * @param message the message that has been received
+ * @param consumer the consum
*/
- public void messageReceived(UnprocessedMessage message)
+ void deregisterConsumer(BasicMessageConsumer consumer)
{
- if (_logger.isDebugEnabled())
+ if (_consumers.remove(consumer.getConsumerTag()) != null)
{
- _logger.debug("Message[" + (message.getDeliverBody() == null ?
- "B:" + message.getBounceBody() : "D:" + message.getDeliverBody())
- + "] received in session with channel id " + _channelId);
- }
+ String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+ if (subscriptionName != null)
+ {
+ _subscriptions.remove(subscriptionName);
+ }
- if (message.getDeliverBody() == null)
- {
- // Return of the bounced message.
- returnBouncedMessage(message);
- }
- else
- {
- _queue.add(message);
+ Destination dest = consumer.getDestination();
+ synchronized (dest)
+ {
+ if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ {
+ _destinationConsumerCount.remove(dest);
+ }
+ }
}
}
- private void returnBouncedMessage(final UnprocessedMessage message)
+ void deregisterProducer(long producerId)
{
- _connection.performConnectionTask(
- new Runnable()
- {
- public void run()
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
- false,
- message.getBounceBody().exchange,
- message.getBounceBody().routingKey,
- message.getContentHeader(),
- message.getBodies());
-
- AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
- AMQShortString reason = message.getBounceBody().replyText;
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
+ _producers.remove(new Long(producerId));
+ }
- }
- catch (Exception e)
- {
- _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
- }
- }
- });
+ boolean isInRecovery()
+ {
+ return _inRecovery;
+ }
+
+ boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
+ {
+ return isQueueBound(exchangeName, queueName, null);
}
/**
- * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a
- * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar.
+ * Tests whether or not the specified queue is bound to the specified exchange under a particular routing key.
*
- * @param deliveryTag the tag of the last message to be acknowledged
- * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery
- * tag
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param exchangeName The exchange name to test for binding against.
+ * @param queueName The queue name to check if bound.
+ * @param routingKey The routing key to check if the queue is bound under.
+ *
+ * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
+ *
+ * @throws JMSException If the query fails for any reason.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws JMSException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- deliveryTag, // deliveryTag
- multiple); // multiple
- if (_logger.isDebugEnabled())
+ try
{
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ AMQMethodEvent response =
+ new FailoverRetrySupport<AMQMethodEvent, AMQException>(
+ new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+ {
+ public AMQMethodEvent execute() throws AMQException, FailoverException
+ {
+ AMQFrame boundFrame =
+ ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(),
+ getProtocolMinorVersion(), exchangeName, // exchange
+ queueName, // queue
+ routingKey); // routingKey
+
+ return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+
+ }
+ }, _connection).execute();
+
+ // Extract and return the response code from the query.
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+ return (responseBody.replyCode == 0);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
}
- getProtocolHandler().writeFrame(ackFrame);
}
- public int getDefaultPrefetch()
+ /**
+ * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
+ * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
+ */
+ void markClosed()
{
- return _defaultPrefetchHighMark;
+ _closed.set(true);
+ _connection.deregisterSession(_channelId);
+ markClosedProducersAndConsumers();
+
}
- public int getDefaultPrefetchHigh()
+ /**
+ * Resubscribes all producers and consumers. This is called when performing failover.
+ *
+ * @throws AMQException
+ */
+ void resubscribe() throws AMQException
{
- return _defaultPrefetchHighMark;
+ resubscribeProducers();
+ resubscribeConsumers();
}
- public int getDefaultPrefetchLow()
+ void setHasMessageListeners()
{
- return _defaultPrefetchLowMark;
+ _hasMessageListeners = true;
}
- public int getChannelId()
+ void setInRecovery(boolean inRecovery)
{
- return _channelId;
+ _inRecovery = inRecovery;
}
+ /**
+ * Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
+ *
+ * @throws AMQException If the session cannot be started for any reason.
+ *
+ * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the
+ * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
+ * for each subsequent call to flow.. only need to do this if we have called stop.
+ */
void start() throws AMQException
{
- //fixme This should be controlled by _stopped as it pairs with the stop method
- //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled.
- //will result in sending Flow messages for each subsequent call to flow.. only need to do this
- // if we have called stop.
+ // Check if the session has perviously been started and suspended, in which case it must be unsuspended.
if (_startedAtLeastOnce.getAndSet(true))
{
- //then we stopped this and are restarting, so signal server to resume delivery
suspendChannel(false);
}
+ // If the event dispatcher is not running then start it too.
if (hasMessageListeners())
{
startDistpatcherIfNecessary();
}
}
- private boolean hasMessageListeners()
- {
- return _hasMessageListeners;
- }
-
- void setHasMessageListeners()
- {
- _hasMessageListeners = true;
- }
-
synchronized void startDistpatcherIfNecessary()
{
// If IMMEDIATE_PREFETCH is not set then we need to start fetching
@@ -2032,7 +1770,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
catch (AMQException e)
{
- _logger.info("Suspending channel threw an exception:" + e);
+ _logger.info("Unsuspending channel threw an exception:" + e);
}
}
}
@@ -2057,7 +1795,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
void stop() throws AMQException
{
- //stop the server delivering messages to this session
+ // Stop the server delivering messages to this session.
suspendChannel(true);
if (_dispatcher != null)
@@ -2066,320 +1804,556 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- /**
- * Callers must hold the failover mutex before calling this method.
+ /*
+ * Binds the named queue, with the specified routing key, to the named exchange.
*
- * @param consumer
+ * <p/>Note that this operation automatically retries in the event of fail-over.
*
- * @throws AMQException
+ * @param queueName The name of the queue to bind.
+ * @param routingKey The routing key to bind the queue with.
+ * @param arguments Additional arguments.
+ * @param exchangeName The exchange to bind the queue on.
+ *
+ * @throws AMQException If the queue cannot be bound for any reason.
*/
- void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException
+ /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft)
+ throws AMQException, FailoverException
{
- AMQDestination amqd = consumer.getDestination();
+ AMQFrame queueBind =
+ QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments
+ amqd.getExchangeName(), // exchange
+ false, // nowait
+ queueName, // queue
+ amqd.getRoutingKey(), // routingKey
+ getTicket()); // ticket
- AMQProtocolHandler protocolHandler = getProtocolHandler();
-
- declareExchange(amqd, protocolHandler, false);
-
- AMQShortString queueName = declareQueue(amqd, protocolHandler);
-
- bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
-
- // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
- if (!_immediatePrefetch)
- {
- // The dispatcher will be null if we have just created this session
- // so suspend the channel before we register our consumer so that we don't
- // start prefetching until a receive/mListener is set.
- if (_dispatcher == null)
- {
- if (!isSuspended())
- {
- try
- {
- suspendChannel(true);
- _logger.info("Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
- }
- catch (AMQException e)
- {
- _logger.info("Suspending channel threw an exception:" + e);
- }
- }
- }
- }
- else
- {
- _logger.info("Immediately prefetching existing messages to new consumer.");
- }
+ protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
+ }*/
- try
- {
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
- }
- catch (JMSException e) //thrown by getMessageSelector
+ private void checkNotTransacted() throws JMSException
+ {
+ if (getTransacted())
{
- throw new AMQException(e.getMessage(), e);
+ throw new IllegalStateException("Session is transacted");
}
}
- /**
- * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
- * instance.
- *
- * @param consumer the consum
- */
- void deregisterConsumer(BasicMessageConsumer consumer)
+ private void checkTemporaryDestination(Destination destination) throws JMSException
{
- if (_consumers.remove(consumer.getConsumerTag()) != null)
+ if ((destination instanceof TemporaryDestination))
{
- String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if (subscriptionName != null)
+ _logger.debug("destination is temporary");
+ final TemporaryDestination tempDest = (TemporaryDestination) destination;
+ if (tempDest.getSession() != this)
{
- _subscriptions.remove(subscriptionName);
+ _logger.debug("destination is on different session");
+ throw new JMSException("Cannot consume from a temporary destination created onanother session");
}
- Destination dest = consumer.getDestination();
- synchronized (dest)
+ if (tempDest.isDeleted())
{
- if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
- {
- _destinationConsumerCount.remove(dest);
- }
+ _logger.debug("destination is deleted");
+ throw new JMSException("Cannot consume from a deleted destination");
}
}
}
- private void registerProducer(long producerId, MessageProducer producer)
+ private void checkTransacted() throws JMSException
{
- _producers.put(new Long(producerId), producer);
+ if (!getTransacted())
+ {
+ throw new IllegalStateException("Session is not transacted");
+ }
}
- void deregisterProducer(long producerId)
+ private void checkValidDestination(Destination destination) throws InvalidDestinationException
{
- _producers.remove(new Long(producerId));
+ if (destination == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
}
- private long getNextProducerId()
+ private void checkValidQueue(Queue queue) throws InvalidDestinationException
{
- return ++_nextProducerId;
+ if (queue == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
}
- /**
- * Resubscribes all producers and consumers. This is called when performing failover.
- *
- * @throws AMQException
+ /*
+ * I could have combined the last 3 methods, but this way it improves readability
*/
- void resubscribe() throws AMQException
+ private AMQTopic checkValidTopic(Topic topic) throws JMSException
{
- resubscribeProducers();
- resubscribeConsumers();
+ if (topic == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Topic");
+ }
+
+ if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this))
+ {
+ throw new javax.jms.InvalidDestinationException(
+ "Cannot create a subscription on a temporary topic created in another session");
+ }
+
+ if (!(topic instanceof AMQTopic))
+ {
+ throw new javax.jms.InvalidDestinationException(
+ "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
+ + topic.getClass().getName());
+ }
+
+ return (AMQTopic) topic;
}
- private void resubscribeProducers() throws AMQException
+ /**
+ * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
+ *
+ * @param error not null if this is a result of an error occurring at the connection level
+ */
+ private void closeConsumers(Throwable error) throws JMSException
{
- ArrayList producers = new ArrayList(_producers.values());
- _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
- for (Iterator it = producers.iterator(); it.hasNext();)
+ if (_dispatcher != null)
{
- BasicMessageProducer producer = (BasicMessageProducer) it.next();
- producer.resubscribe();
+ _dispatcher.close();
+ _dispatcher = null;
+ }
+ // we need to clone the list of consumers since the close() method updates the _consumers collection
+ // which would result in a concurrent modification exception
+ final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+
+ final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+ while (it.hasNext())
+ {
+ final BasicMessageConsumer con = it.next();
+ if (error != null)
+ {
+ con.notifyError(error);
+ }
+ else
+ {
+ con.close();
+ }
}
+ // at this point the _consumers map will be empty
}
- private void resubscribeConsumers() throws AMQException
+ /**
+ * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
+ * currently no way of propagating errors to message producers (this is a JMS limitation).
+ */
+ private void closeProducers() throws JMSException
{
- ArrayList consumers = new ArrayList(_consumers.values());
- _consumers.clear();
+ // we need to clone the list of producers since the close() method updates the _producers collection
+ // which would result in a concurrent modification exception
+ final ArrayList clonedProducers = new ArrayList(_producers.values());
- for (Iterator it = consumers.iterator(); it.hasNext();)
+ final Iterator it = clonedProducers.iterator();
+ while (it.hasNext())
{
- BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
- registerConsumer(consumer, true);
+ final BasicMessageProducer prod = (BasicMessageProducer) it.next();
+ prod.close();
}
+ // at this point the _producers map is empty
}
- private void suspendChannel(boolean suspend) throws AMQException
+ /**
+ * Close all producers or consumers. This is called either in the error case or when closing the session normally.
+ *
+ * @param amqe the exception, may be null to indicate no error has occurred
+ */
+ private void closeProducersAndConsumers(AMQException amqe) throws JMSException
{
- synchronized (_suspensionLock)
+ JMSException jmse = null;
+ try
{
- if (_logger.isDebugEnabled())
+ closeProducers();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing session: " + e, e);
+ jmse = e;
+ }
+
+ try
+ {
+ closeConsumers(amqe);
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing session: " + e, e);
+ if (jmse == null)
{
- _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
+ jmse = e;
}
+ }
- _suspended = suspend;
-
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- !suspend); // active
-
- _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ if (jmse != null)
+ {
+ throw jmse;
}
}
-
- public void confirmConsumerCancelled(AMQShortString consumerTag)
+ /**
+ * Register to consume from the queue.
+ *
+ * @param queueName
+ */
+ private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
+ AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
+ // need to generate a consumer tag on the client so we can exploit the nowait flag
+ AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
- // Remove the consumer from the map
- BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
- if (consumer != null)
+ FieldTable arguments = FieldTableFactory.newFieldTable();
+ if ((messageSelector != null) && !messageSelector.equals(""))
{
-// fixme this isn't right.. needs to check if _queue contains data for this consumer
- if (consumer.isAutoClose())// && _queue.isEmpty())
- {
- consumer.closeWhenNoMessages(true);
- }
+ arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+ }
- if (!consumer.isNoConsume())
- {
- //Clean the Maps up first
- //Flush any pending messages for this consumerTag
- if (_dispatcher != null)
- {
- _logger.info("Dispatcher is not null");
- }
- else
- {
- _logger.info("Dispatcher is null so created stopped dispatcher");
+ if (consumer.isAutoClose())
+ {
+ arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+ }
- startDistpatcherIfNecessary(true);
- }
+ if (consumer.isNoConsume())
+ {
+ arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+ }
- _dispatcher.rejectPending(consumer);
+ consumer.setConsumerTag(tag);
+ // we must register the consumer in the map before we actually start listening
+ _consumers.put(tag, consumer);
+
+ try
+ {
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ AMQFrame jmsConsume =
+ BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
+ tag, // consumerTag
+ consumer.isExclusive(), // exclusive
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+ consumer.isNoLocal(), // noLocal
+ nowait, // nowait
+ queueName, // queue
+ getTicket()); // ticket
+
+ if (nowait)
+ {
+ protocolHandler.writeFrame(jmsConsume);
}
else
{
- //Just close the consumer
- //fixme the CancelOK is being processed before the arriving messages..
- // The dispatcher is still to process them so the server sent in order but the client
- // has yet to receive before the close comes in.
-
-// consumer.markClosed();
+ protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
}
}
- else
+ catch (AMQException e)
{
- _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
+ // clean-up the map in the event of an error
+ _consumers.remove(tag);
+ throw e;
}
+ }
+ private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+ throws JMSException
+ {
+ return createProducerImpl(destination, mandatory, immediate, false);
+ }
+ private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
+ final boolean immediate, final boolean waitUntilSent) throws JMSException
+ {
+ return new FailoverRetrySupport<BasicMessageProducer, JMSException>(
+ new FailoverProtectedOperation<BasicMessageProducer, JMSException>()
+ {
+ public BasicMessageProducer execute() throws JMSException, FailoverException
+ {
+ checkNotClosed();
+ long producerId = getNextProducerId();
+ BasicMessageProducer producer =
+ new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
+ AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+ registerProducer(producerId, producer);
+
+ return producer;
+ }
+ }, _connection).execute();
}
- /*
- * I could have combined the last 3 methods, but this way it improves readability
- */
- private AMQTopic checkValidTopic(Topic topic) throws JMSException
+ private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
- if (topic == null)
- {
- throw new javax.jms.InvalidDestinationException("Invalid Topic");
- }
- if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
- {
- throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session");
- }
- if (!(topic instanceof AMQTopic))
- {
- throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName());
- }
- return (AMQTopic) topic;
+ declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
}
- private void checkValidQueue(Queue queue) throws InvalidDestinationException
+ /**
+ * Declares the named exchange and type of exchange.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param name The name of the exchange to declare.
+ * @param type The type of the exchange to declare.
+ * @param protocolHandler The protocol handler to process the communication through.
+ * @param nowait
+ *
+ * @throws AMQException If the exchange cannot be declared for any reason.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ private void declareExchange(final AMQShortString name, final AMQShortString type,
+ final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
{
- if (queue == null)
- {
- throw new javax.jms.InvalidDestinationException("Invalid Queue");
- }
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ AMQFrame exchangeDeclare =
+ ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ nowait, // nowait
+ false, // passive
+ getTicket(), // ticket
+ type); // type
+
+ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+
+ return null;
+ }
+ }, _connection).execute();
}
- private void checkValidDestination(Destination destination) throws InvalidDestinationException
+ /**
+ * Declares a queue for a JMS destination.
+ *
+ * <p/>Note that for queues but not topics the name is generated in the client rather than the server. This allows
+ * the name to be reused on failover if required. In general, the destination indicates whether it wants a name
+ * generated or not.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param amqd The destination to declare as a queue.
+ * @param protocolHandler The protocol handler to communicate through.
+ *
+ * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of
+ * the client.
+ *
+ * @throws AMQException If the queue cannot be declared for any reason.
+ *
+ * @todo Verify the destiation is valid or throw an exception.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
+ throws AMQException
{
- if (destination == null)
+ /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+ return new FailoverNoopSupport<AMQShortString, AMQException>(
+ new FailoverProtectedOperation<AMQShortString, AMQException>()
+ {
+ public AMQShortString execute() throws AMQException, FailoverException
+ {
+ // Generate the queue name if the destination indicates that a client generated name is to be used.
+ if (amqd.isNameRequired())
+ {
+ amqd.setQueueName(protocolHandler.generateQueueName());
+ }
+
+ AMQFrame queueDeclare =
+ QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ false, // nowait
+ false, // passive
+ amqd.getAMQQueueName(), // queue
+ getTicket()); // ticket
+
+ protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+
+ return amqd.getAMQQueueName();
+ }
+ }, _connection).execute();
+ }
+
+ /**
+ * Undeclares the specified queue.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param queueName The name of the queue to delete.
+ *
+ * @throws JMSException If the queue could not be deleted for any reason.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ private void deleteQueue(final AMQShortString queueName) throws JMSException
+ {
+ try
{
- throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ AMQFrame queueDeleteFrame =
+ QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ false, // ifEmpty
+ false, // ifUnused
+ true, // nowait
+ queueName, // queue
+ getTicket()); // ticket
+
+ getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+
+ return null;
+ }
+ }, _connection).execute();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("The queue deletion failed: " + e.getMessage(), e);
}
}
-
- public AMQShortString getTemporaryTopicExchangeName()
+ private long getNextProducerId()
{
- return _connection.getTemporaryTopicExchangeName();
+ return ++_nextProducerId;
}
- public AMQShortString getTemporaryQueueExchangeName()
+ private AMQProtocolHandler getProtocolHandler()
{
- return _connection.getTemporaryQueueExchangeName();
+ return _connection.getProtocolHandler();
}
-
- public int getTicket()
+ private byte getProtocolMajorVersion()
{
- return _ticket;
+ return getProtocolHandler().getProtocolMajorVersion();
}
- public void setTicket(int ticket)
+ private byte getProtocolMinorVersion()
{
- _ticket = ticket;
+ return getProtocolHandler().getProtocolMinorVersion();
}
+ private boolean hasMessageListeners()
+ {
+ return _hasMessageListeners;
+ }
- public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException
+ private void markClosedConsumers() throws JMSException
{
- getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- active,
- exclusive,
- passive,
- read,
- realm,
- write),
- new BlockingMethodFrameListener(_channelId)
- {
-
- public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
- {
- if (frame instanceof AccessRequestOkBody)
- {
- setTicket(((AccessRequestOkBody) frame).getTicket());
- return true;
- }
- else
- {
- return false;
- }
- }
- });
+ if (_dispatcher != null)
+ {
+ _dispatcher.close();
+ _dispatcher = null;
+ }
+ // we need to clone the list of consumers since the close() method updates the _consumers collection
+ // which would result in a concurrent modification exception
+ final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+ final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+ while (it.hasNext())
+ {
+ final BasicMessageConsumer con = it.next();
+ con.markClosed();
+ }
+ // at this point the _consumers map will be empty
}
- private class SuspenderRunner implements Runnable
+ private void markClosedProducersAndConsumers()
{
- private boolean _suspend;
+ try
+ {
+ // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
+ closeProducers();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing session: " + e, e);
+ }
- public SuspenderRunner(boolean suspend)
+ try
{
- _suspend = suspend;
+ markClosedConsumers();
}
+ catch (JMSException e)
+ {
+ _logger.error("Error closing session: " + e, e);
+ }
+ }
- public void run()
+ /**
+ * Callers must hold the failover mutex before calling this method.
+ *
+ * @param consumer
+ *
+ * @throws AMQException
+ */
+ private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException
+ {
+ AMQDestination amqd = consumer.getDestination();
+
+ AMQProtocolHandler protocolHandler = getProtocolHandler();
+
+ declareExchange(amqd, protocolHandler, false);
+
+ AMQShortString queueName = declareQueue(amqd, protocolHandler);
+
+ // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName());
+
+ // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
+ if (!_immediatePrefetch)
{
- try
- {
- suspendChannel(_suspend);
- }
- catch (AMQException e)
+ // The dispatcher will be null if we have just created this session
+ // so suspend the channel before we register our consumer so that we don't
+ // start prefetching until a receive/mListener is set.
+ if (_dispatcher == null)
{
- _logger.warn("Unable to suspend channel");
+ if (!isSuspended())
+ {
+ try
+ {
+ suspendChannel(true);
+ _logger.info(
+ "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
+ }
+ }
}
}
+ else
+ {
+ _logger.info("Immediately prefetching existing messages to new consumer.");
+ }
+
+ try
+ {
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+ }
+ catch (JMSException e) // thrown by getMessageSelector
+ {
+ throw new AMQException(e.getMessage(), e);
+ }
+ catch (FailoverException e)
+ {
+ throw new AMQException("Fail-over exception interrupted basic consume.", e);
+ }
}
+ private void registerProducer(long producerId, MessageProducer producer)
+ {
+ _producers.put(new Long(producerId), producer);
+ }
private void rejectAllMessages(boolean requeue)
{
@@ -2396,8 +2370,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
Iterator messages = _queue.iterator();
if (_logger.isInfoEnabled())
{
- _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag +
- ") (PDispatchQ) requeue:" + requeue);
+ _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
+ + requeue);
if (messages.hasNext())
{
@@ -2412,12 +2386,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag))
+ if ((consumerTag == null) || message.getDeliverBody().consumerTag.equals(consumerTag))
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Removing message(" + System.identityHashCode(message) +
- ") from _queue DT:" + message.getDeliverBody().deliveryTag);
+ _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
+ + message.getDeliverBody().deliveryTag);
}
messages.remove();
@@ -2432,50 +2406,361 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
-
- public void rejectMessage(UnprocessedMessage message, boolean requeue)
+ private void resubscribeConsumers() throws AMQException
{
+ ArrayList consumers = new ArrayList(_consumers.values());
+ _consumers.clear();
- if (_logger.isTraceEnabled())
+ for (Iterator it = consumers.iterator(); it.hasNext();)
{
- _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+ BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+ registerConsumer(consumer, true);
}
-
- rejectMessage(message.getDeliverBody().deliveryTag, requeue);
}
- public void rejectMessage(AbstractJMSMessage message, boolean requeue)
+ private void resubscribeProducers() throws AMQException
{
- if (_logger.isTraceEnabled())
+ ArrayList producers = new ArrayList(_producers.values());
+ _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
+ for (Iterator it = producers.iterator(); it.hasNext();)
{
- _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag());
+ BasicMessageProducer producer = (BasicMessageProducer) it.next();
+ producer.resubscribe();
}
- rejectMessage(message.getDeliveryTag(), requeue);
+ }
+
+ private void returnBouncedMessage(final UnprocessedMessage message)
+ {
+ _connection.performConnectionTask(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage =
+ _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange,
+ message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies());
+
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
+ AMQShortString reason = message.getBounceBody().replyText;
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
+ else
+ {
+ _connection.exceptionReceived(
+ new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(
+ "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
+ e);
+ }
+ }
+ });
}
- public void rejectMessage(long deliveryTag, boolean requeue)
+ /**
+ * Suspends or unsuspends this session.
+ *
+ * @param suspend <tt>true</tt> indicates that the session should be suspended, <tt>false<tt> indicates that it
+ * should be unsuspended.
+ *
+ * @throws AMQException If the session cannot be suspended for any reason.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ private void suspendChannel(boolean suspend) throws AMQException // , FailoverException
{
- if (_acknowledgeMode == CLIENT_ACKNOWLEDGE ||
- _acknowledgeMode == SESSION_TRANSACTED)
+ synchronized (_suspensionLock)
{
- if (_logger.isDebugEnabled())
+ try
{
- _logger.debug("Rejecting delivery tag:" + deliveryTag);
- }
- AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- deliveryTag,
- requeue);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
+ }
- _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ _suspended = suspend;
+
+ AMQFrame channelFlowFrame =
+ ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ !suspend);
+
+ _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ }
+ catch (FailoverException e)
+ {
+ throw new AMQException("Fail-over interrupted suspend/unsuspend channel.", e);
+ }
}
}
- public boolean isStrictAMQP()
+ /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
+ private class Dispatcher extends Thread
{
- return _strictAMQP;
+
+ /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private final Object _lock = new Object();
+
+ public Dispatcher()
+ {
+ super("Dispatcher-Channel-" + _channelId);
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(getName() + " created");
+ }
+ }
+
+ public void close()
+ {
+ _closed.set(true);
+ interrupt();
+
+ // fixme awaitTermination
+
+ }
+
+ public void rejectPending(BasicMessageConsumer consumer)
+ {
+ synchronized (_lock)
+ {
+ boolean stopped = _dispatcher.connectionStopped();
+
+ if (!stopped)
+ {
+ _dispatcher.setConnectionStopped(true);
+ }
+
+ // Reject messages on pre-receive queue
+ consumer.rollback();
+
+ // Reject messages on pre-dispatch queue
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+
+ // closeConsumer
+ consumer.markClosed();
+
+ _dispatcher.setConnectionStopped(stopped);
+
+ }
+ }
+
+ public void rollback()
+ {
+
+ synchronized (_lock)
+ {
+ boolean isStopped = connectionStopped();
+
+ if (!isStopped)
+ {
+ setConnectionStopped(true);
+ }
+
+ rejectAllMessages(true);
+
+ _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
+
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ if (!consumer.isNoConsume())
+ {
+ consumer.rollback();
+ }
+ else
+ {
+ // should perhaps clear the _SQ here.
+ // consumer._synchronousQueue.clear();
+ consumer.clearReceiveQueue();
+ }
+
+ }
+
+ setConnectionStopped(isStopped);
+ }
+
+ }
+
+ public void run()
+ {
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(getName() + " started");
+ }
+
+ UnprocessedMessage message;
+
+ // Allow disptacher to start stopped
+ synchronized (_lock)
+ {
+ while (connectionStopped())
+ {
+ try
+ {
+ _lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ }
+ }
+
+ try
+ {
+ while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null))
+ {
+ synchronized (_lock)
+ {
+
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+
+ dispatchMessage(message);
+
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+
+ }
+
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
+ }
+ }
+
+ // only call while holding lock
+ final boolean connectionStopped()
+ {
+ return _connectionStopped;
+ }
+
+ boolean setConnectionStopped(boolean connectionStopped)
+ {
+ boolean currently;
+ synchronized (_lock)
+ {
+ currently = _connectionStopped;
+ _connectionStopped = connectionStopped;
+ _lock.notify();
+
+ if (_dispatcherLogger.isDebugEnabled())
+ {
+ _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started")
+ + ": Currently " + (currently ? "Stopped" : "Started"));
+ }
+ }
+
+ return currently;
+ }
+
+ private void dispatchMessage(UnprocessedMessage message)
+ {
+ if (message.getDeliverBody() != null)
+ {
+ final BasicMessageConsumer consumer =
+ (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+
+ if ((consumer == null) || consumer.isClosed())
+ {
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ if (consumer == null)
+ {
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+ + message.getDeliverBody().deliveryTag + "] from queue "
+ + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
+ }
+ else
+ {
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+ + message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
+ + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
+ }
+ }
+ // Don't reject if we're already closing
+ if (!_closed.get())
+ {
+ rejectMessage(message, true);
+ }
+ }
+ else
+ {
+ consumer.notifyMessage(message, _channelId);
+ }
+ }
+ }
}
+ /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
+ boolean read) throws AMQException
+ {
+ getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
+ getProtocolMajorVersion(), getProtocolMinorVersion(), active, exclusive, passive, read, realm, write),
+ new BlockingMethodFrameListener(_channelId)
+ {
+
+ public boolean processMethod(int channelId, AMQMethodBody frame) // throws AMQException
+ {
+ if (frame instanceof AccessRequestOkBody)
+ {
+ setTicket(((AccessRequestOkBody) frame).getTicket());
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ });
+ }*/
+
+ private class SuspenderRunner implements Runnable
+ {
+ private boolean _suspend;
+
+ public SuspenderRunner(boolean suspend)
+ {
+ _suspend = suspend;
+ }
+
+ public void run()
+ {
+ try
+ {
+ suspendChannel(_suspend);
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Unable to suspend channel");
+ }
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 1c3cdbcb65..3a31eda754 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -20,22 +20,10 @@
*/
package org.apache.qpid.client;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
@@ -48,6 +36,19 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);
@@ -140,9 +141,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
- String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -219,7 +220,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
_logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination "
- + _destination);
+ + _destination);
}
}
else
@@ -468,7 +469,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_closedStack != null)
{
_logger.trace(_consumerTag + " close():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -481,9 +482,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
// TODO: Be aware of possible changes to parameter order as versions change.
final AMQFrame cancelFrame =
- BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
- false); // nowait
+ BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
+ false); // nowait
try
{
@@ -497,10 +498,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (AMQException e)
{
- // _logger.error("Error closing consumer: " + e, e);
- JMSException jmse = new JMSException("Error closing consumer: " + e);
- jmse.setLinkedException(e);
- throw jmse;
+ throw new JMSAMQException("Error closing consumer: " + e, e);
+ }
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
}
else
@@ -540,7 +542,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_closedStack != null)
{
_logger.trace(_consumerTag + " markClosed():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -572,9 +574,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
- messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
- messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
+ _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
+ messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
+ messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
if (debug)
{
@@ -659,15 +661,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
- case Session.PRE_ACKNOWLEDGE:
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- break;
+ case Session.PRE_ACKNOWLEDGE:
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ break;
- case Session.CLIENT_ACKNOWLEDGE:
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
- break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ break;
}
}
@@ -677,55 +679,55 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
- break;
+ break;
- case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetchHigh)
- {
- _dups_ok_acknowledge_send = true;
- }
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
- if (_outstanding <= _prefetchLow)
- {
- _dups_ok_acknowledge_send = false;
- }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
- if (_dups_ok_acknowledge_send)
+ if (_dups_ok_acknowledge_send)
+ {
+ if (!_session.isInRecovery())
{
- if (!_session.isInRecovery())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
- }
+ _session.acknowledgeMessage(msg.getDeliveryTag(), true);
}
+ }
- break;
+ break;
- case Session.AUTO_ACKNOWLEDGE:
- // we do not auto ack a message if the application code called recover()
- if (!_session.isInRecovery())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ case Session.AUTO_ACKNOWLEDGE:
+ // we do not auto ack a message if the application code called recover()
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
- break;
+ break;
- case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _receivedDeliveryTags.add(msg.getDeliveryTag());
- }
+ case Session.SESSION_TRANSACTED:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
+ }
- break;
+ break;
}
}
@@ -757,7 +759,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_closedStack != null)
{
_logger.trace(_consumerTag + " notifyError():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously" + _closedStack.toString());
}
else
@@ -817,7 +819,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- public void acknowledge() throws JMSException
+ public void acknowledge() // throws JMSException
{
if (!isClosed())
{
@@ -877,7 +879,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
- + "for consumer with tag:" + _consumerTag);
+ + "for consumer with tag:" + _consumerTag);
}
Long tag = _receivedDeliveryTags.poll();
@@ -907,7 +909,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
- + "for consumer with tag:" + _consumerTag);
+ + "for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
@@ -931,7 +933,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
else
{
_logger.error("Queue contained a :" + o.getClass()
- + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
iterator.remove();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
index d1237cff49..0927ca3625 100644
--- a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
@@ -18,29 +18,12 @@
* under the License.
*
*/
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
package org.apache.qpid.client;
-import javax.jms.JMSException;
-
import org.apache.qpid.AMQException;
+import javax.jms.JMSException;
+
/**
* JMSException does not accept wrapped exceptions in its constructor. Presumably this is because it is a relatively old
* Java exception class, before this was added as a default to Throwable. This exception class accepts wrapped exceptions
@@ -50,8 +33,6 @@ import org.apache.qpid.AMQException;
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Accept wrapped exceptions as a JMSException.
* </table>
- *
- * @author Apache Software Foundation
*/
public class JMSAMQException extends JMSException
{
@@ -71,6 +52,11 @@ public class JMSAMQException extends JMSException
}
}
+ /**
+ * @param s The underlying exception.
+ *
+ * @deprecated Use the other constructor and write a helpfull message. This one will be deleted.
+ */
public JMSAMQException(AMQException s)
{
super(s.getMessage(), String.valueOf(s.getErrorCode()));
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java
index 49377fdc19..037b0dc2d1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,10 +21,26 @@
package org.apache.qpid.client.failover;
/**
- * This exception is thrown when failover is taking place and we need to let other
- * parts of the client know about this.
+ * FailoverException is used to indicate that a synchronous request has failed to receive the reply that it is waiting
+ * for because the fail-over process has been started whilst it was waiting for its reply. Synchronous methods generally
+ * raise this exception to indicate that they must be re-tried once the fail-over process has completed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Used to indicate failure of a synchronous request due to fail-over.
+ * </table>
+ *
+ * @todo This exception is created and passed as an argument to a method, rather than thrown. The exception is being
+ * used to represent an event, passed out to other threads. Use of exceptions as arguments rather than as
+ * exceptions is extremly confusing. Ideally use a condition or set a flag and check it instead.
+ * This exceptions-as-events pattern seems to be in a similar style to Mina code, which is not pretty, but
+ * potentially acceptable for that reason. We have the option of extending the mina model to add more events
+ * to it, that is, anything that is interested in handling failover as an event occurs below the main
+ * amq event handler, which knows the specific interface of the qpid handlers, which can pass this down as
+ * an explicit event, without it being an exception. Add failover method to BlockingMethodFrameListener,
+ * have it set a flag or interrupt the waiting thread, which then creates and raises this exception.
*/
-public class FailoverException extends RuntimeException
+public class FailoverException extends Exception
{
public FailoverException(String message)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 844ecbe743..dbbceff523 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,59 +20,108 @@
*/
package org.apache.qpid.client.failover;
-import java.util.concurrent.CountDownLatch;
-
import org.apache.log4j.Logger;
+
import org.apache.mina.common.IoSession;
+
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
+import java.util.concurrent.CountDownLatch;
+
/**
- * When failover is required, we need a separate thread to handle the establishment of the new connection and
- * the transfer of subscriptions.
- * </p>
- * The reason this needs to be a separate thread is because you cannot do this work inside the MINA IO processor
- * thread. One significant task is the connection setup which involves a protocol exchange until a particular state
- * is achieved. However if you do this in the MINA thread, you have to block until the state is achieved which means
- * the IO processor is not able to do anything at all.
+ * FailoverHandler is a continuation that performs the failover procedure on a protocol session. As described in the
+ * class level comment for {@link AMQProtocolHandler}, a protocol connection can span many physical transport
+ * connections, failing over to a new connection if the transport connection fails. The procedure to establish a new
+ * connection is expressed as a continuation, in order that it may be run in a seperate thread to the i/o thread that
+ * detected the failure and is used to handle the communication to establish a new connection.
+ *
+ * </p>The reason this needs to be a separate thread is because this work cannot be done inside the i/o processor
+ * thread. The significant task is the connection setup which involves a protocol exchange until a particular state
+ * is achieved. This procedure waits until the state is achieved which would prevent the i/o thread doing the work
+ * it needs to do to achieve the new state.
+ *
+ * <p/>The failover procedure does the following:
+ *
+ * <ol>
+ * <li>Sets the failing over condition to true.</li>
+ * <li>Creates a {@link FailoverException} and gets the protocol connection handler to propagate this event to all
+ * interested parties.</li>
+ * <li>Takes the failover mutex on the protocol connection handler.</li>
+ * <li>Abandons the fail over if any of the interested parties vetoes it. The mutex is released and the condition
+ * reset.</li>
+ * <li>Creates a new {@link AMQStateManager} and re-established the connection through it.</li>
+ * <li>Informs the AMQConnection if the connection cannot be re-established.</li>
+ * <li>Recreates all sessions from the old connection to the new.</li>
+ * <li>Resets the failing over condition and releases the mutex.</li>
+ * </ol>
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Update fail-over state <td> {@link AMQProtocolHandler}
+ * </table>
+ *
+ * @todo The failover latch and mutex are used like a lock and condition. If the retrotranlator supports lock/condition
+ * then could change over to using them. 1.4 support still needed.
+ *
+ * @todo If the condition is set to null on a vetoes fail-over and there are already other threads waiting on the
+ * condition, they will never be released. It might be an idea to reset the condition in a finally block.
+ *
+ * @todo Creates a {@link AMQDisconnectedException} and passes it to the AMQConnection. No need to use an
+ * exception-as-argument here, could just as easily call a specific method for this purpose on AMQConnection.
+ *
+ * @todo Creates a {@link FailoverException} and propagates it to the MethodHandlers. No need to use an
+ * exception-as-argument here, could just as easily call a specific method for this purpose on
+ * {@link org.apache.qpid.protocol.AMQMethodListener}.
*/
public class FailoverHandler implements Runnable
{
+ /** Used for debugging. */
private static final Logger _logger = Logger.getLogger(FailoverHandler.class);
+ /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */
private final IoSession _session;
+
+ /** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */
private AMQProtocolHandler _amqProtocolHandler;
- /**
- * Used where forcing the failover host
- */
+ /** Used to hold the host to fail over to. This is optional and if not set a reconnect to the previous host is tried. */
private String _host;
- /**
- * Used where forcing the failover port
- */
+ /** Used to hold the port to fail over to. */
private int _port;
+ /**
+ * Creates a failover handler on a protocol session, for a particular MINA session (network connection).
+ *
+ * @param amqProtocolHandler The protocol handler that spans the failover.
+ * @param session The MINA session, for the failing connection.
+ */
public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
{
_amqProtocolHandler = amqProtocolHandler;
_session = session;
}
+ /**
+ * Performs the failover procedure. See the class level comment, {@link FailoverHandler}, for a description of the
+ * failover procedure.
+ */
public void run()
{
if (Thread.currentThread().isDaemon())
{
throw new IllegalStateException("FailoverHandler must run on a non-daemon thread.");
}
- //Thread.currentThread().setName("Failover Thread");
+ // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of
+ // the fail over.
_amqProtocolHandler.setFailoverLatch(new CountDownLatch(1));
// We wake up listeners. If they can handle failover, they will extend the
- // FailoverSupport class and will in turn block on the latch until failover
- // has completed before retrying the operation
+ // FailoverRetrySupport class and will in turn block on the latch until failover
+ // has completed before retrying the operation.
_amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start"));
// Since failover impacts several structures we protect them all with a single mutex. These structures
@@ -93,14 +142,18 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setStateManager(existingStateManager);
if (_host != null)
{
- _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client"));
+ _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException(
+ "Redirect was vetoed by client"));
}
else
{
- _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client"));
+ _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException(
+ "Failover was vetoed by client"));
}
+
_amqProtocolHandler.getFailoverLatch().countDown();
_amqProtocolHandler.setFailoverLatch(null);
+
return;
}
@@ -119,12 +172,12 @@ public class FailoverHandler implements Runnable
{
failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection();
}
+
if (!failoverSucceeded)
{
_amqProtocolHandler.setStateManager(existingStateManager);
- _amqProtocolHandler.getConnection().exceptionReceived(
- new AMQDisconnectedException("Server closed connection and no failover " +
- "was successful"));
+ _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException(
+ "Server closed connection and no failover " + "was successful"));
}
else
{
@@ -140,6 +193,7 @@ public class FailoverHandler implements Runnable
{
_logger.info("Client vetoed automatic resubscription");
}
+
_amqProtocolHandler.getConnection().fireFailoverComplete();
_amqProtocolHandler.setFailoverState(FailoverState.NOT_STARTED);
_logger.info("Connection failover completed successfully");
@@ -148,35 +202,36 @@ public class FailoverHandler implements Runnable
{
_logger.info("Failover process failed - exception being propagated by protocol handler");
_amqProtocolHandler.setFailoverState(FailoverState.FAILED);
- try
- {
- _amqProtocolHandler.exceptionCaught(_session, e);
- }
+ /*try
+ {*/
+ _amqProtocolHandler.exceptionCaught(_session, e);
+ /*}
catch (Exception ex)
{
_logger.error("Error notifying protocol session of error: " + ex, ex);
- }
+ }*/
}
}
}
- _amqProtocolHandler.getFailoverLatch().countDown();
- }
- public String getHost()
- {
- return _host;
+ _amqProtocolHandler.getFailoverLatch().countDown();
}
+ /**
+ * Sets the host name to fail over to. This is optional and if not set a reconnect to the previous host is tried.
+ *
+ * @param host The host name to fail over to.
+ */
public void setHost(String host)
{
_host = host;
}
- public int getPort()
- {
- return _port;
- }
-
+ /**
+ * Sets the port to fail over to.
+ *
+ * @param port The port to fail over to.
+ */
public void setPort(int port)
{
_port = port;
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
new file mode 100644
index 0000000000..dece1b6c3f
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
@@ -0,0 +1,54 @@
+package org.apache.qpid.client.failover;
+
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * FailoverNoopSupport is a {@link FailoverSupport} implementation that does not really provide any failover support
+ * at all. It wraps a {@link FailoverProtectedOperation} but should that operation throw {@link FailoverException} this
+ * support class simply re-raises that exception as an IllegalStateException. This support wrapper should only be
+ * used where the caller can be certain that the failover protected operation cannot acutally throw a failover exception,
+ * for example, because the caller already holds locks preventing that condition from arising.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Perform a fail-over protected operation with no handling of fail-over conditions.
+ * </table>
+ */
+public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupport<T, E>
+{
+ /** The protected operation that is to be retried in the event of fail-over. */
+ FailoverProtectedOperation<T, E> operation;
+
+ /** The connection on which the fail-over protected operation is to be performed. */
+ AMQConnection connection;
+
+ /**
+ * Creates an automatic retrying fail-over handler for the specified operation.
+ *
+ * @param operation The fail-over protected operation to wrap in this handler.
+ */
+ public FailoverNoopSupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
+ {
+ this.operation = operation;
+ this.connection = con;
+ }
+
+ /**
+ * Delegates to another continuation which is to be provided with fail-over handling.
+ *
+ * @return The return value from the delegated to continuation.
+ * @throws E Any exception that the delegated to continuation may raise.
+ */
+ public T execute() throws E
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ throw new IllegalStateException("Fail-over interupted no-op failover support. "
+ + "No-op support should only be used where the caller is certaing fail-over cannot occur.", e);
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java
new file mode 100644
index 0000000000..efb7bf8aed
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java
@@ -0,0 +1,30 @@
+package org.apache.qpid.client.failover;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * FailoverProtectedOperation is a continuation for an operation that may throw a {@link FailoverException} because
+ * it has been interrupted by the fail-over process. The {@link FailoverRetrySupport} class defines support wrappers
+ * for failover protected operations, in order to provide different handling schemes when failovers occurr.
+ *
+ * <p/>The type of checked exception that the operation may perform has been generified, in order that fail over
+ * protected operations can be defined that raise arbitrary exceptions. The actuall exception types used should not
+ * be sub-classes of FailoverException, or else catching FailoverException in the {@link FailoverRetrySupport} classes
+ * will mask the exception.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Perform an operation that may be interrupted by fail-over.
+ * </table>
+ */
+public interface FailoverProtectedOperation<T, E extends Exception>
+{
+ /**
+ * Performs the continuations work.
+ *
+ * @return Provdes scope for the continuation to return an arbitrary value.
+ *
+ * @throws FailoverException If the operation is interrupted by a fail-over notification.
+ */
+ public abstract T execute() throws E, FailoverException;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
new file mode 100644
index 0000000000..1e4908976b
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+
+import javax.jms.JMSException;
+
+/**
+ * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified
+ * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due
+ * to the original condition being broken, whilst the continuation is waiting for a reponse to a synchronous request,
+ * FailoverRetrySupport automatcally rechecks the condition and re-acquires the mutex and re-runs the continution. This
+ * automatic retrying is continued until the continuation succeeds, or throws an exception (different to
+ * FailoverException, which is used to signal the failure of the original condition).
+ *
+ * <p/>The blocking condition used is that the connection is not currently failing over, and the mutex used is the
+ * connection failover mutex, which guards against the fail-over process being run during fail-over vulnerable methods.
+ * These are used like a lock and condition variable.
+ *
+ * <p/>The wrapped operation may throw a FailoverException, this is an exception that can be raised by a
+ * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener}, in response to it being notified that a
+ * fail-over wants to start whilst it was waiting. Methods that are vulnerable to fail-over are those that are
+ * synchronous, where a failure will prevent them from getting the reply they are waiting for and asynchronous
+ * methods that should not be attempted when a fail-over is in progress.
+ *
+ * <p/>Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be
+ * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want
+ * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation
+ * until it succeeds. Synchronous methods are usually coordinated with a
+ * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants
+ * to start and throws a FailoverException in response to this.
+ *
+ * <p/>Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be
+ * started during fail-over, but be delayed until any current fail-over has completed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a continuation synchronized on a fail-over lock and condition.
+ * <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
+ * </table>
+ *
+ * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
+ * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
+ * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
+ * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
+ * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
+ *
+ * @todo InterruptedException not handled well.
+ */
+public class FailoverRetrySupport<T, E extends Exception> implements FailoverSupport<T, E>
+{
+ /** Used for debugging. */
+ private static final Logger _log = Logger.getLogger(FailoverRetrySupport.class);
+
+ /** The protected operation that is to be retried in the event of fail-over. */
+ FailoverProtectedOperation<T, E> operation;
+
+ /** The connection on which the fail-over protected operation is to be performed. */
+ AMQConnection connection;
+
+ /**
+ * Creates an automatic retrying fail-over handler for the specified operation.
+ *
+ * @param operation The fail-over protected operation to wrap in this handler.
+ */
+ public FailoverRetrySupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
+ {
+ this.operation = operation;
+ this.connection = con;
+ }
+
+ /**
+ * Delays a continuation until the "not failing over" condition is met on the specified connection. Repeats
+ * until the operation throws AMQException or succeeds without being interrupted by fail-over.
+ *
+ * @return The result of executing the continuation.
+ *
+ * @throws AMQException Any underlying exception is allowed to fall through.
+ */
+ public T execute() throws E
+ {
+ while (true)
+ {
+ try
+ {
+ connection.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ _log.debug("Interrupted: " + e, e);
+
+ return null;
+ }
+
+ synchronized (connection.getFailoverMutex())
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ _log.debug("Failover exception caught during operation: " + e, e);
+ }
+ }
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
index a005bc5fdf..41bac34a34 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
@@ -1,65 +1,28 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
package org.apache.qpid.client.failover;
-import javax.jms.JMSException;
-
-import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-public abstract class FailoverSupport
+/**
+ * FailoverSupport defines an interface for different types of fail-over handlers, that provide different types of
+ * behaviour for handling fail-overs during operations that can be interrupted by the fail-over process. For example,
+ * the support could automatically retry once the fail-over process completes, could prevent an operation from being
+ * started whilst fail-over is running, or could quietly abandon the operation or raise an exception, and so on.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Perform a fail-over protected operation with handling for fail-over conditions.
+ * </table>
+ *
+ * @todo Continuation, extend some sort of re-usable Continuation interface, which might look very like this one.
+ */
+public interface FailoverSupport<T, E extends Exception>
{
- private static final Logger _log = Logger.getLogger(FailoverSupport.class);
-
- public Object execute(AMQConnection con) throws JMSException
- {
- // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding.
- // Any method that can potentially block for any reason should use this class so that deadlock will not
- // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners)
- // that might be causing a block. When that happens, the exception is caught here and the mutex is released
- // before waiting for the failover to complete (either successfully or unsuccessfully).
- while (true)
- {
- try
- {
- con.blockUntilNotFailingOver();
- }
- catch (InterruptedException e)
- {
- _log.info("Interrupted: " + e, e);
- return null;
- }
- synchronized (con.getFailoverMutex())
- {
- try
- {
- return operation();
- }
- catch (FailoverException e)
- {
- _log.info("Failover exception caught during operation: " + e, e);
- }
- }
- }
- }
-
- protected abstract Object operation() throws JMSException;
+ /**
+ * Delegates to another continuation which is to be provided with fail-over handling.
+ *
+ * @return The return value from the delegated to continuation.
+ *
+ * @throws E Any exception that the delegated to continuation may raise.
+ */
+ public T execute() throws E;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index addef94215..5bf7bffc63 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,24 +20,22 @@
*/
package org.apache.qpid.client.protocol;
-import java.util.Iterator;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
import org.apache.log4j.Logger;
+
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
+
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
-import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.SSLConfiguration;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.state.AMQState;
@@ -60,22 +58,86 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.SSLContextFactory;
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+/**
+ * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
+ * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
+ * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the
+ * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP,
+ * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in
+ * terms of "message received" and so on.
+ *
+ * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
+ * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
+ * API calls through which an individual connection can be manipulated. This protocol handler talks to the network
+ * through MINA, in a behind the scenes role; it is not an exposed part of the client API.
+ *
+ * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
+ * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
+ * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
+ * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions
+ * in the event of failover. See below for more information about this.
+ *
+ * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
+ * attributes. A more convenient, type-safe, container for session data is provided in the form of
+ * {@link AMQProtocolSession}.
+ *
+ * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
+ * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper
+ * as described above). This event handler is different, because dealing with failover complicates things. To the
+ * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but
+ * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot
+ * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old
+ * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
+ * and the protocol session data is held outside of the MINA IOSession.
+ *
+ * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through.
+ * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
+ * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
+ * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Create the filter chain to filter this handlers events.
+ * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
+ *
+ * <tr><td> Maintain fail-over state.
+ * <tr><td>
+ * </table>
+ *
+ * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the main filter thread?
+ *
+ * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
+ */
public class AMQProtocolHandler extends IoHandlerAdapter
{
+ /** Used for debugging. */
private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
/**
- * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
- * and protocol handler instances.
+ * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
+ * instances and protocol handler instances.
*/
private AMQConnection _connection;
/** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
private volatile AMQProtocolSession _protocolSession;
+ /** Holds the state of the protocol session. */
private AMQStateManager _stateManager = new AMQStateManager();
+ /** Holds the method listeners, */
private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
/**
@@ -91,15 +153,31 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
private FailoverState _failoverState = FailoverState.NOT_STARTED;
+ /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
private CountDownLatch _failoverLatch;
+ /** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+ /**
+ * Creates a new protocol handler, associated with the specified client connection instance.
+ *
+ * @param con The client connection that this is the event handler for.
+ */
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
}
+ /**
+ * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
+ * session, which filters the events handled by this handler. The filter chain consists of, handing off events
+ * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
+ *
+ * @param session The MINA session.
+ *
+ * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
+ */
public void sessionCreated(IoSession session) throws Exception
{
_logger.debug("Protocol session created for session " + System.identityHashCode(session));
@@ -119,16 +197,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
if (_connection.getSSLConfiguration() != null)
{
SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ SSLContextFactory sslFactory =
+ new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
sslFilter.setUseClientMode(true);
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
}
-
try
{
-
ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
@@ -142,35 +219,38 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_protocolSession.init();
}
- public void sessionOpened(IoSession session) throws Exception
- {
- //System.setProperty("foo", "bar");
- }
-
/**
- * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
- * sessionClosed() depending on whether we were trying to send data at the time of failure.
+ * Called when the network connection is closed. This can happen, either because the client explicitly requested
+ * that the connection be closed, in which case nothing is done, or because the connection died. In the case
+ * where the connection died, an attempt to failover automatically to a new connection may be started. The failover
+ * process will be started, provided that it is the clients policy to allow failover, and provided that a failover
+ * has not already been started or failed.
+ *
+ * <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught}
+ * may be called first followed by this method. This depends on whether the client was trying to send data at the
+ * time of the failure.
*
- * @param session
+ * @param session The MINA session.
*
- * @throws Exception
+ * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
+ * not otherwise? The above comment doesn't make that clear.
*/
- public void sessionClosed(IoSession session) throws Exception
+ public void sessionClosed(IoSession session)
{
if (_connection.isClosed())
{
- _logger.info("Session closed called by client");
+ _logger.debug("Session closed called by client");
}
else
{
- _logger.info("Session closed called with failover state currently " + _failoverState);
+ _logger.debug("Session closed called with failover state currently " + _failoverState);
- //reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+ // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
// known through the policy settings.
if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
{
- _logger.info("FAILOVER STARTING");
+ _logger.debug("FAILOVER STARTING");
if (_failoverState == FailoverState.NOT_STARTED)
{
_failoverState = FailoverState.IN_PROGRESS;
@@ -178,12 +258,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
else
{
- _logger.info("Not starting failover as state currently " + _failoverState);
+ _logger.debug("Not starting failover as state currently " + _failoverState);
}
}
else
{
- _logger.info("Failover not allowed by policy.");
+ _logger.debug("Failover not allowed by policy."); // or already in progress?
if (_logger.isDebugEnabled())
{
@@ -192,19 +272,18 @@ public class AMQProtocolHandler extends IoHandlerAdapter
if (_failoverState != FailoverState.IN_PROGRESS)
{
- _logger.info("sessionClose() not allowed to failover");
- _connection.exceptionReceived(
- new AMQDisconnectedException("Server closed connection and reconnection " +
- "not permitted."));
+ _logger.debug("sessionClose() not allowed to failover");
+ _connection.exceptionReceived(new AMQDisconnectedException(
+ "Server closed connection and reconnection " + "not permitted."));
}
else
{
- _logger.info("sessionClose() failover in progress");
+ _logger.debug("sessionClose() failover in progress");
}
}
}
- _logger.info("Protocol Session [" + this + "] closed");
+ _logger.debug("Protocol Session [" + this + "] closed");
}
/** See {@link FailoverHandler} to see rationale for separate thread. */
@@ -223,25 +302,32 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
if (IdleStatus.WRITER_IDLE.equals(status))
{
- //write heartbeat frame:
+ // write heartbeat frame:
_logger.debug("Sent heartbeat");
session.write(HeartbeatBody.FRAME);
HeartbeatDiagnostics.sent();
}
else if (IdleStatus.READER_IDLE.equals(status))
{
- //failover:
+ // failover:
HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
session.close();
}
}
- public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+ /**
+ * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
+ * IOException, MINA will close the connection automatically.
+ *
+ * @param session The MINA session.
+ * @param cause The exception that triggered this event.
+ */
+ public void exceptionCaught(IoSession session, Throwable cause)
{
if (_failoverState == FailoverState.NOT_STARTED)
{
- //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
+ // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
if (cause instanceof AMQConnectionClosedException)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
@@ -250,8 +336,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
sessionClosed(session);
}
- //FIXME Need to correctly handle other exceptions. Things like ...
-// if (cause instanceof AMQChannelClosedException)
+ // FIXME Need to correctly handle other exceptions. Things like ...
+ // if (cause instanceof AMQChannelClosedException)
// which will cause the JMSSession to end due to a channel close and so that Session needs
// to be removed from the map so we can correctly still call close without an exception when trying to close
// the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception
@@ -261,6 +347,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
else if (_failoverState == FailoverState.FAILED)
{
_logger.error("Exception caught by protocol handler: " + cause, cause);
+
// we notify the state manager of the error in case we have any clients waiting on a state
// change. Those "waiters" will be interrupted and can handle the exception
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
@@ -297,7 +384,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
final boolean debug = _logger.isDebugEnabled();
final long msgNumber = ++_messageReceivedCount;
- if (debug && (msgNumber % 1000 == 0))
+ if (debug && ((msgNumber % 1000) == 0))
{
_logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
@@ -310,72 +397,77 @@ public class AMQProtocolHandler extends IoHandlerAdapter
switch (bodyFrame.getFrameType())
{
- case AMQMethodBody.TYPE:
+ case AMQMethodBody.TYPE:
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
- }
+ if (debug)
+ {
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
+ }
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
- try
- {
+ try
+ {
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
- if (!wasAnyoneInterested)
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
}
- catch (AMQException e)
+
+ if (!wasAnyoneInterested)
{
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
+ + _frameListeners);
+ }
+ }
+ catch (AMQException e)
+ {
+ getStateManager().error(e);
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
}
- exceptionCaught(session, e);
}
- break;
- case ContentHeaderBody.TYPE:
+ exceptionCaught(session, e);
+ }
- _protocolSession.messageContentHeaderReceived(frame.getChannel(),
- (ContentHeaderBody) bodyFrame);
- break;
+ break;
- case ContentBody.TYPE:
+ case ContentHeaderBody.TYPE:
- _protocolSession.messageContentBodyReceived(frame.getChannel(),
- (ContentBody) bodyFrame);
- break;
+ _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
+ break;
- case HeartbeatBody.TYPE:
+ case ContentBody.TYPE:
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
- break;
+ _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
+ break;
+
+ case HeartbeatBody.TYPE:
+
+ if (debug)
+ {
+ _logger.debug("Received heartbeat");
+ }
+
+ break;
- default:
+ default:
}
+
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -387,10 +479,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter
final boolean debug = _logger.isDebugEnabled();
- if (debug && (sentMessages % 1000 == 0))
+ if (debug && ((sentMessages % 1000) == 0))
{
_logger.debug("Sent " + _messagesOut + " protocol messages");
}
+
_connection.bytesSent(session.getWrittenBytes());
if (debug)
{
@@ -408,7 +501,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_frameListeners.remove(listener);
}
- */
+ */
public void attainState(AMQState s) throws AMQException
{
getStateManager().attainState(s);
@@ -437,9 +530,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener)
- throws AMQException
+ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener)
+ throws AMQException, FailoverException
{
return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
}
@@ -451,9 +543,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener, long timeout)
- throws AMQException
+ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener,
+ long timeout) throws AMQException, FailoverException
{
try
{
@@ -461,9 +552,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_protocolSession.writeFrame(frame);
AMQMethodEvent e = listener.blockForFrame(timeout);
+
return e;
- // When control resumes before this line, a reply will have been received
- // that matches the criteria defined in the blocking listener
+ // When control resumes before this line, a reply will have been received
+ // that matches the criteria defined in the blocking listener
}
catch (AMQException e)
{
@@ -478,25 +570,33 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/** More convenient method to write a frame and wait for it's response. */
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
+ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException
{
return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
}
/** More convenient method to write a frame and wait for it's response. */
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
+ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException
{
- return writeCommandFrameAndWaitForReply(frame,
- new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
+ return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
+ timeout);
}
-
-
public void closeSession(AMQSession session) throws AMQException
{
_protocolSession.closeSession(session);
}
+ /**
+ * Closes the connection.
+ *
+ * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
+ * anyway.
+ *
+ * @param timeout The timeout to wait for an acknowledgement to the close request.
+ *
+ * @throws AMQException If the close fails for any reason.
+ */
public void closeConnection(long timeout) throws AMQException
{
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
@@ -504,13 +604,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
- _protocolSession.getProtocolMajorVersion(),
- _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection.")); // replyText
+ final AMQFrame frame =
+ ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(),
+ _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection.")); // replyText
try
{
@@ -521,8 +621,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_protocolSession.closeProtocolSession(false);
}
-
-
+ catch (FailoverException e)
+ {
+ _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
+ }
}
/** @return the number of bytes read from this protocol session */
@@ -604,7 +706,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
return _protocolSession.getProtocolMajorVersion();
}
-
public byte getProtocolMinorVersion()
{
return _protocolSession.getProtocolMinorVersion();
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 85f98eab69..86db9d5859 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,71 +27,137 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+/**
+ * BlockingMethodFrameListener is a 'rendezvous' which acts as a {@link AMQMethodListener} that delegates handling of
+ * incoming methods to a method listener implemented as a sub-class of this and hands off the processed method or
+ * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this
+ * differs from a 'rendezvous' in that sense.
+ *
+ * <p/>BlockingMethodFrameListeners are used to coordinate waiting for replies to method calls that expect a response.
+ * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register
+ * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they
+ * have been completed.
+ *
+ * <p/>The {@link #processMethod} must return <tt>true</tt> on any incoming method that it handles. This indicates to
+ * this listeners that the method it is waiting for has arrived. Incoming methods are also filtered by channel prior to
+ * being passed to the {@link #processMethod} method, so responses are only received for a particular channel. The
+ * channel id must be passed to the constructor.
+ *
+ * <p/>Errors from the producer are rethrown to the consumer.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept notification of AMQP method events. <td> {@link AMQMethodEvent}
+ * <tr><td> Delegate handling of the method to another method listener. <td> {@link AMQMethodBody}
+ * <tr><td> Block until a method is handled by the delegated to handler.
+ * <tr><td> Propagate the most recent exception to the consumer.
+ * </table>
+ *
+ * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a
+ * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
+ * seem to use it. So wrapping the listeners is possible.
+ *
+ * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener,
+ * overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot
+ * behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for
+ * method has been received.
+ *
+ * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
+ * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
+ * when this happens. At the very least, restore the interrupted status flag.
+ *
+ * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
+ * check that SynchronousQueue has a non-blocking put method available.
+ */
public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
+ /** This flag is used to indicate that the blocked for method has been received. */
private volatile boolean _ready = false;
- public abstract boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException;
-
+ /** Used to protect the shared event and ready flag between the producer and consumer. */
private final Object _lock = new Object();
- /**
- * This is set if there is an exception thrown from processCommandFrame and the
- * exception is rethrown to the caller of blockForFrame()
- */
+ /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
private volatile Exception _error;
+ /** Holds the channel id for the channel upon which this listener is waiting for a response. */
protected int _channelId;
+ /** Holds the incoming method. */
protected AMQMethodEvent _doneEvt = null;
+ /**
+ * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
+ *
+ * @param channelId The channel id to filter incoming methods with.
+ */
public BlockingMethodFrameListener(int channelId)
{
_channelId = channelId;
}
/**
- * This method is called by the MINA dispatching thread. Note that it could
- * be called before blockForFrame() has been called.
+ * Delegates any additional handling of the incoming methods to another handler.
*
- * @param evt the frame event
- * @return true if the listener has dealt with this frame
- * @throws AMQException
+ * @param channelId The channel id of the incoming method.
+ * @param frame The method body.
+ *
+ * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise.
*/
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+ public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException;
+
+ /**
+ * Informs this listener that an AMQP method has been received.
+ *
+ * @param evt The AMQP method.
+ *
+ * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise.
+ */
+ public boolean methodReceived(AMQMethodEvent evt) // throws AMQException
{
AMQMethodBody method = evt.getMethod();
- try
+ /*try
+ {*/
+ boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
+
+ if (ready)
{
- boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
- if (ready)
+ // we only update the flag from inside the synchronized block
+ // so that the blockForFrame method cannot "miss" an update - it
+ // will only ever read the flag from within the synchronized block
+ synchronized (_lock)
{
- // we only update the flag from inside the synchronized block
- // so that the blockForFrame method cannot "miss" an update - it
- // will only ever read the flag from within the synchronized block
- synchronized (_lock)
- {
- _doneEvt = evt;
- _ready = ready;
- _lock.notify();
- }
+ _doneEvt = evt;
+ _ready = ready;
+ _lock.notify();
}
- return ready;
}
+
+ return ready;
+
+ /*}
catch (AMQException e)
{
error(e);
// we rethrow the error here, and the code in the frame dispatcher will go round
// each listener informing them that an exception has been thrown
throw e;
- }
+ }*/
}
/**
- * This method is called by the thread that wants to wait for a frame.
+ * Blocks until a method is received that is handled by the delegated to method listener, or the specified timeout
+ * has passed.
+ *
+ * @param timeout The timeout in milliseconds.
+ *
+ * @return The AMQP method that was received.
+ *
+ * @throws AMQException
+ * @throws FailoverException
*/
- public AMQMethodEvent blockForFrame(long timeout) throws AMQException
+ public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
{
synchronized (_lock)
{
@@ -117,24 +183,25 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
catch (InterruptedException e)
{
// IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
-// if (!_ready && timeout != -1)
-// {
-// _error = new AMQException("Server did not respond timely");
-// _ready = true;
-// }
+ // if (!_ready && timeout != -1)
+ // {
+ // _error = new AMQException("Server did not respond timely");
+ // _ready = true;
+ // }
}
}
}
+
if (_error != null)
{
if (_error instanceof AMQException)
{
- throw(AMQException) _error;
+ throw (AMQException) _error;
}
else if (_error instanceof FailoverException)
{
- // This should ensure that FailoverException is not wrapped and can be caught.
- throw(FailoverException) _error; // needed to expose FailoverException.
+ // This should ensure that FailoverException is not wrapped and can be caught.
+ throw (FailoverException) _error; // needed to expose FailoverException.
}
else
{
@@ -156,6 +223,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
// set the error so that the thread that is blocking (against blockForFrame())
// can pick up the exception and rethrow to the caller
_error = e;
+
synchronized (_lock)
{
_ready = true;
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
index 1c70ded62a..623591e0b6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
@@ -34,7 +34,7 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener
_expectedClass = expectedClass;
}
- public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
+ public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
{
return _expectedClass.isInstance(frame);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index 642b928d81..0fc39a9318 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,16 +20,17 @@
*/
package org.apache.qpid.client.util;
-
+import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.Iterator;
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
* control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
* caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single
* thread adding items and a single (different) thread removing items.
+ *
+ * @todo Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted.
*/
public class FlowControllingBlockingQueue
{
@@ -81,6 +82,7 @@ public class FlowControllingBlockingQueue
}
}
}
+
return o;
}
@@ -104,4 +106,3 @@ public class FlowControllingBlockingQueue
return _queue.iterator();
}
}
-
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
index 51bbe7d0e6..c201e88104 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
@@ -14,42 +14,43 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.test.unit.client.channelclose;
import junit.framework.TestCase;
-import javax.jms.Connection;
-import javax.jms.Session;
-
-import javax.jms.JMSException;
-import javax.jms.ExceptionListener;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-import javax.jms.Queue;
+import org.apache.log4j.Logger;
-import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQTimeoutException;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.jms.ConnectionListener;
-import org.apache.log4j.Logger;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener
{
@@ -73,15 +74,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
TransportConnection.killAllVMBrokers();
}
-
/*
close channel, use chanel with same id ensure error.
- */
- public void testReusingChannelAfterFullClosure()
+ */
+ public void testReusingChannelAfterFullClosure() throws Exception
{
_connection = newConnection();
- //Create Producer
+ // Create Producer
try
{
_connection.start();
@@ -113,6 +113,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
{
_logger.info("Exception occured was:" + e.getErrorCode());
}
+
assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
_connection = newConnection();
@@ -134,29 +135,27 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
/*
close channel and send guff then send ok no errors
*/
- public void testSendingMethodsAfterClose()
+ public void testSendingMethodsAfterClose() throws Exception
{
try
{
- _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='"
- + _brokerlist + "'");
+ _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'");
((AMQConnection) _connection).setConnectionListener(this);
-
_connection.setExceptionListener(this);
- //Change the StateManager for one that doesn't respond with Close-OKs
+ // Change the StateManager for one that doesn't respond with Close-OKs
AMQStateManager oldStateManager = ((AMQConnection) _connection).getProtocolHandler().getStateManager();
_session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
_connection.start();
- //Test connection
+ // Test connection
checkSendingMessage();
- //Set StateManager to manager that ignores Close-oks
+ // Set StateManager to manager that ignores Close-oks
AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession);
newStateManager.changeState(oldStateManager.getCurrentState());
@@ -214,7 +213,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
createChannelAndTest(TEST_CHANNEL);
- //Test connection is still ok
+ // Test connection is still ok
checkSendingMessage();
@@ -248,9 +247,9 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
}
- private void createChannelAndTest(int channel)
+ private void createChannelAndTest(int channel) throws FailoverException
{
- //Create A channel
+ // Create A channel
try
{
createChannel(channel);
@@ -274,14 +273,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
private void sendClose(int channel)
{
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
+ AMQFrame frame =
+ ChannelCloseOkBody.createAMQFrame(channel,
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
}
-
private void checkSendingMessage() throws JMSException
{
TEST++;
@@ -307,8 +306,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
AMQConnection connection = null;
try
{
- connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='"
- + _brokerlist + "'");
+ connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'");
connection.setConnectionListener(this);
@@ -330,24 +328,24 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
fail("Creating new connection when:" + e.getMessage());
}
-
return connection;
}
- private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException
+ private void declareExchange(int channelId, String _type, String _name, boolean nowait)
+ throws AMQException, FailoverException
{
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(),
- null, // arguments
- false, // autoDelete
- false, // durable
- new AMQShortString(_name), // exchange
- false, // internal
- nowait, // nowait
- true, // passive
- 0, // ticket
- new AMQShortString(_type)); // type
+ AMQFrame exchangeDeclare =
+ ExchangeDeclareBody.createAMQFrame(channelId,
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments
+ false, // autoDelete
+ false, // durable
+ new AMQShortString(_name), // exchange
+ false, // internal
+ nowait, // nowait
+ true, // passive
+ 0, // ticket
+ new AMQShortString(_type)); // type
if (nowait)
{
@@ -355,36 +353,31 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
else
{
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class,
+ SYNC_TIMEOUT);
}
}
- private void createChannel(int channelId) throws AMQException
+ private void createChannel(int channelId) throws AMQException, FailoverException
{
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(),
- null), // outOfBand
- ChannelOpenOkBody.class);
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId,
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand
+ ChannelOpenOkBody.class);
}
-
public void onException(JMSException jmsException)
{
- //_logger.info("CCT" + jmsException);
+ // _logger.info("CCT" + jmsException);
fail(jmsException.getMessage());
}
public void bytesSent(long count)
- {
- }
+ { }
public void bytesReceived(long count)
- {
-
- }
+ { }
public boolean preFailover(boolean redirect)
{
@@ -397,6 +390,5 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
public void failoverComplete()
- {
- }
+ { }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index d52707d965..58ac8294f2 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -22,29 +22,29 @@ package org.apache.qpid.test.unit.close;
import junit.framework.TestCase;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.testutil.QpidClientConnection;
+import org.apache.qpid.url.URLSyntaxException;
-import javax.jms.ExceptionListener;
-import javax.jms.Session;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.MessageProducer;
import javax.jms.Message;
-import javax.jms.TextMessage;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.testutil.QpidClientConnection;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Level;
+import java.util.concurrent.atomic.AtomicInteger;
public class MessageRequeueTest extends TestCase
{
@@ -86,7 +86,7 @@ public class MessageRequeueTest extends TestCase
{
super.tearDown();
- if (!passed) // clean up
+ if (!passed) // clean up
{
QpidClientConnection conn = new QpidClientConnection(BROKER);
@@ -96,6 +96,7 @@ public class MessageRequeueTest extends TestCase
conn.disconnect();
}
+
TransportConnection.killVMBroker(1);
}
@@ -117,7 +118,7 @@ public class MessageRequeueTest extends TestCase
final MessageConsumer consumer = conn.getSession().createConsumer(q);
int messagesReceived = 0;
- long messageLog[] = new long[numTestMessages + 1];
+ long[] messageLog = new long[numTestMessages + 1];
_logger.info("consuming...");
Message msg = consumer.receive(1000);
@@ -130,15 +131,13 @@ public class MessageRequeueTest extends TestCase
int msgindex = msg.getIntProperty("index");
if (messageLog[msgindex] != 0)
{
- _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() +
- ") more than once.");
+ _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag()
+ + ") more than once.");
}
if (_logger.isInfoEnabled())
{
- _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
- "DT:" + dt +
- "IN:" + msgindex);
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex);
}
if (dt == 0)
@@ -148,7 +147,7 @@ public class MessageRequeueTest extends TestCase
messageLog[msgindex] = dt;
- //get Next message
+ // get Next message
msg = consumer.receive(1000);
}
@@ -163,7 +162,7 @@ public class MessageRequeueTest extends TestCase
for (long b : messageLog)
{
- if (b == 0 && index != 0) //delivery tag of zero shouldn't exist
+ if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist
{
_logger.error("Index: " + index + " was not received.");
list.append(" ");
@@ -175,6 +174,7 @@ public class MessageRequeueTest extends TestCase
index++;
}
+
assertEquals(list.toString(), 0, failed);
_logger.info("consumed: " + messagesReceived);
conn.disconnect();
@@ -199,7 +199,7 @@ public class MessageRequeueTest extends TestCase
t1.start();
t2.start();
t3.start();
-// t4.start();
+ // t4.start();
try
{
@@ -228,7 +228,7 @@ public class MessageRequeueTest extends TestCase
for (long b : receieved)
{
- if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0)
+ if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0)
{
_logger.error("Index: " + index + " was not received.");
list.append(" ");
@@ -237,8 +237,10 @@ public class MessageRequeueTest extends TestCase
list.append(b);
failed++;
}
+
index++;
}
+
assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
passed = true;
@@ -278,15 +280,14 @@ public class MessageRequeueTest extends TestCase
int msgindex = result.getIntProperty("index");
if (receieved[msgindex] != 0)
{
- _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() +
- ") more than once.");
+ _logger.error("Received Message(" + msgindex + ":"
+ + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once.");
}
if (_logger.isInfoEnabled())
{
- _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
- "DT:" + dt +
- "IN:" + msgindex);
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt
+ + "IN:" + msgindex);
}
if (dt == 0)
@@ -297,9 +298,8 @@ public class MessageRequeueTest extends TestCase
receieved[msgindex] = dt;
}
-
count++;
- if (count % 100 == 0)
+ if ((count % 100) == 0)
{
_logger.info("consumer-" + id + ": got " + result + ", new count is " + count);
}
@@ -328,11 +328,10 @@ public class MessageRequeueTest extends TestCase
}
}
-
public void testRequeue() throws JMSException, AMQException, URLSyntaxException
{
int run = 0;
-// while (run < 10)
+ // while (run < 10)
{
run++;
@@ -359,7 +358,6 @@ public class MessageRequeueTest extends TestCase
assertNotNull("Message should not be null", msg);
-
// As we have not ack'd message will be requeued.
_logger.debug("Close Consumer");
consumer.close();
@@ -369,4 +367,4 @@ public class MessageRequeueTest extends TestCase
}
}
-} \ No newline at end of file
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 0e718da19b..8d96977df2 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -21,18 +21,20 @@
package org.apache.qpid.test.unit.transacted;
import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.AMQException;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.log4j.Logger;
-import javax.jms.Session;
-import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.jms.TextMessage;
/**
@@ -62,10 +64,10 @@ public class CommitRollbackTest extends TestCase
{
TransportConnection.createVMBroker(1);
}
+
testMethod++;
queue += testMethod;
-
newConnection();
}
@@ -106,7 +108,6 @@ public class CommitRollbackTest extends TestCase
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
-
_logger.info("sending test message");
String MESSAGE_TEXT = "testPutThenDisconnect";
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
@@ -119,7 +120,7 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
Message result = _consumer.receive(1000);
- //commit to ensure message is removed from queue
+ // commit to ensure message is removed from queue
_session.commit();
assertNull("test message was put and disconnected before commit, but is still present", result);
@@ -135,7 +136,6 @@ public class CommitRollbackTest extends TestCase
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
-
_logger.info("sending test message");
String MESSAGE_TEXT = "testPutThenDisconnect";
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
@@ -151,7 +151,7 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
Message result = _consumer.receive(1000);
- //commit to ensure message is removed from queue
+ // commit to ensure message is removed from queue
_session.commit();
assertNull("test message was put and disconnected before commit, but is still present", result);
@@ -168,7 +168,6 @@ public class CommitRollbackTest extends TestCase
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
-
_logger.info("sending test message");
String MESSAGE_TEXT = "testPutThenRollback";
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
@@ -335,13 +334,12 @@ public class CommitRollbackTest extends TestCase
assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
}
-
/**
* Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order
*
* @throws Exception On error
*/
- public void testSend2ThenRollback() throws Exception
+ /*public void testSend2ThenRollback() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -391,7 +389,7 @@ public class CommitRollbackTest extends TestCase
}
assertNull("test message should be null", result);
- }
+ }*/
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
{
@@ -428,7 +426,7 @@ public class CommitRollbackTest extends TestCase
{
assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
}
- else // or it will be msg 2 arriving the first time due to latency.
+ else // or it will be msg 2 arriving the first time due to latency.
{
_logger.info("Message 2 wasn't prefetched so wasn't rejected");
assertEquals("2", ((TextMessage) result).getText());
@@ -445,7 +443,6 @@ public class CommitRollbackTest extends TestCase
}
-
public void testPutThenRollbackThenGet() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
index 195ed79dab..d52da06f76 100644
--- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
+++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -1,25 +1,25 @@
package org.apache.qpid.testutil;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.log4j.Logger;
-import javax.jms.ExceptionListener;
-import javax.jms.Session;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.MessageProducer;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.jms.TextMessage;
public class QpidClientConnection implements ExceptionListener
{
-
private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
private boolean transacted = true;
@@ -40,17 +40,16 @@ public class QpidClientConnection implements ExceptionListener
setPrefetch(5000);
}
-
public void connect() throws JMSException
{
if (!connected)
{
/*
- * amqp://[user:pass@][clientid]/virtualhost?
- * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
- * [&failover='method[?option='value'[&option='value']]']
- * [&option='value']"
- */
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
try
{
@@ -63,7 +62,6 @@ public class QpidClientConnection implements ExceptionListener
session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
-
_logger.info("starting connection");
connection.start();
@@ -124,7 +122,6 @@ public class QpidClientConnection implements ExceptionListener
this.prefetch = prefetch;
}
-
/** override as necessary */
public void onException(JMSException exception)
{
@@ -266,4 +263,3 @@ public class QpidClientConnection implements ExceptionListener
_logger.info("consumed: " + messagesReceived);
}
}
-