summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java80
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java48
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java65
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java17
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java77
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java120
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java35
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java (renamed from qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java)28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java143
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java4
30 files changed, 430 insertions, 464 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 125518358b..5a01888ccf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -499,7 +499,7 @@ public class AMQChannel
}
else
{
- unacked.discard(_storeContext);
+ unacked.dequeueAndDelete(_storeContext);
}
}
@@ -555,7 +555,7 @@ public class AMQChannel
_log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
- unacked.discard(_storeContext);
+ unacked.dequeueAndDelete(_storeContext);
}
}
else
@@ -712,7 +712,7 @@ public class AMQChannel
{
try
{
- message.discard(_storeContext);
+ message.dequeueAndDelete(_storeContext);
message.setQueueDeleted(true);
}
@@ -831,9 +831,7 @@ public class AMQChannel
{
AMQMessage message = bouncedMessage.getAMQMessage();
_session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
-
- message.decrementReference(_storeContext);
+ new AMQShortString(bouncedMessage.getMessage()));
}
_returnMessages.clear();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
index 1723d46ef0..8d41cc58d2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
@@ -82,13 +82,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
}
else
{
- queueEntry.discard(_storeContext);
+ queueEntry.dequeueAndDelete(_storeContext);
_log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry);
}
}
else
{
- queueEntry.discard(_storeContext);
+ queueEntry.dequeueAndDelete(_storeContext);
_log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 415f1fe8be..a81b2cc2db 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -56,14 +56,10 @@ public abstract class RequiredDeliveryException extends AMQException
public void setMessage(final AMQMessage payload)
{
-
- // Increment the reference as this message is in the routing phase
- // and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
- // handler. So increment here.
- payload.incrementReference(1);
+ // handler.
+ // Messages are all kept in memory now. Only queues can push messages out of memory.
_amqMessage = payload;
-
}
public AMQMessage getAMQMessage()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index 0f40e00624..918fcd8407 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.ack;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
@@ -116,22 +115,20 @@ public class TxAck implements TxnOp
//make persistent changes, i.e. dequeue and decrementReference
for (QueueEntry msg : _unacked.values())
{
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(storeContext);
-
+ //Message has been ack so dequeueAndDelete it.
+ // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ // from the transaciton log
+ msg.dequeueAndDelete(storeContext);
}
}
public void undoPrepare()
{
- //decrementReference is annoyingly untransactional (due to
- //in memory counter) so if we failed in prepare for full
- //txn, this op will have to compensate by fixing the count
- //in memory (persistent changes will be rolled back by store)
- for (QueueEntry msg : _unacked.values())
- {
- msg.getMessage().incrementReference(1);
- }
+ //As this is transaction the above dequeueAndDelete will only request the message be dequeue from the
+ // transactionLog. Only when the transaction succesfully completes will it perform any
+ // update of the internal transactionLog reference counting and any resulting message data deletion.
+ // The success or failure of the data deletion is not important to this transaction only that the ack has been
+ // successfully recorded.
}
public void commit(StoreContext storeContext)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index efdadd4922..ac3b0b5e49 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -174,8 +174,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
" When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- unacked.getValue().discard(storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ // from the transaciton log
+ unacked.getValue().dequeueAndDelete(storeContext);
it.remove();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index f3cab10ed7..bd70cd7776 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -65,38 +65,38 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
long deliveryTag = body.getDeliveryTag();
- QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+ QueueEntry queueEntry = channel.getUnacknowledgedMessageMap().get(deliveryTag);
- if (message == null)
+ if (queueEntry == null)
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
}
else
{
- if (message.isQueueDeleted())
+ if (queueEntry.isQueueDeleted())
{
_logger.warn("Message's Queue as already been purged, unable to Reject. " +
"Dropping message should use Dead Letter Queue");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- if(message != null)
+ queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+ if(queueEntry != null)
{
- message.discard(channel.getStoreContext());
+ queueEntry.dequeueAndDelete(channel.getStoreContext());
}
//sendtoDeadLetterQueue(msg)
return;
}
- if (!message.getMessage().isReferenced())
+ if (queueEntry.isDeleted())
{
- _logger.warn("Message as already been purged, unable to Reject.");
+ _logger.warn("QueueEntry as already been deleted, unable to Reject.");
return;
}
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.getMessage().debugIdentity() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
@@ -105,7 +105,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
// If we haven't requested message to be resent to this consumer then reject it from ever getting it.
//if (!evt.getMethod().resend)
{
- message.reject();
+ queueEntry.reject();
}
if (body.getRequeue())
@@ -115,7 +115,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
else
{
_logger.warn("Dropping message as requeue not required and there is no dead letter queue");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+ queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
//sendtoDeadLetterQueue(AMQMessage message)
// message.queue = channel.getDefaultDeadLetterQueue();
// channel.requeue(deliveryTag);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 1f56b2ccd2..e96d2ba874 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -114,17 +114,8 @@ public interface AMQMessage
throws AMQException;
- void removeMessage(StoreContext storeContext) throws AMQException;
String toString();
String debugIdentity();
-
- // Reference counting methods
-
- void decrementReference(StoreContext storeContext) throws MessageCleanupException;
-
- boolean incrementReference(int queueCount);
-
- boolean isReferenced();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 0838b71c54..9fadbb0cdc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -81,25 +81,18 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
boolean isDeleted();
-
int delete() throws AMQException;
-
QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
-
-
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
-
void addQueueDeleteTask(final Task task);
-
List<QueueEntry> getMessagesOnTheQueue();
List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 5d4322c4fc..5eafd281c0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -156,8 +156,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
_logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues);
}
- try
- {
+
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
_message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
@@ -196,7 +195,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
{
int offset;
final int queueCount = _destinationQueues.size();
- _message.incrementReference(queueCount);
if(queueCount == 1)
{
offset = 0;
@@ -222,12 +220,8 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
return _message;
- }
- finally
- {
- // Remove refence for routing process . Reference count should now == delivered queue count
- if(_message != null) _message.decrementReference(_txnContext.getStoreContext());
- }
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
index ec48a2afb0..92c10b0347 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -58,13 +58,6 @@ public class PersistentAMQMessage extends TransientAMQMessage
}
@Override
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- _log.info("PAMQM : removing message:" + _messageId);
- _transactionLog.removeMessage(storeContext, _messageId);
- }
-
- @Override
public boolean isPersistent()
{
return true;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 0df976a620..09600b9d28 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -49,7 +49,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
public abstract State getState();
}
-
public final class AvailableState extends EntryState
{
@@ -59,7 +58,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
public final class DequeuedState extends EntryState
{
@@ -69,7 +67,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
public final class DeletedState extends EntryState
{
@@ -88,7 +85,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
public final class NonSubscriptionAcquiredState extends EntryState
{
public State getState()
@@ -106,7 +102,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
_subscription = subscription;
}
-
public State getState()
{
return State.ACQUIRED;
@@ -118,16 +113,12 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
final static EntryState AVAILABLE_STATE = new AvailableState();
final static EntryState DELETED_STATE = new DeletedState();
final static EntryState DEQUEUED_STATE = new DequeuedState();
final static EntryState EXPIRED_STATE = new ExpiredState();
final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
-
-
-
AMQQueue getQueue();
AMQMessage getMessage();
@@ -141,9 +132,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
boolean isAcquired();
boolean acquire();
+
boolean acquire(Subscription sub);
boolean delete();
+
boolean isDeleted();
boolean acquiredBySubscription();
@@ -170,12 +163,21 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
void dequeue(final StoreContext storeContext) throws FailedDequeueException;
- void dispose(final StoreContext storeContext) throws MessageCleanupException;
-
- void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
+ /**
+ * Message has been ack so dequeueAndDelete it.
+ * If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ * from the transaciton log
+ *
+ * @param storeContext the transactional Context in which to perform the deletion
+ *
+ * @throws FailedDequeueException
+ * @throws MessageCleanupException
+ */
+ void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException;
boolean isQueueDeleted();
void addStateChangeListener(StateChangeListener listener);
+
boolean removeStateChangeListener(StateChangeListener listener);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 3eb1636884..911ed8321b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -282,13 +282,12 @@ public class QueueEntryImpl implements QueueEntry
}
getQueue().dequeue(storeContext, this);
- if(_stateChangeListeners != null)
+
+ if (_stateChangeListeners != null)
{
- notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
+ notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED);
}
-
}
-
}
private void notifyStateChange(final State oldState, final State newState)
@@ -299,29 +298,15 @@ public class QueueEntryImpl implements QueueEntry
}
}
- public void dispose(final StoreContext storeContext) throws MessageCleanupException
- {
- _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state);
- if(delete())
- {
- _log.info("QEI delete message:" + getMessage().getMessageId());
- getMessage().decrementReference(storeContext);
- }
- else
- {
- _log.info("QEI delete state wrong:" + getMessage().getMessageId());
- }
- }
-
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
{
- //if the queue is null then the message is waiting to be acked, but has been removed.
+ //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d;
if (getQueue() != null)
{
dequeue(storeContext);
}
- dispose(storeContext);
+ delete();
}
public boolean isQueueDeleted()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a0f21033c7..501e90b4d7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -408,8 +408,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (entry.immediateAndNotDelivered())
{
- dequeue(storeContext, entry);
- entry.dispose(storeContext);
+ //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content
+ // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks
+ // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses.
+ entry.acquire();
+ entry.dequeueAndDelete(storeContext);
}
else if (!(entry.isAcquired() || entry.isDeleted()))
{
@@ -562,6 +565,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ /**
+ * Only call from queue Entry
+ * @param storeContext
+ * @param entry
+ * @throws FailedDequeueException
+ */
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
decrementQueueCount();
@@ -578,7 +587,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId());
}
- //entry.dispose(storeContext);
}
catch (MessageCleanupException e)
@@ -814,11 +822,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+
public void moveMessagesToAnotherQueue(final long fromMessageId,
final long toMessageId,
String queueName,
StoreContext storeContext)
{
+ // The move is a two step process. First the messages are moved in the _transactionLog.
+ // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the
+ // existing queue.
+ // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery
+ // this is done as the message is recieved.
+ // So The final step is to enqueue the messages on the new queue.
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
TransactionLog transactionLog = getVirtualHost().getTransactionLog();
@@ -844,7 +859,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
transactionLog.beginTran(storeContext);
- // Move the messages in on the transaction log.
+ // Move the messages in the transaction log.
for (QueueEntry entry : entries)
{
AMQMessage message = entry.getMessage();
@@ -853,7 +868,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
- // dequeue does not decrement the refence count
+ // dequeue will remove the messages from the queue
entry.dequeue(storeContext);
}
@@ -882,10 +897,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
+ // Add messages to new queue
for (QueueEntry entry : entries)
{
toQueue.enqueue(storeContext, entry.getMessage());
-
}
}
catch (MessageCleanupException e)
@@ -918,7 +933,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (!entry.isDeleted())
{
- return entry.getMessage().incrementReference(1);
+ return true;
}
}
@@ -940,7 +955,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
AMQMessage message = entry.getMessage();
- if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+ if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable())
{
transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
@@ -973,7 +988,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
for (QueueEntry entry : entries)
{
- if (entry.getMessage().isReferenced())
+ if (!entry.isDeleted())
{
toQueue.enqueue(storeContext, entry.getMessage());
}
@@ -1008,7 +1023,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
&& !node.isDeleted()
&& node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
}
}
@@ -1032,7 +1047,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
noDeletes = false;
}
@@ -1050,7 +1065,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
count++;
}
@@ -1315,8 +1330,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (node.acquire())
{
+ // creating a new final store context per message seems wasteful.
final StoreContext reapingStoreContext = new StoreContext();
- node.discard(reapingStoreContext);
+ node.dequeueAndDelete(reapingStoreContext);
}
}
QueueEntry newNode = _entries.next(node);
@@ -1431,7 +1447,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && node.expired() && node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
index f3d74fb01c..fa4e85a043 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -172,11 +172,6 @@ public class TransientAMQMessage implements AMQMessage
_expiration = expiration;
}
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
-
public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
return new BodyFrameIterator(protocolSession, channel);
@@ -197,76 +192,6 @@ public class TransientAMQMessage implements AMQMessage
return _messageId;
}
- /* Threadsafe. Increment the reference count on the message. */
- public boolean incrementReference(int count)
- {
- if (_referenceCount.addAndGet(count) <= 1)
- {
- int newcount = _referenceCount.addAndGet(-count);
- _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount);
- return false;
- }
- else
- {
- _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1("
- + _referenceCount.get() + ")");
- return true;
- }
-
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- *
- * @param storeContext
- *
- * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
- * failed
- */
- public void decrementReference(StoreContext storeContext) throws MessageCleanupException
- {
-
- int count = _referenceCount.decrementAndGet();
-
- _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count);
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE / 2);
-
- try
- {
- _log.debug("Reference Count hit 0, removing message");
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
- removeMessage(storeContext);
- }
- catch (AMQException e)
- {
- // to maintain consistency, we revert the count
- incrementReference(1);
- throw new MessageCleanupException(getMessageId(), e);
- }
- }
- else
- {
- if (count < 0)
- {
- throw new MessageCleanupException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
- }
- }
- }
-
/**
* Called selectors to determin if the message has already been sent
*
@@ -435,11 +360,6 @@ public class TransientAMQMessage implements AMQMessage
return _arrivalTime;
}
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- //no-op
- }
-
public String toString()
{
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index fe81346c8c..33b3d8608e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -1357,7 +1357,8 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
if(message != null)
{
- message.incrementReference(1);
+ //todo must enqueue message to build reference table
+// message.incrementReference(1);
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index aa7b6e2542..cd0f0c1769 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -30,24 +30,28 @@ import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-/** A simple message store that stores the messages in a threadsafe structure in memory.
+/**
+ * A simple message store that stores the messages in a threadsafe structure in memory.
*
* NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog
*
* This class really should have no storage unless we want to do inMemory Recovery.
- *
*/
public class MemoryMessageStore implements TransactionLog, RoutingTable
{
@@ -63,6 +67,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
private final AtomicLong _messageId = new AtomicLong(1);
private AtomicBoolean _closed = new AtomicBoolean(false);
+ protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>();
public void configure()
{
@@ -112,6 +117,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
_metaDataMap.remove(messageId);
_contentBodyMap.remove(messageId);
+ _messageEnqueueMap.remove(messageId);
}
public void createExchange(Exchange exchange) throws AMQException
@@ -134,7 +140,6 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
-
public void createQueue(AMQQueue queue) throws AMQException
{
// Not requred to do anything
@@ -152,12 +157,39 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
- // Not required to do anything
+ synchronized (_messageEnqueueMap)
+ {
+ List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
+ if (queues == null)
+ {
+ queues = new LinkedList<AMQQueue>();
+ _messageEnqueueMap.put(messageId, queues);
+ }
+
+ queues.add(queue);
+ }
}
public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
- // Not required to do anything
+ synchronized (_messageEnqueueMap)
+ {
+ List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
+ if (queues == null || !queues.contains(queue))
+ {
+ throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName()
+ + " but it is not enqueued on that queue.");
+ }
+ else
+ {
+ queues.remove(queue);
+ if (queues.isEmpty())
+ {
+ removeMessage(context,messageId);
+ }
+ }
+ }
+
}
public void beginTran(StoreContext context) throws AMQException
@@ -238,7 +270,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
private void checkNotClosed() throws MessageStoreClosedException
- {
+ {
if (_closed.get())
{
throw new MessageStoreClosedException();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 1c58e644e9..119a4b1692 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -144,7 +144,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
StoreContext storeContext = getChannel().getStoreContext();
try
- { // if we do not need to wait for client acknowledgements
+ {
+ // if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
// By doing this _before_ the send we ensure that it
@@ -153,7 +154,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
// The send may of course still fail, in which case, as
// the message is unacked, it will be lost.
- entry.dequeue(storeContext);
+ entry.dequeueAndDelete(storeContext);
synchronized (getChannel())
@@ -163,7 +164,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
sendToClient(entry, deliveryTag);
}
- entry.dispose(storeContext);
}
finally
{
@@ -316,7 +316,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
_autoClose = false;
}
-
+ _logger.info(debugIdentity()+" Created subscription:");
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 8e63b95f0d..abfb60c5bf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -92,20 +92,12 @@ public class LocalTransactionalContext implements TransactionalContext
public void process() throws AMQException
{
- _message.incrementReference(1);
- try
- {
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
if(entry.immediateAndNotDelivered())
{
getReturnMessages().add(new NoConsumersException(_message));
}
- }
- finally
- {
- _message.decrementReference(getStoreContext());
- }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 145d7f8b13..561f998b98 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -123,18 +123,18 @@ public class NonTransactionalContext implements TransactionalContext
unacknowledgedMessageMap.size());
unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+ public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException
{
if (debug)
{
- _log.debug("Discarding message: " + message.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
}
- if(message.getMessage().isPersistent())
+ if(queueEntry.getMessage().isPersistent())
{
beginTranIfNecessary();
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- message.discard(_storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ queueEntry.dequeueAndDelete(_storeContext);
return false;
}
@@ -157,10 +157,10 @@ public class NonTransactionalContext implements TransactionalContext
}
else
{
- QueueEntry msg;
- msg = unacknowledgedMessageMap.get(deliveryTag);
+ QueueEntry queueEntry;
+ queueEntry = unacknowledgedMessageMap.get(deliveryTag);
- if (msg == null)
+ if (queueEntry == null)
{
_log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
_channel.getChannelId());
@@ -170,15 +170,17 @@ public class NonTransactionalContext implements TransactionalContext
if (debug)
{
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
}
- if(msg.getMessage().isPersistent())
+ if(queueEntry.getMessage().isPersistent())
{
beginTranIfNecessary();
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ // from the transaciton log
+ queueEntry.dequeueAndDelete(_storeContext);
unacknowledgedMessageMap.remove(deliveryTag);
@@ -186,7 +188,7 @@ public class NonTransactionalContext implements TransactionalContext
if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
- msg.getMessage().getMessageId());
+ queueEntry.getMessage().getMessageId());
}
}
if(_inTran)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index 1d729a82a5..52b8b0ad19 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.queue.AMQMessage;
@@ -37,9 +38,10 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.TransientAMQMessage;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -81,20 +83,6 @@ public class TxAckTest extends TestCase
combined.stop();
}
- public void testPrepare() throws AMQException
- {
- individual.prepare();
- multiple.prepare();
- combined.prepare();
- }
-
- public void testUndoPrepare() throws AMQException
- {
- individual.undoPrepare();
- multiple.undoPrepare();
- combined.undoPrepare();
- }
-
public void testCommit() throws AMQException
{
individual.commit();
@@ -115,12 +103,13 @@ public class TxAckTest extends TestCase
private final List<Long> _unacked;
private StoreContext _storeContext = new StoreContext();
private AMQQueue _queue;
+ private TransactionLog _transactionLog = new TestableMemoryMessageStore();
private static final int MESSAGE_SIZE=100;
Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
{
- TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
+ TransactionalContext txnContext = new NonTransactionalContext(_transactionLog,
_storeContext, null,
new LinkedList<RequiredDeliveryException>()
);
@@ -138,12 +127,15 @@ public class TxAckTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- AMQMessage message = new TestMessage(deliveryTag, info);
+ AMQMessage message = new TestMessage(deliveryTag, info, (TestTransactionLog) _transactionLog);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
message.setPublishAndContentHeaderBody(_storeContext, info, header);
+
+
+
_map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
}
_acked = acked;
@@ -165,25 +157,6 @@ public class TxAckTest extends TestCase
}
}
- void prepare() throws AMQException
- {
- _op.consolidate();
- _op.prepare(_storeContext);
-
- assertCount(_acked, -1);
- assertCount(_unacked, 0);
-
- }
-
- void undoPrepare()
- {
- _op.consolidate();
- _op.undoPrepare();
-
- assertCount(_acked, 1);
- assertCount(_unacked, 0);
- }
-
void commit()
{
_op.consolidate();
@@ -232,30 +205,22 @@ public class TxAckTest extends TestCase
private class TestMessage extends TransientAMQMessage
{
private final long _tag;
- private int _count;
+ private TestTransactionLog _transactionLog;
- TestMessage(long tag, MessagePublishInfo publishBody)
+ public TestMessage(long tag, MessagePublishInfo publishBody, TestTransactionLog transactionLog)
throws AMQException
{
super(createMessage( publishBody));
_tag = tag;
+ _transactionLog = transactionLog;
}
- public boolean incrementReference(int count)
- {
- _count+=count;
- return true;
- }
-
- public void decrementReference(StoreContext context)
- {
- _count--;
- }
-
void assertCountEquals(int expected)
{
- assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ List<AMQQueue> list = _transactionLog.getMessageReferenceMap(_messageId);
+ int actual = (list == null ? 0 : list.size());
+ assertEquals("Wrong count for message with tag " + _tag, expected, actual);
}
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 40b08a2e39..78cf610f28 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -324,7 +324,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 98465eda20..9f8d5f9a99 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -30,14 +30,16 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
@@ -57,7 +59,7 @@ public class AckTest extends TestCase
private MockProtocolSession _protocolSession;
- private TestMemoryMessageStore _messageStore;
+ private TestableMemoryMessageStore _messageStore;
private StoreContext _storeContext = new StoreContext();
@@ -72,14 +74,15 @@ public class AckTest extends TestCase
super.setUp();
ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
- _messageStore = new TestMemoryMessageStore();
+ VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog());
_protocolSession = new MockProtocolSession(_messageStore);
_channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
_protocolSession.addChannel(_channel);
- _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
- null);
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"),
+ true, vhost, null);
}
protected void tearDown()
@@ -185,7 +188,7 @@ public class AckTest extends TestCase
/**
* Tests that in no-ack mode no messages are retained
*/
- public void testPersistentNoAckMode() throws AMQException
+ public void testPersistentNoAckMode() throws AMQException, InterruptedException
{
// false arg means no acks expected
_subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
@@ -194,7 +197,7 @@ public class AckTest extends TestCase
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
+ assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0);
assertTrue(_messageStore.getContentBodyMap().size() == 0);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
deleted file mode 100644
index 44e9851db7..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-
-public class MessageReferenceCountingTest extends TestCase
-{
- AMQMessage _message;
-
- public void setUp()
- {
- _message = MessageFactory.getInstance().createMessage(null, false);
- }
-
- public void testInitialState()
- {
-
- assertTrue("New messages should have a reference", _message.isReferenced());
- }
-
- public void testIncrementReference()
- {
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- assertTrue("Incrementing should be allowed ",_message.incrementReference(1));
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1));
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2));
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- }
-
- public void testDecrementReference()
- {
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- try
- {
- _message.decrementReference(null);
- }
- catch (MessageCleanupException e)
- {
- fail("Decrement should be allowed:"+e.getMessage());
- }
-
- assertFalse("Message should not be Referenced state", _message.isReferenced());
-
- try
- {
- _message.decrementReference(null);
- fail("Decrement should not be allowed as we should have a ref count of 0");
- }
- catch (MessageCleanupException e)
- {
- assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0"));
- }
-
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 12ff91cdad..92235648ec 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -38,4 +38,9 @@ public class MockQueueEntry extends QueueEntryImpl
{
super(_defaultList, message);
}
+
+ public MockQueueEntry(AMQMessage message, SimpleAMQQueue queue)
+ {
+ super(new SimpleQueueEntryList(queue) ,message);
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
index fdaf2c309f..4551ae5af8 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
@@ -20,18 +20,46 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
public class PersistentMessageTest extends TransientMessageTest
{
- private MemoryMessageStore _messageStore;
+ private TestableMemoryMessageStore _messageStore;
+
+ protected SimpleAMQQueue _queue;
+ protected AMQShortString _q1name = new AMQShortString("q1name");
+ protected AMQShortString _owner = new AMQShortString("owner");
+ protected AMQShortString _routingKey = new AMQShortString("routing key");
+ private TransactionalContext _messageDeliveryContext;
+ private static final long MESSAGE_SIZE = 0L;
+ private List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
- public void setUp()
+ public void setUp() throws Exception
{
- _messageStore = new MemoryMessageStore();
- _messageStore.configure();
+ _messageStore = new TestableMemoryMessageStore();
+
_storeContext = new StoreContext();
+ VirtualHost vhost = new VirtualHost(PersistentMessageTest.class.getName(), _messageStore);
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null);
+ // Create IncomingMessage and nondurable queue
+ _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages);
+
}
@Override
@@ -47,4 +75,86 @@ public class PersistentMessageTest extends TransientMessageTest
assertTrue(_message.isPersistent());
}
+ /**
+ * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and
+ * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right
+ * contents. TransactionLog = Empty, returnMessages 1 item.
+ *
+ * @throws Exception
+ */
+ public void testImmediateReturnNotInLog() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl(null, true, false, null);
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_messageStore);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it was not enqueued
+ List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId);
+ assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList);
+ checkMessageMetaDataRemoved(messageId);
+
+ assertEquals("Return message count not correct", 1, _returnMessages.size());
+ }
+
+ protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
+ {
+ IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
+ new MockProtocolSession(_messageStore), _messageStore);
+
+ // equivalent to amqChannel.publishContenHeader
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ // This message has no bodies
+ contentHeaderBody.bodySize = MESSAGE_SIZE;
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+
+ msg.setContentHeaderBody(contentHeaderBody);
+ msg.setExpiration();
+
+ return msg;
+ }
+
+ protected void checkMessageMetaDataExists(long messageId)
+ {
+ try
+ {
+ _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+ }
+ catch (AMQException amqe)
+ {
+ fail("Message MetaData does not exist for message:" + messageId);
+ }
+ }
+
+ protected void checkMessageMetaDataRemoved(long messageId)
+ {
+ try
+ {
+ assertNull("Message MetaData still exists for message:" + messageId,
+ _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
+ assertNull("Message still has values in the reference map:" + messageId,
+ _messageStore.getMessageReferenceMap(messageId));
+
+ }
+ catch (AMQException e)
+ {
+ fail("AMQE thrown whilst trying to getMessageMetaData:" + e.getMessage());
+ }
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 665ca089da..7a97837208 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -36,10 +36,12 @@ import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestTransactionLog;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.ArrayList;
import java.util.List;
@@ -319,7 +321,7 @@ public class SimpleAMQQueueTest extends TestCase
{
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store);
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
@@ -338,21 +340,22 @@ public class SimpleAMQQueueTest extends TestCase
_store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
- AMQQueue data = _store.getMessages().get(messageId);
+ List<AMQQueue> data = _store.getMessageReferenceMap(messageId);
assertNotNull(data);
// Dequeue message
-
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
- MockQueueEntry entry = new MockQueueEntry(message);
- _queue.dequeue(null, entry);
+ MockQueueEntry entry = new MockQueueEntry(message, _queue);
+ entry.getQueueEntryList().add(message);
+ entry.acquire();
+ entry.dequeue(null);
// Check that it is dequeued
- data = _store.getMessages().get(messageId);
+ data = _store.getMessageReferenceMap(messageId);
assertNull(data);
}
@@ -381,7 +384,7 @@ public class SimpleAMQQueueTest extends TestCase
public AMQMessage createMessage() throws AMQException
{
- AMQMessage message = new TestMessage(info);
+ AMQMessage message = new TestMessage(info, _store);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
@@ -397,29 +400,21 @@ public class SimpleAMQQueueTest extends TestCase
public class TestMessage extends TransientAMQMessage
{
private final long _tag;
- private int _count;
+ private TestTransactionLog _transactionLog;
- TestMessage(MessagePublishInfo publishBody)
+ TestMessage(MessagePublishInfo publishBody, TestTransactionLog transactionLog)
throws AMQException
{
super(SimpleAMQQueueTest.createMessage(publishBody));
_tag = getMessageId();
+ _transactionLog = transactionLog;
}
- public boolean incrementReference(int count)
- {
- _count+=count;
- return true;
- }
-
- public void decrementReference(StoreContext context)
- {
- _count--;
- }
void assertCountEquals(int expected)
{
- assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ assertEquals("Wrong count for message with tag " + _tag, expected,
+ _transactionLog.getMessageReferenceMap(_messageId).size());
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index e8acfc2fda..5a4c435e59 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -34,7 +34,7 @@ import org.apache.qpid.server.queue.AMQMessage;
*/
public class TestReferenceCounting extends TestCase
{
- private TestMemoryMessageStore _store;
+ private TestableMemoryMessageStore _store;
private StoreContext _storeContext = new StoreContext();
@@ -42,7 +42,7 @@ public class TestReferenceCounting extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _store = new TestMemoryMessageStore();
+ _store = new TestableMemoryMessageStore();
}
/**
@@ -54,19 +54,9 @@ public class TestReferenceCounting extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- final long messageId = _store.getNewMessageId();
-
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message.incrementReference(1);
-
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
-
-
- assertEquals(1, _store.getMessageMetaDataMap().size());
- message.decrementReference(_storeContext);
assertEquals(1, _store.getMessageMetaDataMap().size());
}
@@ -84,16 +74,10 @@ public class TestReferenceCounting extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- final Long messageId = _store.getNewMessageId();
final ContentHeaderBody chb = createPersistentContentHeader();
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message.incrementReference(1);
-
- assertEquals(1, _store.getMessageMetaDataMap().size());
- message.incrementReference(1);
- message.decrementReference(_storeContext);
assertEquals(1, _store.getMessageMetaDataMap().size());
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
index 4e48435962..bb051693c3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
@@ -20,32 +20,12 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.queue.AMQQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
import java.util.List;
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
-public class TestMemoryMessageStore extends MemoryMessageStore
+public interface TestTransactionLog
{
- public TestMemoryMessageStore()
- {
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
- }
-
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
- {
- return _metaDataMap;
- }
-
- public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
- {
- return _contentBodyMap;
- }
+ public List<AMQQueue> getMessageReferenceMap(Long messageID);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 9146fe88ae..882f88b8f3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -23,22 +23,28 @@ package org.apache.qpid.server.store;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Adds some extra methods to the memory message store for testing purposes.
*/
-public class TestableMemoryMessageStore extends MemoryMessageStore
+public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable
{
MemoryMessageStore _mms = null;
- private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
public TestableMemoryMessageStore(MemoryMessageStore mms)
{
@@ -47,46 +53,127 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
public TestableMemoryMessageStore()
{
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+ _mms = new MemoryMessageStore();
+ _mms.configure();
}
public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
{
- if (_mms != null)
- {
- return _mms._metaDataMap;
- }
- else
- {
- return _metaDataMap;
- }
+ return _mms._metaDataMap;
}
public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
{
- if (_mms != null)
- {
- return _mms._contentBodyMap;
- }
- else
- {
- return _contentBodyMap;
- }
+ return _mms._contentBodyMap;
}
-
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+
+ public List<AMQQueue> getMessageReferenceMap(Long messageId)
+ {
+ return _mms._messageEnqueueMap.get(messageId);
+ }
+
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+ {
+ _mms.configure(virtualHost,base,config);
+ }
+
+ public void close() throws Exception
+ {
+ _mms.close();
+ }
+
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ _mms.createExchange(exchange);
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ _mms.removeExchange(exchange);
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ _mms.bindQueue(exchange,routingKey,queue,args);
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ _mms.unbindQueue(exchange,routingKey,queue,args);
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ _mms.createQueue(queue);
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ {
+ _mms.createQueue(queue,arguments);
+ }
+
+ public void removeQueue(AMQQueue queue) throws AMQException
+ {
+ _mms.removeQueue(queue);
+ }
+
+ public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+ {
+ _mms.removeMessage(storeContext, messageId);
+ }
+
+ public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ _mms.enqueueMessage(context,queue,messageId);
+ }
+
+ public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ _mms.dequeueMessage(context,queue,messageId);
+ }
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ _mms.beginTran(context);
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ _mms.commitTran(context);
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ _mms.abortTran(context);
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return _mms.inTran(context);
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ {
+ _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody);
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ _mms.storeMessageMetaData(context,messageId,messageMetaData);
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
- getMessages().put(messageId, queue);
+ return _mms.getMessageMetaData(context,messageId);
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
- getMessages().remove(messageId);
+ return _mms.getContentBodyChunk(context,messageId,index);
}
- public HashMap<Long, AMQQueue> getMessages()
+ public boolean isPersistent()
{
- return _messages;
+ return _mms.isPersistent();
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
index ca6644d141..26802b4210 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.txn;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.LinkedList;
@@ -194,7 +194,7 @@ public class TxnBufferTest extends TestCase
}
}
- class MockStore extends TestMemoryMessageStore
+ class MockStore extends TestableMemoryMessageStore
{
final Object BEGIN = "BEGIN";
final Object ABORT = "ABORT";