summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-21 00:06:40 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-21 00:06:40 +0000
commit24aa816b81ebac14119ed00b1ae463db334ce381 (patch)
treee95eec0e17394210639e03ae2ad24eabe83522ff /java/broker/src/main/java/org/apache
parentbcc3eec81fed2e1f0acd0019e055d7f47303696b (diff)
downloadqpid-python-24aa816b81ebac14119ed00b1ae463db334ce381.tar.gz
QPID-3774 : allow out of order completion of persistent enqueues / dequeues (0-9 codepath)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1234215 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java90
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java1
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java31
19 files changed, 144 insertions, 37 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 3f5c204f43..8a707dc906 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -73,27 +73,20 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-public class AMQChannel implements SessionConfig, AMQSessionModel
+public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -132,6 +125,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private final MessageStore _messageStore;
+ private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
+
+ private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
+
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
// Set of messages being acknoweledged in the current transaction
@@ -184,7 +181,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_messageStore = messageStore;
// by default the session is non-transactional
- _transaction = new AutoCommitTransaction(_messageStore);
+ _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
_clientDeliveryMethod = session.createDeliveryMethod(_channelId);
}
@@ -203,14 +200,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
public boolean isTransactional()
{
- // this does not look great but there should only be one "non-transactional"
- // transactional context, while there could be several transactional ones in
- // theory
- return !(_transaction instanceof AutoCommitTransaction);
+ return _transaction.isTransactional();
}
public void receivedComplete()
{
+ sync();
}
@@ -1562,4 +1557,69 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
}
+
+ public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ {
+ _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
+ }
+
+ public void completeAsyncCommands()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
+ {
+ cmd.complete();
+ _unfinishedCommandsQueue.poll();
+ }
+ while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
+ {
+ cmd = _unfinishedCommandsQueue.poll();
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public void sync()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.poll()) != null)
+ {
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+ private static class AsyncCommand
+ {
+ private final MessageStore.StoreFuture _future;
+ private ServerTransaction.Action _action;
+
+ public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ {
+ _future = future;
+ _action = action;
+ }
+
+ void awaitReadyForCompletion()
+ {
+ _future.waitForCompletion();
+ }
+
+ void complete()
+ {
+ if(!_future.isComplete())
+ {
+ _future.waitForCompletion();
+ }
+ _action.postCommit();
+ _action = null;
+ }
+
+ boolean isReadyForCompletion()
+ {
+ return _future.isComplete();
+ }
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
index d587ef0c16..1b0168df56 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
@@ -24,6 +24,7 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -52,6 +53,11 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ
public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
MethodRegistry methodRegistry = session.getMethodRegistry();
@@ -71,7 +77,7 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ
throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
}
-
+ channel.sync();
session.writeFrame(response.generateFrame(channelId));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index 29054f55c1..bc2a2dca04 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -68,6 +68,7 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
{
MethodRegistry methodRegistry = session.getMethodRegistry();
BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
+ channel.sync();
session.writeFrame(cancelOkBody.generateFrame(channelId));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 8875f21d0b..a1cfb14753 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -60,6 +60,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
else
{
+ channel.sync();
if (_logger.isDebugEnabled())
{
_logger.debug("BasicConsume: from '" + body.getQueue() +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index bbb009003c..2073299467 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -75,6 +75,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
else
{
+ channel.sync();
AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
if (queue == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
index dd3281c65f..2cf043dd26 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
@@ -46,7 +46,7 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
{
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index c7842cd643..429217321c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -65,6 +65,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
{
MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) session.getMethodRegistry();
AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody();
+ channel.sync();
session.writeFrame(recoverOk.generateFrame(channelId));
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
index f9feada6fe..1e2a83b922 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
@@ -59,7 +59,7 @@ public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<B
{
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
channel.resend(body.getRequeue());
// Qpid 0-8 hacks a synchronous -ok onto recover.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 32aa99534b..ecffd1b9cb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -65,6 +65,7 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
{
throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
}
+ channel.sync();
session.closeChannel(channelId);
// Client requested closure so we don't wait for ok we send it
stateManager.getProtocolSession().closeChannelOk(channelId);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index 5ccaa49de8..365c8bd9c6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -55,6 +55,7 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
{
throw body.getChannelNotFoundException(channelId);
}
+ channel.sync();
channel.setSuspended(!body.getActive());
_logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 21aea1510b..53835f381f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -69,7 +70,12 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MethodRegistry methodRegistry = session.getMethodRegistry();
-
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+ channel.sync();
AMQShortString exchangeName = body.getExchange();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 98a0d33487..69cf0c9e20 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -55,6 +56,11 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
if (_logger.isDebugEnabled())
{
@@ -102,6 +108,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
MethodRegistry methodRegistry = session.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ channel.sync();
session.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index 586aaf9336..339085691f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -49,7 +50,12 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
-
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+ channel.sync();
try
{
if(exchangeRegistry.getExchange(body.getExchange()) == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 0eb69e4b16..bb979d5441 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -64,18 +64,18 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
VirtualHost virtualHost = protocolConnection.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
final AMQQueue queue;
final AMQShortString routingKey;
if (body.getQueue() == null)
{
- AMQChannel channel = protocolConnection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
queue = channel.getDefaultQueue();
@@ -150,6 +150,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
}
if (!body.getNowait())
{
+ channel.sync();
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
protocolConnection.writeFrame(responseBody.generateFrame(channelId));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 693b316607..32cd1c2d08 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -197,6 +197,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if (!body.getNowait())
{
+ channel.sync();
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
QueueDeclareOkBody responseBody =
methodRegistry.createQueueDeclareOkBody(queueName,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 902e3ade85..107e485275 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -71,7 +71,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
{
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
AMQQueue queue;
if (body.getQueue() == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index 6c3e11be5b..7d609f9064 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -109,7 +109,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
if(!body.getNowait())
{
-
+ channel.sync();
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
protocolConnection.writeFrame(responseBody.generateFrame(channelId));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
index 3849c5af19..9915627a94 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
@@ -132,6 +132,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
// 0-8 does not support QueueUnbind
throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + session.getProtocolVersion(), null);
}
+ channel.sync();
session.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 11439b3c8a..7e238aeadc 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -114,7 +114,17 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
private void addFuture(final MessageStore.StoreFuture future, final Action action)
{
- _futureRecorder.recordFuture(future, action);
+ if(action != null)
+ {
+ if(future.isComplete())
+ {
+ action.postCommit();
+ }
+ else
+ {
+ _futureRecorder.recordFuture(future, action);
+ }
+ }
}
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
@@ -265,17 +275,20 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
public void commit(final Runnable immediatePostTransactionAction)
{
- addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
+ if(immediatePostTransactionAction != null)
{
- public void postCommit()
+ addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
{
- immediatePostTransactionAction.run();
- }
+ public void postCommit()
+ {
+ immediatePostTransactionAction.run();
+ }
- public void onRollback()
- {
- }
- });
+ public void onRollback()
+ {
+ }
+ });
+ }
}
public void commit()