summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-09-20 11:05:15 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-09-20 11:05:15 +0000
commit0e5a82d6b80c0bb430e6d2b988dda329c9783e80 (patch)
treecfab10f272fcb8eab445573be2630d804a0b3d70
parent10de9e9821856cbc4d679e35a3747e9ebd3690d8 (diff)
downloadqpid-python-0e5a82d6b80c0bb430e6d2b988dda329c9783e80.tar.gz
QPID-2864: Add producer configurable transaction timeouts
Adds a configurable open or idle transaction timeout, which either warns (using operational logging) or closes the connection, depending on configuration. Default behaviour is no action. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@998887 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java67
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java37
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java31
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java86
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java76
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java340
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java253
17 files changed, 962 insertions, 30 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 48e2a968d0..82d1bbf881 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -31,9 +31,11 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.Exchange;
@@ -120,6 +122,7 @@ public class AMQChannel
private LogActor _actor;
private LogSubject _logSubject;
+ private long _txnUpdateTime;
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
@@ -166,6 +169,8 @@ public class AMQChannel
_currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _txnContext, _session);
_currentMessage.setMessageStore(_messageStore);
_currentMessage.setExchange(e);
+
+ updateTransactionalActivity();
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
@@ -770,6 +775,15 @@ public class AMQChannel
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
_unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
+ updateTransactionalActivity();
+ }
+
+ public void updateTransactionalActivity()
+ {
+ if (isTransactional())
+ {
+ _txnUpdateTime = System.currentTimeMillis();
+ }
}
/**
@@ -1016,4 +1030,57 @@ public class AMQChannel
{
return _blocking.get();
}
+
+ /**
+ * This method is called from the housekeeping thread to check the status of
+ * transactions on this channel and react appropriately.
+ *
+ * If a transaction is open for too long or idle for too long then a warning
+ * is logged or the connection is closed, depending on the configuration. An open
+ * transaction is one that has recent activity. The transaction age is counted
+ * from the time the transaction was started. An idle transaction is one that
+ * has had no activity, such as publishing or acknowledgeing messages.
+ *
+ * @param openWarn time in milliseconds before alerting on open transaction
+ * @param openClose time in milliseconds before closing connection with open transaction
+ * @param idleWarn time in milliseconds before alerting on idle transaction
+ * @param idleClose time in milliseconds before closing connection with idle transaction
+ */
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ {
+ if (isTransactional() && _txnContext.inTransaction())
+ {
+ long currentTime = System.currentTimeMillis();
+ long openTime = currentTime - _txnContext.getTransactionStartTime();
+ long idleTime = currentTime - _txnUpdateTime;
+
+ // Log a warning on idle or open transactions
+ if (idleWarn > 0L && idleTime > idleWarn)
+ {
+ _actor.message(ChannelMessages.CHN_IDLE_TXN(idleTime));
+ }
+ else if (openWarn > 0L && openTime > openWarn)
+ {
+ _actor.message(ChannelMessages.CHN_OPEN_TXN(openTime));
+ }
+
+ // Close connection for idle or open transactions that have timed out
+ if (idleClose > 0L && idleTime > idleClose)
+ {
+ _session.closeConnection(_channelId, new AMQConnectionException(AMQConstant.REQUEST_TIMEOUT,
+ "Idle transaction timed out", 0, 0,
+ _session.getProtocolOutputConverter().getProtocolMajorVersion(),
+ _session.getProtocolOutputConverter().getProtocolMinorVersion(),
+ (Throwable) null), true);
+ }
+ else if (openClose > 0L && openTime > openClose)
+ {
+ _session.closeConnection(_channelId, new AMQConnectionException(AMQConstant.REQUEST_TIMEOUT,
+ "Open transaction timed out", 0, 0,
+ _session.getProtocolOutputConverter().getProtocolMajorVersion(),
+ _session.getProtocolOutputConverter().getProtocolMinorVersion(),
+ (Throwable) null), true);
+ }
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index b7c629ea22..ed4ccded89 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -176,4 +176,23 @@ public class VirtualHostConfiguration
return _config.getLong("queues.flowResumeCapacity", getCapacity());
}
+ public long getTransactionTimeoutOpenWarn()
+ {
+ return _config.getLong("transactionTimeout.openWarn", 0L);
+ }
+
+ public long getTransactionTimeoutOpenClose()
+ {
+ return _config.getLong("transactionTimeout.openClose", 0L);
+ }
+
+ public long getTransactionTimeoutIdleWarn()
+ {
+ return _config.getLong("transactionTimeout.idleWarn", 0L);
+ }
+
+ public long getTransactionTimeoutIdleClose()
+ {
+ return _config.getLong("transactionTimeout.idleClose", 0L);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index d287595e2d..189aa8f520 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -28,6 +28,8 @@ import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQConstant;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
public class ConnectionRegistry implements IConnectionRegistry
@@ -70,4 +72,9 @@ public class ConnectionRegistry implements IConnectionRegistry
{
_registry.remove(connnection);
}
+
+ public Collection<AMQProtocolSession> getConnections()
+ {
+ return new ArrayList<AMQProtocolSession>(_registry);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
index d64fde1c20..bbbfa4114c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.server.connection;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.util.Collection;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
public interface IConnectionRegistry
{
-
public void initialise();
public void close() throws AMQException;
@@ -35,4 +35,5 @@ public interface IConnectionRegistry
public void deregisterConnection(AMQProtocolSession connnection);
+ public Collection<AMQProtocolSession> getConnections();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
index f2a12a1cac..7f6ec704ce 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
@@ -274,6 +274,9 @@ CHN_CLOSE = CHN-1003 : Close
CHN_PREFETCH_SIZE = CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number}
CHN_FLOW_ENFORCED = CHN-1005 : Flow Control Enforced (Queue {0})
CHN_FLOW_REMOVED = CHN-1006 : Flow Control Removed
+#Channel Transactions
+CHN_OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
+CHN_IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms
#Queue
# 0 - owner
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
index b3bf038f2b..7b9f0e6060 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
@@ -37,7 +37,7 @@ public class ChannelLogSubject extends AbstractLogSubject
* 4 - Channel ID
*/
public static String CHANNEL_FORMAT = ConnectionLogSubject.CONNECTION_FORMAT
- + "/ch:{4}";
+ + "/ch:{4}";
public ChannelLogSubject(AMQChannel channel)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 2d5de24fd3..bc5e3e5ff9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -20,6 +20,21 @@
*/
package org.apache.qpid.server.protocol;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
import org.apache.log4j.Logger;
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
@@ -52,12 +67,12 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -69,20 +84,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -103,7 +104,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private VirtualHost _virtualHost;
- private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+ private final Map<Integer, AMQChannel> _channelMap = new ConcurrentHashMap<Integer, AMQChannel>();
private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 59cc8a0cf2..6838a16182 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.security.Principal;
+import java.util.Collection;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
@@ -113,6 +114,13 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
* than one session but this is not validated.
*/
void addChannel(AMQChannel channel) throws AMQException;
+
+ /**
+ * Return all channels associated with this session.
+ *
+ * @return a collection containing this sessions's channels
+ */
+ Collection<AMQChannel> getChannels();
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 648755a495..cc75e2c1ee 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -510,6 +510,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
_deliveredMessages.incrementAndGet();
+
if (_logger.isDebugEnabled())
{
_logger.debug(sub + ": deliverMessage: " + entry.debugIdentity());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 450852cef7..c20ce138f3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -56,6 +56,7 @@ public class LocalTransactionalContext implements TransactionalContext
private boolean _messageDelivered = false;
private final AMQChannel _channel;
+ private long _txnStartTime;
private abstract class DeliveryAction
{
@@ -114,6 +115,7 @@ public class LocalTransactionalContext implements TransactionalContext
public LocalTransactionalContext(final AMQChannel channel)
{
_channel = channel;
+ _txnStartTime = System.currentTimeMillis();
}
public StoreContext getStoreContext()
@@ -135,6 +137,7 @@ public class LocalTransactionalContext implements TransactionalContext
public void rollback() throws AMQException
{
_txnBuffer.rollback(getStoreContext());
+
// Hack to deal with uncommitted non-transactional writes
if (getMessageStore().inTran(getStoreContext()))
{
@@ -142,6 +145,7 @@ public class LocalTransactionalContext implements TransactionalContext
_inTran = false;
}
+ _txnStartTime = System.currentTimeMillis();
_postCommitDeliveryList.clear();
}
@@ -215,6 +219,17 @@ public class LocalTransactionalContext implements TransactionalContext
{
// Not required in this transactional context
}
+
+ public boolean inTransaction()
+ {
+ return _inTran;
+ }
+
+ public long getTransactionStartTime()
+ {
+ return _txnStartTime;
+ }
+
public void beginTranIfNecessary() throws AMQException
{
@@ -226,6 +241,7 @@ public class LocalTransactionalContext implements TransactionalContext
}
getMessageStore().beginTran(getStoreContext());
+ _txnStartTime = System.currentTimeMillis();
_inTran = true;
}
}
@@ -259,6 +275,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
_messageDelivered = false;
_inTran = getMessageStore().inTran(getStoreContext());
+ _txnStartTime = System.currentTimeMillis();
}
try
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 10d6021d27..fdbda006d6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -77,6 +77,16 @@ public class NonTransactionalContext implements TransactionalContext
_inTran = true;
}
}
+
+ public boolean inTransaction()
+ {
+ return false;
+ }
+
+ public long getTransactionStartTime()
+ {
+ return 0L;
+ }
public void commit() throws AMQException
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
index 647ba66fb4..a38afb135a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
@@ -91,6 +91,20 @@ public interface TransactionalContext
* @throws AMQException If the transaction cannot be started for any reason.
*/
void beginTranIfNecessary() throws AMQException;
+
+ /**
+ * Returns whether or not a transaction is currently in progress.
+ *
+ * @return true if a transaction is currently in progress
+ */
+ boolean inTransaction();
+
+ /**
+ * Return the time the current transaction started.
+ *
+ * @return the time this transaction started or 0 if not in a transaction
+ */
+ long getTransactionStartTime();
/**
* Makes all pending operations on the transaction permanent and visible.
@@ -118,13 +132,13 @@ public interface TransactionalContext
void deliver(final AMQQueue queue, AMQMessage message) throws AMQException;
/**
- * Requeues the specified message entry (message queue pair)
- *
- *
- * @param queueEntry The message,queue pair
- *
- * @throws AMQException If the message cannot be delivered for any reason.
- */
+ * Requeues the specified message entry (message queue pair)
+ *
+ *
+ * @param queueEntry The message,queue pair
+ *
+ * @throws AMQException If the message cannot be delivered for any reason.
+ */
void requeue(QueueEntry queueEntry) throws AMQException;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index cc80eb9c0c..71fcfb964d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -27,6 +27,8 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
@@ -41,6 +43,7 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -230,7 +233,7 @@ public class VirtualHost implements Accessable
private void initialiseHouseKeeping(long period)
{
/* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
- if (period != 0L)
+ if (period > 0L)
{
class HouseKeepingTask extends TimerTask
{
@@ -238,6 +241,12 @@ public class VirtualHost implements Accessable
public void run()
{
+ CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+ public String getLogMessage()
+ {
+ return "[" + Thread.currentThread().getName() + "]";
+ }
+ });
_hkLogger.info("Starting the houseKeeping job");
for (AMQQueue q : _queueRegistry.getQueues())
{
@@ -253,7 +262,27 @@ public class VirtualHost implements Accessable
// house keeping task from running.
}
}
+ for (AMQProtocolSession conn : _connectionRegistry.getConnections())
+ {
+ _hkLogger.debug("Checking for long running open transactions on connection " + conn);
+ for (AMQChannel ch : conn.getChannels())
+ {
+ try
+ {
+ ch.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
+ _configuration.getTransactionTimeoutOpenClose(),
+ _configuration.getTransactionTimeoutIdleWarn(),
+ _configuration.getTransactionTimeoutIdleClose());
+ }
+ catch (Exception e)
+ {
+ _hkLogger.error("Exception in housekeeping for connection: " + conn.toString(), e);
+ }
+ }
+ }
+
_hkLogger.info("HouseKeeping job completed.");
+ CurrentActor.remove();
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java
new file mode 100644
index 0000000000..ded2fe8d76
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+/**
+ * This verifies that changing the {@code transactionTimeout} configuration will alter
+ * the behaviour of the transaction open and idle logging, and that when the connection
+ * will be closed.
+ */
+public class TransactionTimeoutConfigurationTest extends TransactionTimeoutTestCase
+{
+ @Override
+ protected void configure() throws Exception
+ {
+ // Setup housekeeping every second
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "1000");
+
+ // Set transaction timout properties.
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "2000");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "10000");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "1000");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "5000");
+ }
+
+ public void testProducerIdleCommit() throws Exception
+ {
+ try
+ {
+ producer();
+
+ send(5, 0);
+
+ sleep(20);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(5, 0);
+
+ check(IDLE);
+ }
+
+ public void testProducerOpenCommit() throws Exception
+ {
+ try
+ {
+ producer();
+
+ send(5, 3);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(6, 3);
+
+ check(OPEN);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
new file mode 100644
index 0000000000..3e852a166e
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+/**
+ * This verifies that the default behaviour is not to time out transactions.
+ */
+public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase
+{
+ @Override
+ protected void configure() throws Exception
+ {
+ // Setup housekeeping every second
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "1000");
+ }
+
+ public void testProducerIdleCommit() throws Exception
+ {
+ try
+ {
+ producer();
+
+ send(5, 0);
+
+ sleep(20);
+
+ _psession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+
+ public void testProducerOpenCommit() throws Exception
+ {
+ try
+ {
+ producer();
+
+ send(5, 3);
+
+ _psession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+
+ monitor(0, 0);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
new file mode 100644
index 0000000000..40c47614cd
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
@@ -0,0 +1,340 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+/**
+ * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration
+ * is set for a virtual host.
+ *
+ * A producer that is idle for too long or open for too long will have its connection closed and
+ * any further operations will fail with a 408 resource timeout exception. Consumers will not
+ * be affected by the transaction timeout configuration.
+ */
+public class TransactionTimeoutTest extends TransactionTimeoutTestCase
+{
+ public void testProducerIdleCommit() throws Exception
+ {
+ try
+ {
+ producer();
+
+ send(5, 0);
+
+ sleep(20);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(10, 0);
+
+ check(IDLE);
+ }
+
+ public void testProducerOpenCommit() throws Exception
+ {
+ try
+ {
+ producer();
+
+ send(6, 5);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(0, 10);
+
+ check(OPEN);
+ }
+
+ public void testProducerIdleCommitTwice() throws Exception
+ {
+ try
+ {
+ producer();
+
+ send(5, 0);
+
+ sleep(10);
+
+ _psession.commit();
+
+ send(5, 0);
+
+ sleep(20);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(15, 0);
+
+ check(IDLE);
+ }
+
+ public void testProducerOpenCommitTwice() throws Exception
+ {
+ try
+ {
+ producer();
+
+ send(5, 0);
+
+ sleep(10);
+
+ _psession.commit();
+
+ send(6, 5);
+
+ _psession.commit();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(5, 10);
+
+ check(OPEN);
+ }
+
+ public void testProducerIdleRollback() throws Exception
+ {
+ try
+ {
+ producer();
+
+ send(5, 0);
+
+ sleep(20);
+
+ _psession.rollback();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(10, 0);
+
+ check(IDLE);
+ }
+
+ public void testProducerIdleRollbackTwice() throws Exception
+ {
+ try
+ {
+ producer();
+
+ send(5, 0);
+
+ sleep(10);
+
+ _psession.rollback();
+
+ send(5, 0);
+
+ sleep(20);
+
+ _psession.rollback();
+ fail("should fail");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+
+ monitor(15, 0);
+
+ check(IDLE);
+ }
+
+ public void testConsumerCommitClose() throws Exception
+ {
+ try
+ {
+ producer();
+
+ consumer();
+
+ send(1, 0);
+
+ _psession.commit();
+
+ expect(1, 0);
+
+ _csession.commit();
+
+ sleep(30);
+
+ _csession.close();
+ }
+ catch (Exception e)
+ {
+ fail("should have succeeded: " + e.getMessage());
+ }
+
+ monitor(0, 0);
+ }
+
+ public void testConsumerIdleReceiveCommit() throws Exception
+ {
+ try
+ {
+ producer();
+
+ consumer();
+
+ send(1, 0);
+
+ _psession.commit();
+
+ sleep(20);
+
+ expect(1, 0);
+
+ sleep(20);
+
+ _csession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ monitor(0, 0);
+ }
+
+ public void testConsumerIdleCommit() throws Exception
+ {
+ try
+ {
+ producer();
+
+ consumer();
+
+ send(1, 0);
+
+ _psession.commit();
+
+ expect(1, 0);
+
+ sleep(20);
+
+ _csession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ monitor(0, 0);
+ }
+
+ public void testConsumerIdleRollback() throws Exception
+ {
+ try
+ {
+ producer();
+
+ consumer();
+
+ send(1, 0);
+
+ _psession.commit();
+
+ expect(1, 0);
+
+ sleep(20);
+
+ _csession.rollback();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ monitor(0, 0);
+ }
+
+ public void testConsumerOpenCommit() throws Exception
+ {
+ try
+ {
+ producer();
+
+ consumer();
+
+ send(1, 0);
+
+ _psession.commit();
+
+ sleep(30);
+
+ _csession.commit();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ monitor(0, 0);
+ }
+
+ public void testConsumerOpenRollback() throws Exception
+ {
+ try
+ {
+ producer();
+
+ consumer();
+
+ send(1, 0);
+
+ _psession.commit();
+
+ sleep(30);
+
+ _csession.rollback();
+ }
+ catch (Exception e)
+ {
+ fail("Should have succeeded");
+ }
+
+ monitor(0, 0);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
new file mode 100644
index 0000000000..7d91bb4049
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
@@ -0,0 +1,253 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.LogMonitor;
+
+/**
+ * The {@link TestCase} for transaction timeout testing.
+ */
+public class TransactionTimeoutTestCase extends QpidTestCase implements ExceptionListener
+{
+ public static final String VIRTUALHOST = "test";
+ public static final String TEXT = "0123456789abcdefghiforgettherest";
+ public static final String CHN_OPEN_TXN = "CHN-1007";
+ public static final String CHN_IDLE_TXN = "CHN-1008";
+ public static final String IDLE = "Idle";
+ public static final String OPEN = "Open";
+
+ protected LogMonitor _monitor;
+ protected AMQConnection _con;
+ protected Session _psession, _csession;
+ protected Queue _queue;
+ protected MessageConsumer _consumer;
+ protected MessageProducer _producer;
+ protected CountDownLatch _caught = new CountDownLatch(1);
+ protected String _message;
+ protected Exception _exception;
+ protected AMQConstant _code;
+
+ protected void configure() throws Exception
+ {
+ // Setup housekeeping every second
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "1000");
+
+ /*
+ * Set transaction timout properties. The XML in the virtualhosts configuration is as follows:
+ *
+ * <transactionTimeout>
+ * <openWarn>10000</openWarn>
+ * <openClose>20000</openClose>
+ * <idleWarn>5000</idleWarn>
+ * <idleClose>15000</idleClose>
+ * </transactionTimeout>
+ */
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "10000");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "20000");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "5000");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "15000");
+ }
+
+ protected void setUp() throws Exception
+ {
+ // Configure timeouts
+ configure();
+
+ // Monitor log file
+ _monitor = new LogMonitor(_outputFile);
+
+ // Start broker
+ super.setUp();
+
+ // Connect to broker
+ String broker = _broker.equals(VM) ? ("vm://:" + DEFAULT_VM_PORT) : ("tcp://localhost:" + DEFAULT_PORT);
+ ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'");
+ _con = (AMQConnection) getConnection(url);
+ _con.setExceptionListener(this);
+ _con.start();
+
+ // Create queue
+ Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED);
+ AMQShortString queueName = new AMQShortString("test");
+ _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true);
+ qsession.close();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _con.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ /**
+ * Create a transacted persistent message producer session.
+ */
+ protected void producer() throws Exception
+ {
+ _psession = _con.createSession(true, Session.SESSION_TRANSACTED);
+ _producer = _psession.createProducer(_queue);
+ _producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ }
+
+ /**
+ * Create a transacted message consumer session.
+ */
+ protected void consumer() throws Exception
+ {
+ _csession = _con.createSession(true, Session.SESSION_TRANSACTED);
+ _consumer = _csession.createConsumer(_queue);
+ }
+
+ /**
+ * Send a number of messages to the queue, optionally pausing after each.
+ */
+ protected void send(int count, int delay) throws Exception
+ {
+ for (int i = 0; i < count; i++)
+ {
+ sleep(delay);
+ Message msg = _psession.createTextMessage(TEXT);
+ msg.setIntProperty("i", i);
+ _producer.send(msg);
+ }
+ }
+
+ /**
+ * Sleep for an integral number of seconds.
+ */
+ protected void sleep(int seconds) throws Exception
+ {
+ try
+ {
+ Thread.sleep(seconds * 1000L);
+ }
+ catch (InterruptedException ie)
+ {
+ throw new RuntimeException("Interrupted");
+ }
+ }
+
+ /**
+ * Check for idle and open messages.
+ *
+ * Either exactly zero messages, or +-2 error accepted around the specified number.
+ */
+ protected void monitor(int idle, int open) throws Exception
+ {
+ List<String> idleMsgs = _monitor.findMatches(CHN_IDLE_TXN);
+ List<String> openMsgs = _monitor.findMatches(CHN_OPEN_TXN);
+
+ String idleErr = "Expected " + idle + " but found " + idleMsgs.size() + " txn idle messages";
+ String openErr = "Expected " + open + " but found " + openMsgs.size() + " txn open messages";
+
+ if (idle == 0)
+ {
+ assertTrue(idleErr, idleMsgs.isEmpty());
+ }
+ else
+ {
+ assertTrue(idleErr, idleMsgs.size() >= idle - 2 && idleMsgs.size() <= idle + 2);
+ }
+
+ if (open == 0)
+ {
+ assertTrue(openErr, openMsgs.isEmpty());
+ }
+ else
+ {
+ assertTrue(openErr, openMsgs.size() >= open - 2 && openMsgs.size() <= open + 2);
+ }
+ }
+
+ /**
+ * Receive a number of messages, optionally pausing after each.
+ */
+ protected void expect(int count, int delay) throws Exception
+ {
+ for (int i = 0; i < count; i++)
+ {
+ sleep(delay);
+ Message msg = _consumer.receive(1000);
+ assertNotNull("Message should not be null", msg);
+ assertTrue("Message should be a text message", msg instanceof TextMessage);
+ assertEquals("Message content does not match expected", TEXT, ((TextMessage) msg).getText());
+ assertEquals("Message order is incorrect", i, msg.getIntProperty("i"));
+ }
+ }
+
+ /**
+ * Checks that the correct exception was thrown and was received
+ * by the listener with a 408 error code.
+ */
+ protected void check(String reason)throws InterruptedException
+ {
+ assertTrue("Should have caught exception in listener", _caught.await(1, TimeUnit.SECONDS));
+ assertNotNull("Should have thrown exception to client", _exception);
+ assertTrue("Exception message should contain '" + reason + "': " + _message, _message.contains(reason + " transaction timed out"));
+ assertNotNull("Exception should have an error code", _code);
+ assertEquals("Error code should be 408", AMQConstant.REQUEST_TIMEOUT, _code);
+ }
+
+ /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */
+ public void onException(JMSException jmse)
+ {
+ _caught.countDown();
+ _message = jmse.getLinkedException().getMessage();
+ if (jmse.getLinkedException() instanceof AMQException)
+ {
+ _code = ((AMQException) jmse.getLinkedException()).getErrorCode();
+ }
+ }
+}