diff options
Diffstat (limited to 'qpid/java/broker')
30 files changed, 430 insertions, 464 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 125518358b..5a01888ccf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -499,7 +499,7 @@ public class AMQChannel } else { - unacked.discard(_storeContext); + unacked.dequeueAndDelete(_storeContext); } } @@ -555,7 +555,7 @@ public class AMQChannel _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); - unacked.discard(_storeContext); + unacked.dequeueAndDelete(_storeContext); } } else @@ -712,7 +712,7 @@ public class AMQChannel { try { - message.discard(_storeContext); + message.dequeueAndDelete(_storeContext); message.setQueueDeleted(true); } @@ -831,9 +831,7 @@ public class AMQChannel { AMQMessage message = bouncedMessage.getAMQMessage(); _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); - - message.decrementReference(_storeContext); + new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java index 1723d46ef0..8d41cc58d2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java @@ -82,13 +82,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor } else { - queueEntry.discard(_storeContext); + queueEntry.dequeueAndDelete(_storeContext); _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry); } } else { - queueEntry.discard(_storeContext); + queueEntry.dequeueAndDelete(_storeContext); _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 415f1fe8be..a81b2cc2db 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -56,14 +56,10 @@ public abstract class RequiredDeliveryException extends AMQException public void setMessage(final AMQMessage payload) { - - // Increment the reference as this message is in the routing phase - // and so will have the ref decremented as routing fails. // we need to keep this message around so we can return it in the - // handler. So increment here. - payload.incrementReference(1); + // handler. + // Messages are all kept in memory now. Only queues can push messages out of memory. _amqMessage = payload; - } public AMQMessage getAMQMessage() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index 0f40e00624..918fcd8407 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.ack; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.HashMap; @@ -116,22 +115,20 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (QueueEntry msg : _unacked.values()) { - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(storeContext); - + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + msg.dequeueAndDelete(storeContext); } } public void undoPrepare() { - //decrementReference is annoyingly untransactional (due to - //in memory counter) so if we failed in prepare for full - //txn, this op will have to compensate by fixing the count - //in memory (persistent changes will be rolled back by store) - for (QueueEntry msg : _unacked.values()) - { - msg.getMessage().incrementReference(1); - } + //As this is transaction the above dequeueAndDelete will only request the message be dequeue from the + // transactionLog. Only when the transaction succesfully completes will it perform any + // update of the internal transactionLog reference counting and any resulting message data deletion. + // The success or failure of the data deletion is not important to this transaction only that the ack has been + // successfully recorded. } public void commit(StoreContext storeContext) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index efdadd4922..ac3b0b5e49 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -174,8 +174,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - unacked.getValue().discard(storeContext); + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + unacked.getValue().dequeueAndDelete(storeContext); it.remove(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index f3cab10ed7..bd70cd7776 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -65,38 +65,38 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR long deliveryTag = body.getDeliveryTag(); - QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag); + QueueEntry queueEntry = channel.getUnacknowledgedMessageMap().get(deliveryTag); - if (message == null) + if (queueEntry == null) { _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); // throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); } else { - if (message.isQueueDeleted()) + if (queueEntry.isQueueDeleted()) { _logger.warn("Message's Queue as already been purged, unable to Reject. " + "Dropping message should use Dead Letter Queue"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); - if(message != null) + queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag); + if(queueEntry != null) { - message.discard(channel.getStoreContext()); + queueEntry.dequeueAndDelete(channel.getStoreContext()); } //sendtoDeadLetterQueue(msg) return; } - if (!message.getMessage().isReferenced()) + if (queueEntry.isDeleted()) { - _logger.warn("Message as already been purged, unable to Reject."); + _logger.warn("QueueEntry as already been deleted, unable to Reject."); return; } if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.getMessage().debugIdentity() + ": Requeue:" + body.getRequeue() + //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); @@ -105,7 +105,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR // If we haven't requested message to be resent to this consumer then reject it from ever getting it. //if (!evt.getMethod().resend) { - message.reject(); + queueEntry.reject(); } if (body.getRequeue()) @@ -115,7 +115,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR else { _logger.warn("Dropping message as requeue not required and there is no dead letter queue"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); + queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag); //sendtoDeadLetterQueue(AMQMessage message) // message.queue = channel.getDefaultDeadLetterQueue(); // channel.requeue(deliveryTag); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 1f56b2ccd2..e96d2ba874 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -114,17 +114,8 @@ public interface AMQMessage throws AMQException; - void removeMessage(StoreContext storeContext) throws AMQException; String toString(); String debugIdentity(); - - // Reference counting methods - - void decrementReference(StoreContext storeContext) throws MessageCleanupException; - - boolean incrementReference(int queueCount); - - boolean isReferenced(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0838b71c54..9fadbb0cdc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -81,25 +81,18 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> boolean isDeleted(); - int delete() throws AMQException; - QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; - - boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; - - void addQueueDeleteTask(final Task task); - List<QueueEntry> getMessagesOnTheQueue(); List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 5d4322c4fc..5eafd281c0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -156,8 +156,7 @@ public class IncomingMessage implements Filterable<RuntimeException> _logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues); } - try - { + // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody()); @@ -196,7 +195,6 @@ public class IncomingMessage implements Filterable<RuntimeException> { int offset; final int queueCount = _destinationQueues.size(); - _message.incrementReference(queueCount); if(queueCount == 1) { offset = 0; @@ -222,12 +220,8 @@ public class IncomingMessage implements Filterable<RuntimeException> } return _message; - } - finally - { - // Remove refence for routing process . Reference count should now == delivered queue count - if(_message != null) _message.decrementReference(_txnContext.getStoreContext()); - } + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java index ec48a2afb0..92c10b0347 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java @@ -58,13 +58,6 @@ public class PersistentAMQMessage extends TransientAMQMessage } @Override - public void removeMessage(StoreContext storeContext) throws AMQException - { - _log.info("PAMQM : removing message:" + _messageId); - _transactionLog.removeMessage(storeContext, _messageId); - } - - @Override public boolean isPersistent() { return true; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 0df976a620..09600b9d28 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -49,7 +49,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept public abstract State getState(); } - public final class AvailableState extends EntryState { @@ -59,7 +58,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - public final class DequeuedState extends EntryState { @@ -69,7 +67,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - public final class DeletedState extends EntryState { @@ -88,7 +85,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - public final class NonSubscriptionAcquiredState extends EntryState { public State getState() @@ -106,7 +102,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept _subscription = subscription; } - public State getState() { return State.ACQUIRED; @@ -118,16 +113,12 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - final static EntryState AVAILABLE_STATE = new AvailableState(); final static EntryState DELETED_STATE = new DeletedState(); final static EntryState DEQUEUED_STATE = new DequeuedState(); final static EntryState EXPIRED_STATE = new ExpiredState(); final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); - - - AMQQueue getQueue(); AMQMessage getMessage(); @@ -141,9 +132,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept boolean isAcquired(); boolean acquire(); + boolean acquire(Subscription sub); boolean delete(); + boolean isDeleted(); boolean acquiredBySubscription(); @@ -170,12 +163,21 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept void dequeue(final StoreContext storeContext) throws FailedDequeueException; - void dispose(final StoreContext storeContext) throws MessageCleanupException; - - void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException; + /** + * Message has been ack so dequeueAndDelete it. + * If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + * from the transaciton log + * + * @param storeContext the transactional Context in which to perform the deletion + * + * @throws FailedDequeueException + * @throws MessageCleanupException + */ + void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException; boolean isQueueDeleted(); void addStateChangeListener(StateChangeListener listener); + boolean removeStateChangeListener(StateChangeListener listener); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 3eb1636884..911ed8321b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -282,13 +282,12 @@ public class QueueEntryImpl implements QueueEntry } getQueue().dequeue(storeContext, this); - if(_stateChangeListeners != null) + + if (_stateChangeListeners != null) { - notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); + notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED); } - } - } private void notifyStateChange(final State oldState, final State newState) @@ -299,29 +298,15 @@ public class QueueEntryImpl implements QueueEntry } } - public void dispose(final StoreContext storeContext) throws MessageCleanupException - { - _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state); - if(delete()) - { - _log.info("QEI delete message:" + getMessage().getMessageId()); - getMessage().decrementReference(storeContext); - } - else - { - _log.info("QEI delete state wrong:" + getMessage().getMessageId()); - } - } - - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException + public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException { - //if the queue is null then the message is waiting to be acked, but has been removed. + //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d; if (getQueue() != null) { dequeue(storeContext); } - dispose(storeContext); + delete(); } public boolean isQueueDeleted() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a0f21033c7..501e90b4d7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -408,8 +408,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (entry.immediateAndNotDelivered()) { - dequeue(storeContext, entry); - entry.dispose(storeContext); + //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content + // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks + // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses. + entry.acquire(); + entry.dequeueAndDelete(storeContext); } else if (!(entry.isAcquired() || entry.isDeleted())) { @@ -562,6 +565,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + /** + * Only call from queue Entry + * @param storeContext + * @param entry + * @throws FailedDequeueException + */ public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { decrementQueueCount(); @@ -578,7 +587,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId()); } - //entry.dispose(storeContext); } catch (MessageCleanupException e) @@ -814,11 +822,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public void moveMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, String queueName, StoreContext storeContext) { + // The move is a two step process. First the messages are moved in the _transactionLog. + // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the + // existing queue. + // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery + // this is done as the message is recieved. + // So The final step is to enqueue the messages on the new queue. AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); TransactionLog transactionLog = getVirtualHost().getTransactionLog(); @@ -844,7 +859,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { transactionLog.beginTran(storeContext); - // Move the messages in on the transaction log. + // Move the messages in the transaction log. for (QueueEntry entry : entries) { AMQMessage message = entry.getMessage(); @@ -853,7 +868,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); } - // dequeue does not decrement the refence count + // dequeue will remove the messages from the queue entry.dequeue(storeContext); } @@ -882,10 +897,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { + // Add messages to new queue for (QueueEntry entry : entries) { toQueue.enqueue(storeContext, entry.getMessage()); - } } catch (MessageCleanupException e) @@ -918,7 +933,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (!entry.isDeleted()) { - return entry.getMessage().incrementReference(1); + return true; } } @@ -940,7 +955,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { AMQMessage message = entry.getMessage(); - if (message.isReferenced() && message.isPersistent() && toQueue.isDurable()) + if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable()) { transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); } @@ -973,7 +988,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { for (QueueEntry entry : entries) { - if (entry.getMessage().isReferenced()) + if (!entry.isDeleted()) { toQueue.enqueue(storeContext, entry.getMessage()); } @@ -1008,7 +1023,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener && !node.isDeleted() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); } } @@ -1032,7 +1047,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); noDeletes = false; } @@ -1050,7 +1065,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); count++; } @@ -1315,8 +1330,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (node.acquire()) { + // creating a new final store context per message seems wasteful. final StoreContext reapingStoreContext = new StoreContext(); - node.discard(reapingStoreContext); + node.dequeueAndDelete(reapingStoreContext); } } QueueEntry newNode = _entries.next(node); @@ -1431,7 +1447,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.expired() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java index f3d74fb01c..fa4e85a043 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java @@ -172,11 +172,6 @@ public class TransientAMQMessage implements AMQMessage _expiration = expiration; } - public boolean isReferenced() - { - return _referenceCount.get() > 0; - } - public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) { return new BodyFrameIterator(protocolSession, channel); @@ -197,76 +192,6 @@ public class TransientAMQMessage implements AMQMessage return _messageId; } - /* Threadsafe. Increment the reference count on the message. */ - public boolean incrementReference(int count) - { - if (_referenceCount.addAndGet(count) <= 1) - { - int newcount = _referenceCount.addAndGet(-count); - _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount); - return false; - } - else - { - _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1(" - + _referenceCount.get() + ")"); - return true; - } - - } - - /** - * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the - * message store. - * - * @param storeContext - * - * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that - * failed - */ - public void decrementReference(StoreContext storeContext) throws MessageCleanupException - { - - int count = _referenceCount.decrementAndGet(); - - _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count); - - // note that the operation of decrementing the reference count and then removing the message does not - // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after - // the message has been passed to all queues. i.e. we are - // not relying on the all the increments having taken place before the delivery manager decrements. - if (count == 0) - { - // set the reference count way below 0 so that we can detect that the message has been deleted - // this is to guard against the message being spontaneously recreated (from the mgmt console) - // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE / 2); - - try - { - _log.debug("Reference Count hit 0, removing message"); - // must check if the handle is null since there may be cases where we decide to throw away a message - // and the handle has not yet been constructed - // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op - removeMessage(storeContext); - } - catch (AMQException e) - { - // to maintain consistency, we revert the count - incrementReference(1); - throw new MessageCleanupException(getMessageId(), e); - } - } - else - { - if (count < 0) - { - throw new MessageCleanupException("Reference count for message id " + debugIdentity() - + " has gone below 0."); - } - } - } - /** * Called selectors to determin if the message has already been sent * @@ -435,11 +360,6 @@ public class TransientAMQMessage implements AMQMessage return _arrivalTime; } - public void removeMessage(StoreContext storeContext) throws AMQException - { - //no-op - } - public String toString() { // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index fe81346c8c..33b3d8608e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -1357,7 +1357,8 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable if(message != null) { - message.incrementReference(1); + //todo must enqueue message to build reference table +// message.incrementReference(1); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index aa7b6e2542..cd0f0c1769 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -30,24 +30,28 @@ import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -/** A simple message store that stores the messages in a threadsafe structure in memory. +/** + * A simple message store that stores the messages in a threadsafe structure in memory. * * NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog * * This class really should have no storage unless we want to do inMemory Recovery. - * */ public class MemoryMessageStore implements TransactionLog, RoutingTable { @@ -63,6 +67,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable private final AtomicLong _messageId = new AtomicLong(1); private AtomicBoolean _closed = new AtomicBoolean(false); + protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>(); public void configure() { @@ -112,6 +117,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } _metaDataMap.remove(messageId); _contentBodyMap.remove(messageId); + _messageEnqueueMap.remove(messageId); } public void createExchange(Exchange exchange) throws AMQException @@ -134,7 +140,6 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } - public void createQueue(AMQQueue queue) throws AMQException { // Not requred to do anything @@ -152,12 +157,39 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { - // Not required to do anything + synchronized (_messageEnqueueMap) + { + List<AMQQueue> queues = _messageEnqueueMap.get(messageId); + if (queues == null) + { + queues = new LinkedList<AMQQueue>(); + _messageEnqueueMap.put(messageId, queues); + } + + queues.add(queue); + } } public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { - // Not required to do anything + synchronized (_messageEnqueueMap) + { + List<AMQQueue> queues = _messageEnqueueMap.get(messageId); + if (queues == null || !queues.contains(queue)) + { + throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName() + + " but it is not enqueued on that queue."); + } + else + { + queues.remove(queue); + if (queues.isEmpty()) + { + removeMessage(context,messageId); + } + } + } + } public void beginTran(StoreContext context) throws AMQException @@ -238,7 +270,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } private void checkNotClosed() throws MessageStoreClosedException - { + { if (_closed.get()) { throw new MessageStoreClosedException(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 1c58e644e9..119a4b1692 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -144,7 +144,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage StoreContext storeContext = getChannel().getStoreContext(); try - { // if we do not need to wait for client acknowledgements + { + // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. // By doing this _before_ the send we ensure that it @@ -153,7 +154,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - entry.dequeue(storeContext); + entry.dequeueAndDelete(storeContext); synchronized (getChannel()) @@ -163,7 +164,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage sendToClient(entry, deliveryTag); } - entry.dispose(storeContext); } finally { @@ -316,7 +316,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _autoClose = false; } - + _logger.info(debugIdentity()+" Created subscription:"); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 8e63b95f0d..abfb60c5bf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -92,20 +92,12 @@ public class LocalTransactionalContext implements TransactionalContext public void process() throws AMQException { - _message.incrementReference(1); - try - { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); if(entry.immediateAndNotDelivered()) { getReturnMessages().add(new NoConsumersException(_message)); } - } - finally - { - _message.decrementReference(getStoreContext()); - } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 145d7f8b13..561f998b98 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -123,18 +123,18 @@ public class NonTransactionalContext implements TransactionalContext unacknowledgedMessageMap.size()); unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException + public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException { if (debug) { - _log.debug("Discarding message: " + message.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); } - if(message.getMessage().isPersistent()) + if(queueEntry.getMessage().isPersistent()) { beginTranIfNecessary(); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - message.discard(_storeContext); + //Message has been ack so dequeueAndDelete it. + queueEntry.dequeueAndDelete(_storeContext); return false; } @@ -157,10 +157,10 @@ public class NonTransactionalContext implements TransactionalContext } else { - QueueEntry msg; - msg = unacknowledgedMessageMap.get(deliveryTag); + QueueEntry queueEntry; + queueEntry = unacknowledgedMessageMap.get(deliveryTag); - if (msg == null) + if (queueEntry == null) { _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channel.getChannelId()); @@ -170,15 +170,17 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); } - if(msg.getMessage().isPersistent()) + if(queueEntry.getMessage().isPersistent()) { beginTranIfNecessary(); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + queueEntry.dequeueAndDelete(_storeContext); unacknowledgedMessageMap.remove(deliveryTag); @@ -186,7 +188,7 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + - msg.getMessage().getMessageId()); + queueEntry.getMessage().getMessageId()); } } if(_inTran) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 1d729a82a5..52b8b0ad19 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.queue.AMQMessage; @@ -37,9 +38,10 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.TransientAMQMessage; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.TestTransactionLog; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -81,20 +83,6 @@ public class TxAckTest extends TestCase combined.stop(); } - public void testPrepare() throws AMQException - { - individual.prepare(); - multiple.prepare(); - combined.prepare(); - } - - public void testUndoPrepare() throws AMQException - { - individual.undoPrepare(); - multiple.undoPrepare(); - combined.undoPrepare(); - } - public void testCommit() throws AMQException { individual.commit(); @@ -115,12 +103,13 @@ public class TxAckTest extends TestCase private final List<Long> _unacked; private StoreContext _storeContext = new StoreContext(); private AMQQueue _queue; + private TransactionLog _transactionLog = new TestableMemoryMessageStore(); private static final int MESSAGE_SIZE=100; Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception { - TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(), + TransactionalContext txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList<RequiredDeliveryException>() ); @@ -138,12 +127,15 @@ public class TxAckTest extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - AMQMessage message = new TestMessage(deliveryTag, info); + AMQMessage message = new TestMessage(deliveryTag, info, (TestTransactionLog) _transactionLog); ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; message.setPublishAndContentHeaderBody(_storeContext, info, header); + + + _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message)); } _acked = acked; @@ -165,25 +157,6 @@ public class TxAckTest extends TestCase } } - void prepare() throws AMQException - { - _op.consolidate(); - _op.prepare(_storeContext); - - assertCount(_acked, -1); - assertCount(_unacked, 0); - - } - - void undoPrepare() - { - _op.consolidate(); - _op.undoPrepare(); - - assertCount(_acked, 1); - assertCount(_unacked, 0); - } - void commit() { _op.consolidate(); @@ -232,30 +205,22 @@ public class TxAckTest extends TestCase private class TestMessage extends TransientAMQMessage { private final long _tag; - private int _count; + private TestTransactionLog _transactionLog; - TestMessage(long tag, MessagePublishInfo publishBody) + public TestMessage(long tag, MessagePublishInfo publishBody, TestTransactionLog transactionLog) throws AMQException { super(createMessage( publishBody)); _tag = tag; + _transactionLog = transactionLog; } - public boolean incrementReference(int count) - { - _count+=count; - return true; - } - - public void decrementReference(StoreContext context) - { - _count--; - } - void assertCountEquals(int expected) { - assertEquals("Wrong count for message with tag " + _tag, expected, _count); + List<AMQQueue> list = _transactionLog.getMessageReferenceMap(_messageId); + int actual = (list == null ? 0 : list.size()); + assertEquals("Wrong count for message with tag " + _tag, expected, actual); } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 40b08a2e39..78cf610f28 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -324,7 +324,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException + public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 98465eda20..9f8d5f9a99 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -30,14 +30,16 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.NullApplicationRegistry; @@ -57,7 +59,7 @@ public class AckTest extends TestCase private MockProtocolSession _protocolSession; - private TestMemoryMessageStore _messageStore; + private TestableMemoryMessageStore _messageStore; private StoreContext _storeContext = new StoreContext(); @@ -72,14 +74,15 @@ public class AckTest extends TestCase super.setUp(); ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); - _messageStore = new TestMemoryMessageStore(); + VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog()); _protocolSession = new MockProtocolSession(_messageStore); _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); _protocolSession.addChannel(_channel); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), - null); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), + true, vhost, null); } protected void tearDown() @@ -185,7 +188,7 @@ public class AckTest extends TestCase /** * Tests that in no-ack mode no messages are retained */ - public void testPersistentNoAckMode() throws AMQException + public void testPersistentNoAckMode() throws AMQException, InterruptedException { // false arg means no acks expected _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); @@ -194,7 +197,7 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); + assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0); assertTrue(_messageStore.getContentBodyMap().size() == 0); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java deleted file mode 100644 index 44e9851db7..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; - -public class MessageReferenceCountingTest extends TestCase -{ - AMQMessage _message; - - public void setUp() - { - _message = MessageFactory.getInstance().createMessage(null, false); - } - - public void testInitialState() - { - - assertTrue("New messages should have a reference", _message.isReferenced()); - } - - public void testIncrementReference() - { - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - assertTrue("Incrementing should be allowed ",_message.incrementReference(1)); - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1)); - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2)); - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - } - - public void testDecrementReference() - { - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - try - { - _message.decrementReference(null); - } - catch (MessageCleanupException e) - { - fail("Decrement should be allowed:"+e.getMessage()); - } - - assertFalse("Message should not be Referenced state", _message.isReferenced()); - - try - { - _message.decrementReference(null); - fail("Decrement should not be allowed as we should have a ref count of 0"); - } - catch (MessageCleanupException e) - { - assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0")); - } - - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 12ff91cdad..92235648ec 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -38,4 +38,9 @@ public class MockQueueEntry extends QueueEntryImpl { super(_defaultList, message); } + + public MockQueueEntry(AMQMessage message, SimpleAMQQueue queue) + { + super(new SimpleQueueEntryList(queue) ,message); + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java index fdaf2c309f..4551ae5af8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java @@ -20,18 +20,46 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; +import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; public class PersistentMessageTest extends TransientMessageTest { - private MemoryMessageStore _messageStore; + private TestableMemoryMessageStore _messageStore; + + protected SimpleAMQQueue _queue; + protected AMQShortString _q1name = new AMQShortString("q1name"); + protected AMQShortString _owner = new AMQShortString("owner"); + protected AMQShortString _routingKey = new AMQShortString("routing key"); + private TransactionalContext _messageDeliveryContext; + private static final long MESSAGE_SIZE = 0L; + private List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>(); - public void setUp() + public void setUp() throws Exception { - _messageStore = new MemoryMessageStore(); - _messageStore.configure(); + _messageStore = new TestableMemoryMessageStore(); + _storeContext = new StoreContext(); + VirtualHost vhost = new VirtualHost(PersistentMessageTest.class.getName(), _messageStore); + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null); + // Create IncomingMessage and nondurable queue + _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages); + } @Override @@ -47,4 +75,86 @@ public class PersistentMessageTest extends TransientMessageTest assertTrue(_message.isPersistent()); } + /** + * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and + * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right + * contents. TransactionLog = Empty, returnMessages 1 item. + * + * @throws Exception + */ + public void testImmediateReturnNotInLog() throws Exception + { + MessagePublishInfo info = new MessagePublishInfoImpl(null, true, false, null); + IncomingMessage msg = createMessage(info); + + // Send persistent message + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + + // equivalent to amqChannel.routeMessage() + msg.enqueue(qs); + + msg.routingComplete(_messageStore); + + // equivalent to amqChannel.deliverCurrentMessageIfComplete + msg.deliverToQueues(); + + // Check that data has been stored to disk + long messageId = msg.getMessageId(); + checkMessageMetaDataExists(messageId); + + // Check that it was not enqueued + List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId); + assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList); + checkMessageMetaDataRemoved(messageId); + + assertEquals("Return message count not correct", 1, _returnMessages.size()); + } + + protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException + { + IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext, + new MockProtocolSession(_messageStore), _messageStore); + + // equivalent to amqChannel.publishContenHeader + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID; + // This message has no bodies + contentHeaderBody.bodySize = MESSAGE_SIZE; + contentHeaderBody.properties = new BasicContentHeaderProperties(); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + + msg.setContentHeaderBody(contentHeaderBody); + msg.setExpiration(); + + return msg; + } + + protected void checkMessageMetaDataExists(long messageId) + { + try + { + _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId); + } + catch (AMQException amqe) + { + fail("Message MetaData does not exist for message:" + messageId); + } + } + + protected void checkMessageMetaDataRemoved(long messageId) + { + try + { + assertNull("Message MetaData still exists for message:" + messageId, + _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); + assertNull("Message still has values in the reference map:" + messageId, + _messageStore.getMessageReferenceMap(messageId)); + + } + catch (AMQException e) + { + fail("AMQE thrown whilst trying to getMessageMetaData:" + e.getMessage()); + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 665ca089da..7a97837208 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -36,10 +36,12 @@ import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestTransactionLog; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.ArrayList; import java.util.List; @@ -319,7 +321,7 @@ public class SimpleAMQQueueTest extends TestCase { // Create IncomingMessage and nondurable queue NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); - IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store); + IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.properties = new BasicContentHeaderProperties(); @@ -338,21 +340,22 @@ public class SimpleAMQQueueTest extends TestCase _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1)); // Check that it is enqueued - AMQQueue data = _store.getMessages().get(messageId); + List<AMQQueue> data = _store.getMessageReferenceMap(messageId); assertNotNull(data); // Dequeue message - ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store); message.setPublishAndContentHeaderBody(new StoreContext(), info, header); - MockQueueEntry entry = new MockQueueEntry(message); - _queue.dequeue(null, entry); + MockQueueEntry entry = new MockQueueEntry(message, _queue); + entry.getQueueEntryList().add(message); + entry.acquire(); + entry.dequeue(null); // Check that it is dequeued - data = _store.getMessages().get(messageId); + data = _store.getMessageReferenceMap(messageId); assertNull(data); } @@ -381,7 +384,7 @@ public class SimpleAMQQueueTest extends TestCase public AMQMessage createMessage() throws AMQException { - AMQMessage message = new TestMessage(info); + AMQMessage message = new TestMessage(info, _store); ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; @@ -397,29 +400,21 @@ public class SimpleAMQQueueTest extends TestCase public class TestMessage extends TransientAMQMessage { private final long _tag; - private int _count; + private TestTransactionLog _transactionLog; - TestMessage(MessagePublishInfo publishBody) + TestMessage(MessagePublishInfo publishBody, TestTransactionLog transactionLog) throws AMQException { super(SimpleAMQQueueTest.createMessage(publishBody)); _tag = getMessageId(); + _transactionLog = transactionLog; } - public boolean incrementReference(int count) - { - _count+=count; - return true; - } - - public void decrementReference(StoreContext context) - { - _count--; - } void assertCountEquals(int expected) { - assertEquals("Wrong count for message with tag " + _tag, expected, _count); + assertEquals("Wrong count for message with tag " + _tag, expected, + _transactionLog.getMessageReferenceMap(_messageId).size()); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index e8acfc2fda..5a4c435e59 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -34,7 +34,7 @@ import org.apache.qpid.server.queue.AMQMessage; */ public class TestReferenceCounting extends TestCase { - private TestMemoryMessageStore _store; + private TestableMemoryMessageStore _store; private StoreContext _storeContext = new StoreContext(); @@ -42,7 +42,7 @@ public class TestReferenceCounting extends TestCase protected void setUp() throws Exception { super.setUp(); - _store = new TestMemoryMessageStore(); + _store = new TestableMemoryMessageStore(); } /** @@ -54,19 +54,9 @@ public class TestReferenceCounting extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - final long messageId = _store.getNewMessageId(); - AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - message.incrementReference(1); - - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - - - assertEquals(1, _store.getMessageMetaDataMap().size()); - message.decrementReference(_storeContext); assertEquals(1, _store.getMessageMetaDataMap().size()); } @@ -84,16 +74,10 @@ public class TestReferenceCounting extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - final Long messageId = _store.getNewMessageId(); final ContentHeaderBody chb = createPersistentContentHeader(); AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - message.incrementReference(1); - - assertEquals(1, _store.getMessageMetaDataMap().size()); - message.incrementReference(1); - message.decrementReference(_storeContext); assertEquals(1, _store.getMessageMetaDataMap().size()); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java index 4e48435962..bb051693c3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java @@ -20,32 +20,12 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.queue.AMQQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; import java.util.List; -/** - * Adds some extra methods to the memory message store for testing purposes. - */ -public class TestMemoryMessageStore extends MemoryMessageStore +public interface TestTransactionLog { - public TestMemoryMessageStore() - { - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); - } - - public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() - { - return _metaDataMap; - } - - public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() - { - return _contentBodyMap; - } + public List<AMQQueue> getMessageReferenceMap(Long messageID); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 9146fe88ae..882f88b8f3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -23,22 +23,28 @@ package org.apache.qpid.server.store; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.commons.configuration.Configuration; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Adds some extra methods to the memory message store for testing purposes. */ -public class TestableMemoryMessageStore extends MemoryMessageStore +public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable { MemoryMessageStore _mms = null; - private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>(); public TestableMemoryMessageStore(MemoryMessageStore mms) { @@ -47,46 +53,127 @@ public class TestableMemoryMessageStore extends MemoryMessageStore public TestableMemoryMessageStore() { - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); + _mms = new MemoryMessageStore(); + _mms.configure(); } public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() { - if (_mms != null) - { - return _mms._metaDataMap; - } - else - { - return _metaDataMap; - } + return _mms._metaDataMap; } public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() { - if (_mms != null) - { - return _mms._contentBodyMap; - } - else - { - return _contentBodyMap; - } + return _mms._contentBodyMap; } - - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + + public List<AMQQueue> getMessageReferenceMap(Long messageId) + { + return _mms._messageEnqueueMap.get(messageId); + } + + public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + { + _mms.configure(virtualHost,base,config); + } + + public void close() throws Exception + { + _mms.close(); + } + + public void createExchange(Exchange exchange) throws AMQException + { + _mms.createExchange(exchange); + } + + public void removeExchange(Exchange exchange) throws AMQException + { + _mms.removeExchange(exchange); + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + _mms.bindQueue(exchange,routingKey,queue,args); + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + _mms.unbindQueue(exchange,routingKey,queue,args); + } + + public void createQueue(AMQQueue queue) throws AMQException + { + _mms.createQueue(queue); + } + + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + { + _mms.createQueue(queue,arguments); + } + + public void removeQueue(AMQQueue queue) throws AMQException + { + _mms.removeQueue(queue); + } + + public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException + { + _mms.removeMessage(storeContext, messageId); + } + + public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + _mms.enqueueMessage(context,queue,messageId); + } + + public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + _mms.dequeueMessage(context,queue,messageId); + } + + public void beginTran(StoreContext context) throws AMQException + { + _mms.beginTran(context); + } + + public void commitTran(StoreContext context) throws AMQException + { + _mms.commitTran(context); + } + + public void abortTran(StoreContext context) throws AMQException + { + _mms.abortTran(context); + } + + public boolean inTran(StoreContext context) + { + return _mms.inTran(context); + } + + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + { + _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody); + } + + public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + { + _mms.storeMessageMetaData(context,messageId,messageMetaData); + } + + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException { - getMessages().put(messageId, queue); + return _mms.getMessageMetaData(context,messageId); } - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { - getMessages().remove(messageId); + return _mms.getContentBodyChunk(context,messageId,index); } - public HashMap<Long, AMQQueue> getMessages() + public boolean isPersistent() { - return _messages; + return _mms.isPersistent(); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java index ca6644d141..26802b4210 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.txn; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.LinkedList; @@ -194,7 +194,7 @@ public class TxnBufferTest extends TestCase } } - class MockStore extends TestMemoryMessageStore + class MockStore extends TestableMemoryMessageStore { final Object BEGIN = "BEGIN"; final Object ABORT = "ABORT"; |
