summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-12-28 13:02:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-12-28 13:02:41 +0000
commit55ccbf149980b06c7b7effa36871ffbdf50550fa (patch)
treef5fc6181438968f82af0528c751af32ea8fef64e /qpid/java/broker
parentf085f3b0ce89af428e75bf2ae3b8c65ecdd16ad6 (diff)
downloadqpid-python-55ccbf149980b06c7b7effa36871ffbdf50550fa.tar.gz
QPID-3714 : [Java] Performance Improvements
Persistence: Store message in same transaction as enqueue if possible Memory: Remove unnecessary (un)boxing Reduce unnecessary copying of message data Cache short strings Cache queues for a given routing key on an Exchange (0-9) Use a fixed size buffer for preparing frames to write out Other: Reduce calls to System.currentTimeMillis (0-10) Special case immutable RangeSets, in particular RangeSets of a single range/point (0-10) Special case delivery properties and message properties in headers (0-9) send commit-ok as soon as data committed to store Cache publishing access control queries (0-9) Optimised long and int typed values for FieldTables (0-9) Retain FieldTable encoded form (0-9) Cache queue and topic destinations git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1225178 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java33
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java64
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java107
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java36
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java29
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java1
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java105
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java56
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java11
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java840
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java837
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java837
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java65
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java21
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java149
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java146
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java69
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java79
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java4
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java65
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java144
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java169
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java59
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java61
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java40
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java7
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java3
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java55
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java32
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java60
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java18
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java124
-rwxr-xr-xqpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java88
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java16
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java41
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java5
87 files changed, 2660 insertions, 2288 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
index 593c1616fb..b898e85aa2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
@@ -111,6 +111,11 @@ public class ManagementExchange implements Exchange, QMFService.Listener
}
+ public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
+ {
+ enqueue(message);
+ }
+
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
enqueue(message);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
index b98daf7cb1..9ee8b923fc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
@@ -32,6 +32,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.management.common.mbeans.ManagedConnection;
import java.util.ArrayList;
+import java.util.List;
public class QMFBrokerRequestCommand extends QMFCommand
{
@@ -57,7 +58,7 @@ public class QMFBrokerRequestCommand extends QMFCommand
QMFMessage responseMessage = new QMFMessage(queueName, cmd);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
index 26a27cfa19..5d2717a9fb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
public class QMFClassQueryCommand extends QMFCommand
{
@@ -71,7 +72,7 @@ public class QMFClassQueryCommand extends QMFCommand
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
index 8e8cb55a0d..ff927a1de9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
@@ -163,7 +163,7 @@ public class QMFGetQueryCommand extends QMFCommand
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
index 895ff643a2..8208525b08 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
@@ -21,8 +21,11 @@
package org.apache.qpid.qmf;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.message.*;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.transport.codec.BBEncoder;
import java.nio.ByteBuffer;
@@ -59,11 +62,21 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead
return _routingKey;
}
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return AMQShortString.valueOf(_routingKey);
+ }
+
public AMQMessageHeader getMessageHeader()
{
return this;
}
+ public StoredMessage getStoredMessage()
+ {
+ throw new NotImplementedException();
+ }
+
public boolean isPersistent()
{
return false;
@@ -159,9 +172,9 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead
return new QMFMessageReference(this);
}
- public Long getMessageNumber()
+ public long getMessageNumber()
{
- return null;
+ return 0l;
}
public long getArrivalTime()
@@ -172,9 +185,9 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead
public int getContent(ByteBuffer buf, int offset)
{
ByteBuffer src = _content.duplicate();
- _content.position(offset);
- _content = _content.slice();
- int len = _content.remaining();
+ src.position(offset);
+ src = src.slice();
+ int len = src.remaining();
if(len > buf.remaining())
{
len = buf.remaining();
@@ -185,6 +198,16 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead
return len;
}
+
+ public ByteBuffer getContent(int offset, int size)
+ {
+ ByteBuffer src = _content.duplicate();
+ src.position(offset);
+ src = src.slice();
+ src.limit(size);
+ return src;
+ }
+
private static class QMFMessageReference extends MessageReference<QMFMessage>
{
public QMFMessageReference(QMFMessage message)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
index cf27e4b970..37c16efec5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.AMQException;
+import java.util.List;
import java.util.UUID;
import java.util.ArrayList;
@@ -68,7 +69,7 @@ public class QMFMethodRequestCommand extends QMFCommand
QMFMessage responseMessage = new QMFMessage(queueName, cmd);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
index 6defd088de..ed07457a23 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
public class QMFPackageQueryCommand extends QMFCommand
{
@@ -67,7 +68,7 @@ public class QMFPackageQueryCommand extends QMFCommand
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
index 3141676f10..850ffa8610 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
@@ -31,6 +31,7 @@ import org.apache.qpid.AMQException;
import java.util.Collection;
import java.util.ArrayList;
+import java.util.List;
public class QMFSchemaRequestCommand extends QMFCommand
{
@@ -70,7 +71,7 @@ public class QMFSchemaRequestCommand extends QMFCommand
QMFMessage responseMessage = new QMFMessage(routingKey, cmd);
- ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage);
+ List<? extends BaseQueue> queues = exchange.route(responseMessage);
for(BaseQueue q : queues)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index a4fd997568..82ac01cea8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -95,7 +95,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class AMQChannel implements SessionConfig, AMQSessionModel
{
- public static final int DEFAULT_PREFETCH = 5000;
+ public static final int DEFAULT_PREFETCH = 4096;
private static final Logger _logger = Logger.getLogger(AMQChannel.class);
@@ -166,6 +166,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private final UUID _id;
private long _createTime = System.currentTimeMillis();
+ private final ClientDeliveryMethod _clientDeliveryMethod;
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
@@ -183,6 +185,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
// by default the session is non-transactional
_transaction = new AutoCommitTransaction(_messageStore);
+
+ _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
}
public ConfigStore getConfigStore()
@@ -205,6 +209,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
return !(_transaction instanceof AutoCommitTransaction);
}
+ public void receivedComplete()
+ {
+ }
+
+
public boolean inTransaction()
{
return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
@@ -284,7 +293,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_currentMessage.setExpiration();
- MessageMetaData mmd = _currentMessage.headersReceived();
+ MessageMetaData mmd = _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd);
_currentMessage.setStoredMessage(handle);
@@ -316,8 +325,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
try
{
- _currentMessage.getStoredMessage().flushToStore();
- final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
+ final List<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
{
@@ -339,11 +347,13 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
else
{
- _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional()));
+ _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
incrementOutstandingTxnsIfNecessary();
updateTransactionalActivity();
}
}
+ _currentMessage.getStoredMessage().flushToStore();
+
}
finally
{
@@ -857,10 +867,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
{
- Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
- _unacknowledgedMessageMap.collect(deliveryTag, multiple, ackedMessageMap);
- _unacknowledgedMessageMap.remove(ackedMessageMap);
- return ackedMessageMap.values();
+ return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
+
}
/**
@@ -949,12 +957,17 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
public void commit() throws AMQException
{
+ commit(null);
+ }
+ public void commit(Runnable immediateAction) throws AMQException
+ {
+
if (!isTransactional())
{
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
- _transaction.commit();
+ _transaction.commit(immediateAction);
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
@@ -1033,7 +1046,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
if (isTransactional())
{
- _txnUpdateTime.set(System.currentTimeMillis());
+ _txnUpdateTime.set(getProtocolSession().getLastReceivedTime());
}
}
@@ -1079,20 +1092,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
return _messageStore;
}
- private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
- {
-
- public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
- throws AMQException
- {
- _session.registerMessageDelivered(entry.getMessage().getSize());
- getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
- deliveryTag, sub.getConsumerTag());
- entry.incrementDeliveryCount();
- }
-
- };
-
public ClientDeliveryMethod getClientDeliveryMethod()
{
return _clientDeliveryMethod;
@@ -1158,11 +1157,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private class MessageDeliveryAction implements ServerTransaction.Action
{
private IncomingMessage _incommingMessage;
- private ArrayList<? extends BaseQueue> _destinationQueues;
+ private List<? extends BaseQueue> _destinationQueues;
public MessageDeliveryAction(IncomingMessage currentMessage,
- ArrayList<? extends BaseQueue> destinationQueues,
- boolean transactional)
+ List<? extends BaseQueue> destinationQueues)
{
_incommingMessage = currentMessage;
_destinationQueues = destinationQueues;
@@ -1177,8 +1175,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
MessageReference ref = amqMessage.newReference();
- for(final BaseQueue queue : _destinationQueues)
+ for(int i = 0; i < _destinationQueues.size(); i++)
{
+ BaseQueue queue = _destinationQueues.get(i);
+
BaseQueue.PostEnqueueAction action;
if(immediate)
@@ -1190,7 +1190,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
action = null;
}
- queue.enqueue(amqMessage, action);
+ queue.enqueue(amqMessage, isTransactional(), action);
if(queue instanceof AMQQueue)
{
@@ -1198,6 +1198,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
}
+
+ _incommingMessage.getStoredMessage().flushToStore();
ref.release();
}
catch (AMQException e)
@@ -1539,7 +1541,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
- final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m);
+ final List<? extends BaseQueue> destinationQueues = altExchange.route(m);
if (destinationQueues == null || destinationQueues.isEmpty())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
index 9da02e0600..9765636c25 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.AMQException;
@@ -39,13 +39,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
private final Map<Long, QueueEntry> _msgToResend;
private final boolean _requeueIfUnabletoResend;
private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
- private final TransactionLog _transactionLog;
+ private final MessageStore _transactionLog;
public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
Map<Long, QueueEntry> msgToRequeue,
Map<Long, QueueEntry> msgToResend,
boolean requeueIfUnabletoResend,
- TransactionLog txnLog)
+ MessageStore txnLog)
{
_unacknowledgedMessageMap = unacknowledgedMessageMap;
_msgToRequeue = msgToRequeue;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
index 3bad73d86d..f4b4932744 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -46,10 +46,6 @@ public interface UnacknowledgedMessageMap
void add(long deliveryTag, QueueEntry message);
- void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs);
-
- void remove(Map<Long,QueueEntry> msgs);
-
QueueEntry remove(long deliveryTag);
Collection<QueueEntry> cancelAllMessages();
@@ -67,6 +63,8 @@ public interface UnacknowledgedMessageMap
*/
Set<Long> getDeliveryTags();
+ Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple);
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index d920d97c1a..6a5d863526 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -157,6 +157,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
+ public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple)
+ {
+ Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+ collect(deliveryTag, multiple, ackedMessageMap);
+ remove(ackedMessageMap);
+ return ackedMessageMap.values();
+ }
+
private void collect(long key, Map<Long, QueueEntry> msgs)
{
synchronized (_lock)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
index 60c9a86b76..48f85d9bc9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
@@ -115,4 +115,9 @@ public class Binding
return result;
}
+ public String toString()
+ {
+ return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+"}";
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index d693c6962b..5ff90b3499 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -351,11 +351,11 @@ public abstract class AbstractExchange implements Exchange, Managable
- public final ArrayList<? extends BaseQueue> route(final InboundMessage message)
+ public final List<? extends BaseQueue> route(final InboundMessage message)
{
_receivedMessageCount.incrementAndGet();
_receivedMessageSize.addAndGet(message.getSize());
- final ArrayList<? extends BaseQueue> queues = doRoute(message);
+ final List<? extends BaseQueue> queues = doRoute(message);
if(!queues.isEmpty())
{
_routedMessageCount.incrementAndGet();
@@ -364,7 +364,7 @@ public abstract class AbstractExchange implements Exchange, Managable
return queues;
}
- protected abstract ArrayList<? extends BaseQueue> doRoute(final InboundMessage message);
+ protected abstract List<? extends BaseQueue> doRoute(final InboundMessage message);
public long getMsgReceives()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index cb0d8ecf8f..8c0a5001db 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -34,6 +34,8 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -41,8 +43,52 @@ public class DirectExchange extends AbstractExchange
{
private static final Logger _logger = Logger.getLogger(DirectExchange.class);
- private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey =
- new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>();
+ private static final class BindingSet
+ {
+ private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>();
+ private List<BaseQueue> _queues = new ArrayList<BaseQueue>();
+
+ public synchronized void addBinding(Binding binding)
+ {
+ _bindings.add(binding);
+ recalculateQueues();
+ }
+
+
+ public synchronized void removeBinding(Binding binding)
+ {
+ _bindings.remove(binding);
+ recalculateQueues();
+ }
+
+ private void recalculateQueues()
+ {
+ List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
+
+ for(Binding b : _bindings)
+ {
+ if(!queues.contains(b.getQueue()))
+ {
+ queues.add(b.getQueue());
+ }
+ }
+ _queues = queues;
+ }
+
+
+ public List<BaseQueue> getQueues()
+ {
+ return _queues;
+ }
+
+ public CopyOnWriteArraySet<Binding> getBindings()
+ {
+ return _bindings;
+ }
+ }
+
+ private final ConcurrentHashMap<String, BindingSet> _bindingsByKey =
+ new ConcurrentHashMap<String, BindingSet>();
public static final ExchangeType<DirectExchange> TYPE = new ExchangeType<DirectExchange>()
{
@@ -91,33 +137,20 @@ public class DirectExchange extends AbstractExchange
}
- public ArrayList<? extends BaseQueue> doRoute(InboundMessage payload)
+ public List<? extends BaseQueue> doRoute(InboundMessage payload)
{
final String routingKey = payload.getRoutingKey();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
+ BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
if(bindings != null)
{
- final ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(bindings.size());
-
- for(Binding binding : bindings)
- {
- queues.add(binding.getQueue());
- binding.incrementMatches();
- }
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Publishing message to queue " + queues);
- }
-
- return queues;
+ return bindings.getQueues();
}
else
{
- return new ArrayList<BaseQueue>(0);
+ return Collections.emptyList();
}
@@ -132,16 +165,10 @@ public class DirectExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+ BindingSet bindings = _bindingsByKey.get(bindingKey);
if(bindings != null)
{
- for(Binding binding : bindings)
- {
- if(binding.getQueue().equals(queue))
- {
- return true;
- }
- }
+ return bindings.getQueues().contains(queue);
}
return false;
@@ -150,22 +177,20 @@ public class DirectExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey)
{
String bindingKey = (routingKey == null) ? "" : routingKey.toString();
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
- return bindings != null && !bindings.isEmpty();
+ BindingSet bindings = _bindingsByKey.get(bindingKey);
+ return bindings != null && !bindings.getQueues().isEmpty();
}
public boolean isBound(AMQQueue queue)
{
- for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
+ for (BindingSet bindings : _bindingsByKey.values())
{
- for(Binding binding : bindings)
+ if(bindings.getQueues().contains(queue))
{
- if(binding.getQueue().equals(queue))
- {
- return true;
- }
+ return true;
}
+
}
return false;
}
@@ -184,19 +209,19 @@ public class DirectExchange extends AbstractExchange
assert queue != null;
assert routingKey != null;
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+ BindingSet bindings = _bindingsByKey.get(bindingKey);
if(bindings == null)
{
- bindings = new CopyOnWriteArraySet<Binding>();
- CopyOnWriteArraySet<Binding> newBindings;
+ bindings = new BindingSet();
+ BindingSet newBindings;
if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null)
{
bindings = newBindings;
}
}
- bindings.add(binding);
+ bindings.addBinding(binding);
}
@@ -204,10 +229,10 @@ public class DirectExchange extends AbstractExchange
{
assert binding != null;
- CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(binding.getBindingKey());
+ BindingSet bindings = _bindingsByKey.get(binding.getBindingKey());
if(bindings != null)
{
- bindings.remove(binding);
+ bindings.removeBinding(binding);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 29a3611709..29c354feae 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.configuration.ExchangeConfig;
import javax.management.JMException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
public interface Exchange extends ExchangeReferrer, ExchangeConfig
{
@@ -70,7 +71,7 @@ public interface Exchange extends ExchangeReferrer, ExchangeConfig
*
* @return list of queues to which to route the message.
*/
- ArrayList<? extends BaseQueue> route(InboundMessage message);
+ List<? extends BaseQueue> route(InboundMessage message);
/**
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index e523eb24fb..3a8a86e654 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -37,8 +37,11 @@ import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.message.InboundMessage;
import javax.management.JMException;
+import java.sql.Array;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
import java.lang.ref.WeakReference;
public class TopicExchange extends AbstractExchange
@@ -77,8 +80,6 @@ public class TopicExchange extends AbstractExchange
private static final Logger _logger = Logger.getLogger(TopicExchange.class);
-
-
private final TopicParser _parser = new TopicParser();
private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
@@ -175,7 +176,6 @@ public class TopicExchange extends AbstractExchange
_bindings.put(binding, args);
}
-
}
private JMSSelectorFilter createSelectorFilter(final FieldTable args) throws AMQInvalidArgumentException
@@ -201,14 +201,23 @@ public class TopicExchange extends AbstractExchange
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
- final AMQShortString routingKey = payload.getRoutingKey() == null
+ final AMQShortString routingKey = payload.getRoutingKeyShortString() == null
? AMQShortString.EMPTY_STRING
- : new AMQShortString(payload.getRoutingKey());
+ : payload.getRoutingKeyShortString();
+
+ final Collection<AMQQueue> matchedQueues = getMatchedQueues(payload, routingKey);
- // The copy here is unfortunate, but not too bad relevant to the amount of
- // things created and copied in getMatchedQueues
- ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>();
- queues.addAll(getMatchedQueues(payload, routingKey));
+ ArrayList<BaseQueue> queues;
+
+ if(matchedQueues.getClass() == ArrayList.class)
+ {
+ queues = (ArrayList) matchedQueues;
+ }
+ else
+ {
+ queues = new ArrayList<BaseQueue>();
+ queues.addAll(matchedQueues);
+ }
if(queues == null || queues.isEmpty())
{
@@ -325,25 +334,28 @@ public class TopicExchange extends AbstractExchange
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
- if(results.isEmpty())
+ switch(results.size())
{
- return Collections.EMPTY_SET;
- }
- else
- {
- Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
- for(TopicMatcherResult result : results)
- {
- TopicExchangeResult res = (TopicExchangeResult)result;
-
- for(Binding b : res.getBindings())
+ case 0:
+ return Collections.EMPTY_SET;
+ case 1:
+ TopicMatcherResult[] resultQueues = new TopicMatcherResult[1];
+ results.toArray(resultQueues);
+ return ((TopicExchangeResult)resultQueues[0]).processMessage(message, null);
+ default:
+ Collection<AMQQueue> queues = new HashSet<AMQQueue>();
+ for(TopicMatcherResult result : results)
{
- b.incrementMatches();
+ TopicExchangeResult res = (TopicExchangeResult)result;
+
+ for(Binding b : res.getBindings())
+ {
+ b.incrementMatches();
+ }
+
+ queues = res.processMessage(message, queues);
}
-
- queues = res.processMessage(message, queues);
- }
- return queues;
+ return queues;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
index 41dc0d749a..d8b09a7841 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
@@ -39,6 +39,7 @@ public final class TopicExchangeResult implements TopicMatcherResult
private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
+ private volatile ArrayList<AMQQueue> _unfilteredQueueList = new ArrayList<AMQQueue>(0);
public void addUnfilteredQueue(AMQQueue queue)
{
@@ -46,6 +47,9 @@ public final class TopicExchangeResult implements TopicMatcherResult
if(instances == null)
{
_unfilteredQueues.put(queue, 1);
+ ArrayList<AMQQueue> newList = new ArrayList<AMQQueue>(_unfilteredQueueList);
+ newList.add(queue);
+ _unfilteredQueueList = newList;
}
else
{
@@ -59,6 +63,10 @@ public final class TopicExchangeResult implements TopicMatcherResult
if(instances == 1)
{
_unfilteredQueues.remove(queue);
+ ArrayList<AMQQueue> newList = new ArrayList<AMQQueue>(_unfilteredQueueList);
+ newList.remove(queue);
+ _unfilteredQueueList = newList;
+
}
else
{
@@ -166,7 +174,7 @@ public final class TopicExchangeResult implements TopicMatcherResult
{
if(_filteredQueues.isEmpty())
{
- return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
+ return _unfilteredQueueList;
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
index 36076cf75b..4446536d4c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
@@ -77,7 +77,7 @@ public class TopicMatcherDFAState
}
if(nextState == null)
{
- return Collections.EMPTY_SET;
+ return Collections.EMPTY_LIST;
}
// Shortcut if we are at a looping terminal state
if((nextState == this) && (_nextStateMap.size() == 1) && _nextStateMap.containsKey(TopicWord.ANY_WORD))
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
index 2dff45c326..1c1527aa31 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
@@ -44,27 +44,9 @@ import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageReject;
-import org.apache.qpid.transport.MessageRejectCode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import org.apache.qpid.transport.*;
+
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -365,7 +347,8 @@ public class Bridge implements BridgeConfig
// TODO - deal with exchange not existing
DeliveryProperties delvProps = null;
- if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() &&
+ !delvProps.hasExpiration())
{
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
@@ -377,7 +360,7 @@ public class Bridge implements BridgeConfig
storeMessage.flushToStore();
MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)_session).getReference());
- ArrayList<? extends BaseQueue> queues = exchange.route(message);
+ List<? extends BaseQueue> queues = exchange.route(message);
@@ -391,7 +374,7 @@ public class Bridge implements BridgeConfig
{
if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
- RangeSet rejects = new RangeSet();
+ RangeSet rejects = RangeSetFactory.createRangeSet();
rejects.add(xfr.getId());
MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
ssn.invoke(reject);
@@ -428,7 +411,7 @@ public class Bridge implements BridgeConfig
}
- private void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
+ private void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
{
_transaction.enqueue(queues,message, new ServerTransaction.Action()
{
@@ -456,8 +439,7 @@ public class Bridge implements BridgeConfig
{
// NO-OP
}
- });
-
+ }, 0L);
}
public void exception(final Session session, final SessionException exception)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
index cfe5aedd61..a77ed5700a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.flow;
+import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Set;
import java.util.HashSet;
@@ -27,13 +28,16 @@ import java.util.HashSet;
public abstract class AbstractFlowCreditManager implements FlowCreditManager
{
protected final AtomicBoolean _suspended = new AtomicBoolean(false);
- private final Set<FlowCreditManagerListener> _listeners = new HashSet<FlowCreditManagerListener>();
+ private final ArrayList<FlowCreditManagerListener> _listeners = new ArrayList<FlowCreditManagerListener>();
public final void addStateListener(FlowCreditManagerListener listener)
{
synchronized(_listeners)
{
- _listeners.add(listener);
+ if(!_listeners.contains(listener))
+ {
+ _listeners.add(listener);
+ }
}
}
@@ -49,9 +53,10 @@ public abstract class AbstractFlowCreditManager implements FlowCreditManager
{
synchronized(_listeners)
{
- for(FlowCreditManagerListener listener : _listeners)
+ final int size = _listeners.size();
+ for(int i = 0; i<size; i++)
{
- listener.creditStateChanged(!suspended);
+ _listeners.get(i).creditStateChanged(!suspended);
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 765dee2878..8875f21d0b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -52,7 +52,6 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
AMQChannel channel = protocolConnection.getChannel(channelId);
-
VirtualHost vHost = protocolConnection.getVirtualHost();
if (channel == null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 9133cce6b7..32aa99534b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -65,7 +65,6 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
{
throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
}
-
session.closeChannel(channelId);
// Client requested closure so we don't wait for ok we send it
stateManager.getProtocolSession().closeChannelOk(channelId);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index 696ca8a63b..5ccaa49de8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -55,7 +55,6 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
{
throw body.getChannelNotFoundException(channelId);
}
-
channel.setSuspended(!body.getActive());
_logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index f8e4eab0b6..6eaba87b79 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -49,7 +49,6 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
if (_logger.isInfoEnabled())
{
_logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index ccd42204d9..21aea1510b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -65,7 +65,6 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MethodRegistry methodRegistry = session.getMethodRegistry();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 0cfed77f2e..693b316607 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -85,6 +85,13 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
synchronized (queueRegistry)
{
queue = queueRegistry.getQueue(queueName);
@@ -183,12 +190,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
- AMQChannel channel = protocolConnection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
//set this as the default queue on the channel:
channel.setDefaultQueue(queue);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index da52268e52..902e3ade85 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -64,15 +64,17 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
AMQQueue queue;
if (body.getQueue() == null)
{
- AMQChannel channel = protocolConnection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
//get the default queue on the channel:
queue = channel.getDefaultQueue();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index 759eec0129..6c3e11be5b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -63,17 +63,14 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
AMQChannel channel = protocolConnection.getChannel(channelId);
-
-
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
AMQQueue queue;
if(body.getQueue() == null)
{
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
-
//get the default queue on the channel:
queue = channel.getDefaultQueue();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
index f2119f7faa..3849c5af19 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
@@ -66,14 +66,15 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
final AMQQueue queue;
final AMQShortString routingKey;
- if (body.getQueue() == null)
+
+ AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
{
- AMQChannel channel = session.getChannel(channelId);
+ throw body.getChannelNotFoundException(channelId);
+ }
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
+ if (body.getQueue() == null)
+ {
queue = channel.getDefaultQueue();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index 32bf8aa17d..b284514186 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
/**
* A deliverable message.
*/
-public class AMQMessage extends AbstractServerMessageImpl
+public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -62,10 +62,6 @@ public class AMQMessage extends AbstractServerMessageImpl
private Object _sessionIdentifier;
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
- private final StoredMessage<MessageMetaData> _handle;
-
- WeakReference<AMQChannel> _channelRef;
-
public AMQMessage(StoredMessage<MessageMetaData> handle)
{
this(handle, null);
@@ -75,7 +71,7 @@ public class AMQMessage extends AbstractServerMessageImpl
{
super(handle);
- _handle = handle;
+
final MessageMetaData metaData = handle.getMetaData();
_size = metaData.getContentSize();
final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo();
@@ -84,8 +80,6 @@ public class AMQMessage extends AbstractServerMessageImpl
{
_flags |= IMMEDIATE;
}
-
- _channelRef = channelRef;
}
public void setExpiration(final long expiration)
@@ -97,7 +91,7 @@ public class AMQMessage extends AbstractServerMessageImpl
public MessageMetaData getMessageMetaData()
{
- return _handle.getMetaData();
+ return getStoredMessage().getMetaData();
}
public ContentHeaderBody getContentHeaderBody() throws AMQException
@@ -107,7 +101,7 @@ public class AMQMessage extends AbstractServerMessageImpl
public Long getMessageId()
{
- return _handle.getMessageNumber();
+ return getStoredMessage().getMessageNumber();
}
/**
@@ -219,9 +213,9 @@ public class AMQMessage extends AbstractServerMessageImpl
return new AMQMessageReference(this);
}
- public Long getMessageNumber()
+ public long getMessageNumber()
{
- return getMessageId();
+ return getStoredMessage().getMessageNumber();
}
@@ -248,16 +242,13 @@ public class AMQMessage extends AbstractServerMessageImpl
public int getContent(ByteBuffer buf, int offset)
{
- return _handle.getContent(offset, buf);
+ return getStoredMessage().getContent(offset, buf);
}
- public StoredMessage<MessageMetaData> getStoredMessage()
+
+ public ByteBuffer getContent(int offset, int size)
{
- return _handle;
+ return getStoredMessage().getContent(offset, size);
}
- public SessionConfig getSessionConfig()
- {
- return _channelRef == null ? null : ((SessionConfig) _channelRef.get());
- }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 80c28332c0..b1d43f0b50 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -21,19 +21,30 @@
package org.apache.qpid.server.message;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
-public abstract class AbstractServerMessageImpl implements ServerMessage
+public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaData> implements ServerMessage<T>
{
- private final AtomicInteger _referenceCount = new AtomicInteger(0);
- private final StoredMessage<?> _handle;
- public AbstractServerMessageImpl(StoredMessage<?> handle)
+ private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
+
+ private volatile int _referenceCount = 0;
+ private final StoredMessage<T> _handle;
+
+ public AbstractServerMessageImpl(StoredMessage<T> handle)
{
_handle = handle;
}
+ public StoredMessage<T> getStoredMessage()
+ {
+ return _handle;
+ }
+
public boolean incrementReference()
{
return incrementReference(1);
@@ -41,9 +52,9 @@ public abstract class AbstractServerMessageImpl implements ServerMessage
public boolean incrementReference(int count)
{
- if(_referenceCount.addAndGet(count) <= 0)
+ if(_refCountUpdater.addAndGet(this, count) <= 0)
{
- _referenceCount.addAndGet(-count);
+ _refCountUpdater.addAndGet(this, -count);
return false;
}
else
@@ -62,7 +73,7 @@ public abstract class AbstractServerMessageImpl implements ServerMessage
*/
public void decrementReference()
{
- int count = _referenceCount.decrementAndGet();
+ int count = _refCountUpdater.decrementAndGet(this);
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
@@ -73,7 +84,7 @@ public abstract class AbstractServerMessageImpl implements ServerMessage
// set the reference count way below 0 so that we can detect that the message has been deleted
// this is to guard against the message being spontaneously recreated (from the mgmt console)
// by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
+ _refCountUpdater.set(this,Integer.MIN_VALUE/2);
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
@@ -99,6 +110,6 @@ public abstract class AbstractServerMessageImpl implements ServerMessage
protected int getReferenceCount()
{
- return _referenceCount.get();
+ return _referenceCount;
}
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java
index c32f80fc5b..7be91ad0ca 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java
@@ -20,8 +20,11 @@
*/
package org.apache.qpid.server.message;
+import org.apache.qpid.server.store.StoredMessage;
+
public interface EnqueableMessage
{
- Long getMessageNumber();
+ long getMessageNumber();
boolean isPersistent();
+ StoredMessage getStoredMessage();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java
index 1b3fdb1870..79d5574a91 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java
@@ -22,10 +22,12 @@ package org.apache.qpid.server.message;
import org.apache.qpid.server.queue.Filterable;
+import org.apache.qpid.framing.AMQShortString;
public interface InboundMessage extends Filterable
{
String getRoutingKey();
+ AMQShortString getRoutingKeyShortString();
AMQMessageHeader getMessageHeader();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
index 08a09c4a85..44741f57bd 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
public interface MessageContentSource
{
public int getContent(ByteBuffer buf, int offset);
+ public ByteBuffer getContent(int offset, int size);
long getSize();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
index f9863f4945..17ebb6ee07 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
@@ -30,9 +30,12 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.framing.AMQShortString;
import java.nio.ByteBuffer;
import java.lang.ref.SoftReference;
+import java.util.ArrayList;
+import java.util.List;
public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
{
@@ -42,7 +45,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
private MessageTransferHeader _messageHeader;
private long _arrivalTime;
private int _bodySize;
- private volatile SoftReference<ByteBuffer> _body;
private static final int ENCODER_SIZE = 1 << 10;
@@ -53,21 +55,16 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
public MessageMetaData_0_10(MessageTransfer xfr)
{
- this(xfr.getHeader(), xfr.getBodySize(), xfr.getBody(), System.currentTimeMillis());
+ this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis());
}
private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
{
- this(header, bodySize, null, arrivalTime);
- }
-
- private MessageMetaData_0_10(Header header, int bodySize, ByteBuffer xfrBody, long arrivalTime)
- {
_header = header;
if(_header != null)
{
- _deliveryProps = _header.get(DeliveryProperties.class);
- _messageProps = _header.get(MessageProperties.class);
+ _deliveryProps = _header.getDeliveryProperties();
+ _messageProps = _header.getMessageProperties();
}
else
{
@@ -78,21 +75,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
_arrivalTime = arrivalTime;
_bodySize = bodySize;
-
-
- if(xfrBody == null)
- {
- _body = null;
- }
- else
- {
- ByteBuffer body = ByteBuffer.allocate(_bodySize);
- body.put(xfrBody);
- body.flip();
- _body = new SoftReference<ByteBuffer>(body);
- }
-
-
}
@@ -122,16 +104,39 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
encoder.writeInt64(_arrivalTime);
encoder.writeInt32(_bodySize);
- Struct[] headers = _header == null ? new Struct[0] : _header.getStructs();
- encoder.writeInt32(headers.length);
+ int headersLength = 0;
+ if(_header.getDeliveryProperties() != null)
+ {
+ headersLength++;
+ }
+ if(_header.getMessageProperties() != null)
+ {
+ headersLength++;
+ }
+ if(_header.getNonStandardProperties() != null)
+ {
+ headersLength += _header.getNonStandardProperties().size();
+ }
+ encoder.writeInt32(headersLength);
- for(Struct header : headers)
+ if(_header.getDeliveryProperties() != null)
{
- encoder.writeStruct32(header);
-
+ encoder.writeStruct32(_header.getDeliveryProperties());
+ }
+ if(_header.getMessageProperties() != null)
+ {
+ encoder.writeStruct32(_header.getMessageProperties());
}
+ if(_header.getNonStandardProperties() != null)
+ {
+ for(Struct header : _header.getNonStandardProperties())
+ {
+ encoder.writeStruct32(header);
+ }
+
+ }
ByteBuffer buf = encoder.buffer();
return buf;
}
@@ -173,6 +178,11 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
}
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return AMQShortString.valueOf(getRoutingKey());
+ }
+
public AMQMessageHeader getMessageHeader()
{
return _messageHeader;
@@ -210,17 +220,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
return _header;
}
- public ByteBuffer getBody()
- {
- ByteBuffer body = _body == null ? null : _body.get();
- return body;
- }
-
- public void setBody(ByteBuffer body)
- {
- _body = new SoftReference<ByteBuffer>(body);
- }
-
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
{
public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
@@ -232,14 +231,32 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
int bodySize = decoder.readInt32();
int headerCount = decoder.readInt32();
- Struct[] headers = new Struct[headerCount];
+ DeliveryProperties deliveryProperties = null;
+ MessageProperties messageProperties = null;
+ List<Struct> otherProps = null;
for(int i = 0 ; i < headerCount; i++)
{
- headers[i] = decoder.readStruct32();
+ Struct struct = decoder.readStruct32();
+ if(struct instanceof DeliveryProperties && deliveryProperties == null)
+ {
+ deliveryProperties = (DeliveryProperties) struct;
+ }
+ else if(struct instanceof MessageProperties && messageProperties == null)
+ {
+ messageProperties = (MessageProperties) struct;
+ }
+ else
+ {
+ if(otherProps == null)
+ {
+ otherProps = new ArrayList<Struct>();
+
+ }
+ otherProps.add(struct);
+ }
}
-
- Header header = new Header(headers);
+ Header header = new Header(deliveryProperties,messageProperties,otherProps);
return new MessageMetaData_0_10(header, bodySize, arrivalTime);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
index 51841e6dd0..30934ae014 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
@@ -24,32 +24,35 @@ import org.apache.qpid.transport.*;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.framing.AMQShortString;
import java.nio.ByteBuffer;
-import java.lang.ref.WeakReference;
-public class MessageTransferMessage extends AbstractServerMessageImpl implements InboundMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage
{
- private StoredMessage<MessageMetaData_0_10> _storeMessage;
- private WeakReference<Session> _sessionRef;
- public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
+ private Object _connectionRef;
+
+ public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
{
super(storeMessage);
- _storeMessage = storeMessage;
- _sessionRef = sessionRef;
+ _connectionRef = connectionRef;
}
private MessageMetaData_0_10 getMetaData()
{
- return _storeMessage.getMetaData();
+ return getStoredMessage().getMetaData();
}
public String getRoutingKey()
{
return getMetaData().getRoutingKey();
+ }
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return AMQShortString.valueOf(getRoutingKey());
}
public AMQMessageHeader getMessageHeader()
@@ -91,9 +94,9 @@ public class MessageTransferMessage extends AbstractServerMessageImpl implements
return new TransferMessageReference(this);
}
- public Long getMessageNumber()
+ public long getMessageNumber()
{
- return _storeMessage.getMessageNumber();
+ return getStoredMessage().getMessageNumber();
}
public long getArrivalTime()
@@ -103,7 +106,13 @@ public class MessageTransferMessage extends AbstractServerMessageImpl implements
public int getContent(ByteBuffer buf, int offset)
{
- return _storeMessage.getContent(offset, buf);
+ return getStoredMessage().getContent(offset, buf);
+ }
+
+
+ public ByteBuffer getContent(int offset, int size)
+ {
+ return getStoredMessage().getContent(offset,size);
}
public Header getHeader()
@@ -113,32 +122,13 @@ public class MessageTransferMessage extends AbstractServerMessageImpl implements
public ByteBuffer getBody()
{
- ByteBuffer body = getMetaData().getBody();
- if(body == null && getSize() != 0l)
- {
- final int size = (int) getSize();
- int pos = 0;
- body = ByteBuffer.allocate(size);
-
- while(pos < size)
- {
- pos += getContent(body, pos);
- }
-
- body.flip();
- getMetaData().setBody(body.duplicate());
- }
- return body;
+ return getContent(0, (int)getSize());
}
- public Session getSession()
+ public Object getConnectionReference()
{
- return _sessionRef == null ? null : _sessionRef.get();
+ return _connectionRef;
}
- public SessionConfig getSessionConfig()
- {
- return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
- }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 2f2d39115f..2d25135326 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -23,13 +23,17 @@ package org.apache.qpid.server.message;
import java.nio.ByteBuffer;
import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
-public interface ServerMessage extends EnqueableMessage, MessageContentSource
+public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueableMessage, MessageContentSource
{
String getRoutingKey();
AMQMessageHeader getMessageHeader();
+ public StoredMessage<T> getStoredMessage();
+
boolean isPersistent();
long getSize();
@@ -40,11 +44,12 @@ public interface ServerMessage extends EnqueableMessage, MessageContentSource
MessageReference newReference();
- Long getMessageNumber();
+ long getMessageNumber();
long getArrivalTime();
public int getContent(ByteBuffer buf, int offset);
- SessionConfig getSessionConfig();
+ public ByteBuffer getContent(int offset, int size);
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
index aded3f3d2a..483bca894e 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
@@ -40,8 +40,8 @@ public class HeaderPropertiesConverter
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
Header header = messageTransferMessage.getHeader();
- DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
- MessageProperties messageProps = header.get(MessageProperties.class);
+ DeliveryProperties deliveryProps = header.getDeliveryProperties();
+ MessageProperties messageProps = header.getMessageProperties();
if(deliveryProps != null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 3970e5a2d4..efd904f6aa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -1,420 +1,420 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
- * Supported AMQP versions:
- * 8-0
- */
-package org.apache.qpid.server.output.amqp0_8;
-
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
-
- private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
- writeMessageDelivery(entry, channelId, deliverBody);
- }
-
-
- private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
- throws AMQException
- {
- if(entry.getMessage() instanceof AMQMessage)
- {
- return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
- }
- else
- {
- final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
- return chb;
- }
- }
-
-
- private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
- throws AMQException
- {
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
- }
-
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
- throws AMQException
- {
-
-
- int bodySize = (int) message.getSize();
-
- if(bodySize == 0)
- {
- SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
-
- writeFrame(compositeBlock);
- }
- else
- {
- int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
- int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
- int writtenSize = capacity;
-
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
- CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- while(writtenSize < bodySize)
- {
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
- writtenSize += capacity;
-
- writeFrame(new AMQFrame(channelId, body));
- }
- }
- }
-
- private class MessageContentSourceBody implements AMQBody
- {
- public static final byte TYPE = 3;
- private int _length;
- private MessageContentSource _message;
- private int _offset;
-
- public MessageContentSourceBody(MessageContentSource message, int offset, int length)
- {
- _message = message;
- _offset = offset;
- _length = length;
- }
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- public int getSize()
- {
- return _length;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- byte[] data = new byte[_length];
-
- _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
-
- buffer.write(data);
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
- }
-
-
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(entry, channelId, deliver);
- }
-
-
- private AMQBody createEncodedDeliverBody(QueueEntry entry,
- final long deliveryTag,
- final AMQShortString consumerTag)
- throws AMQException
- {
-
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- final AMQBody returnBlock = new AMQBody()
- {
-
- public AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
-
-
-
- }
-
- public byte getFrameType()
- {
- return AMQMethodBody.TYPE;
- }
-
- public int getSize()
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
- }
-
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
- throws AMQException
- {
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
-
- return getOkBody;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
-
- BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
-
-
- return basicReturnBody;
- }
-
- public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
-
- AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
- writeMessageDelivery(message, header, channelId, returnFrame);
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
-
- BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-
-
- public static final class CompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final AMQBody _contentBody;
- private final int _channel;
-
-
- public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
- _contentBody = contentBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
- }
-
- public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final int _channel;
-
-
- public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output.amqp0_8;
+
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
+
+
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
+
+
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
+
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+
+
+ int bodySize = (int) message.getSize();
+
+ if(bodySize == 0)
+ {
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writeFrame(compositeBlock);
+
+ while(writtenSize < bodySize)
+ {
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
+ }
+ }
+ }
+
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
+
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
+
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
+
+
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
+ }
+
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ BasicGetOkBody getOkBody =
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
+ queueSize);
+
+ return getOkBody;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+
+ BasicReturnBody basicReturnBody =
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
+
+
+ return basicReturnBody;
+ }
+
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+ writeMessageDelivery(message, header, channelId, returnFrame);
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+ writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+ }
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index aef3483282..010afcb1a9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -1,419 +1,418 @@
-package org.apache.qpid.server.output.amqp0_9;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
- private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
- writeMessageDelivery(entry, channelId, deliverBody);
- }
-
-
- private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
- throws AMQException
- {
- if(entry.getMessage() instanceof AMQMessage)
- {
- return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
- }
- else
- {
- final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
- return chb;
- }
- }
-
-
- private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
- throws AMQException
- {
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
- }
-
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
- throws AMQException
- {
-
-
- int bodySize = (int) message.getSize();
-
- if(bodySize == 0)
- {
- SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
-
- writeFrame(compositeBlock);
- }
- else
- {
- int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
- int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
- int writtenSize = capacity;
-
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
-
- CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- while(writtenSize < bodySize)
- {
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
- writtenSize += capacity;
-
- writeFrame(new AMQFrame(channelId, body));
- }
- }
- }
-
- private class MessageContentSourceBody implements AMQBody
- {
- public static final byte TYPE = 3;
- private int _length;
- private MessageContentSource _message;
- private int _offset;
-
- public MessageContentSourceBody(MessageContentSource message, int offset, int length)
- {
- _message = message;
- _offset = offset;
- _length = length;
- }
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- public int getSize()
- {
- return _length;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- byte[] data = new byte[_length];
-
- _message.getContent(ByteBuffer.wrap(data), _offset);
-
- buffer.write(data);
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
- {
- throw new UnsupportedOperationException();
- }
- }
-
-
- private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
- }
-
-
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(entry, channelId, deliver);
- }
-
-
- private AMQBody createEncodedDeliverBody(QueueEntry entry,
- final long deliveryTag,
- final AMQShortString consumerTag)
- throws AMQException
- {
-
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- final AMQBody returnBlock = new AMQBody()
- {
-
- public AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
-
-
-
- }
-
- public byte getFrameType()
- {
- return AMQMethodBody.TYPE;
- }
-
- public int getSize()
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
- }
-
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
- throws AMQException
- {
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
-
- return getOkBody;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
-
- BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
-
-
- return basicReturnBody;
- }
-
- public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
-
- AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
- writeMessageDelivery(message, header, channelId, returnFrame);
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
-
- BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-
-
- public static final class CompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final AMQBody _contentBody;
- private final int _channel;
-
-
- public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
- _contentBody = contentBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
- }
-
- public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final int _channel;
-
-
- public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
- }
-
-}
+package org.apache.qpid.server.output.amqp0_9;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
+
+
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
+
+
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
+
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+
+
+ int bodySize = (int) message.getSize();
+
+ if(bodySize == 0)
+ {
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+
+
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writeFrame(compositeBlock);
+
+ while(writtenSize < bodySize)
+ {
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
+ }
+ }
+ }
+
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
+
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
+
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
+
+
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
+ }
+
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ BasicGetOkBody getOkBody =
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
+ queueSize);
+
+ return getOkBody;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+
+ BasicReturnBody basicReturnBody =
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
+
+
+ return basicReturnBody;
+ }
+
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+ writeMessageDelivery(message, header, channelId, returnFrame);
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+ writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+ }
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
index 10748298bc..5e2b3e4556 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
@@ -1,414 +1,425 @@
-package org.apache.qpid.server.output.amqp0_9_1;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
- private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
- writeMessageDelivery(entry, channelId, deliverBody);
- }
-
-
- private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
- throws AMQException
- {
- if(entry.getMessage() instanceof AMQMessage)
- {
- return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
- }
- else
- {
- final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
- return chb;
- }
- }
-
-
- private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
- throws AMQException
- {
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
- }
-
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
- throws AMQException
- {
-
-
- int bodySize = (int) message.getSize();
-
- if(bodySize == 0)
- {
- SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
-
- writeFrame(compositeBlock);
- }
- else
- {
- int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
- int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
- int writtenSize = capacity;
-
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
- CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- while(writtenSize < bodySize)
- {
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
- writtenSize += capacity;
-
- writeFrame(new AMQFrame(channelId, body));
- }
- }
- }
-
- private class MessageContentSourceBody implements AMQBody
- {
- public static final byte TYPE = 3;
- private int _length;
- private MessageContentSource _message;
- private int _offset;
-
- public MessageContentSourceBody(MessageContentSource message, int offset, int length)
- {
- _message = message;
- _offset = offset;
- _length = length;
- }
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- public int getSize()
- {
- return _length;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- byte[] data = new byte[_length];
-
- _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
-
- buffer.write(data);
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
- }
-
-
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(entry, channelId, deliver);
- }
-
-
- private AMQBody createEncodedDeliverBody(QueueEntry entry,
- final long deliveryTag,
- final AMQShortString consumerTag)
- throws AMQException
- {
-
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- final AMQBody returnBlock = new AMQBody()
- {
-
- public AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
-
-
-
- }
-
- public byte getFrameType()
- {
- return AMQMethodBody.TYPE;
- }
-
- public int getSize()
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
- }
-
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
- throws AMQException
- {
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
-
- return getOkBody;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
-
- BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
-
-
- return basicReturnBody;
- }
-
- public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
-
- AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
- writeMessageDelivery(message, header, channelId, returnFrame);
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
-
- BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-
-
- public static final class CompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final AMQBody _contentBody;
- private final int _channel;
-
-
- public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
- _contentBody = contentBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
- }
-
- public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final int _channel;
-
-
- public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
- }
-
+package org.apache.qpid.server.output.amqp0_9_1;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
+
+
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
+
+
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
+
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+
+
+ int bodySize = (int) message.getSize();
+
+ if(bodySize == 0)
+ {
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writeFrame(compositeBlock);
+
+ while(writtenSize < bodySize)
+ {
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
+ }
+ }
+ }
+
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ ByteBuffer buf = _message.getContent(_offset, _length);
+
+ if(buf.hasArray())
+ {
+ buffer.write(buf.array(), buf.arrayOffset()+buf.position(), buf.remaining());
+ }
+ else
+ {
+
+ byte[] data = new byte[_length];
+
+ buf.get(data);
+
+ buffer.write(data);
+ }
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
+
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
+
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
+
+
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
+ }
+
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ BasicGetOkBody getOkBody =
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
+ queueSize);
+
+ return getOkBody;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+
+ BasicReturnBody basicReturnBody =
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
+
+
+ return basicReturnBody;
+ }
+
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+ writeMessageDelivery(message, header, channelId, returnFrame);
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+ writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+ }
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 2a277848ed..b778cf4135 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -111,6 +111,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
+ private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
private AMQShortString _contextKey;
@@ -280,6 +281,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
closeProtocolSession();
}
}
+ receiveComplete();
}
catch (Exception e)
{
@@ -288,6 +290,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
}
+ private void receiveComplete()
+ {
+ for (AMQChannel channel : _channelMap.values())
+ {
+ channel.receivedComplete();
+ }
+
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -405,35 +416,51 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
}
+
+ private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
+ private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
+ private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
+
private ByteBuffer asByteBuffer(AMQDataBlock block)
{
- final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+ final int size = (int) block.getSize();
- try
- {
- block.writePayload(new DataOutputStream(new OutputStream()
- {
+ final byte[] data;
- @Override
- public void write(int b) throws IOException
- {
- buf.put((byte) b);
- }
+ if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
+ {
+ data= new byte[size];
+ }
+ else
+ {
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- buf.put(b, off, len);
- }
- }));
+ data = _reusableBytes;
+ }
+ _reusableDataOutput.setBuffer(data);
+
+ try
+ {
+ block.writePayload(_reusableDataOutput);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
- buf.flip();
+ final ByteBuffer buf;
+
+ if(size <= REUSABLE_BYTE_BUFFER_CAPACITY)
+ {
+ buf = _reusableByteBuffer;
+ buf.position(0);
+ }
+ else
+ {
+ buf = ByteBuffer.wrap(data);
+ }
+ buf.limit(_reusableDataOutput.length());
+
return buf;
}
@@ -1425,7 +1452,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_statisticsEnabled = enabled;
}
- @Override
public boolean isSessionNameUnique(byte[] name)
{
// 0-8/0-9/0-9-1 sessions don't have names
@@ -1437,9 +1463,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_deferFlush = deferFlush;
}
-
-
- @Override
public String getUserName()
{
return getAuthorizedPrincipal().getName();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index b4765d6227..143a6ae8ca 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -557,7 +557,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
List<String> list = new ArrayList<String>();
AMQMessageHeader header = msg.getMessageHeader();
- MessageProperties msgProps = msg.getHeader().get(MessageProperties.class);
+ MessageProperties msgProps = msg.getHeader().getMessageProperties();
String appID = null;
String userID = null;
@@ -619,7 +619,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
_queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
txn.commit();
}
@@ -654,7 +654,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
_queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
index 05e0efd9a6..0bd40e8f13 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
@@ -35,6 +35,7 @@ public interface BaseQueue extends TransactionLogResource
void enqueue(ServerMessage message) throws AMQException;
void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
+ void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException;
boolean isDurable();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index c4762c98c9..ab0a567114 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -100,7 +100,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
{
if(entry.acquire())
{
- ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
txn.dequeue(entry.getQueue(),entry.getMessage(),
new ServerTransaction.Action()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
index 26112d9f53..31e9725e47 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.framing.AMQShortString;
public class InboundMessageAdapter implements InboundMessage
{
@@ -44,6 +45,11 @@ public class InboundMessageAdapter implements InboundMessage
}
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return AMQShortString.valueOf(_entry.getMessage());
+ }
+
public String getRoutingKey()
{
return _entry.getMessage().getRoutingKey();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index a56f5685b8..19a7a15ad1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -63,7 +63,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
* delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
* by the message handle.
*/
- private ArrayList<? extends BaseQueue> _destinationQueues;
+ private List<? extends BaseQueue> _destinationQueues;
private long _expiration;
@@ -126,12 +126,18 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
public MessageMetaData headersReceived()
{
- _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0);
+
+ return headersReceived(System.currentTimeMillis());
+ }
+
+ public MessageMetaData headersReceived(long currentTime)
+ {
+ _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
return _messageMetaData;
}
- public ArrayList<? extends BaseQueue> getDestinationQueues()
+ public List<? extends BaseQueue> getDestinationQueues()
{
return _destinationQueues;
}
@@ -158,6 +164,11 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _messagePublishInfo.getExchange();
}
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return _messagePublishInfo.getRoutingKey();
+ }
+
public String getRoutingKey()
{
return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
@@ -209,7 +220,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return getContentHeader().bodySize;
}
- public Long getMessageNumber()
+ public long getMessageNumber()
{
return _storedMessageHandle.getMessageNumber();
}
@@ -225,7 +236,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
}
- public void enqueue(final ArrayList<? extends BaseQueue> queues)
+ public void enqueue(final List<? extends BaseQueue> queues)
{
_destinationQueues = queues;
}
@@ -288,6 +299,15 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
}
+
+ public ByteBuffer getContent(int offset, int size)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ getContent(buf,offset);
+ buf.flip();
+ return buf;
+ }
+
public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
{
_storedMessageHandle = storedMessageHandle;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 5bb5dc3462..bd349e7512 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import java.util.Collection;
@@ -424,7 +425,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if (rerouteQueues != null && rerouteQueues.size() != 0)
{
- ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
{
@@ -447,7 +448,8 @@ public abstract class QueueEntryImpl implements QueueEntry
{
}
- });
+ }, 0L);
+
txn.dequeue(currentQueue, message, new ServerTransaction.Action()
{
public void postCommit()
@@ -460,8 +462,10 @@ public abstract class QueueEntryImpl implements QueueEntry
}
});
- }
+
+ txn.commit();
}
+ }
}
public boolean isQueueDeleted()
@@ -549,4 +553,11 @@ public abstract class QueueEntryImpl implements QueueEntry
_deliveryCountUpdater.decrementAndGet(this);
}
+ public String toString()
+ {
+ return "QueueEntryImpl{" +
+ "_entryId=" + _entryId +
+ ", _state=" + _state +
+ '}';
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index 5270f9f740..a6cdb40d72 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -51,7 +51,6 @@ public class QueueRunner implements ReadWriteRunnable
private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
- private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
private final AtomicBoolean _stateChange = new AtomicBoolean();
private final AtomicLong _lastRunAgain = new AtomicLong();
@@ -65,8 +64,6 @@ public class QueueRunner implements ReadWriteRunnable
_queue = queue;
}
- private int trouble = 0;
-
public void run()
{
if(_scheduled.compareAndSet(SCHEDULED,RUNNING))
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index cc93cbc2f1..25fc91b998 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -584,8 +584,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
+ enqueue(message, false, action);
+ }
+
+ public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
+ {
- incrementTxnEnqueueStats(message);
+ if(transactional)
+ {
+ incrementTxnEnqueueStats(message);
+ }
incrementQueueCount();
incrementQueueSize(message);
@@ -733,13 +741,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private void incrementTxnEnqueueStats(final ServerMessage message)
{
- SessionConfig session = message.getSessionConfig();
-
- if(session !=null && session.isTransactional())
- {
- _msgTxnEnqueues.incrementAndGet();
- _byteTxnEnqueues.addAndGet(message.getSize());
- }
+ _msgTxnEnqueues.incrementAndGet();
+ _byteTxnEnqueues.addAndGet(message.getSize());
}
private void incrementTxnDequeueStats(QueueEntry entry)
@@ -1447,7 +1450,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
}
- });
+ }, 0L);
txn.dequeue(this, entry.getMessage(),
new ServerTransaction.Action()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index abf9e3379d..7d4748bcaa 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -32,11 +32,9 @@ import static org.apache.qpid.server.security.access.Operation.UNBIND;
import java.net.SocketAddress;
import java.security.Principal;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import javax.security.auth.Subject;
@@ -192,6 +190,15 @@ public class SecurityManager
return _logger;
}
+ private static class CachedPropertiesMap extends LinkedHashMap<String, PublishAccessCheck>
+ {
+ @Override
+ protected boolean removeEldestEntry(Entry<String, PublishAccessCheck> eldest)
+ {
+ return size() >= 200;
+ }
+ }
+
private abstract class AccessCheck
{
abstract Result allowed(SecurityPlugin plugin);
@@ -204,56 +211,61 @@ public class SecurityManager
return true;
}
- HashMap<String, SecurityPlugin> remainingPlugins = new HashMap<String, SecurityPlugin>(_globalPlugins);
+ Map<String, SecurityPlugin> remainingPlugins = _globalPlugins.isEmpty()
+ ? Collections.<String, SecurityPlugin>emptyMap()
+ : _hostPlugins.isEmpty() ? _globalPlugins : new HashMap<String, SecurityPlugin>(_globalPlugins);
- for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet())
+ if(!_hostPlugins.isEmpty())
{
- // Create set of global only plugins
- SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey());
- if (globalPlugin != null)
- {
- remainingPlugins.remove(hostEntry.getKey());
- }
-
- Result host = checker.allowed(hostEntry.getValue());
-
- if (host == Result.DENIED)
- {
- // Something vetoed the access, we're done
- return false;
- }
-
- // host allow overrides global allow, so only check global on abstain or defer
- if (host != Result.ALLOWED)
- {
- if (globalPlugin == null)
- {
- if (host == Result.DEFER)
- {
- host = hostEntry.getValue().getDefault();
- }
- if (host == Result.DENIED)
+ for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet())
+ {
+ // Create set of global only plugins
+ SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey());
+ if (globalPlugin != null)
+ {
+ remainingPlugins.remove(hostEntry.getKey());
+ }
+
+ Result host = checker.allowed(hostEntry.getValue());
+
+ if (host == Result.DENIED)
+ {
+ // Something vetoed the access, we're done
+ return false;
+ }
+
+ // host allow overrides global allow, so only check global on abstain or defer
+ if (host != Result.ALLOWED)
+ {
+ if (globalPlugin == null)
{
- return false;
+ if (host == Result.DEFER)
+ {
+ host = hostEntry.getValue().getDefault();
+ }
+ if (host == Result.DENIED)
+ {
+ return false;
+ }
}
- }
- else
- {
- Result global = checker.allowed(globalPlugin);
- if (global == Result.DEFER)
- {
- global = globalPlugin.getDefault();
- }
- if (global == Result.ABSTAIN && host == Result.DEFER)
- {
- global = hostEntry.getValue().getDefault();
- }
- if (global == Result.DENIED)
+ else
{
- return false;
+ Result global = checker.allowed(globalPlugin);
+ if (global == Result.DEFER)
+ {
+ global = globalPlugin.getDefault();
+ }
+ if (global == Result.ABSTAIN && host == Result.DEFER)
+ {
+ global = hostEntry.getValue().getDefault();
+ }
+ if (global == Result.DENIED)
+ {
+ return false;
+ }
}
- }
- }
+ }
+ }
}
for (SecurityPlugin plugin : remainingPlugins.values())
@@ -371,15 +383,33 @@ public class SecurityManager
});
}
+
+ private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _immediatePublishPropsCache
+ = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>();
+ private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _publishPropsCache
+ = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>();
+
public boolean authorisePublish(final boolean immediate, final String routingKey, final String exchangeName)
{
- return checkAllPlugins(new AccessCheck()
+ PublishAccessCheck check;
+ ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> cache =
+ immediate ? _immediatePublishPropsCache : _publishPropsCache;
+
+ ConcurrentHashMap<String, PublishAccessCheck> exchangeMap = cache.get(exchangeName);
+ if(exchangeMap == null)
{
- Result allowed(SecurityPlugin plugin)
+ cache.putIfAbsent(exchangeName, new ConcurrentHashMap<String, PublishAccessCheck>());
+ exchangeMap = cache.get(exchangeName);
+ }
+
+ check = exchangeMap.get(routingKey);
+ if(check == null)
{
- return plugin.authorise(PUBLISH, EXCHANGE, new ObjectProperties(exchangeName, routingKey, immediate));
+ check = new PublishAccessCheck(new ObjectProperties(exchangeName, routingKey, immediate));
+ exchangeMap.put(routingKey, check);
}
- });
+
+ return checkAllPlugins(check);
}
public boolean authorisePurge(final AMQQueue queue)
@@ -413,4 +443,19 @@ public class SecurityManager
return current;
}
+
+ private class PublishAccessCheck extends AccessCheck
+ {
+ private final ObjectProperties _props;
+
+ public PublishAccessCheck(ObjectProperties props)
+ {
+ _props = props;
+ }
+
+ Result allowed(SecurityPlugin plugin)
+ {
+ return plugin.authorise(PUBLISH, EXCHANGE, _props);
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
index e4bf8df340..8a52d31f97 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
@@ -18,10 +18,7 @@
*/
package org.apache.qpid.server.security.access;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import org.apache.commons.lang.StringUtils;
import org.apache.qpid.framing.AMQShortString;
@@ -35,7 +32,7 @@ import org.apache.qpid.server.queue.AMQQueue;
* {@link #equals(Object)} and {@link #hashCode()} are intended for use in maps. This is due to the wildcard matching
* described above.
*/
-public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
+public class ObjectProperties
{
/** serialVersionUID */
private static final long serialVersionUID = -1356019341374170495L;
@@ -93,7 +90,9 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
return properties;
}
}
-
+
+ private final EnumMap<Property, String> _properties = new EnumMap<Property, String>(Property.class);
+
public static List<String> getAllPropertyNames()
{
List<String> properties = new ArrayList<String>();
@@ -113,7 +112,7 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
{
super();
- putAll(copy);
+ _properties.putAll(copy._properties);
}
public ObjectProperties(String name)
@@ -231,7 +230,7 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
public List<String> getPropertyNames()
{
List<String> properties = new ArrayList<String>();
- for (Property property : keySet())
+ for (Property property : _properties.keySet())
{
properties.add(property.getName());
}
@@ -240,17 +239,22 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
public Boolean isSet(Property key)
{
- return containsKey(key) && Boolean.valueOf(get(key));
+ return _properties.containsKey(key) && Boolean.valueOf(_properties.get(key));
}
-
+
+ public String get(Property key)
+ {
+ return _properties.get(key);
+ }
+
public String getName()
{
- return get(Property.NAME);
+ return _properties.get(Property.NAME);
}
public void setName(String name)
{
- put(Property.NAME, name);
+ _properties.put(Property.NAME, name);
}
public void setName(AMQShortString name)
@@ -262,39 +266,38 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
{
return put(key, value == null ? "" : value.asString());
}
-
- @Override
+
public String put(Property key, String value)
{
- return super.put(key, value == null ? "" : value.trim());
+ return _properties.put(key, value == null ? "" : value.trim());
}
public void put(Property key, Boolean value)
{
if (value != null)
{
- super.put(key, Boolean.toString(value));
+ _properties.put(key, Boolean.toString(value));
}
}
public boolean matches(ObjectProperties properties)
{
- if (properties.keySet().isEmpty())
+ if (properties._properties.keySet().isEmpty())
{
return true;
}
- if (!keySet().containsAll(properties.keySet()))
+ if (!_properties.keySet().containsAll(properties._properties.keySet()))
{
return false;
}
- for (Map.Entry<Property,String> entry : properties.entrySet())
+ for (Map.Entry<Property,String> entry : properties._properties.entrySet())
{
Property key = entry.getKey();
String ruleValue = entry.getValue();
- String thisValue = get(key);
+ String thisValue = _properties.get(key);
if (!valueMatches(thisValue, ruleValue))
{
@@ -315,4 +318,29 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
&& thisValue.length() > ruleValue.length()
&& thisValue.startsWith(ruleValue.substring(0, ruleValue.length() - 2)));
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ObjectProperties that = (ObjectProperties) o;
+
+ if (_properties != null ? !_properties.equals(that._properties) : that._properties != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _properties != null ? _properties.hashCode() : 0;
+ }
+
+ @Override
+ public String toString()
+ {
+ return _properties.toString();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index d3f46d2e90..d90b3d02ba 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -52,6 +52,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
/**
@@ -60,7 +62,7 @@ import org.apache.qpid.server.queue.AMQQueue;
*
* TODO extract the SQL statements into a generic JDBC store
*/
-public class DerbyMessageStore implements MessageStore
+public class DerbyMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -197,12 +199,16 @@ public class DerbyMessageStore implements MessageStore
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
-
if(!_configured)
{
_logSubject = logSubject;
+ }
+
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+
+ if(!_configured)
+ {
commonConfiguration(name, storeConfiguration, logSubject);
_configured = true;
@@ -219,6 +225,11 @@ public class DerbyMessageStore implements MessageStore
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
+
+ if(!_configured)
+ {
+ _logSubject = logSubject;
+ }
CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
if(!_configured)
@@ -697,7 +708,7 @@ public class DerbyMessageStore implements MessageStore
if (results == 0)
{
- throw new RuntimeException("Message metadata not found for message id " + messageId);
+ _logger.warn("Message metadata not found for message id " + messageId);
}
if (_logger.isDebugEnabled())
@@ -1678,14 +1689,26 @@ public class DerbyMessageStore implements MessageStore
}
}
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId);
+ if(message.getStoredMessage() instanceof StoredDerbyMessage)
+ {
+ try
+ {
+ ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
+ }
+ }
+
+ DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId);
+ DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
}
@@ -1709,8 +1732,11 @@ public class DerbyMessageStore implements MessageStore
{
private final long _messageId;
+ private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
- private Connection _conn;
+ private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
+
StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
{
@@ -1721,27 +1747,19 @@ public class DerbyMessageStore implements MessageStore
StoredDerbyMessage(long messageId,
StorableMessageMetaData metaData, boolean persist)
{
- try
- {
- _messageId = messageId;
+ _messageId = messageId;
+
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
- {
- _conn = newConnection();
- storeMetaData(_conn, messageId, metaData);
- }
- }
- catch (SQLException e)
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ if(persist)
{
- throw new RuntimeException(e);
+ _metaData = metaData;
}
-
}
public StorableMessageMetaData getMetaData()
{
- StorableMessageMetaData metaData = _metaDataRef.get();
+ StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
if(metaData == null)
{
try
@@ -1765,27 +1783,62 @@ public class DerbyMessageStore implements MessageStore
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
- DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src);
+ src = src.slice();
+
+ if(_data == null)
+ {
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
+ }
+ else
+ {
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData,0,_data,0,oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
+ }
+
}
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
- return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+ }
+ else
+ {
+ return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ }
+
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ getContent(offsetInMessage, buf);
+ buf.position(0);
+ return buf;
}
- public StoreFuture flushToStore()
+ public synchronized StoreFuture flushToStore()
{
try
{
- if(_conn != null)
+ if(_metaData != null)
{
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Flushing message " + _messageId + " to store");
- }
+ Connection conn = newConnection();
+
+ store(conn);
- _conn.commit();
- _conn.close();
+ conn.commit();
+ conn.close();
}
}
catch (SQLException e)
@@ -1796,16 +1849,34 @@ public class DerbyMessageStore implements MessageStore
}
throw new RuntimeException(e);
}
- finally
+ return IMMEDIATE_FUTURE;
+ }
+
+ private synchronized void store(final Connection conn) throws SQLException
+ {
+ if(_metaData != null)
{
- _conn = null;
+ try
+ {
+ storeMetaData(conn, _messageId, _metaData);
+ DerbyMessageStore.this.addContent(conn, _messageId, 0,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+ }
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Storing message " + _messageId + " to store");
}
- return IMMEDIATE_FUTURE;
}
public void remove()
{
- flushToStore();
DerbyMessageStore.this.removeMessage(_messageId);
}
}
@@ -1839,4 +1910,5 @@ public class DerbyMessageStore implements MessageStore
}
}
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index d008d42fa0..005055dbaa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -35,10 +35,12 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+public class MemoryMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -53,11 +55,11 @@ public class MemoryMessageStore implements MessageStore
private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
{
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index e2fca2f9c7..88c95ad65e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -20,14 +20,16 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.message.EnqueableMessage;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
*
*/
-public interface MessageStore extends DurableConfigurationStore, TransactionLog
+public interface MessageStore
{
StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
{
@@ -77,4 +79,69 @@ public interface MessageStore extends DurableConfigurationStore, TransactionLog
boolean isPersistent();
+
+ public static interface Transaction
+ {
+ /**
+ * Places a message onto a specified queue, in a given transactional context.
+ *
+ *
+ *
+ * @param queue The queue to place the message on.
+ * @param message
+ * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
+ */
+ void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+ /**
+ * Extracts a message from a specified queue, in a given transactional context.
+ *
+ * @param queue The queue to place the message on.
+ * @param message The message to dequeue.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void commitTran() throws AMQStoreException;
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ StoreFuture commitTranAsync() throws AMQStoreException;
+
+ /**
+ * Abandons all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void abortTran() throws AMQStoreException;
+
+
+
+ }
+
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception;
+
+ Transaction newTransaction();
+
+
+
+ public static interface StoreFuture
+ {
+ boolean isComplete();
+
+ void waitForCompletion();
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 1f5b027b80..858a850d8c 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -26,15 +26,13 @@ import java.nio.ByteBuffer;
public class StoredMemoryMessage implements StoredMessage
{
private final long _messageNumber;
- private final ByteBuffer _content;
+ private ByteBuffer _content;
private final StorableMessageMetaData _metaData;
public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData)
{
_messageNumber = messageNumber;
_metaData = metaData;
- _content = ByteBuffer.allocate(metaData.getContentSize());
-
}
public long getMessageNumber()
@@ -44,26 +42,79 @@ public class StoredMemoryMessage implements StoredMessage
public void addContent(int offsetInMessage, ByteBuffer src)
{
- src = src.duplicate();
- ByteBuffer dst = _content.duplicate();
- dst.position(offsetInMessage);
- dst.put(src);
+ if(_content == null)
+ {
+ if(offsetInMessage == 0)
+ {
+ _content = src.slice();
+ }
+ else
+ {
+ final int contentSize = _metaData.getContentSize();
+ int size = (contentSize < offsetInMessage + src.remaining())
+ ? offsetInMessage + src.remaining()
+ : contentSize;
+ _content = ByteBuffer.allocate(size);
+ addContent(offsetInMessage, src);
+ }
+ }
+ else
+ {
+ if(_content.limit() >= offsetInMessage + src.remaining())
+ {
+ _content.position(offsetInMessage);
+ _content.put(src);
+ _content.position(0);
+ }
+ else
+ {
+ final int contentSize = _metaData.getContentSize();
+ int size = (contentSize < offsetInMessage + src.remaining())
+ ? offsetInMessage + src.remaining()
+ : contentSize;
+ ByteBuffer oldContent = _content;
+ _content = ByteBuffer.allocate(size);
+ _content.put(oldContent);
+ _content.position(0);
+ addContent(offsetInMessage, src);
+ }
+
+ }
}
public int getContent(int offset, ByteBuffer dst)
{
ByteBuffer src = _content.duplicate();
- src.position(offset);
- src = src.slice();
- if(dst.remaining() < src.limit())
+
+ int oldPosition = src.position();
+
+ src.position(oldPosition + offset);
+
+ int length = dst.remaining() < src.remaining() ? dst.remaining() : src.remaining();
+ src.limit(oldPosition + length);
+
+ dst.put(src);
+
+
+ return length;
+ }
+
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = _content.duplicate();
+
+ if(offsetInMessage != 0)
{
- src.limit(dst.remaining());
+ buf.position(offsetInMessage);
+ buf = buf.slice();
}
- dst.put(src);
- return src.limit();
+
+ buf.limit(size);
+ return buf;
}
- public TransactionLog.StoreFuture flushToStore()
+ public MessageStore.StoreFuture flushToStore()
{
return MessageStore.IMMEDIATE_FUTURE;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index 0bc45c6718..d4a0381929 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -32,7 +32,9 @@ public interface StoredMessage<M extends StorableMessageMetaData>
int getContent(int offsetInMessage, ByteBuffer dst);
- TransactionLog.StoreFuture flushToStore();
+ ByteBuffer getContent(int offsetInMessage, int size);
+
+ MessageStore.StoreFuture flushToStore();
void remove();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
index d196a91930..da7f8d18b2 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
@@ -20,72 +20,7 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.AMQStoreException;
-import org.apache.commons.configuration.Configuration;
-
public interface TransactionLog
{
- public static interface Transaction
- {
- /**
- * Places a message onto a specified queue, in a given transactional context.
- *
- * @param queue The queue to place the message on.
- * @param messageId The message to enqueue.
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException;
-
- /**
- * Extracts a message from a specified queue, in a given transactional context.
- *
- * @param queue The queue to place the message on.
- * @param messageId The message to dequeue.
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException;
-
-
- /**
- * Commits all operations performed within a given transactional context.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void commitTran() throws AMQStoreException;
-
- /**
- * Commits all operations performed within a given transactional context.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- StoreFuture commitTranAsync() throws AMQStoreException;
-
- /**
- * Abandons all operations performed within a given transactional context.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void abortTran() throws AMQStoreException;
-
-
-
- }
-
- public void configureTransactionLog(String name,
- TransactionLogRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception;
-
- Transaction newTransaction();
-
-
-
- public static interface StoreFuture
- {
- boolean isComplete();
-
- void waitForCompletion();
- }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
index 7781c52df3..802596ed1e 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.store;
public interface TransactionLogRecoveryHandler
{
- QueueEntryRecoveryHandler begin(TransactionLog log);
+ QueueEntryRecoveryHandler begin(MessageStore log);
public static interface QueueEntryRecoveryHandler
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 6c18b2e229..dea88c4776 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -62,7 +62,6 @@ import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -70,10 +69,10 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
@@ -183,6 +182,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
}
_queue = queue;
+
Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments();
_traceExclude = (String) arguments.get("qpid.trace.exclude");
_trace = (String) arguments.get("qpid.trace.id");
@@ -224,8 +224,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
if (_noLocal && entry.getMessage() instanceof MessageTransferMessage)
{
- Session messageSession= ((MessageTransferMessage)entry.getMessage()).getSession();
- if (messageSession != null && messageSession.getConnection() == _session.getConnection())
+ Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
+ if (connectionRef != null && connectionRef == _session.getReference())
{
return false;
}
@@ -377,35 +377,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
-
-
- Struct[] headers;
- if(msg.getHeader() == null)
- {
- headers = EMPTY_STRUCT_ARRAY;
- }
- else
- {
- headers = msg.getHeader().getStructs();
- }
-
- ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
- DeliveryProperties origDeliveryProps = null;
- for(Struct header : headers)
- {
- if(header instanceof DeliveryProperties)
- {
- origDeliveryProps = (DeliveryProperties) header;
- }
- else
- {
- if(header instanceof MessageProperties)
- {
- messageProps = (MessageProperties) header;
- }
- newHeaders.add(header);
- }
- }
+ DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
+ messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
deliveryProps = new DeliveryProperties();
if(origDeliveryProps != null)
@@ -440,17 +413,16 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
deliveryProps.setRedelivered(entry.isRedelivered());
- newHeaders.add(deliveryProps);
-
if(_trace != null && messageProps == null)
{
messageProps = new MessageProperties();
- newHeaders.add(messageProps);
}
- Header header = new Header(newHeaders);
+ Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
+
- xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
+ xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+ : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
}
else if(serverMsg instanceof AMQMessage)
{
@@ -463,8 +435,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
message_0_8.getContent(body, 0);
body.flip();
- Struct[] headers = new Struct[] { deliveryProps, messageProps };
-
BasicContentHeaderProperties properties =
(BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
@@ -505,8 +475,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
messageProps.setApplicationHeaders(appHeaders);
- Header header = new Header(headers);
- xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+ Header header = new Header(deliveryProps, messageProps, null);
+ xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
+ : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
}
else
{
@@ -519,8 +490,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
serverMsg.getContent(body, 0);
body.flip();
- Struct[] headers = new Struct[] { deliveryProps, messageProps };
-
deliveryProps.setExpiration(serverMsg.getExpiration());
deliveryProps.setImmediate(serverMsg.isImmediate());
@@ -567,8 +536,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
messageProps.setApplicationHeaders(appHeaders);
*/
- Header header = new Header(headers);
- xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+ Header header = new Header(deliveryProps, messageProps, null);
+ xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
+ : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
}
boolean excludeDueToFederation = false;
@@ -644,28 +614,51 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
}
- private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+ private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
{
- ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
- txn.dequeue(entry.getQueue(),entry.getMessage(),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- if(restoreCredit)
- {
- restoreCredit(entry);
- }
- entry.discard();
- }
+ _deferredMessageCredit += deferredMessageCredit;
+ _deferredSizeCredit += deferredSizeCredit;
- public void onRollback()
- {
+ }
- }
- });
+ public void flushCreditState()
+ {
+ flushCreditState(false);
+ }
+ public void flushCreditState(boolean strict)
+ {
+ if(strict || !isSuspended() || _deferredMessageCredit >= 200
+ || !(_creditManager instanceof WindowCreditManager)
+ || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+ {
+ _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+ _deferredMessageCredit = 0;
+ _deferredSizeCredit = 0l;
+ }
}
+ private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+ {
+ AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
+ dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ if (restoreCredit)
+ {
+ restoreCredit(entry);
+ }
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+
void reject(final QueueEntry entry)
{
entry.setRedelivered();
@@ -704,7 +697,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
final InboundMessage m = new InboundMessageAdapter(entry);
- final ArrayList<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
+ final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
if (destinationQueues == null || destinationQueues.isEmpty())
{
@@ -751,6 +744,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return _stateChangeLock.tryLock();
}
+
public void getSendLock()
{
_stateChangeLock.lock();
@@ -816,28 +810,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return _properties.get(key);
}
- private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
- {
- _deferredMessageCredit += deferredMessageCredit;
- _deferredSizeCredit += deferredSizeCredit;
-
- }
-
- public void flushCreditState()
- {
- flushCreditState(false);
- }
- public void flushCreditState(boolean strict)
- {
- if(strict || !isSuspended() || _deferredMessageCredit >= 200
- || !(_creditManager instanceof WindowCreditManager)
- || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
- {
- _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
- _deferredMessageCredit = 0;
- _deferredSizeCredit = 0l;
- }
- }
public FlowCreditManager_0_10 getCreditManager()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index 00f0c9f0f1..ab07ed20f6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _connectionId;
-
+ private final Object _reference = new Object();
private ServerConnectionMBean _mBean;
private VirtualHost _virtualHost;
private AtomicLong _lastIoTime = new AtomicLong();
@@ -90,6 +90,11 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
return _config.getId();
}
+ public Object getReference()
+ {
+ return _reference;
+ }
+
@Override
protected void invoke(Method method)
{
@@ -414,13 +419,11 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
return _connectionId;
}
- @Override
public boolean isSessionNameUnique(byte[] name)
{
return !super.hasSessionWithName(name);
}
- @Override
public String getUserName()
{
return _authorizedPrincipal.getName();
@@ -450,11 +453,11 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
{
for (Session ssn : getChannels())
{
- ((ServerSession)ssn).flushCreditState();
+ ((ServerSession)ssn).receivedComplete();
}
}
- @Override
+
public ManagedObject getManagedObject()
{
return _mBean;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 1ce7b806d8..8e6d33d3bc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.transport;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import static org.apache.qpid.util.Serial.gt;
-import java.lang.ref.WeakReference;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -71,6 +70,7 @@ import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
import org.slf4j.Logger;
@@ -86,6 +86,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private ConnectionConfig _connectionConfig;
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
+ private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
public static interface MessageDispositionChangeListener
{
@@ -121,8 +122,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private final WeakReference<Session> _reference;
-
ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
@@ -134,7 +133,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
- _reference = new WeakReference<Session>(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
@@ -161,40 +159,22 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
return isCommandsFull(id);
}
- public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
+ public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
{
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- _transaction.enqueue(queues,message, new ServerTransaction.Action()
- {
-
- BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
-
- public void postCommit()
- {
- MessageReference<?> ref = message.newReference();
- for(int i = 0; i < _queues.length; i++)
- {
- try
- {
- _queues[i].enqueue(message);
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- ref.release();
- }
-
- public void onRollback()
- {
- // NO-OP
- }
- });
-
- incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
+ PostEnqueueAction postTransactionAction;
+ if(isTransactional())
+ {
+ postTransactionAction = new PostEnqueueAction(queues, message) ;
+ }
+ else
+ {
+ postTransactionAction = _postEnqueueAction;
+ postTransactionAction.setState(queues, message);
+ }
+ _transaction.enqueue(queues,message, postTransactionAction, 0L);
+ incrementOutstandingTxnsIfNecessary();
+ updateTransactionalActivity();
}
@@ -252,7 +232,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public RangeSet acquire(RangeSet transfers)
{
- RangeSet acquired = new RangeSet();
+ RangeSet acquired = RangeSetFactory.createRangeSet();
if(!_messageDispositionListenerMap.isEmpty())
{
@@ -300,41 +280,56 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
{
- if(ranges != null && !_messageDispositionListenerMap.isEmpty())
+ if(ranges != null)
{
- Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
- Iterator<Range> rangeIter = ranges.iterator();
- if(rangeIter.hasNext())
+ if(ranges.size() == 1)
{
- Range range = rangeIter.next();
+ Range r = ranges.getFirst();
+ for(int i = r.getLower(); i <= r.getUpper(); i++)
+ {
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i);
+ if(changeListener != null)
+ {
+ action.performAction(changeListener);
+ }
+ }
+ }
+ else if(!_messageDispositionListenerMap.isEmpty())
+ {
+ Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
+ Iterator<Range> rangeIter = ranges.iterator();
- while(range != null && unacceptedMessages.hasNext())
+ if(rangeIter.hasNext())
{
- int next = unacceptedMessages.next();
- while(gt(next, range.getUpper()))
+ Range range = rangeIter.next();
+
+ while(range != null && unacceptedMessages.hasNext())
{
- if(rangeIter.hasNext())
+ int next = unacceptedMessages.next();
+ while(gt(next, range.getUpper()))
{
- range = rangeIter.next();
+ if(rangeIter.hasNext())
+ {
+ range = rangeIter.next();
+ }
+ else
+ {
+ range = null;
+ break;
+ }
}
- else
+ if(range != null && range.includes(next))
{
- range = null;
- break;
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
+ action.performAction(changeListener);
}
- }
- if(range != null && range.includes(next))
- {
- MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
- action.performAction(changeListener);
- }
- }
+ }
+ }
}
-
}
}
@@ -534,10 +529,10 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
_taskList.remove(task);
}
- public WeakReference<Session> getReference()
- {
- return _reference;
- }
+ public Object getReference()
+ {
+ return ((ServerConnection) getConnection()).getReference();
+ }
public MessageStore getMessageStore()
{
@@ -697,7 +692,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
}
- public void flushCreditState()
+ public void receivedComplete()
{
final Collection<Subscription_0_10> subscriptions = getSubscriptions();
for (Subscription_0_10 subscription_0_10 : subscriptions)
@@ -706,6 +701,54 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
}
+ private static class PostEnqueueAction implements ServerTransaction.Action
+ {
+
+ private List<? extends BaseQueue> _queues;
+ private ServerMessage _message;
+ private final boolean _transactional;
+
+ public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message)
+ {
+ _transactional = true;
+ setState(queues, message);
+ }
+
+ public PostEnqueueAction()
+ {
+ _transactional = false;
+ }
+
+ public void setState(List<? extends BaseQueue> queues, ServerMessage message)
+ {
+ _message = message;
+ _queues = queues;
+ }
+
+ public void postCommit()
+ {
+ MessageReference<?> ref = _message.newReference();
+ for(int i = 0; i < _queues.size(); i++)
+ {
+ try
+ {
+ _queues.get(i).enqueue(_message, _transactional, null);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ ref.release();
+ }
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
+ }
+
public int getUnacknowledgedMessageCount()
{
return _messageDispositionListenerMap.size();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index a0dca53ed0..b6e142a5fd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
@@ -55,46 +56,7 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Acquired;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.ExchangeBind;
-import org.apache.qpid.transport.ExchangeBound;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.ExchangeDeclare;
-import org.apache.qpid.transport.ExchangeDelete;
-import org.apache.qpid.transport.ExchangeQuery;
-import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExchangeUnbind;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAccept;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquire;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCancel;
-import org.apache.qpid.transport.MessageFlow;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageFlush;
-import org.apache.qpid.transport.MessageReject;
-import org.apache.qpid.transport.MessageRejectCode;
-import org.apache.qpid.transport.MessageRelease;
-import org.apache.qpid.transport.MessageResume;
-import org.apache.qpid.transport.MessageSetFlowMode;
-import org.apache.qpid.transport.MessageStop;
-import org.apache.qpid.transport.MessageSubscribe;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.QueueDeclare;
-import org.apache.qpid.transport.QueueDelete;
-import org.apache.qpid.transport.QueuePurge;
-import org.apache.qpid.transport.QueueQuery;
-import org.apache.qpid.transport.QueueQueryResult;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.TxCommit;
-import org.apache.qpid.transport.TxRollback;
-import org.apache.qpid.transport.TxSelect;
+import org.apache.qpid.transport.*;
public class ServerSessionDelegate extends SessionDelegate
{
@@ -295,7 +257,8 @@ public class ServerSessionDelegate extends SessionDelegate
final Exchange exchange = getExchangeForMessage(ssn, xfr);
DeliveryProperties delvProps = null;
- if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps
+ .hasExpiration())
{
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
@@ -312,7 +275,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
final Exchange exchangeInUse;
- ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData);
+ List<? extends BaseQueue> queues = exchange.route(messageMetaData);
if(queues.isEmpty() && exchange.getAlternateExchange() != null)
{
final Exchange alternateExchange = exchange.getAlternateExchange();
@@ -334,15 +297,16 @@ public class ServerSessionDelegate extends SessionDelegate
if(!queues.isEmpty())
{
final MessageStore store = getVirtualHost(ssn).getMessageStore();
- final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store);
+ final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
((ServerSession) ssn).enqueue(message, queues);
+ storeMessage.flushToStore();
}
else
{
if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
- RangeSet rejects = new RangeSet();
+ RangeSet rejects = RangeSetFactory.createRangeSet();
rejects.add(xfr.getId());
MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
ssn.invoke(reject);
@@ -353,11 +317,13 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
+
+
ssn.processed(xfr);
}
- private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr,
- final MessageMetaData_0_10 messageMetaData, final MessageStore store)
+ private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
+ final MessageMetaData_0_10 messageMetaData, final MessageStore store)
{
final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
ByteBuffer body = xfr.getBody();
@@ -365,7 +331,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
storeMessage.addContent(0, body);
}
- storeMessage.flushToStore();
return storeMessage;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index 36e9d78440..a67d4badd1 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.txn;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -30,7 +31,7 @@ import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
/**
* An implementation of ServerTransaction where each enqueue/dequeue
@@ -43,11 +44,11 @@ public class AutoCommitTransaction implements ServerTransaction
{
protected static final Logger _logger = Logger.getLogger(AutoCommitTransaction.class);
- private final TransactionLog _transactionLog;
+ private final MessageStore _messageStore;
- public AutoCommitTransaction(TransactionLog transactionLog)
+ public AutoCommitTransaction(MessageStore transactionLog)
{
- _transactionLog = transactionLog;
+ _messageStore = transactionLog;
}
public long getTransactionStartTime()
@@ -59,14 +60,14 @@ public class AutoCommitTransaction implements ServerTransaction
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
*/
- public void addPostTransactionAction(Action immediateAction)
+ public void addPostTransactionAction(final Action immediateAction)
{
immediateAction.postCommit();
}
public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
if(message.isPersistent() && queue.isDurable())
@@ -76,8 +77,8 @@ public class AutoCommitTransaction implements ServerTransaction
_logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
}
- txn = _transactionLog.newTransaction();
- txn.dequeueMessage(queue, message.getMessageNumber());
+ txn = _messageStore.newTransaction();
+ txn.dequeueMessage(queue, message);
txn.commitTran();
txn = null;
}
@@ -98,7 +99,7 @@ public class AutoCommitTransaction implements ServerTransaction
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
for(QueueEntry entry : queueEntries)
@@ -115,10 +116,10 @@ public class AutoCommitTransaction implements ServerTransaction
if(txn == null)
{
- txn = _transactionLog.newTransaction();
+ txn = _messageStore.newTransaction();
}
- txn.dequeueMessage(queue, message.getMessageNumber());
+ txn.dequeueMessage(queue, message);
}
}
@@ -145,7 +146,7 @@ public class AutoCommitTransaction implements ServerTransaction
public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
if(message.isPersistent() && queue.isDurable())
@@ -155,8 +156,8 @@ public class AutoCommitTransaction implements ServerTransaction
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
}
- txn = _transactionLog.newTransaction();
- txn.enqueueMessage(queue, message.getMessageNumber());
+ txn = _messageStore.newTransaction();
+ txn.enqueueMessage(queue, message);
txn.commitTran();
txn = null;
}
@@ -176,15 +177,14 @@ public class AutoCommitTransaction implements ServerTransaction
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
if(message.isPersistent())
{
- Long id = message.getMessageNumber();
for(BaseQueue queue : queues)
{
if (queue.isDurable())
@@ -195,22 +195,26 @@ public class AutoCommitTransaction implements ServerTransaction
}
if (txn == null)
{
- txn = _transactionLog.newTransaction();
+ txn = _messageStore.newTransaction();
}
- txn.enqueueMessage(queue, id);
+ txn.enqueueMessage(queue, message);
+
+
}
}
- if (txn != null)
- {
- txn.commitTran();
- txn = null;
-
- }
}
+ if (txn != null)
+ {
+ txn.commitTran();
+ txn = null;
+ }
+
postTransactionAction.postCommit();
postTransactionAction = null;
+
+
}
catch (AMQException e)
{
@@ -225,6 +229,11 @@ public class AutoCommitTransaction implements ServerTransaction
}
+ public void commit(final Runnable immediatePostTransactionAction)
+ {
+ immediatePostTransactionAction.run();
+ }
+
public void commit()
{
}
@@ -233,7 +242,7 @@ public class AutoCommitTransaction implements ServerTransaction
{
}
- private void rollbackIfNecessary(Action postTransactionAction, TransactionLog.Transaction txn)
+ private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
{
if (txn != null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 946dbd7c28..7f5b5fb8b2 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -29,11 +29,7 @@ import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,11 +46,11 @@ public class LocalTransaction implements ServerTransaction
private final List<Action> _postTransactionActions = new ArrayList<Action>();
- private volatile TransactionLog.Transaction _transaction;
- private TransactionLog _transactionLog;
+ private volatile MessageStore.Transaction _transaction;
+ private MessageStore _transactionLog;
private long _txnStartTime = 0L;
- public LocalTransaction(TransactionLog transactionLog)
+ public LocalTransaction(MessageStore transactionLog)
{
_transactionLog = transactionLog;
}
@@ -63,7 +59,7 @@ public class LocalTransaction implements ServerTransaction
{
return _transaction != null;
}
-
+
public long getTransactionStartTime()
{
return _txnStartTime;
@@ -88,7 +84,7 @@ public class LocalTransaction implements ServerTransaction
}
beginTranIfNecessary();
- _transaction.dequeueMessage(queue, message.getMessageNumber());
+ _transaction.dequeueMessage(queue, message);
}
catch(AMQException e)
@@ -118,7 +114,7 @@ public class LocalTransaction implements ServerTransaction
}
beginTranIfNecessary();
- _transaction.dequeueMessage(queue, message.getMessageNumber());
+ _transaction.dequeueMessage(queue, message);
}
}
@@ -191,7 +187,7 @@ public class LocalTransaction implements ServerTransaction
}
beginTranIfNecessary();
- _transaction.enqueueMessage(queue, message.getMessageNumber());
+ _transaction.enqueueMessage(queue, message);
}
catch (Exception e)
{
@@ -202,13 +198,13 @@ public class LocalTransaction implements ServerTransaction
}
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
_postTransactionActions.add(postTransactionAction);
if (_txnStartTime == 0L)
{
- _txnStartTime = System.currentTimeMillis();
+ _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime;
}
if(message.isPersistent())
@@ -226,7 +222,7 @@ public class LocalTransaction implements ServerTransaction
beginTranIfNecessary();
- _transaction.enqueueMessage(queue, message.getMessageNumber());
+ _transaction.enqueueMessage(queue, message);
}
}
@@ -242,6 +238,11 @@ public class LocalTransaction implements ServerTransaction
public void commit()
{
+ commit(null);
+ }
+
+ public void commit(Runnable immediateAction)
+ {
try
{
if(_transaction != null)
@@ -249,9 +250,14 @@ public class LocalTransaction implements ServerTransaction
_transaction.commitTran();
}
- for(Action action : _postTransactionActions)
+ if(immediateAction != null)
+ {
+ immediateAction.run();
+ }
+
+ for(int i = 0; i < _postTransactionActions.size(); i++)
{
- action.postCommit();
+ _postTransactionActions.get(i).postCommit();
}
}
catch (Exception e)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
index b3c6e1ac3a..64fdc0ba9a 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -42,7 +43,7 @@ import java.util.List;
*/
public interface ServerTransaction
{
- /**
+ /**
* Represents an action to be performed on transaction commit or rollback
*/
public static interface Action
@@ -91,7 +92,7 @@ public interface ServerTransaction
*
* Store operations will result only for a persistent messages on durable queues.
*/
- void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction);
+ void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime);
/**
* Commit the transaction represented by this object.
@@ -101,6 +102,8 @@ public interface ServerTransaction
*/
void commit();
+ void commit(Runnable immediatePostTransactionAction);
+
/** Rollback the transaction represented by this object.
*
* If the caller has registered one or more Actions, the onRollback() method on each will
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 04f19b79bb..24ab29444c 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -39,7 +39,6 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
{
@@ -57,8 +56,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
MessageStore getMessageStore();
- TransactionLog getTransactionLog();
-
DurableConfigurationStore getDurableConfigurationStore();
AuthenticationManager getAuthenticationManager();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 0fd31973b2..aad3547789 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -73,7 +73,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
private List<ProcessAction> _actions;
private MessageStore _store;
- private TransactionLog _transactionLog;
private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
@@ -86,7 +85,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
_virtualHost = virtualHost;
}
- public QueueRecoveryHandler begin(MessageStore store)
+ public VirtualHostConfigRecoveryHandler begin(MessageStore store)
{
_logSubject = new MessageStoreLogSubject(_virtualHost,store);
_store = store;
@@ -99,14 +98,12 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
{
try
{
- AMQShortString queueNameShortString = new AMQShortString(queueName);
-
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+ AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName);
if (q == null)
{
- q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, exclusive, _virtualHost,
- arguments);
+ q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost,
+ FieldTable.convertToMap(arguments));
_virtualHost.getQueueRegistry().registerQueue(q);
}
@@ -186,12 +183,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
//To change body of implemented methods use File | Settings | File Templates.
}
- public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log)
- {
- _transactionLog = log;
- return this;
- }
-
private static final class ProcessAction
{
private final AMQQueue _queue;
@@ -316,15 +307,15 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
else
{
_logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
- TransactionLog.Transaction txn = _transactionLog.newTransaction();
- txn.dequeueMessage(queue, messageId);
+ MessageStore.Transaction txn = _store.newTransaction();
+ txn.dequeueMessage(queue, new DummyMessage(messageId));
txn.commitTranAsync();
}
}
else
{
_logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
- TransactionLog.Transaction txn = _transactionLog.newTransaction();
+ MessageStore.Transaction txn = _store.newTransaction();
TransactionLogResource mockQueue =
new TransactionLogResource()
{
@@ -334,7 +325,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
return queueName;
}
};
- txn.dequeueMessage(mockQueue, messageId);
+ txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
txn.commitTranAsync();
}
@@ -367,4 +358,32 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
}
+ private static class DummyMessage implements EnqueableMessage
+ {
+
+
+ private final long _messageId;
+
+ public DummyMessage(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 17c65003e9..9b1c5474a3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -75,7 +75,6 @@ import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
@@ -228,7 +227,10 @@ public class VirtualHostImpl implements VirtualHost
if (store != null)
{
_messageStore = store;
- _durableConfigurationStore = store;
+ if(store instanceof DurableConfigurationStore)
+ {
+ _durableConfigurationStore = (DurableConfigurationStore) store;
+ }
}
else
{
@@ -380,6 +382,8 @@ public class VirtualHostImpl implements VirtualHost
Class clazz = Class.forName(messageStoreClass);
Object o = clazz.newInstance();
+
+
if (!(o instanceof MessageStore))
{
throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
@@ -390,10 +394,18 @@ public class VirtualHostImpl implements VirtualHost
MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore);
- messageStore.configureConfigStore(this.getName(),
- recoveryHandler,
- hostConfig.getStoreConfiguration(),
- storeLogSubject);
+
+ if(messageStore instanceof DurableConfigurationStore)
+ {
+ DurableConfigurationStore durableConfigurationStore = (DurableConfigurationStore) messageStore;
+
+ durableConfigurationStore.configureConfigStore(this.getName(),
+ recoveryHandler,
+ hostConfig.getStoreConfiguration(),
+ storeLogSubject);
+
+ _durableConfigurationStore = durableConfigurationStore;
+ }
messageStore.configureMessageStore(this.getName(),
recoveryHandler,
@@ -405,7 +417,8 @@ public class VirtualHostImpl implements VirtualHost
storeLogSubject);
_messageStore = messageStore;
- _durableConfigurationStore = messageStore;
+
+
}
private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
@@ -553,11 +566,6 @@ public class VirtualHostImpl implements VirtualHost
return _messageStore;
}
- public TransactionLog getTransactionLog()
- {
- return _messageStore;
- }
-
public DurableConfigurationStore getDurableConfigurationStore()
{
return _durableConfigurationStore;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index ea2fe90da6..24c790d799 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -72,7 +72,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
/**
* Not used in this test, just there to stub out the routing calls
*/
- private MessageStore _store = new MemoryMessageStore();
+ private MemoryMessageStore _store = new MemoryMessageStore();
BindingFactory bindingFactory = new BindingFactory(new DurableConfigurationStore.Source()
@@ -310,7 +310,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
* @throws AMQException
*/
@Override
- public void enqueue(ServerMessage msg, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage msg, boolean sync, PostEnqueueAction action) throws AMQException
{
messages.add( new HeadersExchangeTest.Message((AMQMessage) msg));
final QueueEntry queueEntry = new QueueEntry()
@@ -318,47 +318,47 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
public AMQQueue getQueue()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public AMQMessage getMessage()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public long getSize()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public boolean getDeliveredToConsumer()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean expired() throws AMQException
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean isAvailable()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean isAcquired()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean acquire()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean acquire(Subscription sub)
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean delete()
@@ -373,17 +373,17 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
public boolean acquiredBySubscription()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean isAcquiredBy(Subscription subscription)
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public void release()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public boolean releaseButRetain()
@@ -393,82 +393,82 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
public boolean immediateAndNotDelivered()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public void setRedelivered()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public AMQMessageHeader getMessageHeader()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public boolean isPersistent()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean isRedelivered()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public Subscription getDeliveredSubscription()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public void reject()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public boolean isRejectedBy(long subscriptionId)
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public void dequeue()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void dispose()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void discard()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void routeToAlternate()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public boolean isQueueDeleted()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public void addStateChangeListener(StateChangeListener listener)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public boolean removeStateChangeListener(StateChangeListener listener)
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public int compareTo(final QueueEntry o)
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public boolean isDequeued()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index 2ce43052d9..d5f8ef3d54 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -64,17 +64,17 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
ArrayList<QueueEntry> msgs = _subscription.getMessages();
try
{
- assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber());
- assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber());
- assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber());
+ assertEquals(1L, msgs.get(0).getMessage().getMessageNumber());
+ assertEquals(6L, msgs.get(1).getMessage().getMessageNumber());
+ assertEquals(8L, msgs.get(2).getMessage().getMessageNumber());
- assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber());
- assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber());
- assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber());
+ assertEquals(2L, msgs.get(3).getMessage().getMessageNumber());
+ assertEquals(5L, msgs.get(4).getMessage().getMessageNumber());
+ assertEquals(7L, msgs.get(5).getMessage().getMessageNumber());
- assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber());
- assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber());
- assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber());
+ assertEquals(3L, msgs.get(6).getMessage().getMessageNumber());
+ assertEquals(4L, msgs.get(7).getMessage().getMessageNumber());
+ assertEquals(9L, msgs.get(8).getMessage().getMessageNumber());
}
catch (AssertionFailedError afe)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 0daf79122c..f43af447ff 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -168,22 +168,22 @@ public class MockAMQQueue implements AMQQueue
public UUID getId()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public QueueConfigType getConfigType()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public ConfiguredObject getParent()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public boolean isDurable()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean isAutoDelete()
@@ -199,7 +199,7 @@ public class MockAMQQueue implements AMQQueue
public AMQShortString getOwner()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public void setVirtualHost(VirtualHost virtualhost)
@@ -219,22 +219,22 @@ public class MockAMQQueue implements AMQQueue
public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void unregisterSubscription(Subscription subscription) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public int getConsumerCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public int getActiveConsumerCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public boolean hasExclusiveSubscriber()
@@ -244,37 +244,37 @@ public class MockAMQQueue implements AMQQueue
public boolean isUnused()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean isEmpty()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public int getMessageCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public int getUndeliveredMessageCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public long getQueueDepth()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public long getReceivedMessageCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public long getOldestMessageArrivalTime()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public boolean isDeleted()
@@ -297,59 +297,58 @@ public class MockAMQQueue implements AMQQueue
}
+ public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
+ {
+ }
+
public void requeue(QueueEntry entry)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void requeue(QueueEntryImpl storeContext, Subscription subscription)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void dequeue(QueueEntry entry, Subscription sub)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public void addQueueDeleteTask(Task task)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void removeQueueDeleteTask(final Task task)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public List<QueueEntry> getMessagesOnTheQueue()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public List<Long> getMessagesOnTheQueue(int num)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public List<Long> getMessagesOnTheQueue(int num, int offest)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public QueueEntry getMessageOnTheQueue(long messageId)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
@@ -359,132 +358,123 @@ public class MockAMQQueue implements AMQQueue
public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long getMaximumMessageSize()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setMaximumMessageSize(long value)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long getMaximumMessageCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setMaximumMessageCount(long value)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long getMaximumQueueDepth()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setMaximumQueueDepth(long value)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long getMaximumMessageAge()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setMaximumMessageAge(long maximumMessageAge)
{
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean getBlockOnQueueFull()
- {
- return false;
- }
-
- public void setBlockOnQueueFull(boolean block)
- {
+
}
public long getMinimumAlertRepeatGap()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void deleteMessageFromTop()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long clearQueue()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void checkMessageStatus() throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public Set<NotificationCheck> getNotificationChecks()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public void flushSubscription(Subscription sub) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void deliverAsync(Subscription sub)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void deliverAsync()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void stop()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public boolean isExclusive()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public Exchange getAlternateExchange()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public void setAlternateExchange(Exchange exchange)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public Map<String, Object> getArguments()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public void checkCapacity(AMQChannel channel)
@@ -493,12 +483,12 @@ public class MockAMQQueue implements AMQQueue
public ManagedObject getManagedObject()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public int compareTo(AMQQueue o)
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setMinimumAlertRepeatGap(long value)
@@ -508,22 +498,22 @@ public class MockAMQQueue implements AMQQueue
public long getCapacity()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setCapacity(long capacity)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long getFlowResumeCapacity()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setFlowResumeCapacity(long flowResumeCapacity)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void configure(ConfigurationPlugin config)
@@ -533,7 +523,7 @@ public class MockAMQQueue implements AMQQueue
public ConfigurationPlugin getConfiguration()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public AuthorizationHolder getAuthorizationHolder()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
index 78ed3e9f34..75f633f2af 100755
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -97,7 +96,17 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
return src.limit();
}
- public TransactionLog.StoreFuture flushToStore()
+
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ getContent(offsetInMessage, buf);
+ buf.position(0);
+ return buf;
+ }
+
+ public MessageStore.StoreFuture flushToStore()
{
return MessageStore.IMMEDIATE_FUTURE;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
index 7a3f6f701c..cf910208e7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -164,7 +164,7 @@ public abstract class QueueEntryListTestBase extends TestCase
final QueueEntry head = getTestList().getHead();
assertNull("Head entry should not contain an actual message", head.getMessage());
assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head)
- .getMessage().getMessageNumber().longValue());
+ .getMessage().getMessageNumber());
}
/**
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 6c7094cac0..28d52f4fd1 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -649,9 +649,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
public void onRollback()
{
}
- });
-
-
+ }, 0L);
// Check that it is enqueued
AMQQueue data = _store.getMessages().get(1L);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
index f3ba6a5495..a873739ca7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
@@ -162,8 +162,8 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
while (entry != null)
{
assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted());
- assertNotNull("QueueEntry was not found in the list of remaining entries",
- remainingMessages.get(entry.getMessage().getMessageNumber().intValue()));
+ assertNotNull("QueueEntry "+entry.getMessage().getMessageNumber()+" was not found in the list of remaining entries " + remainingMessages,
+ remainingMessages.get((int)(entry.getMessage().getMessageNumber())));
count++;
entry = entry.getNextNode();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index eca845644e..34ad0e5668 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -317,7 +317,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
assertEquals("Sorted queue entry value is not as expected",
expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY"));
assertEquals("Sorted queue entry id is not as expected",
- Long.valueOf(expectedMessageId), entry.getMessage().getMessageNumber());
+ expectedMessageId, entry.getMessage().getMessageNumber());
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 1d0a9d6316..90adaa1319 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -558,7 +558,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
/**
* Delete the Store Environment path
*
- * @param configuration The configuration that contains the store environment path.
+ * @param environmentPath The configuration that contains the store environment path.
*/
private void cleanup(File environmentPath)
{
@@ -636,7 +636,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
{
//To change body of implemented methods use File | Settings | File Templates.
}
- });
+ }, 0L);
}
}
@@ -710,7 +710,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
if (queue.isDurable() && !queue.isAutoDelete())
{
- getVirtualHost().getMessageStore().createQueue(queue, queueArguments);
+ getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments);
}
}
catch (AMQException e)
@@ -754,7 +754,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
getVirtualHost().getExchangeRegistry().registerExchange(exchange);
if (durable)
{
- getVirtualHost().getMessageStore().createExchange(exchange);
+ getVirtualHost().getDurableConfigurationStore().createExchange(exchange);
}
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 5ff84557d8..44006df517 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.exchange.Exchange;
@@ -42,18 +43,11 @@ import java.nio.ByteBuffer;
*/
public class SkeletonMessageStore implements MessageStore
{
- private final AtomicLong _messageId = new AtomicLong(1);
-
- public void configure(String base, Configuration config) throws Exception
- {
- }
-
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
Configuration config,
LogSubject logSubject) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void configureMessageStore(String name,
@@ -61,7 +55,6 @@ public class SkeletonMessageStore implements MessageStore
Configuration config,
LogSubject logSubject) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void close() throws Exception
@@ -70,31 +63,28 @@ public class SkeletonMessageStore implements MessageStore
public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
- public void removeMessage(Long messageId)
- {
- }
public void createExchange(Exchange exchange) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void removeExchange(Exchange exchange) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void createQueue(AMQQueue queue) throws AMQStoreException
@@ -105,63 +95,11 @@ public class SkeletonMessageStore implements MessageStore
{
}
-
-
-
- public List<AMQQueue> createQueues() throws AMQException
- {
- return null;
- }
-
- public Long getNewMessageId()
- {
- return _messageId.getAndIncrement();
- }
-
- public void storeContentBodyChunk(
- Long messageId,
- int index,
- ContentChunk contentBody,
- boolean lastContentBody) throws AMQException
- {
-
- }
-
- public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException
- {
-
- }
-
- public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
- {
- return null;
- }
-
- public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException
- {
- return null;
- }
-
public boolean isPersistent()
{
return false;
}
- public void storeMessageHeader(Long messageNumber, ServerMessage message)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void storeContent(Long messageNumber, long offset, ByteBuffer body)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public ServerMessage getMessage(Long messageNumber)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
@@ -172,7 +110,7 @@ public class SkeletonMessageStore implements MessageStore
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public Transaction newTransaction()
@@ -180,19 +118,19 @@ public class SkeletonMessageStore implements MessageStore
return new Transaction()
{
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void commitTran() throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public StoreFuture commitTranAsync() throws AMQStoreException
@@ -213,7 +151,7 @@ public class SkeletonMessageStore implements MessageStore
public void abortTran() throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
};
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
index 4dea13d391..fa698f4cf8 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -82,6 +82,12 @@ public class TestMemoryMessageStore extends MemoryMessageStore
return _storedMessage.getContent(offsetInMessage, dst);
}
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return _storedMessage.getContent(offsetInMessage, size);
+ }
+
public StoreFuture flushToStore()
{
return _storedMessage.flushToStore();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 3593297a05..3804d0dc8e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
/**
@@ -66,14 +68,14 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
private class TestableTransaction implements Transaction
{
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- getMessages().put(messageId, (AMQQueue)queue);
+ getMessages().put(message.getMessageNumber(), (AMQQueue)queue);
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- getMessages().remove(messageId);
+ getMessages().remove(message.getMessageNumber());
}
public void commitTran() throws AMQStoreException
@@ -143,6 +145,12 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
return _storedMessage.getContent(offsetInMessage, dst);
}
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return _storedMessage.getContent(offsetInMessage, size);
+ }
+
public StoreFuture flushToStore()
{
return _storedMessage.flushToStore();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
index 9afed49922..98484db264 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.MockQueueEntry;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -44,7 +44,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
{
private ServerTransaction _transaction = null; // Class under test
- private TransactionLog _transactionLog;
+ private MessageStore _transactionLog;
private AMQQueue _queue;
private List<AMQQueue> _queues;
private Collection<QueueEntry> _queueEntries;
@@ -137,7 +137,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
_message = createTestMessage(false);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action);
+ _transaction.enqueue(_queues, _message, _action, 0L);
assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -157,7 +157,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action);
+ _transaction.enqueue(_queues, _message, _action, 0L);
assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -175,7 +175,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, true, false, true});
- _transaction.enqueue(_queues, _message, _action);
+ _transaction.enqueue(_queues, _message, _action, 0L);
assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState());
@@ -198,7 +198,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
try
{
- _transaction.enqueue(_queues, _message, _action);
+ _transaction.enqueue(_queues, _message, _action, 0L);
fail("Exception not thrown");
}
catch (RuntimeException re)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
index e81fd8e3f1..484beb8fb4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.MockQueueEntry;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -51,7 +51,7 @@ public class LocalTransactionTest extends QpidTestCase
private MockAction _action1;
private MockAction _action2;
private MockStoreTransaction _storeTransaction;
- private TransactionLog _transactionLog;
+ private MessageStore _transactionLog;
@Override
@@ -140,7 +140,7 @@ public class LocalTransactionTest extends QpidTestCase
_message = createTestMessage(false);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action1);
+ _transaction.enqueue(_queues, _message, _action1, 0L);
assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -156,7 +156,7 @@ public class LocalTransactionTest extends QpidTestCase
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action1);
+ _transaction.enqueue(_queues, _message, _action1, 0L);
assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -173,7 +173,7 @@ public class LocalTransactionTest extends QpidTestCase
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, true, false, true});
- _transaction.enqueue(_queues, _message, _action1);
+ _transaction.enqueue(_queues, _message, _action1, 0L);
assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
@@ -196,7 +196,7 @@ public class LocalTransactionTest extends QpidTestCase
try
{
- _transaction.enqueue(_queues, _message, _action1);
+ _transaction.enqueue(_queues, _message, _action1, 0L);
fail("Exception not thrown");
}
catch (RuntimeException re)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 422105e410..f3d71c6dea 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.StoredMessage;
/**
* Mock Server Message allowing its persistent flag to be controlled from test.
@@ -81,6 +82,11 @@ class MockServerMessage implements ServerMessage
throw new NotImplementedException();
}
+ public StoredMessage getStoredMessage()
+ {
+ throw new NotImplementedException();
+ }
+
public long getExpiration()
{
throw new NotImplementedException();
@@ -91,12 +97,18 @@ class MockServerMessage implements ServerMessage
throw new NotImplementedException();
}
+
+ public ByteBuffer getContent(int offset, int size)
+ {
+ throw new NotImplementedException();
+ }
+
public long getArrivalTime()
{
throw new NotImplementedException();
}
- public Long getMessageNumber()
+ public long getMessageNumber()
{
return 0L;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index ff372532ac..bf8fda307a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -24,11 +24,11 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.NotImplementedException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.TransactionLog.StoreFuture;
-import org.apache.qpid.server.store.TransactionLog.Transaction;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.store.MessageStore.StoreFuture;
+import org.apache.qpid.server.store.MessageStore.Transaction;
/**
* Mock implementation of a (Store) Transaction allow its state to be observed.
@@ -61,7 +61,7 @@ class MockStoreTransaction implements Transaction
return _state;
}
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
if (_throwExceptionOnQueueOp)
{
@@ -82,7 +82,7 @@ class MockStoreTransaction implements Transaction
return _numberOfEnqueuedMessages;
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
if (_throwExceptionOnQueueOp)
{
@@ -107,10 +107,33 @@ class MockStoreTransaction implements Transaction
_state = TransactionState.ABORTED;
}
- public static TransactionLog createTestTransactionLog(final MockStoreTransaction storeTransaction)
+ public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction)
{
- return new TransactionLog()
+ return new MessageStore()
{
+ public void configureMessageStore(final String name,
+ final MessageStoreRecoveryHandler recoveryHandler,
+ final Configuration config,
+ final LogSubject logSubject) throws Exception
+ {
+ //TODO.
+ }
+
+ public void close() throws Exception
+ {
+ //TODO.
+ }
+
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
+ {
+ return null; //TODO.
+ }
+
+ public boolean isPersistent()
+ {
+ return false; //TODO.
+ }
+
public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
Configuration storeConfiguration, LogSubject logSubject) throws Exception
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 7aa314bf22..153371c8d9 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -41,7 +41,6 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
public class MockVirtualHost implements VirtualHost
{
@@ -159,10 +158,6 @@ public class MockVirtualHost implements VirtualHost
return null;
}
- public TransactionLog getTransactionLog()
- {
- return null;
- }
public void removeBrokerConnection(BrokerLink brokerLink)
{