From 67a9864ed236ed085e263beaea7bae2c52522331 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 19 Feb 2015 17:32:33 +0000 Subject: Make close return a future, wait on Future in broker shutdown git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660949 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/server/Broker.java | 9 +- .../apache/qpid/server/consumer/ConsumerImpl.java | 3 +- .../server/model/AbstractConfiguredObject.java | 138 ++++++++++++++++++++- .../apache/qpid/server/model/ConfiguredObject.java | 3 +- .../server/store/AbstractJDBCMessageStore.java | 11 +- .../qpid/server/store/MemoryMessageStore.java | 5 +- .../org/apache/qpid/server/store/StoreFuture.java | 40 ------ .../org/apache/qpid/server/store/Transaction.java | 5 +- .../server/txn/AsyncAutoCommitTransaction.java | 28 ++--- .../apache/qpid/server/txn/LocalTransaction.java | 24 +++- .../org/apache/qpid/server/util/FutureResult.java | 50 ++++++++ .../server/txn/AsyncAutoCommitTransactionTest.java | 6 +- .../qpid/server/txn/MockStoreTransaction.java | 6 +- 13 files changed, 245 insertions(+), 83 deletions(-) delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java (limited to 'qpid/java/broker-core') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java index c18923ffe0..6c50fe7cfd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.Properties; +import java.util.concurrent.TimeoutException; import javax.security.auth.Subject; @@ -49,6 +50,7 @@ import org.apache.qpid.server.plugin.PluggableFactoryLoader; import org.apache.qpid.server.plugin.SystemConfigFactory; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.FutureResult; public class Broker implements BrokerShutdownProvider { @@ -102,11 +104,16 @@ public class Broker implements BrokerShutdownProvider { if(_systemConfig != null) { - _systemConfig.close(); + final FutureResult closeResult = _systemConfig.close(); + closeResult.waitForCompletion(5000l); } _taskExecutor.stop(); } + catch (TimeoutException e) + { + LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately"); + } finally { if (_configuringOwnLogging) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index fd6f3385c6..52fcf07e25 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.FutureResult; public interface ConsumerImpl { @@ -65,7 +66,7 @@ public interface ConsumerImpl boolean seesRequeues(); - void close(); + FutureResult close(); boolean trySendLock(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index baf465f6d1..aef769dc4f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -43,6 +43,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; @@ -68,6 +69,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.encryption.ConfigurationSecretEncrypter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.util.Strings; @@ -457,14 +459,15 @@ public abstract class AbstractConfiguredObject> im } } - protected void closeChildren() + protected FutureResult closeChildren() { + final List childCloseFutures = new ArrayList<>(); applyToChildren(new Action>() { @Override public void performAction(final ConfiguredObject child) { - child.close(); + childCloseFutures.add(child.close()); } }); @@ -483,13 +486,67 @@ public abstract class AbstractConfiguredObject> im childNameMap.clear(); } + + FutureResult futureResult; + if(childCloseFutures.isEmpty()) + { + futureResult = FutureResult.IMMEDIATE_FUTURE; + } + else + { + futureResult = new FutureResult() + { + @Override + public boolean isComplete() + { + for(FutureResult childResult : childCloseFutures) + { + if(!childResult.isComplete()) + { + return false; + } + } + return true; + } + + @Override + public void waitForCompletion() + { + for(FutureResult childResult : childCloseFutures) + { + childResult.waitForCompletion(); + } + } + + + @Override + public void waitForCompletion(long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + for(FutureResult childResult : childCloseFutures) + { + + childResult.waitForCompletion(remaining); + remaining = startTime + timeout - System.currentTimeMillis(); + if(remaining < 0) + { + throw new TimeoutException("Completion did not occur within specified timeout: " + timeout); + } + } + } + }; + } + return futureResult; } @Override - public final void close() + public final FutureResult close() { if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) { + final CloseResult closeResult = new CloseResult(); + CloseFuture close = beforeClose(); Runnable closeRunnable = new Runnable() @@ -497,7 +554,8 @@ public abstract class AbstractConfiguredObject> im @Override public void run() { - closeChildren(); + final FutureResult result = closeChildren(); + closeResult.setChildFutureResult(result); onClose(); unregister(false); @@ -514,7 +572,11 @@ public abstract class AbstractConfiguredObject> im } // if future not complete, schedule the remainder to be done once complete. - + return closeResult; + } + else + { + return FutureResult.IMMEDIATE_FUTURE; } } @@ -1899,6 +1961,72 @@ public abstract class AbstractConfiguredObject> im } } + private static class CloseResult implements FutureResult + { + private volatile FutureResult _childFutureResult; + + @Override + public boolean isComplete() + { + return _childFutureResult != null && _childFutureResult.isComplete(); + } + + @Override + public void waitForCompletion() + { + synchronized (this) + { + while (_childFutureResult == null) + { + try + { + wait(); + } + catch (InterruptedException e) + { + + } + } + } + _childFutureResult.waitForCompletion(); + } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + + synchronized (this) + { + while (_childFutureResult == null && remaining > 0) + { + try + { + wait(remaining); + } + catch (InterruptedException e) + { + + } + remaining = startTime + timeout - System.currentTimeMillis(); + + if(remaining < 0) + { + throw new TimeoutException("Completion did not occur within given tiemout: " + timeout); + } + } + } + _childFutureResult.waitForCompletion(remaining); + } + + public synchronized void setChildFutureResult(final FutureResult childFutureResult) + { + _childFutureResult = childFutureResult; + notifyAll(); + } + } + private class AttributeGettingHandler implements InvocationHandler { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 7079461a09..89fda6798b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.util.FutureResult; @ManagedObject( creatable = false, category = false ) /** @@ -246,7 +247,7 @@ public interface ConfiguredObject> void open(); - void close(); + FutureResult close(); TaskExecutor getTaskExecutor(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 4dfaa716cf..5868ae61c5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -45,6 +45,7 @@ import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.util.FutureResult; public abstract class AbstractJDBCMessageStore implements MessageStore { @@ -834,10 +835,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } } - private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException + private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException { commitTran(connWrapper); - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } private void abortTran(ConnectionWrapper connWrapper) throws StoreException @@ -1231,14 +1232,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { checkMessageStoreOpen(); doPreCommitActions(); - StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); + FutureResult futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); storedSizeChange(_storeSizeIncrease); doPostCommitActions(); - return storeFuture; + return futureResult; } private void doPreCommitActions() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index efe040fbb3..eb887b4ef5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.util.FutureResult; /** A simple message store that stores the messages in a thread-safe structure in memory. */ public class MemoryMessageStore implements MessageStore @@ -58,9 +59,9 @@ public class MemoryMessageStore implements MessageStore private Set _localDistributedTransactionsRemoves = new HashSet(); @Override - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java deleted file mode 100644 index 7d3bf90a75..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java +++ /dev/null @@ -1,40 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store; - -public interface StoreFuture -{ - StoreFuture IMMEDIATE_FUTURE = new StoreFuture() - { - public boolean isComplete() - { - return true; - } - - public void waitForCompletion() - { - } - }; - - boolean isComplete(); - - void waitForCompletion(); -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java index 6f7afccac0..007f3ab796 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.util.FutureResult; public interface Transaction { @@ -53,7 +54,7 @@ public interface Transaction * Commits all operations performed within a given transactional context. * */ - StoreFuture commitTranAsync(); + FutureResult commitTranAsync(); /** * Abandons all operations performed within a given transactional context. @@ -72,4 +73,4 @@ public interface Transaction void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues); -} \ No newline at end of file +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 65064b015c..809c234cc6 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -55,7 +55,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public static interface FutureRecorder { - public void recordFuture(StoreFuture future, Action action); + public void recordFuture(FutureResult future, Action action); } @@ -83,7 +83,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction */ public void addPostTransactionAction(final Action immediateAction) { - addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction); + addFuture(FutureResult.IMMEDIATE_FUTURE, immediateAction); } @@ -92,7 +92,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - StoreFuture future; + FutureResult future; if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) @@ -108,7 +108,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -120,7 +120,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - private void addFuture(final StoreFuture future, final Action action) + private void addFuture(final FutureResult future, final Action action) { if(action != null) { @@ -135,7 +135,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent) + private void addEnqueueFuture(final FutureResult future, final Action action, boolean persistent) { if(action != null) { @@ -178,7 +178,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - StoreFuture future; + FutureResult future; if(txn != null) { future = txn.commitTranAsync(); @@ -186,7 +186,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -204,7 +204,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - StoreFuture future; + FutureResult future; if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) @@ -219,7 +219,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; @@ -255,7 +255,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - StoreFuture future; + FutureResult future; if (txn != null) { future = txn.commitTranAsync(); @@ -263,7 +263,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; @@ -281,7 +281,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { if(immediatePostTransactionAction != null) { - addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action() + addFuture(FutureResult.IMMEDIATE_FUTURE, new Action() { public void postCommit() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 349ec793fe..b800556312 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.txn; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -53,7 +54,7 @@ public class LocalTransaction implements ServerTransaction private final MessageStore _transactionLog; private volatile long _txnStartTime = 0L; private volatile long _txnUpdateTime = 0l; - private StoreFuture _asyncTran; + private FutureResult _asyncTran; public LocalTransaction(MessageStore transactionLog) { @@ -271,16 +272,16 @@ public class LocalTransaction implements ServerTransaction } } - public StoreFuture commitAsync(final Runnable deferred) + public FutureResult commitAsync(final Runnable deferred) { sync(); - StoreFuture future = StoreFuture.IMMEDIATE_FUTURE; + FutureResult future = FutureResult.IMMEDIATE_FUTURE; if(_transaction != null) { - future = new StoreFuture() + future = new FutureResult() { private volatile boolean _completed = false; - private StoreFuture _underlying = _transaction.commitTranAsync(); + private FutureResult _underlying = _transaction.commitTranAsync(); @Override public boolean isComplete() @@ -298,6 +299,17 @@ public class LocalTransaction implements ServerTransaction } } + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + + if(!_completed) + { + _underlying.waitForCompletion(timeout); + checkUnderlyingCompletion(); + } + } + private synchronized boolean checkUnderlyingCompletion() { if(!_completed && _underlying.isComplete()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java new file mode 100644 index 0000000000..2aab3081ee --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java @@ -0,0 +1,50 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.util; + +import java.util.concurrent.TimeoutException; + +public interface FutureResult +{ + FutureResult IMMEDIATE_FUTURE = new FutureResult() + { + public boolean isComplete() + { + return true; + } + + public void waitForCompletion() + { + } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + + } + }; + + boolean isComplete(); + + void waitForCompletion(); + + void waitForCompletion(long timeout) throws TimeoutException; +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java index ec0908efba..a61ac4f5d2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction.FutureRecorder; import org.apache.qpid.server.txn.ServerTransaction.Action; @@ -43,7 +43,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase private MessageStore _messageStore = mock(MessageStore.class); private Transaction _storeTransaction = mock(Transaction.class); private Action _postTransactionAction = mock(Action.class); - private StoreFuture _future = mock(StoreFuture.class); + private FutureResult _future = mock(FutureResult.class); @Override @@ -136,7 +136,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); verifyZeroInteractions(_storeTransaction); - verify(_futureRecorder).recordFuture(StoreFuture.IMMEDIATE_FUTURE, _postTransactionAction); + verify(_futureRecorder).recordFuture(FutureResult.IMMEDIATE_FUTURE, _postTransactionAction); verifyZeroInteractions(_postTransactionAction); } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index da868a01f1..6fcfde0221 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -24,7 +24,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.NullMessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -96,7 +96,7 @@ class MockStoreTransaction implements Transaction _state = TransactionState.COMMITTED; } - public StoreFuture commitTranAsync() + public FutureResult commitTranAsync() { throw new NotImplementedException(); } @@ -126,4 +126,4 @@ class MockStoreTransaction implements Transaction } }; } -} \ No newline at end of file +} -- cgit v1.2.1