diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-21 00:06:40 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-21 00:06:40 +0000 |
commit | 24aa816b81ebac14119ed00b1ae463db334ce381 (patch) | |
tree | e95eec0e17394210639e03ae2ad24eabe83522ff /java/broker/src/main/java/org/apache | |
parent | bcc3eec81fed2e1f0acd0019e055d7f47303696b (diff) | |
download | qpid-python-24aa816b81ebac14119ed00b1ae463db334ce381.tar.gz |
QPID-3774 : allow out of order completion of persistent enqueues / dequeues (0-9 codepath)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1234215 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache')
19 files changed, 144 insertions, 37 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3f5c204f43..8a707dc906 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -73,27 +73,20 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -public class AMQChannel implements SessionConfig, AMQSessionModel +public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -132,6 +125,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final MessageStore _messageStore; + private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); + + private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; + private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); // Set of messages being acknoweledged in the current transaction @@ -184,7 +181,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _messageStore = messageStore; // by default the session is non-transactional - _transaction = new AutoCommitTransaction(_messageStore); + _transaction = new AsyncAutoCommitTransaction(_messageStore, this); _clientDeliveryMethod = session.createDeliveryMethod(_channelId); } @@ -203,14 +200,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel public boolean isTransactional() { - // this does not look great but there should only be one "non-transactional" - // transactional context, while there could be several transactional ones in - // theory - return !(_transaction instanceof AutoCommitTransaction); + return _transaction.isTransactional(); } public void receivedComplete() { + sync(); } @@ -1562,4 +1557,69 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } } + + public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + { + _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); + } + + public void completeAsyncCommands() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) + { + cmd.complete(); + _unfinishedCommandsQueue.poll(); + } + while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) + { + cmd = _unfinishedCommandsQueue.poll(); + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + + public void sync() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.poll()) != null) + { + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + private static class AsyncCommand + { + private final MessageStore.StoreFuture _future; + private ServerTransaction.Action _action; + + public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + { + _future = future; + _action = action; + } + + void awaitReadyForCompletion() + { + _future.waitForCompletion(); + } + + void complete() + { + if(!_future.isComplete()) + { + _future.waitForCompletion(); + } + _action.postCommit(); + _action = null; + } + + boolean isReadyForCompletion() + { + return _future.isComplete(); + } + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java index d587ef0c16..1b0168df56 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java @@ -24,6 +24,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -52,6 +53,11 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
MethodRegistry methodRegistry = session.getMethodRegistry();
@@ -71,7 +77,7 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
}
-
+ channel.sync();
session.writeFrame(response.generateFrame(channelId));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 29054f55c1..bc2a2dca04 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -68,6 +68,7 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC { MethodRegistry methodRegistry = session.getMethodRegistry(); BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag()); + channel.sync(); session.writeFrame(cancelOkBody.generateFrame(channelId)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 8875f21d0b..a1cfb14753 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -60,6 +60,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } else { + channel.sync(); if (_logger.isDebugEnabled()) { _logger.debug("BasicConsume: from '" + body.getQueue() + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index bbb009003c..2073299467 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -75,6 +75,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } else { + channel.sync(); AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue()); if (queue == null) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index dd3281c65f..2cf043dd26 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -46,7 +46,7 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> { throw body.getChannelNotFoundException(channelId); } - + channel.sync(); channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index c7842cd643..429217321c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -65,6 +65,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic { MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) session.getMethodRegistry(); AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody(); + channel.sync(); session.writeFrame(recoverOk.generateFrame(channelId)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java index f9feada6fe..1e2a83b922 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java @@ -59,7 +59,7 @@ public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<B {
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
channel.resend(body.getRequeue());
// Qpid 0-8 hacks a synchronous -ok onto recover.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 32aa99534b..ecffd1b9cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -65,6 +65,7 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos { throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel"); } + channel.sync(); session.closeChannel(channelId); // Client requested closure so we don't wait for ok we send it stateManager.getProtocolSession().closeChannelOk(channelId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index 5ccaa49de8..365c8bd9c6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -55,6 +55,7 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB { throw body.getChannelNotFoundException(channelId); } + channel.sync(); channel.setSuspended(!body.getActive()); _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 21aea1510b..53835f381f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; @@ -69,7 +70,12 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); MethodRegistry methodRegistry = session.getMethodRegistry(); - + final AMQChannel channel = session.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + channel.sync(); AMQShortString exchangeName = body.getExchange(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 98a0d33487..69cf0c9e20 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -55,6 +56,11 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + final AMQChannel channel = session.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } if (_logger.isDebugEnabled()) { @@ -102,6 +108,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { MethodRegistry methodRegistry = session.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); + channel.sync(); session.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 586aaf9336..339085691f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.ExchangeDeleteOkBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -49,7 +50,12 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - + final AMQChannel channel = session.getChannel(channelId); + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + channel.sync(); try { if(exchangeRegistry.getExchange(body.getExchange()) == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 0eb69e4b16..bb979d5441 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -64,18 +64,18 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> VirtualHost virtualHost = protocolConnection.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + AMQChannel channel = protocolConnection.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } final AMQQueue queue; final AMQShortString routingKey; if (body.getQueue() == null) { - AMQChannel channel = protocolConnection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId); - } queue = channel.getDefaultQueue(); @@ -150,6 +150,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> } if (!body.getNowait()) { + channel.sync(); MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); protocolConnection.writeFrame(responseBody.generateFrame(channelId)); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 693b316607..32cd1c2d08 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -197,6 +197,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if (!body.getNowait()) { + channel.sync(); MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 902e3ade85..107e485275 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -71,7 +71,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB { throw body.getChannelNotFoundException(channelId); } - + channel.sync(); AMQQueue queue; if (body.getQueue() == null) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index 6c3e11be5b..7d609f9064 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -109,7 +109,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod if(!body.getNowait()) { - + channel.sync(); MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); protocolConnection.writeFrame(responseBody.generateFrame(channelId)); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java index 3849c5af19..9915627a94 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java @@ -132,6 +132,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB // 0-8 does not support QueueUnbind throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + session.getProtocolVersion(), null); } + channel.sync(); session.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 11439b3c8a..7e238aeadc 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -114,7 +114,17 @@ public class AsyncAutoCommitTransaction implements ServerTransaction private void addFuture(final MessageStore.StoreFuture future, final Action action) { - _futureRecorder.recordFuture(future, action); + if(action != null) + { + if(future.isComplete()) + { + action.postCommit(); + } + else + { + _futureRecorder.recordFuture(future, action); + } + } } public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) @@ -265,17 +275,20 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public void commit(final Runnable immediatePostTransactionAction) { - addFuture(MessageStore.IMMEDIATE_FUTURE, new Action() + if(immediatePostTransactionAction != null) { - public void postCommit() + addFuture(MessageStore.IMMEDIATE_FUTURE, new Action() { - immediatePostTransactionAction.run(); - } + public void postCommit() + { + immediatePostTransactionAction.run(); + } - public void onRollback() - { - } - }); + public void onRollback() + { + } + }); + } } public void commit() |