diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-09-20 11:05:15 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-09-20 11:05:15 +0000 |
| commit | 0e5a82d6b80c0bb430e6d2b988dda329c9783e80 (patch) | |
| tree | cfab10f272fcb8eab445573be2630d804a0b3d70 | |
| parent | 10de9e9821856cbc4d679e35a3747e9ebd3690d8 (diff) | |
| download | qpid-python-0e5a82d6b80c0bb430e6d2b988dda329c9783e80.tar.gz | |
QPID-2864: Add producer configurable transaction timeouts
Adds a configurable open or idle transaction timeout, which either
warns (using operational logging) or closes the connection, depending
on configuration. Default behaviour is no action.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@998887 13f79535-47bb-0310-9956-ffa450edef68
17 files changed, 962 insertions, 30 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 48e2a968d0..82d1bbf881 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -31,9 +31,11 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.exchange.Exchange; @@ -120,6 +122,7 @@ public class AMQChannel private LogActor _actor; private LogSubject _logSubject; + private long _txnUpdateTime; public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException @@ -166,6 +169,8 @@ public class AMQChannel _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _txnContext, _session); _currentMessage.setMessageStore(_messageStore); _currentMessage.setExchange(e); + + updateTransactionalActivity(); } public void publishContentHeader(ContentHeaderBody contentHeaderBody) @@ -770,6 +775,15 @@ public class AMQChannel public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException { _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext); + updateTransactionalActivity(); + } + + public void updateTransactionalActivity() + { + if (isTransactional()) + { + _txnUpdateTime = System.currentTimeMillis(); + } } /** @@ -1016,4 +1030,57 @@ public class AMQChannel { return _blocking.get(); } + + /** + * This method is called from the housekeeping thread to check the status of + * transactions on this channel and react appropriately. + * + * If a transaction is open for too long or idle for too long then a warning + * is logged or the connection is closed, depending on the configuration. An open + * transaction is one that has recent activity. The transaction age is counted + * from the time the transaction was started. An idle transaction is one that + * has had no activity, such as publishing or acknowledgeing messages. + * + * @param openWarn time in milliseconds before alerting on open transaction + * @param openClose time in milliseconds before closing connection with open transaction + * @param idleWarn time in milliseconds before alerting on idle transaction + * @param idleClose time in milliseconds before closing connection with idle transaction + */ + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + { + if (isTransactional() && _txnContext.inTransaction()) + { + long currentTime = System.currentTimeMillis(); + long openTime = currentTime - _txnContext.getTransactionStartTime(); + long idleTime = currentTime - _txnUpdateTime; + + // Log a warning on idle or open transactions + if (idleWarn > 0L && idleTime > idleWarn) + { + _actor.message(ChannelMessages.CHN_IDLE_TXN(idleTime)); + } + else if (openWarn > 0L && openTime > openWarn) + { + _actor.message(ChannelMessages.CHN_OPEN_TXN(openTime)); + } + + // Close connection for idle or open transactions that have timed out + if (idleClose > 0L && idleTime > idleClose) + { + _session.closeConnection(_channelId, new AMQConnectionException(AMQConstant.REQUEST_TIMEOUT, + "Idle transaction timed out", 0, 0, + _session.getProtocolOutputConverter().getProtocolMajorVersion(), + _session.getProtocolOutputConverter().getProtocolMinorVersion(), + (Throwable) null), true); + } + else if (openClose > 0L && openTime > openClose) + { + _session.closeConnection(_channelId, new AMQConnectionException(AMQConstant.REQUEST_TIMEOUT, + "Open transaction timed out", 0, 0, + _session.getProtocolOutputConverter().getProtocolMajorVersion(), + _session.getProtocolOutputConverter().getProtocolMinorVersion(), + (Throwable) null), true); + } + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index b7c629ea22..ed4ccded89 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -176,4 +176,23 @@ public class VirtualHostConfiguration return _config.getLong("queues.flowResumeCapacity", getCapacity()); } + public long getTransactionTimeoutOpenWarn() + { + return _config.getLong("transactionTimeout.openWarn", 0L); + } + + public long getTransactionTimeoutOpenClose() + { + return _config.getLong("transactionTimeout.openClose", 0L); + } + + public long getTransactionTimeoutIdleWarn() + { + return _config.getLong("transactionTimeout.idleWarn", 0L); + } + + public long getTransactionTimeoutIdleClose() + { + return _config.getLong("transactionTimeout.idleClose", 0L); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index d287595e2d..189aa8f520 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -28,6 +28,8 @@ import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQConstant; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; public class ConnectionRegistry implements IConnectionRegistry @@ -70,4 +72,9 @@ public class ConnectionRegistry implements IConnectionRegistry { _registry.remove(connnection); } + + public Collection<AMQProtocolSession> getConnections() + { + return new ArrayList<AMQProtocolSession>(_registry); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index d64fde1c20..bbbfa4114c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.connection; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import java.util.Collection; + import org.apache.qpid.AMQException; +import org.apache.qpid.server.protocol.AMQProtocolSession; public interface IConnectionRegistry { - public void initialise(); public void close() throws AMQException; @@ -35,4 +35,5 @@ public interface IConnectionRegistry public void deregisterConnection(AMQProtocolSession connnection); + public Collection<AMQProtocolSession> getConnections(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties index f2a12a1cac..7f6ec704ce 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties @@ -274,6 +274,9 @@ CHN_CLOSE = CHN-1003 : Close CHN_PREFETCH_SIZE = CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number} CHN_FLOW_ENFORCED = CHN-1005 : Flow Control Enforced (Queue {0}) CHN_FLOW_REMOVED = CHN-1006 : Flow Control Removed +#Channel Transactions +CHN_OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms +CHN_IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms #Queue # 0 - owner diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java index b3bf038f2b..7b9f0e6060 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -37,7 +37,7 @@ public class ChannelLogSubject extends AbstractLogSubject * 4 - Channel ID */ public static String CHANNEL_FORMAT = ConnectionLogSubject.CONNECTION_FORMAT - + "/ch:{4}"; + + "/ch:{4}"; public ChannelLogSubject(AMQChannel channel) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 2d5de24fd3..bc5e3e5ff9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -20,6 +20,21 @@ */ package org.apache.qpid.server.protocol; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.security.Principal; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.JMException; +import javax.security.sasl.SaslServer; + import org.apache.log4j.Logger; import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IdleStatus; @@ -52,12 +67,12 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.AMQPConnectionActor; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -69,20 +84,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.transport.Sender; -import javax.management.JMException; -import javax.security.sasl.SaslServer; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.security.Principal; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicBoolean; - public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); @@ -103,7 +104,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private VirtualHost _virtualHost; - private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); + private final Map<Integer, AMQChannel> _channelMap = new ConcurrentHashMap<Integer, AMQChannel>(); private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 59cc8a0cf2..6838a16182 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.security.Principal; +import java.util.Collection; public interface AMQProtocolSession extends AMQVersionAwareProtocolSession @@ -113,6 +114,13 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession * than one session but this is not validated. */ void addChannel(AMQChannel channel) throws AMQException; + + /** + * Return all channels associated with this session. + * + * @return a collection containing this sessions's channels + */ + Collection<AMQChannel> getChannels(); /** * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 648755a495..cc75e2c1ee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -510,6 +510,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { _deliveredMessages.incrementAndGet(); + if (_logger.isDebugEnabled()) { _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 450852cef7..c20ce138f3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -56,6 +56,7 @@ public class LocalTransactionalContext implements TransactionalContext private boolean _messageDelivered = false; private final AMQChannel _channel; + private long _txnStartTime; private abstract class DeliveryAction { @@ -114,6 +115,7 @@ public class LocalTransactionalContext implements TransactionalContext public LocalTransactionalContext(final AMQChannel channel) { _channel = channel; + _txnStartTime = System.currentTimeMillis(); } public StoreContext getStoreContext() @@ -135,6 +137,7 @@ public class LocalTransactionalContext implements TransactionalContext public void rollback() throws AMQException { _txnBuffer.rollback(getStoreContext()); + // Hack to deal with uncommitted non-transactional writes if (getMessageStore().inTran(getStoreContext())) { @@ -142,6 +145,7 @@ public class LocalTransactionalContext implements TransactionalContext _inTran = false; } + _txnStartTime = System.currentTimeMillis(); _postCommitDeliveryList.clear(); } @@ -215,6 +219,17 @@ public class LocalTransactionalContext implements TransactionalContext { // Not required in this transactional context } + + public boolean inTransaction() + { + return _inTran; + } + + public long getTransactionStartTime() + { + return _txnStartTime; + } + public void beginTranIfNecessary() throws AMQException { @@ -226,6 +241,7 @@ public class LocalTransactionalContext implements TransactionalContext } getMessageStore().beginTran(getStoreContext()); + _txnStartTime = System.currentTimeMillis(); _inTran = true; } } @@ -259,6 +275,7 @@ public class LocalTransactionalContext implements TransactionalContext { _messageDelivered = false; _inTran = getMessageStore().inTran(getStoreContext()); + _txnStartTime = System.currentTimeMillis(); } try diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 10d6021d27..fdbda006d6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -77,6 +77,16 @@ public class NonTransactionalContext implements TransactionalContext _inTran = true; } } + + public boolean inTransaction() + { + return false; + } + + public long getTransactionStartTime() + { + return 0L; + } public void commit() throws AMQException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java index 647ba66fb4..a38afb135a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java @@ -91,6 +91,20 @@ public interface TransactionalContext * @throws AMQException If the transaction cannot be started for any reason. */ void beginTranIfNecessary() throws AMQException; + + /** + * Returns whether or not a transaction is currently in progress. + * + * @return true if a transaction is currently in progress + */ + boolean inTransaction(); + + /** + * Return the time the current transaction started. + * + * @return the time this transaction started or 0 if not in a transaction + */ + long getTransactionStartTime(); /** * Makes all pending operations on the transaction permanent and visible. @@ -118,13 +132,13 @@ public interface TransactionalContext void deliver(final AMQQueue queue, AMQMessage message) throws AMQException; /** - * Requeues the specified message entry (message queue pair) - * - * - * @param queueEntry The message,queue pair - * - * @throws AMQException If the message cannot be delivered for any reason. - */ + * Requeues the specified message entry (message queue pair) + * + * + * @param queueEntry The message,queue pair + * + * @throws AMQException If the message cannot be delivered for any reason. + */ void requeue(QueueEntry queueEntry) throws AMQException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index cc80eb9c0c..71fcfb964d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -27,6 +27,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.AMQBrokerManagerMBean; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.configuration.ExchangeConfiguration; @@ -41,6 +43,7 @@ import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; @@ -230,7 +233,7 @@ public class VirtualHost implements Accessable private void initialiseHouseKeeping(long period) { /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ - if (period != 0L) + if (period > 0L) { class HouseKeepingTask extends TimerTask { @@ -238,6 +241,12 @@ public class VirtualHost implements Accessable public void run() { + CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) { + public String getLogMessage() + { + return "[" + Thread.currentThread().getName() + "]"; + } + }); _hkLogger.info("Starting the houseKeeping job"); for (AMQQueue q : _queueRegistry.getQueues()) { @@ -253,7 +262,27 @@ public class VirtualHost implements Accessable // house keeping task from running. } } + for (AMQProtocolSession conn : _connectionRegistry.getConnections()) + { + _hkLogger.debug("Checking for long running open transactions on connection " + conn); + for (AMQChannel ch : conn.getChannels()) + { + try + { + ch.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), + _configuration.getTransactionTimeoutOpenClose(), + _configuration.getTransactionTimeoutIdleWarn(), + _configuration.getTransactionTimeoutIdleClose()); + } + catch (Exception e) + { + _hkLogger.error("Exception in housekeeping for connection: " + conn.toString(), e); + } + } + } + _hkLogger.info("HouseKeeping job completed."); + CurrentActor.remove(); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java new file mode 100644 index 0000000000..ded2fe8d76 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +/** + * This verifies that changing the {@code transactionTimeout} configuration will alter + * the behaviour of the transaction open and idle logging, and that when the connection + * will be closed. + */ +public class TransactionTimeoutConfigurationTest extends TransactionTimeoutTestCase +{ + @Override + protected void configure() throws Exception + { + // Setup housekeeping every second + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "1000"); + + // Set transaction timout properties. + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "2000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "10000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "1000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "5000"); + } + + public void testProducerIdleCommit() throws Exception + { + try + { + producer(); + + send(5, 0); + + sleep(20); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(5, 0); + + check(IDLE); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + producer(); + + send(5, 3); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(6, 3); + + check(OPEN); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java new file mode 100644 index 0000000000..3e852a166e --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +/** + * This verifies that the default behaviour is not to time out transactions. + */ +public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase +{ + @Override + protected void configure() throws Exception + { + // Setup housekeeping every second + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "1000"); + } + + public void testProducerIdleCommit() throws Exception + { + try + { + producer(); + + send(5, 0); + + sleep(20); + + _psession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + producer(); + + send(5, 3); + + _psession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertTrue("Listener should not have received exception", _caught.getCount() == 1); + + monitor(0, 0); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java new file mode 100644 index 0000000000..40c47614cd --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -0,0 +1,340 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +/** + * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration + * is set for a virtual host. + * + * A producer that is idle for too long or open for too long will have its connection closed and + * any further operations will fail with a 408 resource timeout exception. Consumers will not + * be affected by the transaction timeout configuration. + */ +public class TransactionTimeoutTest extends TransactionTimeoutTestCase +{ + public void testProducerIdleCommit() throws Exception + { + try + { + producer(); + + send(5, 0); + + sleep(20); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(10, 0); + + check(IDLE); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + producer(); + + send(6, 5); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(0, 10); + + check(OPEN); + } + + public void testProducerIdleCommitTwice() throws Exception + { + try + { + producer(); + + send(5, 0); + + sleep(10); + + _psession.commit(); + + send(5, 0); + + sleep(20); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(15, 0); + + check(IDLE); + } + + public void testProducerOpenCommitTwice() throws Exception + { + try + { + producer(); + + send(5, 0); + + sleep(10); + + _psession.commit(); + + send(6, 5); + + _psession.commit(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(5, 10); + + check(OPEN); + } + + public void testProducerIdleRollback() throws Exception + { + try + { + producer(); + + send(5, 0); + + sleep(20); + + _psession.rollback(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(10, 0); + + check(IDLE); + } + + public void testProducerIdleRollbackTwice() throws Exception + { + try + { + producer(); + + send(5, 0); + + sleep(10); + + _psession.rollback(); + + send(5, 0); + + sleep(20); + + _psession.rollback(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(15, 0); + + check(IDLE); + } + + public void testConsumerCommitClose() throws Exception + { + try + { + producer(); + + consumer(); + + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + _csession.commit(); + + sleep(30); + + _csession.close(); + } + catch (Exception e) + { + fail("should have succeeded: " + e.getMessage()); + } + + monitor(0, 0); + } + + public void testConsumerIdleReceiveCommit() throws Exception + { + try + { + producer(); + + consumer(); + + send(1, 0); + + _psession.commit(); + + sleep(20); + + expect(1, 0); + + sleep(20); + + _csession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + monitor(0, 0); + } + + public void testConsumerIdleCommit() throws Exception + { + try + { + producer(); + + consumer(); + + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + sleep(20); + + _csession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + monitor(0, 0); + } + + public void testConsumerIdleRollback() throws Exception + { + try + { + producer(); + + consumer(); + + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + sleep(20); + + _csession.rollback(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + monitor(0, 0); + } + + public void testConsumerOpenCommit() throws Exception + { + try + { + producer(); + + consumer(); + + send(1, 0); + + _psession.commit(); + + sleep(30); + + _csession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + monitor(0, 0); + } + + public void testConsumerOpenRollback() throws Exception + { + try + { + producer(); + + consumer(); + + send(1, 0); + + _psession.commit(); + + sleep(30); + + _csession.rollback(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + monitor(0, 0); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java new file mode 100644 index 0000000000..7d91bb4049 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java @@ -0,0 +1,253 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.Session; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.LogMonitor; + +/** + * The {@link TestCase} for transaction timeout testing. + */ +public class TransactionTimeoutTestCase extends QpidTestCase implements ExceptionListener +{ + public static final String VIRTUALHOST = "test"; + public static final String TEXT = "0123456789abcdefghiforgettherest"; + public static final String CHN_OPEN_TXN = "CHN-1007"; + public static final String CHN_IDLE_TXN = "CHN-1008"; + public static final String IDLE = "Idle"; + public static final String OPEN = "Open"; + + protected LogMonitor _monitor; + protected AMQConnection _con; + protected Session _psession, _csession; + protected Queue _queue; + protected MessageConsumer _consumer; + protected MessageProducer _producer; + protected CountDownLatch _caught = new CountDownLatch(1); + protected String _message; + protected Exception _exception; + protected AMQConstant _code; + + protected void configure() throws Exception + { + // Setup housekeeping every second + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "1000"); + + /* + * Set transaction timout properties. The XML in the virtualhosts configuration is as follows: + * + * <transactionTimeout> + * <openWarn>10000</openWarn> + * <openClose>20000</openClose> + * <idleWarn>5000</idleWarn> + * <idleClose>15000</idleClose> + * </transactionTimeout> + */ + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "10000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "20000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "5000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "15000"); + } + + protected void setUp() throws Exception + { + // Configure timeouts + configure(); + + // Monitor log file + _monitor = new LogMonitor(_outputFile); + + // Start broker + super.setUp(); + + // Connect to broker + String broker = _broker.equals(VM) ? ("vm://:" + DEFAULT_VM_PORT) : ("tcp://localhost:" + DEFAULT_PORT); + ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'"); + _con = (AMQConnection) getConnection(url); + _con.setExceptionListener(this); + _con.start(); + + // Create queue + Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED); + AMQShortString queueName = new AMQShortString("test"); + _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true); + qsession.close(); + } + + protected void tearDown() throws Exception + { + try + { + _con.close(); + } + catch (Exception e) + { + e.printStackTrace(); + } + finally + { + super.tearDown(); + } + } + + /** + * Create a transacted persistent message producer session. + */ + protected void producer() throws Exception + { + _psession = _con.createSession(true, Session.SESSION_TRANSACTED); + _producer = _psession.createProducer(_queue); + _producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } + + /** + * Create a transacted message consumer session. + */ + protected void consumer() throws Exception + { + _csession = _con.createSession(true, Session.SESSION_TRANSACTED); + _consumer = _csession.createConsumer(_queue); + } + + /** + * Send a number of messages to the queue, optionally pausing after each. + */ + protected void send(int count, int delay) throws Exception + { + for (int i = 0; i < count; i++) + { + sleep(delay); + Message msg = _psession.createTextMessage(TEXT); + msg.setIntProperty("i", i); + _producer.send(msg); + } + } + + /** + * Sleep for an integral number of seconds. + */ + protected void sleep(int seconds) throws Exception + { + try + { + Thread.sleep(seconds * 1000L); + } + catch (InterruptedException ie) + { + throw new RuntimeException("Interrupted"); + } + } + + /** + * Check for idle and open messages. + * + * Either exactly zero messages, or +-2 error accepted around the specified number. + */ + protected void monitor(int idle, int open) throws Exception + { + List<String> idleMsgs = _monitor.findMatches(CHN_IDLE_TXN); + List<String> openMsgs = _monitor.findMatches(CHN_OPEN_TXN); + + String idleErr = "Expected " + idle + " but found " + idleMsgs.size() + " txn idle messages"; + String openErr = "Expected " + open + " but found " + openMsgs.size() + " txn open messages"; + + if (idle == 0) + { + assertTrue(idleErr, idleMsgs.isEmpty()); + } + else + { + assertTrue(idleErr, idleMsgs.size() >= idle - 2 && idleMsgs.size() <= idle + 2); + } + + if (open == 0) + { + assertTrue(openErr, openMsgs.isEmpty()); + } + else + { + assertTrue(openErr, openMsgs.size() >= open - 2 && openMsgs.size() <= open + 2); + } + } + + /** + * Receive a number of messages, optionally pausing after each. + */ + protected void expect(int count, int delay) throws Exception + { + for (int i = 0; i < count; i++) + { + sleep(delay); + Message msg = _consumer.receive(1000); + assertNotNull("Message should not be null", msg); + assertTrue("Message should be a text message", msg instanceof TextMessage); + assertEquals("Message content does not match expected", TEXT, ((TextMessage) msg).getText()); + assertEquals("Message order is incorrect", i, msg.getIntProperty("i")); + } + } + + /** + * Checks that the correct exception was thrown and was received + * by the listener with a 408 error code. + */ + protected void check(String reason)throws InterruptedException + { + assertTrue("Should have caught exception in listener", _caught.await(1, TimeUnit.SECONDS)); + assertNotNull("Should have thrown exception to client", _exception); + assertTrue("Exception message should contain '" + reason + "': " + _message, _message.contains(reason + " transaction timed out")); + assertNotNull("Exception should have an error code", _code); + assertEquals("Error code should be 408", AMQConstant.REQUEST_TIMEOUT, _code); + } + + /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */ + public void onException(JMSException jmse) + { + _caught.countDown(); + _message = jmse.getLinkedException().getMessage(); + if (jmse.getLinkedException() instanceof AMQException) + { + _code = ((AMQException) jmse.getLinkedException()).getErrorCode(); + } + } +} |
