diff options
| author | Keith Wall <kwall@apache.org> | 2013-01-07 10:01:25 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2013-01-07 10:01:25 +0000 |
| commit | dd69131565f955480a790adf8d9c084f752de517 (patch) | |
| tree | 6255db3bf3c1da6480436dc7d1becf2b4cb3ebec /qpid/java | |
| parent | d5965424b2b9b9e91f6b161afef996f8ae360060 (diff) | |
| download | qpid-python-dd69131565f955480a790adf8d9c084f752de517.tar.gz | |
QPID-3569: Refactor TransactionTimeout
* Moved the duplicated transactionUpdateTime member from AMQChannel/ServerSession to ServerTransaction.
** LocalTransaction now maintains advances transactionUpdateTime on each enqueue/dequeue operation
** Other non-transactional ServerTransaction impls return transactionUpdateTime of 0 (as they already do for transactionStartTime).
** Changed LocalTransaction so that transaction start time is recorded on first enqueue or dequeue operation (rather than only
first enqueue)
* Moved duplicated logic from AMQChannel/ServerSession#checkTransactionStatus to TransactionTimeoutHelper
* Make TransactionTimeoutTests use a durable queue so it is actually testing with store transactions.
* Removed warnings if operational logging is turned off.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1429726 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
19 files changed, 477 insertions, 229 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3deff080d6..a05b53f7d4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -51,6 +51,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.exchange.Exchange; @@ -87,6 +88,7 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; @@ -145,7 +147,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AtomicLong _txnUpdateTime = new AtomicLong(0); private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); @@ -185,15 +186,29 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F // by default the session is non-transactional _transaction = new AsyncAutoCommitTransaction(_messageStore, this); - _clientDeliveryMethod = session.createDeliveryMethod(_channelId); + _clientDeliveryMethod = session.createDeliveryMethod(_channelId); - _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject); + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() + { + @Override + public void doTimeoutAction(String reason) throws AMQException + { + closeConnection(reason); + } + }); } /** Sets this channel to be part of a local transaction */ public void setLocalTransactional() { - _transaction = new LocalTransaction(_messageStore); + _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor() + { + @Override + public long getActivityTime() + { + return _session.getLastReceivedTime(); + } + }); _txnStarts.incrementAndGet(); } @@ -207,8 +222,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F sync(); } - - private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -229,11 +242,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - public Long getTxnStarts() - { - return _txnStarts.get(); - } - public Long getTxnCommits() { return _txnCommits.get(); @@ -351,9 +359,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } }); - _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime()); + _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues)); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); _currentMessage.getStoredMessage().flushToStore(); } } @@ -839,7 +846,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); - updateTransactionalActivity(); } private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) @@ -959,7 +965,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); - _txnUpdateTime.set(0); } }); } @@ -969,7 +974,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); - _txnUpdateTime.set(0); decrementOutstandingTxnsIfNecessary(); } } @@ -1007,7 +1011,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); - _txnUpdateTime.set(0); decrementOutstandingTxnsIfNecessary(); } @@ -1036,19 +1039,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - - - } - - /** - * Update last transaction activity timestamp - */ - private void updateTransactionalActivity() - { - if (isTransactional()) - { - _txnUpdateTime.set(getProtocolSession().getLastReceivedTime()); - } } public String toString() @@ -1215,11 +1205,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F // TODO throw new RuntimeException(e); } - - - - - } public void onRollback() @@ -1369,7 +1354,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void onRollback() { - //To change body of implemented methods use File | Settings | File Templates. } } @@ -1478,37 +1462,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return _createTime; } - public void mgmtClose() throws AMQException - { - _session.mgmtCloseChannel(_channelId); - } - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { - final long transactionStartTime = _transaction.getTransactionStartTime(); - final long transactionUpdateTime = _txnUpdateTime.get(); - if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0) - { - long currentTime = System.currentTimeMillis(); - long openTime = currentTime - transactionStartTime; - long idleTime = currentTime - transactionUpdateTime; - - _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), - TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); - if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose)) - { - closeConnection("Idle transaction timed out"); - return; - } - - _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime), - TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT); - if (_transactionTimeoutHelper.isTimedOut(openTime, openClose)) - { - closeConnection("Open transaction timed out"); - return; - } - } + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose); } /** @@ -1628,14 +1584,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _action.postCommit(); _action = null; } - - boolean isReadyForCompletion() - { - return _future.isComplete(); - } } - @Override public int getConsumerCount() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java index 0c474cca13..b7007bf768 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java @@ -18,46 +18,85 @@ */ package org.apache.qpid.server; -import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.txn.ServerTransaction; public class TransactionTimeoutHelper { - private static final Logger LOGGER = Logger.getLogger(TransactionTimeoutHelper.class); - - public static final String IDLE_TRANSACTION_ALERT = "IDLE TRANSACTION ALERT"; - public static final String OPEN_TRANSACTION_ALERT = "OPEN TRANSACTION ALERT"; + private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out"; + private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out"; private final LogSubject _logSubject; - public TransactionTimeoutHelper(final LogSubject logSubject) + private final CloseAction _closeAction; + + public TransactionTimeoutHelper(final LogSubject logSubject, final CloseAction closeAction) { _logSubject = logSubject; + _closeAction = closeAction; } - public void logIfNecessary(final long timeSoFar, final long warnTimeout, - final LogMessage message, final String alternateLogPrefix) + public void checkIdleOrOpenTimes(ServerTransaction transaction, long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { - if (isTimedOut(timeSoFar, warnTimeout)) + if (transaction.isTransactional()) { - LogActor logActor = CurrentActor.get(); - if(logActor.getRootMessageLogger().isMessageEnabled(logActor, _logSubject, message.getLogHierarchy())) + final long transactionUpdateTime = transaction.getTransactionUpdateTime(); + if(transactionUpdateTime > 0) { - logActor.message(_logSubject, message); + long idleTime = System.currentTimeMillis() - transactionUpdateTime; + boolean closed = logAndCloseIfNecessary(idleTime, idleWarn, idleClose, ChannelMessages.IDLE_TXN(idleTime), IDLE_TRANSACTION_TIMEOUT_ERROR); + if (closed) + { + return; // no point proceeding to check the open time + } } - else + + final long transactionStartTime = transaction.getTransactionStartTime(); + if(transactionStartTime > 0) { - LOGGER.warn(alternateLogPrefix + " " + _logSubject.toLogString() + " " + timeSoFar + " ms"); + long openTime = System.currentTimeMillis() - transactionStartTime; + logAndCloseIfNecessary(openTime, openWarn, openClose, ChannelMessages.OPEN_TXN(openTime), OPEN_TRANSACTION_TIMEOUT_ERROR); } } } - public boolean isTimedOut(long timeSoFar, long timeout) + /** + * @return true iff closeTimeout was exceeded + */ + private boolean logAndCloseIfNecessary(final long timeSoFar, + final long warnTimeout, final long closeTimeout, + final LogMessage warnMessage, final String closeMessage) throws AMQException + { + if (isTimedOut(timeSoFar, warnTimeout)) + { + LogActor logActor = CurrentActor.get(); + logActor.message(_logSubject, warnMessage); + } + + if(isTimedOut(timeSoFar, closeTimeout)) + { + _closeAction.doTimeoutAction(closeMessage); + return true; + } + else + { + return false; + } + } + + private boolean isTimedOut(long timeSoFar, long timeout) { return timeout > 0L && timeSoFar > timeout; } + + public interface CloseAction + { + void doTimeoutAction(String reason) throws AMQException; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index ba1a1ca45c..2cef27267b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -80,7 +80,7 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina { // NO-OP } - }, System.currentTimeMillis()); + }); return ACCEPTED; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 25e771a9cf..9aa8d1da83 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -454,7 +454,7 @@ public abstract class QueueEntryImpl implements QueueEntry { } - }, 0L); + }); txn.dequeue(currentQueue, message, new ServerTransaction.Action() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 6f42822b67..beb566f622 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1398,7 +1398,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { } - }, 0L); + }); txn.dequeue(this, entry.getMessage(), new ServerTransaction.Action() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 85a9433e91..6152ddd228 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -43,6 +43,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; +import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -132,7 +133,6 @@ public class ServerSession extends Session private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AtomicLong _txnUpdateTime = new AtomicLong(0); private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); @@ -147,7 +147,14 @@ public class ServerSession extends Session _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this); _logSubject = new ChannelLogSubject(this); - _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject); + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() + { + @Override + public void doTimeoutAction(String reason) throws AMQException + { + getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); + } + }); } protected void setState(State state) @@ -186,9 +193,8 @@ public class ServerSession extends Session } getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ; - _transaction.enqueue(queues,message, postTransactionAction, 0L); + _transaction.enqueue(queues,message, postTransactionAction); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); } @@ -402,7 +408,6 @@ public class ServerSession extends Session entry.release(); } }); - updateTransactionalActivity(); } public Collection<Subscription_0_10> getSubscriptions() @@ -549,7 +554,6 @@ public class ServerSession extends Session _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); - _txnUpdateTime.set(0); decrementOutstandingTxnsIfNecessary(); } @@ -559,7 +563,6 @@ public class ServerSession extends Session _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); - _txnUpdateTime.set(0); decrementOutstandingTxnsIfNecessary(); } @@ -584,22 +587,6 @@ public class ServerSession extends Session } } - /** - * Update last transaction activity timestamp - */ - private void updateTransactionalActivity() - { - if (isTransactional()) - { - _txnUpdateTime.set(System.currentTimeMillis()); - } - } - - public Long getTxnStarts() - { - return _txnStarts.get(); - } - public Long getTxnCommits() { return _txnCommits.get(); @@ -705,30 +692,7 @@ public class ServerSession extends Session public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { - final long transactionStartTime = _transaction.getTransactionStartTime(); - final long transactionUpdateTime = _txnUpdateTime.get(); - if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0) - { - long currentTime = System.currentTimeMillis(); - long openTime = currentTime - transactionStartTime; - long idleTime = currentTime - transactionUpdateTime; - - _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), - TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); - if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose)) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); - return; - } - - _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime), - TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT); - if (_transactionTimeoutHelper.isTimedOut(openTime, openClose)) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); - return; - } - } + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose); } public void block(AMQQueue queue) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index efd7850a49..43e60c8e13 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -66,11 +66,18 @@ public class AsyncAutoCommitTransaction implements ServerTransaction _futureRecorder = recorder; } + @Override public long getTransactionStartTime() { return 0L; } + @Override + public long getTransactionUpdateTime() + { + return 0L; + } + /** * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered * by the caller are executed immediately. @@ -241,7 +248,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) { Transaction txn = null; try diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index e5a7df6880..8a9479a2d4 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -52,11 +52,18 @@ public class AutoCommitTransaction implements ServerTransaction _messageStore = transactionLog; } + @Override public long getTransactionStartTime() { return 0L; } + @Override + public long getTransactionUpdateTime() + { + return 0L; + } + /** * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered * by the caller are executed immediately. @@ -178,7 +185,7 @@ public class AutoCommitTransaction implements ServerTransaction } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) { Transaction txn = null; try @@ -270,4 +277,6 @@ public class AutoCommitTransaction implements ServerTransaction } } + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java index 05d0110e9b..ab987f0fb9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java @@ -26,7 +26,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Xid; @@ -39,10 +38,6 @@ public class DistributedTransaction implements ServerTransaction private final AutoCommitTransaction _autoCommitTransaction; - private volatile Transaction _transaction; - - private long _txnStartTime = 0L; - private DtxBranch _branch; private AMQSessionModel _session; private VirtualHost _vhost; @@ -55,9 +50,16 @@ public class DistributedTransaction implements ServerTransaction _autoCommitTransaction = new AutoCommitTransaction(vhost.getMessageStore()); } + @Override public long getTransactionStartTime() { - return _txnStartTime; + return 0; + } + + @Override + public long getTransactionUpdateTime() + { + return 0; } public void addPostTransactionAction(Action postTransactionAction) @@ -107,7 +109,7 @@ public class DistributedTransaction implements ServerTransaction { _branch.enqueue(queue, message); _branch.addPostTransactionAcion(postTransactionAction); - enqueue(Collections.singletonList(queue), message, postTransactionAction, System.currentTimeMillis()); + enqueue(Collections.singletonList(queue), message, postTransactionAction); } else { @@ -116,7 +118,7 @@ public class DistributedTransaction implements ServerTransaction } public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, - Action postTransactionAction, long currentTime) + Action postTransactionAction) { if(_branch != null) { @@ -128,7 +130,7 @@ public class DistributedTransaction implements ServerTransaction } else { - _autoCommitTransaction.enqueue(queues, message, postTransactionAction, currentTime); + _autoCommitTransaction.enqueue(queues, message, postTransactionAction); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index df95ce46d5..afa7cb0fb4 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -49,25 +49,42 @@ public class LocalTransaction implements ServerTransaction private final List<Action> _postTransactionActions = new ArrayList<Action>(); private volatile Transaction _transaction; - private MessageStore _transactionLog; + private final ActivityTimeAccessor _activityTime; + private final MessageStore _transactionLog; private volatile long _txnStartTime = 0L; + private volatile long _txnUpdateTime = 0l; private StoreFuture _asyncTran; public LocalTransaction(MessageStore transactionLog) { - _transactionLog = transactionLog; + this(transactionLog, new ActivityTimeAccessor() + { + @Override + public long getActivityTime() + { + return System.currentTimeMillis(); + } + }); } - - public boolean inTransaction() + + public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime) { - return _transaction != null; + _transactionLog = transactionLog; + _activityTime = activityTime; } + @Override public long getTransactionStartTime() { return _txnStartTime; } + @Override + public long getTransactionUpdateTime() + { + return _txnUpdateTime; + } + public void addPostTransactionAction(Action postTransactionAction) { sync(); @@ -78,6 +95,7 @@ public class LocalTransaction implements ServerTransaction { sync(); _postTransactionActions.add(postTransactionAction); + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); if(message.isPersistent() && queue.isDurable()) { @@ -104,6 +122,7 @@ public class LocalTransaction implements ServerTransaction { sync(); _postTransactionActions.add(postTransactionAction); + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); try { @@ -180,6 +199,7 @@ public class LocalTransaction implements ServerTransaction { sync(); _postTransactionActions.add(postTransactionAction); + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); if(message.isPersistent() && queue.isDurable()) { @@ -189,7 +209,7 @@ public class LocalTransaction implements ServerTransaction { _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); } - + beginTranIfNecessary(); _transaction.enqueueMessage(queue, message); } @@ -202,15 +222,11 @@ public class LocalTransaction implements ServerTransaction } } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) { sync(); _postTransactionActions.add(postTransactionAction); - - if (_txnStartTime == 0L) - { - _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime; - } + initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); if(message.isPersistent()) { @@ -224,8 +240,7 @@ public class LocalTransaction implements ServerTransaction { _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() ); } - - + beginTranIfNecessary(); _transaction.enqueueMessage(queue, message); } @@ -378,8 +393,6 @@ public class LocalTransaction implements ServerTransaction } throw new RuntimeException("Failed to commit transaction", e); } - - } private void doPostTransactionActions() @@ -437,16 +450,34 @@ public class LocalTransaction implements ServerTransaction } } + private void initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime() + { + long currentTime = _activityTime.getActivityTime(); + + if (_txnStartTime == 0) + { + _txnStartTime = currentTime; + } + _txnUpdateTime = currentTime; + } + private void resetDetails() { _asyncTran = null; _transaction = null; - _postTransactionActions.clear(); + _postTransactionActions.clear(); _txnStartTime = 0L; + _txnUpdateTime = 0; } public boolean isTransactional() { return true; } + + public interface ActivityTimeAccessor + { + long getActivityTime(); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index c568ae67aa..8acac00479 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -55,11 +55,18 @@ public interface ServerTransaction /** * Return the time the current transaction started. - * + * * @return the time this transaction started or 0 if not in a transaction */ long getTransactionStartTime(); + /** + * Return the time of the last activity on the current transaction. + * + * @return the time of the last activity or 0 if not in a transaction + */ + long getTransactionUpdateTime(); + /** * Register an Action for execution after transaction commit or rollback. Actions * will be executed in the order in which they are registered. @@ -92,7 +99,7 @@ public interface ServerTransaction * * Store operations will result only for a persistent messages on durable queues. */ - void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime); + void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction); /** * Commit the transaction represented by this object. diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java index 9081dc49d6..96078d766c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java @@ -18,67 +18,131 @@ */ package org.apache.qpid.server; -import static org.mockito.Matchers.any; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.qpid.server.logging.messages.ChannelMessages.IDLE_TXN_LOG_HIERARCHY; +import static org.apache.qpid.server.logging.messages.ChannelMessages.OPEN_TXN_LOG_HIERARCHY; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.test.utils.QpidTestCase; +import org.hamcrest.Description; +import org.mockito.ArgumentMatcher; public class TransactionTimeoutHelperTest extends QpidTestCase { - private final LogMessage _logMessage = mock(LogMessage.class); private final LogActor _logActor = mock(LogActor.class); private final LogSubject _logSubject = mock(LogSubject.class); + private final ServerTransaction _transaction = mock(ServerTransaction.class); + private final CloseAction _closeAction = mock(CloseAction.class); private TransactionTimeoutHelper _transactionTimeoutHelper; - private RootMessageLogger _rootMessageLogger; + private long _now; - public void testLogIfNecessary() + public void testNotTransactional() throws Exception { - _transactionTimeoutHelper.logIfNecessary(99, 100, _logMessage, ""); - verifyZeroInteractions(_logActor, _logMessage); + when(_transaction.isTransactional()).thenReturn(false); - _transactionTimeoutHelper.logIfNecessary(101, 100, _logMessage, ""); - verify(_logActor).message(_logSubject, _logMessage); + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 5, 10, 5, 10); + + verifyZeroInteractions(_logActor, _closeAction); + } + + public void testOpenTransactionProducesWarningOnly() throws Exception + { + final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61); + + configureMockTransaction(sixtyOneSecondsAgo, sixtyOneSecondsAgo); + + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(30), 0, 0, 0); + + verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms")); + verifyZeroInteractions(_closeAction); + } + + public void testOpenTransactionProducesTimeoutActionOnly() throws Exception + { + final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61); + + configureMockTransaction(sixtyOneSecondsAgo, sixtyOneSecondsAgo); + + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, SECONDS.toMillis(30), 0, 0); + + verify(_closeAction).doTimeoutAction("Open transaction timed out"); + verifyZeroInteractions(_logActor); } - public void testLogIfNecessaryWhenOperationalLoggingDisabled() + public void testOpenTransactionProducesWarningAndTimeoutAction() throws Exception { - //disable the operational logging - when(_rootMessageLogger.isMessageEnabled( - same(_logActor), any(LogSubject.class), any(String.class))) - .thenReturn(false); - - //verify the actor is never asked to log a message - _transactionTimeoutHelper.logIfNecessary(101, 100, _logMessage, ""); - verify(_logActor, never()).message(any(LogMessage.class)); - verify(_logActor, never()).message(any(LogSubject.class), any(LogMessage.class)); + final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61); + + configureMockTransaction(sixtyOneSecondsAgo, sixtyOneSecondsAgo); + + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(15), SECONDS.toMillis(30), 0, 0); + + verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms")); + verify(_closeAction).doTimeoutAction("Open transaction timed out"); } - public void testIsTimedOut() + public void testIdleTransactionProducesWarningOnly() throws Exception { - assertFalse("Shouldn't have timed out", _transactionTimeoutHelper.isTimedOut(199,200)); - assertTrue("Should have timed out", _transactionTimeoutHelper.isTimedOut(201,200)); + final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61); + final long thrityOneSecondsAgo = _now - SECONDS.toMillis(31); + + configureMockTransaction(sixtyOneSecondsAgo, thrityOneSecondsAgo); + + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, SECONDS.toMillis(30), 0); + + verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms")); + verifyZeroInteractions(_closeAction); } - /** - * If TransactionTimeout is disabled, the timeout will be 0. This test verifies - * that the helper methods respond negatively in this scenario. - */ - public void testTransactionTimeoutDisabled() + public void testIdleTransactionProducesTimeoutActionOnly() throws Exception { - assertFalse("Shouldn't have timed out", _transactionTimeoutHelper.isTimedOut(201,0)); + final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61); + final long thrityOneSecondsAgo = _now - SECONDS.toMillis(31); + + configureMockTransaction(sixtyOneSecondsAgo, thrityOneSecondsAgo); - _transactionTimeoutHelper.logIfNecessary(99, 0, _logMessage, ""); - verifyZeroInteractions(_logActor, _logMessage); + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, 0, SECONDS.toMillis(30)); + + verify(_closeAction).doTimeoutAction("Idle transaction timed out"); + verifyZeroInteractions(_logActor); + } + + public void testIdleTransactionProducesWarningAndTimeoutAction() throws Exception + { + final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61); + final long thrityOneSecondsAgo = _now - SECONDS.toMillis(31); + + configureMockTransaction(sixtyOneSecondsAgo, thrityOneSecondsAgo); + + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, SECONDS.toMillis(15), SECONDS.toMillis(30)); + + verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms")); + verify(_closeAction).doTimeoutAction("Idle transaction timed out"); + } + + public void testIdleAndOpenWarnings() throws Exception + { + final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61); + final long thirtyOneSecondsAgo = _now - SECONDS.toMillis(31); + + configureMockTransaction(sixtyOneSecondsAgo, thirtyOneSecondsAgo); + + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(60), 0, SECONDS.toMillis(30), 0); + + verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms")); + verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms")); + verifyZeroInteractions(_closeAction); } @Override @@ -88,14 +152,79 @@ public class TransactionTimeoutHelperTest extends QpidTestCase CurrentActor.set(_logActor); - _rootMessageLogger = mock(RootMessageLogger.class); - when(_logActor.getRootMessageLogger()).thenReturn(_rootMessageLogger); + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, _closeAction); + _now = System.currentTimeMillis(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + CurrentActor.remove(); + } + } - when(_rootMessageLogger.isMessageEnabled( - same(_logActor), any(LogSubject.class), any(String.class))) - .thenReturn(true); + private void configureMockTransaction(final long startTime, final long updateTime) + { + when(_transaction.isTransactional()).thenReturn(true); + when(_transaction.getTransactionStartTime()).thenReturn(startTime); + when(_transaction.getTransactionUpdateTime()).thenReturn(updateTime); + } - _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject); + private LogMessage isLogMessage(String expectedlogHierarchy, String expectedText) + { + return argThat(new IsLogMessage(expectedlogHierarchy, expectedText)); } + class IsLogMessage extends ArgumentMatcher<LogMessage> + { + private final String _expectedLogHierarchy; + private final String _expectedLogMessageMatches; + private String _hierarchyMatchesFailure; + private String _logMessageMatchesFailure; + + public IsLogMessage(String expectedlogHierarchy, String expectedLogMessageMatches) + { + _expectedLogHierarchy = expectedlogHierarchy; + _expectedLogMessageMatches = expectedLogMessageMatches; + } + + public boolean matches(Object arg) + { + LogMessage logMessage = (LogMessage)arg; + + boolean hierarchyMatches = logMessage.getLogHierarchy().equals(_expectedLogHierarchy); + boolean logMessageMatches = logMessage.toString().matches(_expectedLogMessageMatches); + + if (!hierarchyMatches) + { + _hierarchyMatchesFailure = "LogHierarchy does not match. Expected " + _expectedLogHierarchy + " actual " + logMessage.getLogHierarchy(); + } + + if (!logMessageMatches) + { + _logMessageMatchesFailure = "LogMessage does not match. Expected " + _expectedLogMessageMatches + " actual " + logMessage.toString(); + } + + return hierarchyMatches && logMessageMatches; + } + + @Override + public void describeTo(Description description) + { + if (_hierarchyMatchesFailure != null) + { + description.appendText(_hierarchyMatchesFailure); + } + if (_logMessageMatchesFailure != null) + { + description.appendText(_logMessageMatchesFailure); + } + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 2cd423d4c9..ea07c14f8a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -659,7 +659,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void onRollback() { } - }, 0L); + }); // Check that it is enqueued AMQQueue data = store.getMessages().get(1L); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index e18269241e..8b7807e999 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -630,7 +630,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase { //To change body of implemented methods use File | Settings | File Templates. } - }, 0L); + }); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java index 1aa91fa98a..5c1012d50b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java @@ -82,7 +82,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase AsyncAutoCommitTransaction asyncAutoCommitTransaction = new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); - asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction, System.currentTimeMillis()); + asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction); verify(_storeTransaction).enqueueMessage(_queue, _message); verify(_futureRecorder).recordFuture(_future, _postTransactionAction); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java index cd3fe3c473..06b8539eb1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -137,7 +137,7 @@ public class AutoCommitTransactionTest extends QpidTestCase _message = createTestMessage(false); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action, 0L); + _transaction.enqueue(_queues, _message, _action); assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -157,7 +157,7 @@ public class AutoCommitTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action, 0L); + _transaction.enqueue(_queues, _message, _action); assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -175,7 +175,7 @@ public class AutoCommitTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, true, false, true}); - _transaction.enqueue(_queues, _message, _action, 0L); + _transaction.enqueue(_queues, _message, _action); assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); @@ -198,7 +198,7 @@ public class AutoCommitTransactionTest extends QpidTestCase try { - _transaction.enqueue(_queues, _message, _action, 0L); + _transaction.enqueue(_queues, _message, _action); fail("Exception not thrown"); } catch (RuntimeException re) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index 5992829f37..4904cbc6fb 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -140,7 +140,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(false); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action1, 0L); + _transaction.enqueue(_queues, _message, _action1); assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -156,7 +156,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, false, false}); - _transaction.enqueue(_queues, _message, _action1, 0L); + _transaction.enqueue(_queues, _message, _action1); assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -173,7 +173,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(true); _queues = createTestBaseQueues(new boolean[] {false, true, false, true}); - _transaction.enqueue(_queues, _message, _action1, 0L); + _transaction.enqueue(_queues, _message, _action1); assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); @@ -196,7 +196,7 @@ public class LocalTransactionTest extends QpidTestCase try { - _transaction.enqueue(_queues, _message, _action1, 0L); + _transaction.enqueue(_queues, _message, _action1); fail("Exception not thrown"); } catch (RuntimeException re) @@ -217,7 +217,7 @@ public class LocalTransactionTest extends QpidTestCase { _message = createTestMessage(false); _queue = createTestAMQQueue(false); - + _transaction.dequeue(_queue, _message, _action1); assertEquals("Dequeue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); @@ -465,7 +465,6 @@ public class LocalTransactionTest extends QpidTestCase */ public void testRollbackWorkWithAdditionalPostAction() throws Exception { - _message = createTestMessage(true); _queue = createTestAMQQueue(true); @@ -482,6 +481,122 @@ public class LocalTransactionTest extends QpidTestCase assertTrue("Rollback action2 must be fired", _action1.isRollbackActionFired()); } + public void testFirstEnqueueRecordsTransactionStartAndUpdateTime() throws Exception + { + assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime()); + assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); + + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + long startTime = System.currentTimeMillis(); + _transaction.enqueue(_queue, _message, _action1); + + assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime); + assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime()); + } + + public void testSubsequentEnqueueAdvancesTransactionUpdateTimeOnly() throws Exception + { + assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime()); + assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); + + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _transaction.enqueue(_queue, _message, _action1); + + final long transactionStartTimeAfterFirstEnqueue = _transaction.getTransactionStartTime(); + final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime(); + + Thread.sleep(1); + _transaction.enqueue(_queue, _message, _action2); + + final long transactionStartTimeAfterSecondEnqueue = _transaction.getTransactionStartTime(); + final long transactionUpdateTimeAfterSecondEnqueue = _transaction.getTransactionUpdateTime(); + + assertEquals("Transaction start time after second enqueue should be unchanged", transactionStartTimeAfterFirstEnqueue, transactionStartTimeAfterSecondEnqueue); + assertTrue("Transaction update time after second enqueue should be greater than first update time", transactionUpdateTimeAfterSecondEnqueue > transactionUpdateTimeAfterFirstEnqueue); + } + + public void testFirstDequeueRecordsTransactionStartAndUpdateTime() throws Exception + { + assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime()); + assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); + + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + long startTime = System.currentTimeMillis(); + _transaction.dequeue(_queue, _message, _action1); + + assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime); + assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime()); + } + + public void testMixedEnqueuesAndDequeuesAdvancesTransactionUpdateTimeOnly() throws Exception + { + assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime()); + assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); + + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _transaction.enqueue(_queue, _message, _action1); + + final long transactionStartTimeAfterFirstEnqueue = _transaction.getTransactionStartTime(); + final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime(); + + Thread.sleep(1); + _transaction.dequeue(_queue, _message, _action2); + + final long transactionStartTimeAfterFirstDequeue = _transaction.getTransactionStartTime(); + final long transactionUpdateTimeAfterFirstDequeue = _transaction.getTransactionUpdateTime(); + + assertEquals("Transaction start time after first dequeue should be unchanged", transactionStartTimeAfterFirstEnqueue, transactionStartTimeAfterFirstDequeue); + assertTrue("Transaction update time after first dequeue should be greater than first update time", transactionUpdateTimeAfterFirstDequeue > transactionUpdateTimeAfterFirstEnqueue); + } + + public void testCommitResetsTransactionStartAndUpdateTime() throws Exception + { + assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime()); + assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); + + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + long startTime = System.currentTimeMillis(); + _transaction.enqueue(_queue, _message, _action1); + + assertTrue(_transaction.getTransactionStartTime() >= startTime); + assertTrue(_transaction.getTransactionUpdateTime() >= startTime); + + _transaction.commit(); + + assertEquals("Transaction start time should be reset after commit", 0, _transaction.getTransactionStartTime()); + assertEquals("Transaction update time should be reset after commit", 0, _transaction.getTransactionUpdateTime()); + } + + public void testRollbackResetsTransactionStartAndUpdateTime() throws Exception + { + assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime()); + assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime()); + + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + long startTime = System.currentTimeMillis(); + _transaction.enqueue(_queue, _message, _action1); + + assertTrue(_transaction.getTransactionStartTime() >= startTime); + assertTrue(_transaction.getTransactionUpdateTime() >= startTime); + + _transaction.rollback(); + + assertEquals("Transaction start time should be reset after rollback", 0, _transaction.getTransactionStartTime()); + assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime()); + } + private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags) { Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java index b11df5a2a0..1e8760dc33 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -39,7 +39,7 @@ public class TransactionTimeoutTest extends TransactionTimeoutTestCase protected void configure() throws Exception { - // Setup housekeeping every second + // Setup housekeeping every 100ms setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100"); if (getName().contains("ProducerIdle")) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java index e2b0f00ee4..721dc027c6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java @@ -23,17 +23,13 @@ package org.apache.qpid.test.unit.transacted; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.jms.Session; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.util.LogMonitor; +import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -41,6 +37,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -61,7 +58,7 @@ public abstract class TransactionTimeoutTestCase extends QpidBrokerTestCase impl public static final String OPEN = "Open"; protected LogMonitor _monitor; - protected AMQConnection _con; + protected Connection _con; protected Session _psession, _csession; protected Queue _queue; protected MessageConsumer _consumer; @@ -89,16 +86,14 @@ public abstract class TransactionTimeoutTestCase extends QpidBrokerTestCase impl super.setUp(); // Connect to broker - String broker = ("tcp://localhost:" + DEFAULT_PORT); - ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'"); - _con = (AMQConnection) getConnection(url); + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(1)); + _con = getConnection(); _con.setExceptionListener(this); _con.start(); // Create queue Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED); - AMQShortString queueName = new AMQShortString("test"); - _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true); + _queue = qsession.createQueue(getTestQueueName()); qsession.close(); // Create producer and consumer |
