diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-03-09 15:57:33 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-03-09 15:57:33 +0000 |
| commit | 038d1727cc37edfc9f22ebb71e28f35122366c57 (patch) | |
| tree | e32b45217d4cef7933f70be351c232de6966edde /qpid/java | |
| parent | bf6b5856c7abb109f958bd07000cdc0ebd133656 (diff) | |
| download | qpid-python-038d1727cc37edfc9f22ebb71e28f35122366c57.tar.gz | |
QPID-949 : Removed all getMessage() calls as this will cause a flowed message to be read in to memory from disk. In all instances the reason was to perform methods that exist on the the QueueEntry. Added accessor to MessageID on QueueEntry. Outstanding getMessage() calls have been left in to perform NO_LOCAL work. Moving Publisher and PublisherClient identifer to the QEI would remove this need.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@751718 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
15 files changed, 77 insertions, 64 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 9b29c7b921..72a2780c32 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -431,7 +431,7 @@ public class AMQChannel { if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + _log.debug(debugIdentity() + " Adding unacked message(" + entry.toString() + " DT:" + deliveryTag + ") with a queue(" + entry.getQueue() + ") for " + subscription); } } @@ -551,7 +551,7 @@ public class AMQChannel } else { - _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.debugIdentity() + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); unacked.dequeueAndDelete(_storeContext); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index 918fcd8407..95de0dc8c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -102,7 +102,7 @@ public class TxAck implements TxnOp //buffer must be marked as persistent: for (QueueEntry msg : _unacked.values()) { - if (msg.getMessage().isPersistent()) + if (msg.isPersistent()) { return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index bd70cd7776..8b04315d33 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -96,7 +96,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.getMessage().debugIdentity() + + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.debugIdentity() + ": Requeue:" + body.getRequeue() + //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 36da1c4cb1..2ff54fb748 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -274,8 +274,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que /** * Checks if there is any notification to be send to the listeners + * @param queueEntry */ - public void checkForNotification(AMQMessage msg) throws AMQException + public void checkForNotification(QueueEntry queueEntry) throws AMQException { final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); @@ -289,7 +290,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) { - if (check.notifyIfNecessary(msg, _queue, this)) + if (check.notifyIfNecessary(queueEntry, _queue, this)) { _lastNotificationTimes[check.ordinal()] = currentTime; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index e33b0c83c7..a83d661de2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -20,14 +20,12 @@ */
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
-
public enum NotificationCheck
{
MESSAGE_COUNT_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,19 +39,19 @@ public enum NotificationCheck },
MESSAGE_SIZE_ALERT(true)
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
{
// Check for threshold message size
- long messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
+ long messageSize = (queueEntry == null) ? 0 : queueEntry.getSize();
if (messageSize >= maximumMessageSize)
{
listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
maximumMessageSize + ") breached. [Message ID=" +
- (msg == null ? "null" : msg.getMessageId()) + "]");
+ (queueEntry == null ? "null" : queueEntry.getMessageId()) + "]");
return true;
}
}
@@ -63,7 +61,7 @@ public enum NotificationCheck },
QUEUE_DEPTH_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -84,7 +82,7 @@ public enum NotificationCheck },
MESSAGE_AGE_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -126,6 +124,6 @@ public enum NotificationCheck return _messageSpecific;
}
- abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+ abstract boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 2cb1493a8f..7fc5df4e9e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -135,6 +135,8 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept AMQMessage getMessage(); + Long getMessageId(); + long getSize(); /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 5114529419..fceaf75a9e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -83,6 +83,7 @@ public class QueueEntryImpl implements QueueEntry private long _expiration; private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); + private boolean _persistent; QueueEntryImpl(SimpleQueueEntryList queueEntryList) { @@ -111,6 +112,7 @@ public class QueueEntryImpl implements QueueEntry _flags |= IMMEDIATE; } _expiration = message.getExpiration(); + _persistent = message.isPersistent(); } _backingStore = queueEntryList.getBackingStore(); _flowed = new AtomicBoolean(false); @@ -140,6 +142,11 @@ public class QueueEntryImpl implements QueueEntry return _message; } + public Long getMessageId() + { + return _messageId; + } + public long getSize() { return _messageSize; @@ -245,12 +252,12 @@ public class QueueEntryImpl implements QueueEntry public ContentHeaderBody getContentHeaderBody() throws AMQException { - return _message.getContentHeaderBody(); + return getMessage().getContentHeaderBody(); } public boolean isPersistent() throws AMQException { - return _message.isPersistent(); + return _persistent; } public boolean isRedelivered() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 9e9895c53b..63ec56c1af 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -417,7 +417,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener deliverAsync(); } - _managedObject.checkForNotification(entry.getMessage()); + _managedObject.checkForNotification(entry); return entry; } @@ -567,10 +567,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { - AMQMessage msg = entry.getMessage(); - if (msg.isPersistent()) + if (entry.isPersistent()) { - _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId()); + _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, entry.getMessageId()); } } @@ -761,7 +760,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); + final long messageId = entry.getMessageId(); return messageId >= fromMessageId && messageId <= toMessageId; } @@ -780,7 +779,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - _complete = entry.getMessage().getMessageId() == messageId; + _complete = entry.getMessageId() == messageId; return _complete; } @@ -829,7 +828,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); + final long messageId = entry.getMessageId(); return (messageId >= fromMessageId) && (messageId <= toMessageId) && entry.acquire(); @@ -848,11 +847,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Move the messages in the transaction log. for (QueueEntry entry : entries) { - AMQMessage message = entry.getMessage(); - - if (message.isPersistent() && toQueue.isDurable()) + if (entry.isPersistent() && toQueue.isDurable()) { - transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); + transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId()); } // dequeue will remove the messages from the queue entry.dequeue(storeContext); @@ -887,6 +884,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener for (QueueEntry entry : entries) { toQueue.enqueue(storeContext, entry.getMessage()); + // As we only did a dequeue above now that we have moved the message we should perform a delete. + // We cannot do this earlier as the message will be lost if flowed. + //entry.delete(); } } catch (MessageCleanupException e) @@ -913,7 +913,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); + final long messageId = entry.getMessageId(); if ((messageId >= fromMessageId) && (messageId <= toMessageId)) { @@ -939,11 +939,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Move the messages in on the transaction log. for (QueueEntry entry : entries) { - AMQMessage message = entry.getMessage(); - - if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable()) + if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable()) { - transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); + transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId()); } } @@ -1002,7 +1000,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { QueueEntry node = queueListIterator.getNode(); - final long messageId = node.getMessage().getMessageId(); + final long messageId = node.getMessageId(); if ((messageId >= fromMessageId) && (messageId <= toMessageId) @@ -1438,7 +1436,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else { - _managedObject.checkForNotification(node.getMessage()); + _managedObject.checkForNotification(node); } } @@ -1605,7 +1603,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener for (int i = 0; i < num && !it.atTail(); i++) { it.advance(); - ids.add(it.getNode().getMessage().getMessageId()); + ids.add(it.getNode().getMessageId()); } return ids; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index d3e69fc1fa..bc1f56fee1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -387,6 +387,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage //todo - client id should be recoreded and this test removed but handled below if (_noLocal) { + //todo getPublisherClientInstance should be moved to QueueEntryImpl final Object publisherId = entry.getMessage().getPublisherClientInstance(); // We don't want local messages so check to see if message is one we sent @@ -407,6 +408,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage //todo - client id should be recoreded and this test removed but handled here + //todo getPublisherIdentifier should be moved to QueueEntryImpl if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier())) { return false; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 0ca8135f71..2f27e1405a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -127,9 +127,9 @@ public class NonTransactionalContext implements TransactionalContext { if (debug) { - _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessageId()); } - if(queueEntry.getMessage().isPersistent()) + if(queueEntry.isPersistent()) { beginTranIfNecessary(); } @@ -175,9 +175,9 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { - _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessageId()); } - if(queueEntry.getMessage().isPersistent()) + if(queueEntry.isPersistent()) { beginTranIfNecessary(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java index a8dd58ca83..7fe16062fc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java @@ -172,7 +172,7 @@ public class Move extends AbstractCommand { for (QueueEntry msg : messages) { - ids.add(msg.getMessage().getMessageId()); + ids.add(msg.getMessageId()); } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java index 2a97db6066..3ab127b59d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java @@ -87,7 +87,7 @@ public class ExtractResendAndRequeueTest extends TestCase while(queueEntries.advance()) { QueueEntry entry = queueEntries.getNode(); - _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry); + _unacknowledgedMessageMap.add(entry.getMessageId(), entry); // Store the entry for future inspection _referenceList.add(entry); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index c50770d7ba..4c9de73a0b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -215,6 +215,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase return null; //To change body of implemented methods use File | Settings | File Templates. } + public Long getMessageId() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public long getSize() { return 0; //To change body of implemented methods use File | Settings | File Templates. diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index f8544a33bd..890b641540 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -100,7 +100,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -140,7 +140,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -159,7 +159,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -198,7 +198,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -217,7 +217,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -236,7 +236,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -254,7 +254,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -294,7 +294,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -312,7 +312,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -352,7 +352,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -384,7 +384,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -425,7 +425,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -464,7 +464,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 800bb1ac9c..d7844730d1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -68,17 +68,17 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest ArrayList<QueueEntry> msgs = _subscription.getQueueEntries(); try { - assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId()); - assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessage().getMessageId()); - assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessage().getMessageId()); + assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessageId()); + assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessageId()); + assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessageId()); - assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessage().getMessageId()); - assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessage().getMessageId()); - assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessage().getMessageId()); + assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessageId()); + assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessageId()); + assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessageId()); - assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessage().getMessageId()); - assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessage().getMessageId()); - assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessage().getMessageId()); + assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessageId()); + assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessageId()); + assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessageId()); } catch (AssertionFailedError afe) { |
