From 67d5fb5c48b224efea5134c455719670398dd5eb Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 20 Feb 2009 14:50:01 +0000 Subject: QPID-1632 - Removal of reference counting and update to tests, TxAckTest was reduced in size as reference counting is now not modified until the transaction completes. Replaced MessageReferenceCoutingTest with PersistentMessageTest and further tests wil be created when DerbyDBMessageStore is also updated. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@746260 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 10 +- .../qpid/server/ExtractResendAndRequeue.java | 4 +- .../qpid/server/RequiredDeliveryException.java | 8 +- .../java/org/apache/qpid/server/ack/TxAck.java | 21 ++- .../server/ack/UnacknowledgedMessageMapImpl.java | 6 +- .../server/handler/BasicRejectMethodHandler.java | 22 ++-- .../org/apache/qpid/server/queue/AMQMessage.java | 9 -- .../org/apache/qpid/server/queue/AMQQueue.java | 7 - .../apache/qpid/server/queue/IncomingMessage.java | 12 +- .../qpid/server/queue/PersistentAMQMessage.java | 7 - .../org/apache/qpid/server/queue/QueueEntry.java | 26 ++-- .../apache/qpid/server/queue/QueueEntryImpl.java | 27 +--- .../apache/qpid/server/queue/SimpleAMQQueue.java | 44 +++++-- .../qpid/server/queue/TransientAMQMessage.java | 80 ------------ .../qpid/server/store/DerbyMessageStore.java | 3 +- .../qpid/server/store/MemoryMessageStore.java | 48 +++++-- .../qpid/server/subscription/SubscriptionImpl.java | 8 +- .../qpid/server/txn/LocalTransactionalContext.java | 8 -- .../qpid/server/txn/NonTransactionalContext.java | 28 ++-- .../java/org/apache/qpid/server/ack/TxAckTest.java | 65 +++------- .../exchange/AbstractHeadersExchangeTestBase.java | 2 +- .../java/org/apache/qpid/server/queue/AckTest.java | 17 ++- .../server/queue/MessageReferenceCountingTest.java | 77 ----------- .../apache/qpid/server/queue/MockQueueEntry.java | 5 + .../qpid/server/queue/PersistentMessageTest.java | 120 ++++++++++++++++- .../qpid/server/queue/SimpleAMQQueueTest.java | 35 +++-- .../qpid/server/store/TestMemoryMessageStore.java | 51 -------- .../qpid/server/store/TestReferenceCounting.java | 20 +-- .../qpid/server/store/TestTransactionLog.java | 31 +++++ .../server/store/TestableMemoryMessageStore.java | 143 +++++++++++++++++---- .../org/apache/qpid/server/txn/TxnBufferTest.java | 4 +- 31 files changed, 457 insertions(+), 491 deletions(-) delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 125518358b..5a01888ccf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -499,7 +499,7 @@ public class AMQChannel } else { - unacked.discard(_storeContext); + unacked.dequeueAndDelete(_storeContext); } } @@ -555,7 +555,7 @@ public class AMQChannel _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); - unacked.discard(_storeContext); + unacked.dequeueAndDelete(_storeContext); } } else @@ -712,7 +712,7 @@ public class AMQChannel { try { - message.discard(_storeContext); + message.dequeueAndDelete(_storeContext); message.setQueueDeleted(true); } @@ -831,9 +831,7 @@ public class AMQChannel { AMQMessage message = bouncedMessage.getAMQMessage(); _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); - - message.decrementReference(_storeContext); + new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java index 1723d46ef0..8d41cc58d2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java @@ -82,13 +82,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor } else { - queueEntry.discard(_storeContext); + queueEntry.dequeueAndDelete(_storeContext); _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry); } } else { - queueEntry.discard(_storeContext); + queueEntry.dequeueAndDelete(_storeContext); _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 415f1fe8be..a81b2cc2db 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -56,14 +56,10 @@ public abstract class RequiredDeliveryException extends AMQException public void setMessage(final AMQMessage payload) { - - // Increment the reference as this message is in the routing phase - // and so will have the ref decremented as routing fails. // we need to keep this message around so we can return it in the - // handler. So increment here. - payload.incrementReference(1); + // handler. + // Messages are all kept in memory now. Only queues can push messages out of memory. _amqMessage = payload; - } public AMQMessage getAMQMessage() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index 0f40e00624..918fcd8407 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.ack; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.HashMap; @@ -116,22 +115,20 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (QueueEntry msg : _unacked.values()) { - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(storeContext); - + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + msg.dequeueAndDelete(storeContext); } } public void undoPrepare() { - //decrementReference is annoyingly untransactional (due to - //in memory counter) so if we failed in prepare for full - //txn, this op will have to compensate by fixing the count - //in memory (persistent changes will be rolled back by store) - for (QueueEntry msg : _unacked.values()) - { - msg.getMessage().incrementReference(1); - } + //As this is transaction the above dequeueAndDelete will only request the message be dequeue from the + // transactionLog. Only when the transaction succesfully completes will it perform any + // update of the internal transactionLog reference counting and any resulting message data deletion. + // The success or failure of the data deletion is not important to this transaction only that the ack has been + // successfully recorded. } public void commit(StoreContext storeContext) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index efdadd4922..ac3b0b5e49 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -174,8 +174,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - unacked.getValue().discard(storeContext); + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + unacked.getValue().dequeueAndDelete(storeContext); it.remove(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index f3cab10ed7..bd70cd7776 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -65,38 +65,38 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener boolean isDeleted(); - int delete() throws AMQException; - QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; - - boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; - - void addQueueDeleteTask(final Task task); - List getMessagesOnTheQueue(); List getMessagesOnTheQueue(long fromMessageId, long toMessageId); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 5d4322c4fc..5eafd281c0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -156,8 +156,7 @@ public class IncomingMessage implements Filterable _logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues); } - try - { + // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody()); @@ -196,7 +195,6 @@ public class IncomingMessage implements Filterable { int offset; final int queueCount = _destinationQueues.size(); - _message.incrementReference(queueCount); if(queueCount == 1) { offset = 0; @@ -222,12 +220,8 @@ public class IncomingMessage implements Filterable } return _message; - } - finally - { - // Remove refence for routing process . Reference count should now == delivered queue count - if(_message != null) _message.decrementReference(_txnContext.getStoreContext()); - } + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java index ec48a2afb0..92c10b0347 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java @@ -57,13 +57,6 @@ public class PersistentAMQMessage extends TransientAMQMessage _transactionLog.storeMessageMetaData(storeContext, _messageId, mmd); } - @Override - public void removeMessage(StoreContext storeContext) throws AMQException - { - _log.info("PAMQM : removing message:" + _messageId); - _transactionLog.removeMessage(storeContext, _messageId); - } - @Override public boolean isPersistent() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 0df976a620..09600b9d28 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -49,7 +49,6 @@ public interface QueueEntry extends Comparable, Filterable, Filterable, Filterable, Filterable, Filterable, Filterable, Filterable, Filterable 0; - } - public Iterator getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) { return new BodyFrameIterator(protocolSession, channel); @@ -197,76 +192,6 @@ public class TransientAMQMessage implements AMQMessage return _messageId; } - /* Threadsafe. Increment the reference count on the message. */ - public boolean incrementReference(int count) - { - if (_referenceCount.addAndGet(count) <= 1) - { - int newcount = _referenceCount.addAndGet(-count); - _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount); - return false; - } - else - { - _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1(" - + _referenceCount.get() + ")"); - return true; - } - - } - - /** - * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the - * message store. - * - * @param storeContext - * - * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that - * failed - */ - public void decrementReference(StoreContext storeContext) throws MessageCleanupException - { - - int count = _referenceCount.decrementAndGet(); - - _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count); - - // note that the operation of decrementing the reference count and then removing the message does not - // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after - // the message has been passed to all queues. i.e. we are - // not relying on the all the increments having taken place before the delivery manager decrements. - if (count == 0) - { - // set the reference count way below 0 so that we can detect that the message has been deleted - // this is to guard against the message being spontaneously recreated (from the mgmt console) - // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE / 2); - - try - { - _log.debug("Reference Count hit 0, removing message"); - // must check if the handle is null since there may be cases where we decide to throw away a message - // and the handle has not yet been constructed - // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op - removeMessage(storeContext); - } - catch (AMQException e) - { - // to maintain consistency, we revert the count - incrementReference(1); - throw new MessageCleanupException(getMessageId(), e); - } - } - else - { - if (count < 0) - { - throw new MessageCleanupException("Reference count for message id " + debugIdentity() - + " has gone below 0."); - } - } - } - /** * Called selectors to determin if the message has already been sent * @@ -435,11 +360,6 @@ public class TransientAMQMessage implements AMQMessage return _arrivalTime; } - public void removeMessage(StoreContext storeContext) throws AMQException - { - //no-op - } - public String toString() { // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index fe81346c8c..33b3d8608e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -1357,7 +1357,8 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable if(message != null) { - message.incrementReference(1); + //todo must enqueue message to build reference table +// message.incrementReference(1); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index aa7b6e2542..cd0f0c1769 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -30,24 +30,28 @@ import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -/** A simple message store that stores the messages in a threadsafe structure in memory. +/** + * A simple message store that stores the messages in a threadsafe structure in memory. * * NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog * * This class really should have no storage unless we want to do inMemory Recovery. - * */ public class MemoryMessageStore implements TransactionLog, RoutingTable { @@ -63,6 +67,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable private final AtomicLong _messageId = new AtomicLong(1); private AtomicBoolean _closed = new AtomicBoolean(false); + protected final Map> _messageEnqueueMap = new HashMap>(); public void configure() { @@ -112,6 +117,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } _metaDataMap.remove(messageId); _contentBodyMap.remove(messageId); + _messageEnqueueMap.remove(messageId); } public void createExchange(Exchange exchange) throws AMQException @@ -134,7 +140,6 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } - public void createQueue(AMQQueue queue) throws AMQException { // Not requred to do anything @@ -152,12 +157,39 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { - // Not required to do anything + synchronized (_messageEnqueueMap) + { + List queues = _messageEnqueueMap.get(messageId); + if (queues == null) + { + queues = new LinkedList(); + _messageEnqueueMap.put(messageId, queues); + } + + queues.add(queue); + } } public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { - // Not required to do anything + synchronized (_messageEnqueueMap) + { + List queues = _messageEnqueueMap.get(messageId); + if (queues == null || !queues.contains(queue)) + { + throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName() + + " but it is not enqueued on that queue."); + } + else + { + queues.remove(queue); + if (queues.isEmpty()) + { + removeMessage(context,messageId); + } + } + } + } public void beginTran(StoreContext context) throws AMQException @@ -238,7 +270,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } private void checkNotClosed() throws MessageStoreClosedException - { + { if (_closed.get()) { throw new MessageStoreClosedException(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 1c58e644e9..119a4b1692 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -144,7 +144,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage StoreContext storeContext = getChannel().getStoreContext(); try - { // if we do not need to wait for client acknowledgements + { + // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. // By doing this _before_ the send we ensure that it @@ -153,7 +154,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - entry.dequeue(storeContext); + entry.dequeueAndDelete(storeContext); synchronized (getChannel()) @@ -163,7 +164,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage sendToClient(entry, deliveryTag); } - entry.dispose(storeContext); } finally { @@ -316,7 +316,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _autoClose = false; } - + _logger.info(debugIdentity()+" Created subscription:"); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 8e63b95f0d..abfb60c5bf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -92,20 +92,12 @@ public class LocalTransactionalContext implements TransactionalContext public void process() throws AMQException { - _message.incrementReference(1); - try - { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); if(entry.immediateAndNotDelivered()) { getReturnMessages().add(new NoConsumersException(_message)); } - } - finally - { - _message.decrementReference(getStoreContext()); - } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 145d7f8b13..561f998b98 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -123,18 +123,18 @@ public class NonTransactionalContext implements TransactionalContext unacknowledgedMessageMap.size()); unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException + public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException { if (debug) { - _log.debug("Discarding message: " + message.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); } - if(message.getMessage().isPersistent()) + if(queueEntry.getMessage().isPersistent()) { beginTranIfNecessary(); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - message.discard(_storeContext); + //Message has been ack so dequeueAndDelete it. + queueEntry.dequeueAndDelete(_storeContext); return false; } @@ -157,10 +157,10 @@ public class NonTransactionalContext implements TransactionalContext } else { - QueueEntry msg; - msg = unacknowledgedMessageMap.get(deliveryTag); + QueueEntry queueEntry; + queueEntry = unacknowledgedMessageMap.get(deliveryTag); - if (msg == null) + if (queueEntry == null) { _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channel.getChannelId()); @@ -170,15 +170,17 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); } - if(msg.getMessage().isPersistent()) + if(queueEntry.getMessage().isPersistent()) { beginTranIfNecessary(); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + queueEntry.dequeueAndDelete(_storeContext); unacknowledgedMessageMap.remove(deliveryTag); @@ -186,7 +188,7 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + - msg.getMessage().getMessageId()); + queueEntry.getMessage().getMessageId()); } } if(_inTran) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 1d729a82a5..52b8b0ad19 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.queue.AMQMessage; @@ -37,9 +38,10 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.TransientAMQMessage; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.TestTransactionLog; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -81,20 +83,6 @@ public class TxAckTest extends TestCase combined.stop(); } - public void testPrepare() throws AMQException - { - individual.prepare(); - multiple.prepare(); - combined.prepare(); - } - - public void testUndoPrepare() throws AMQException - { - individual.undoPrepare(); - multiple.undoPrepare(); - combined.undoPrepare(); - } - public void testCommit() throws AMQException { individual.commit(); @@ -115,12 +103,13 @@ public class TxAckTest extends TestCase private final List _unacked; private StoreContext _storeContext = new StoreContext(); private AMQQueue _queue; + private TransactionLog _transactionLog = new TestableMemoryMessageStore(); private static final int MESSAGE_SIZE=100; Scenario(int messageCount, List acked, List unacked) throws Exception { - TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(), + TransactionalContext txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList() ); @@ -138,12 +127,15 @@ public class TxAckTest extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - AMQMessage message = new TestMessage(deliveryTag, info); + AMQMessage message = new TestMessage(deliveryTag, info, (TestTransactionLog) _transactionLog); ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; message.setPublishAndContentHeaderBody(_storeContext, info, header); + + + _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message)); } _acked = acked; @@ -165,25 +157,6 @@ public class TxAckTest extends TestCase } } - void prepare() throws AMQException - { - _op.consolidate(); - _op.prepare(_storeContext); - - assertCount(_acked, -1); - assertCount(_unacked, 0); - - } - - void undoPrepare() - { - _op.consolidate(); - _op.undoPrepare(); - - assertCount(_acked, 1); - assertCount(_unacked, 0); - } - void commit() { _op.consolidate(); @@ -232,30 +205,22 @@ public class TxAckTest extends TestCase private class TestMessage extends TransientAMQMessage { private final long _tag; - private int _count; + private TestTransactionLog _transactionLog; - TestMessage(long tag, MessagePublishInfo publishBody) + public TestMessage(long tag, MessagePublishInfo publishBody, TestTransactionLog transactionLog) throws AMQException { super(createMessage( publishBody)); _tag = tag; + _transactionLog = transactionLog; } - public boolean incrementReference(int count) - { - _count+=count; - return true; - } - - public void decrementReference(StoreContext context) - { - _count--; - } - void assertCountEquals(int expected) { - assertEquals("Wrong count for message with tag " + _tag, expected, _count); + List list = _transactionLog.getMessageReferenceMap(_messageId); + int actual = (list == null ? 0 : list.size()); + assertEquals("Wrong count for message with tag " + _tag, expected, actual); } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 40b08a2e39..78cf610f28 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -324,7 +324,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException + public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 98465eda20..9f8d5f9a99 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -30,14 +30,16 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.NullApplicationRegistry; @@ -57,7 +59,7 @@ public class AckTest extends TestCase private MockProtocolSession _protocolSession; - private TestMemoryMessageStore _messageStore; + private TestableMemoryMessageStore _messageStore; private StoreContext _storeContext = new StoreContext(); @@ -72,14 +74,15 @@ public class AckTest extends TestCase super.setUp(); ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); - _messageStore = new TestMemoryMessageStore(); + VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog()); _protocolSession = new MockProtocolSession(_messageStore); _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); _protocolSession.addChannel(_channel); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), - null); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), + true, vhost, null); } protected void tearDown() @@ -185,7 +188,7 @@ public class AckTest extends TestCase /** * Tests that in no-ack mode no messages are retained */ - public void testPersistentNoAckMode() throws AMQException + public void testPersistentNoAckMode() throws AMQException, InterruptedException { // false arg means no acks expected _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); @@ -194,7 +197,7 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); + assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0); assertTrue(_messageStore.getContentBodyMap().size() == 0); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java deleted file mode 100644 index 44e9851db7..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; - -public class MessageReferenceCountingTest extends TestCase -{ - AMQMessage _message; - - public void setUp() - { - _message = MessageFactory.getInstance().createMessage(null, false); - } - - public void testInitialState() - { - - assertTrue("New messages should have a reference", _message.isReferenced()); - } - - public void testIncrementReference() - { - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - assertTrue("Incrementing should be allowed ",_message.incrementReference(1)); - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1)); - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2)); - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - } - - public void testDecrementReference() - { - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - try - { - _message.decrementReference(null); - } - catch (MessageCleanupException e) - { - fail("Decrement should be allowed:"+e.getMessage()); - } - - assertFalse("Message should not be Referenced state", _message.isReferenced()); - - try - { - _message.decrementReference(null); - fail("Decrement should not be allowed as we should have a ref count of 0"); - } - catch (MessageCleanupException e) - { - assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0")); - } - - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 12ff91cdad..92235648ec 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -38,4 +38,9 @@ public class MockQueueEntry extends QueueEntryImpl { super(_defaultList, message); } + + public MockQueueEntry(AMQMessage message, SimpleAMQQueue queue) + { + super(new SimpleQueueEntryList(queue) ,message); + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java index fdaf2c309f..4551ae5af8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java @@ -20,18 +20,46 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; +import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; public class PersistentMessageTest extends TransientMessageTest { - private MemoryMessageStore _messageStore; + private TestableMemoryMessageStore _messageStore; + + protected SimpleAMQQueue _queue; + protected AMQShortString _q1name = new AMQShortString("q1name"); + protected AMQShortString _owner = new AMQShortString("owner"); + protected AMQShortString _routingKey = new AMQShortString("routing key"); + private TransactionalContext _messageDeliveryContext; + private static final long MESSAGE_SIZE = 0L; + private List _returnMessages = new LinkedList(); - public void setUp() + public void setUp() throws Exception { - _messageStore = new MemoryMessageStore(); - _messageStore.configure(); + _messageStore = new TestableMemoryMessageStore(); + _storeContext = new StoreContext(); + VirtualHost vhost = new VirtualHost(PersistentMessageTest.class.getName(), _messageStore); + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null); + // Create IncomingMessage and nondurable queue + _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages); + } @Override @@ -47,4 +75,86 @@ public class PersistentMessageTest extends TransientMessageTest assertTrue(_message.isPersistent()); } + /** + * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and + * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right + * contents. TransactionLog = Empty, returnMessages 1 item. + * + * @throws Exception + */ + public void testImmediateReturnNotInLog() throws Exception + { + MessagePublishInfo info = new MessagePublishInfoImpl(null, true, false, null); + IncomingMessage msg = createMessage(info); + + // Send persistent message + ArrayList qs = new ArrayList(); + qs.add(_queue); + + // equivalent to amqChannel.routeMessage() + msg.enqueue(qs); + + msg.routingComplete(_messageStore); + + // equivalent to amqChannel.deliverCurrentMessageIfComplete + msg.deliverToQueues(); + + // Check that data has been stored to disk + long messageId = msg.getMessageId(); + checkMessageMetaDataExists(messageId); + + // Check that it was not enqueued + List queueList = _messageStore.getMessageReferenceMap(messageId); + assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList); + checkMessageMetaDataRemoved(messageId); + + assertEquals("Return message count not correct", 1, _returnMessages.size()); + } + + protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException + { + IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext, + new MockProtocolSession(_messageStore), _messageStore); + + // equivalent to amqChannel.publishContenHeader + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID; + // This message has no bodies + contentHeaderBody.bodySize = MESSAGE_SIZE; + contentHeaderBody.properties = new BasicContentHeaderProperties(); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + + msg.setContentHeaderBody(contentHeaderBody); + msg.setExpiration(); + + return msg; + } + + protected void checkMessageMetaDataExists(long messageId) + { + try + { + _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId); + } + catch (AMQException amqe) + { + fail("Message MetaData does not exist for message:" + messageId); + } + } + + protected void checkMessageMetaDataRemoved(long messageId) + { + try + { + assertNull("Message MetaData still exists for message:" + messageId, + _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); + assertNull("Message still has values in the reference map:" + messageId, + _messageStore.getMessageReferenceMap(messageId)); + + } + catch (AMQException e) + { + fail("AMQE thrown whilst trying to getMessageMetaData:" + e.getMessage()); + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 665ca089da..7a97837208 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -36,10 +36,12 @@ import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestTransactionLog; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.ArrayList; import java.util.List; @@ -319,7 +321,7 @@ public class SimpleAMQQueueTest extends TestCase { // Create IncomingMessage and nondurable queue NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); - IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store); + IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.properties = new BasicContentHeaderProperties(); @@ -338,21 +340,22 @@ public class SimpleAMQQueueTest extends TestCase _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1)); // Check that it is enqueued - AMQQueue data = _store.getMessages().get(messageId); + List data = _store.getMessageReferenceMap(messageId); assertNotNull(data); // Dequeue message - ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store); message.setPublishAndContentHeaderBody(new StoreContext(), info, header); - MockQueueEntry entry = new MockQueueEntry(message); - _queue.dequeue(null, entry); + MockQueueEntry entry = new MockQueueEntry(message, _queue); + entry.getQueueEntryList().add(message); + entry.acquire(); + entry.dequeue(null); // Check that it is dequeued - data = _store.getMessages().get(messageId); + data = _store.getMessageReferenceMap(messageId); assertNull(data); } @@ -381,7 +384,7 @@ public class SimpleAMQQueueTest extends TestCase public AMQMessage createMessage() throws AMQException { - AMQMessage message = new TestMessage(info); + AMQMessage message = new TestMessage(info, _store); ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; @@ -397,29 +400,21 @@ public class SimpleAMQQueueTest extends TestCase public class TestMessage extends TransientAMQMessage { private final long _tag; - private int _count; + private TestTransactionLog _transactionLog; - TestMessage(MessagePublishInfo publishBody) + TestMessage(MessagePublishInfo publishBody, TestTransactionLog transactionLog) throws AMQException { super(SimpleAMQQueueTest.createMessage(publishBody)); _tag = getMessageId(); + _transactionLog = transactionLog; } - public boolean incrementReference(int count) - { - _count+=count; - return true; - } - - public void decrementReference(StoreContext context) - { - _count--; - } void assertCountEquals(int expected) { - assertEquals("Wrong count for message with tag " + _tag, expected, _count); + assertEquals("Wrong count for message with tag " + _tag, expected, + _transactionLog.getMessageReferenceMap(_messageId).size()); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java deleted file mode 100644 index 4e48435962..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.abstraction.ContentChunk; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.List; - -/** - * Adds some extra methods to the memory message store for testing purposes. - */ -public class TestMemoryMessageStore extends MemoryMessageStore -{ - public TestMemoryMessageStore() - { - _metaDataMap = new ConcurrentHashMap(); - _contentBodyMap = new ConcurrentHashMap>(); - } - - public ConcurrentMap getMessageMetaDataMap() - { - return _metaDataMap; - } - - public ConcurrentMap> getContentBodyMap() - { - return _contentBodyMap; - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index e8acfc2fda..5a4c435e59 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -34,7 +34,7 @@ import org.apache.qpid.server.queue.AMQMessage; */ public class TestReferenceCounting extends TestCase { - private TestMemoryMessageStore _store; + private TestableMemoryMessageStore _store; private StoreContext _storeContext = new StoreContext(); @@ -42,7 +42,7 @@ public class TestReferenceCounting extends TestCase protected void setUp() throws Exception { super.setUp(); - _store = new TestMemoryMessageStore(); + _store = new TestableMemoryMessageStore(); } /** @@ -54,19 +54,9 @@ public class TestReferenceCounting extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - final long messageId = _store.getNewMessageId(); - AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - message.incrementReference(1); - - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - - - assertEquals(1, _store.getMessageMetaDataMap().size()); - message.decrementReference(_storeContext); assertEquals(1, _store.getMessageMetaDataMap().size()); } @@ -84,16 +74,10 @@ public class TestReferenceCounting extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - final Long messageId = _store.getNewMessageId(); final ContentHeaderBody chb = createPersistentContentHeader(); AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - message.incrementReference(1); - - assertEquals(1, _store.getMessageMetaDataMap().size()); - message.incrementReference(1); - message.decrementReference(_storeContext); assertEquals(1, _store.getMessageMetaDataMap().size()); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java new file mode 100644 index 0000000000..bb051693c3 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.queue.AMQQueue; + +import java.util.Map; +import java.util.List; + +public interface TestTransactionLog +{ + public List getMessageReferenceMap(Long messageID); +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 9146fe88ae..882f88b8f3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -23,22 +23,28 @@ package org.apache.qpid.server.store; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.commons.configuration.Configuration; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Adds some extra methods to the memory message store for testing purposes. */ -public class TestableMemoryMessageStore extends MemoryMessageStore +public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable { MemoryMessageStore _mms = null; - private HashMap _messages = new HashMap(); public TestableMemoryMessageStore(MemoryMessageStore mms) { @@ -47,46 +53,127 @@ public class TestableMemoryMessageStore extends MemoryMessageStore public TestableMemoryMessageStore() { - _metaDataMap = new ConcurrentHashMap(); - _contentBodyMap = new ConcurrentHashMap>(); + _mms = new MemoryMessageStore(); + _mms.configure(); } public ConcurrentMap getMessageMetaDataMap() { - if (_mms != null) - { - return _mms._metaDataMap; - } - else - { - return _metaDataMap; - } + return _mms._metaDataMap; } public ConcurrentMap> getContentBodyMap() { - if (_mms != null) - { - return _mms._contentBodyMap; - } - else - { - return _contentBodyMap; - } + return _mms._contentBodyMap; } - - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + + public List getMessageReferenceMap(Long messageId) + { + return _mms._messageEnqueueMap.get(messageId); + } + + public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + { + _mms.configure(virtualHost,base,config); + } + + public void close() throws Exception + { + _mms.close(); + } + + public void createExchange(Exchange exchange) throws AMQException + { + _mms.createExchange(exchange); + } + + public void removeExchange(Exchange exchange) throws AMQException + { + _mms.removeExchange(exchange); + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + _mms.bindQueue(exchange,routingKey,queue,args); + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + _mms.unbindQueue(exchange,routingKey,queue,args); + } + + public void createQueue(AMQQueue queue) throws AMQException + { + _mms.createQueue(queue); + } + + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + { + _mms.createQueue(queue,arguments); + } + + public void removeQueue(AMQQueue queue) throws AMQException + { + _mms.removeQueue(queue); + } + + public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException + { + _mms.removeMessage(storeContext, messageId); + } + + public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + _mms.enqueueMessage(context,queue,messageId); + } + + public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + _mms.dequeueMessage(context,queue,messageId); + } + + public void beginTran(StoreContext context) throws AMQException + { + _mms.beginTran(context); + } + + public void commitTran(StoreContext context) throws AMQException + { + _mms.commitTran(context); + } + + public void abortTran(StoreContext context) throws AMQException + { + _mms.abortTran(context); + } + + public boolean inTran(StoreContext context) + { + return _mms.inTran(context); + } + + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + { + _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody); + } + + public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + { + _mms.storeMessageMetaData(context,messageId,messageMetaData); + } + + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException { - getMessages().put(messageId, queue); + return _mms.getMessageMetaData(context,messageId); } - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { - getMessages().remove(messageId); + return _mms.getContentBodyChunk(context,messageId,index); } - public HashMap getMessages() + public boolean isPersistent() { - return _messages; + return _mms.isPersistent(); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java index ca6644d141..26802b4210 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.txn; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.LinkedList; @@ -194,7 +194,7 @@ public class TxnBufferTest extends TestCase } } - class MockStore extends TestMemoryMessageStore + class MockStore extends TestableMemoryMessageStore { final Object BEGIN = "BEGIN"; final Object ABORT = "ABORT"; -- cgit v1.2.1