From 8bce3305776bbfbe5b80fdf1a6120c30a89b1552 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 21 Jan 2012 00:06:40 +0000 Subject: QPID-3774 : allow out of order completion of persistent enqueues / dequeues (0-9 codepath) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1234215 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 90 ++++++++++++++++++---- .../qpid/server/handler/AccessRequestHandler.java | 8 +- .../server/handler/BasicCancelMethodHandler.java | 1 + .../server/handler/BasicConsumeMethodHandler.java | 1 + .../qpid/server/handler/BasicGetMethodHandler.java | 1 + .../qpid/server/handler/BasicQosHandler.java | 2 +- .../server/handler/BasicRecoverMethodHandler.java | 1 + .../handler/BasicRecoverSyncMethodHandler.java | 2 +- .../qpid/server/handler/ChannelCloseHandler.java | 1 + .../qpid/server/handler/ChannelFlowHandler.java | 1 + .../qpid/server/handler/ExchangeBoundHandler.java | 8 +- .../server/handler/ExchangeDeclareHandler.java | 7 ++ .../qpid/server/handler/ExchangeDeleteHandler.java | 8 +- .../qpid/server/handler/QueueBindHandler.java | 13 ++-- .../qpid/server/handler/QueueDeclareHandler.java | 1 + .../qpid/server/handler/QueueDeleteHandler.java | 2 +- .../qpid/server/handler/QueuePurgeHandler.java | 2 +- .../qpid/server/handler/QueueUnbindHandler.java | 1 + .../server/txn/AsyncAutoCommitTransaction.java | 31 +++++--- .../management/jmx/ManagedConnectionMBeanTest.java | 4 +- .../qpid/server/logging/ExchangeLoggingTest.java | 3 +- 21 files changed, 148 insertions(+), 40 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3f5c204f43..8a707dc906 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -73,27 +73,20 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -public class AMQChannel implements SessionConfig, AMQSessionModel +public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -132,6 +125,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final MessageStore _messageStore; + private final LinkedList _unfinishedCommandsQueue = new LinkedList(); + + private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; + private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); // Set of messages being acknoweledged in the current transaction @@ -184,7 +181,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _messageStore = messageStore; // by default the session is non-transactional - _transaction = new AutoCommitTransaction(_messageStore); + _transaction = new AsyncAutoCommitTransaction(_messageStore, this); _clientDeliveryMethod = session.createDeliveryMethod(_channelId); } @@ -203,14 +200,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel public boolean isTransactional() { - // this does not look great but there should only be one "non-transactional" - // transactional context, while there could be several transactional ones in - // theory - return !(_transaction instanceof AutoCommitTransaction); + return _transaction.isTransactional(); } public void receivedComplete() { + sync(); } @@ -1562,4 +1557,69 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } } + + public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + { + _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); + } + + public void completeAsyncCommands() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) + { + cmd.complete(); + _unfinishedCommandsQueue.poll(); + } + while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) + { + cmd = _unfinishedCommandsQueue.poll(); + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + + public void sync() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.poll()) != null) + { + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + private static class AsyncCommand + { + private final MessageStore.StoreFuture _future; + private ServerTransaction.Action _action; + + public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + { + _future = future; + _action = action; + } + + void awaitReadyForCompletion() + { + _future.waitForCompletion(); + } + + void complete() + { + if(!_future.isComplete()) + { + _future.waitForCompletion(); + } + _action.postCommit(); + _action = null; + } + + boolean isReadyForCompletion() + { + return _future.isComplete(); + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java index d587ef0c16..1b0168df56 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java @@ -24,6 +24,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -52,6 +53,11 @@ public class AccessRequestHandler implements StateAwareMethodListener { throw body.getChannelNotFoundException(channelId); } - + channel.sync(); channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index c7842cd643..429217321c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -65,6 +65,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener queueEntries, Action postTransactionAction) @@ -265,17 +275,20 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public void commit(final Runnable immediatePostTransactionAction) { - addFuture(MessageStore.IMMEDIATE_FUTURE, new Action() + if(immediatePostTransactionAction != null) { - public void postCommit() + addFuture(MessageStore.IMMEDIATE_FUTURE, new Action() { - immediatePostTransactionAction.run(); - } + public void postCommit() + { + immediatePostTransactionAction.run(); + } - public void onRollback() - { - } - }); + public void onRollback() + { + } + }); + } } public void commit() diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java index 7f578a6389..a258e4267d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java @@ -224,13 +224,13 @@ public class ManagedConnectionMBeanTest extends QpidBrokerTestCase mBean.rollbackTransactions(channelId.intValue()); Message m = consumer.receive(1000l); - assertNull("Unexpected message received", m); + assertNull("Unexpected message received: " + String.valueOf(m), m); producerSession.commit(); _connection.start(); m = consumer.receive(1000l); - assertNull("Unexpected message received", m); + assertNull("Unexpected message received after commit " + String.valueOf(m), m); } public void testAuthorisedId() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java index ec96f778f6..07faf1ef3e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java @@ -31,6 +31,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; @@ -195,7 +196,7 @@ public class ExchangeLoggingTest extends AbstractTestLogging ExchangeDeleteBody body = registry.createExchangeDeleteBody(0, new AMQShortString(_name), false, true); - AMQFrame exchangeDeclare = body.generateFrame(0); + AMQFrame exchangeDeclare = body.generateFrame(((AMQSession)_session).getChannelId()); ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeleteOkBody.class); } -- cgit v1.2.1