diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-03-06 12:28:33 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-03-06 12:28:33 +0000 |
| commit | 5728d63f9bbfbc7919dbed0b79d6781a083d7f67 (patch) | |
| tree | aaf483d5b5b137ab680bd098cb2e0725a43c2046 /qpid/java/broker | |
| parent | fff143ba920066a1f127c4e69ed5f3c145f73443 (diff) | |
| download | qpid-python-5728d63f9bbfbc7919dbed0b79d6781a083d7f67.tar.gz | |
QPID-1628 : Ensured all getSize() calls are done on the QueueEntry to prevent the inadvertent loading of a flowed message with a call to getMessage()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@750873 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
9 files changed, 26 insertions, 25 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index ac3b0b5e49..5c38185696 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -89,7 +89,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap QueueEntry message = _map.remove(deliveryTag); if(message != null) { - _unackedSize -= message.getMessage().getSize(); + _unackedSize -= message.getSize(); } @@ -115,7 +115,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.put(deliveryTag, message); - _unackedSize += message.getMessage().getSize(); + _unackedSize += message.getSize(); _lastDeliveryTag = deliveryTag; } } @@ -181,7 +181,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap it.remove(); - _unackedSize -= unacked.getValue().getMessage().getSize(); + _unackedSize -= unacked.getValue().getSize(); if (unacked.getKey() == deliveryTag) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java index 0743e4bb8d..0bb428cd8f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java @@ -1,6 +1,6 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; import java.util.concurrent.atomic.AtomicLong; @@ -49,9 +49,9 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager return _bytesCredit.get() > 0L; } - public boolean useCreditForMessage(AMQMessage msg) + public boolean useCreditForMessage(QueueEntry queueEntry) { - final long msgSize = msg.getSize(); + final long msgSize = queueEntry.getSize(); if(hasCredit()) { if(_bytesCredit.addAndGet(-msgSize) >= 0) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index a249a6e63a..297e5a4826 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -1,6 +1,7 @@ package org.apache.qpid.server.flow; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; /* * @@ -40,5 +41,5 @@ public interface FlowCreditManager public boolean hasCredit(); - public boolean useCreditForMessage(AMQMessage msg); + public boolean useCreditForMessage(QueueEntry queueEntry); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java index d63431c3eb..437b7b0469 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java @@ -1,6 +1,6 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; /* * @@ -37,7 +37,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements return true; } - public boolean useCreditForMessage(AMQMessage msg) + public boolean useCreditForMessage(QueueEntry queueEntry) { return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java index 5f0acec03f..15ecb5f292 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java @@ -1,6 +1,6 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; /* * @@ -52,7 +52,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl return (_messageCredit > 0L) && ( _bytesCredit > 0L ); } - public synchronized boolean useCreditForMessage(AMQMessage msg) + public synchronized boolean useCreditForMessage(QueueEntry queueEntry) { if(_messageCredit == 0L) { @@ -61,7 +61,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl } else { - final long msgSize = msg.getSize(); + final long msgSize = queueEntry.getSize(); if(msgSize > _bytesCredit) { setSuspended(true); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java index c1b3a09006..3e28d779b1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java @@ -1,6 +1,6 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; import java.util.concurrent.atomic.AtomicLong; @@ -50,7 +50,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen return _messageCredit.get() > 0L; } - public boolean useCreditForMessage(AMQMessage msg) + public boolean useCreditForMessage(QueueEntry queueEntry) { if(hasCredit()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java index be0300f2c1..5cdd3a0328 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager { @@ -123,7 +123,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F && (_messageCreditLimit == 0L || _messageCredit > 0); } - public synchronized boolean useCreditForMessage(final AMQMessage msg) + public synchronized boolean useCreditForMessage(final QueueEntry queueEntry) { if(_messageCreditLimit != 0L) { @@ -137,10 +137,10 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } else { - if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit)) + if((_bytesCredit >= queueEntry.getSize()) || (_bytesCredit == _bytesCreditLimit)) { _messageCredit--; - _bytesCredit -= msg.getSize(); + _bytesCredit -= queueEntry.getSize(); return true; } @@ -166,9 +166,9 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } else { - if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit)) + if((_bytesCredit >= queueEntry.getSize()) || (_bytesCredit == _bytesCreditLimit)) { - _bytesCredit -= msg.getSize(); + _bytesCredit -= queueEntry.getSize(); return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 0f492a21bb..a626114792 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -129,7 +129,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
- singleMessageCredit.useCreditForMessage(entry.getMessage());
+ singleMessageCredit.useCreditForMessage(entry);
session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
deliveryTag, queue.getMessageCount());
@@ -181,9 +181,9 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
- public boolean wouldSuspend(QueueEntry msg)
+ public boolean wouldSuspend(QueueEntry queueEntry)
{
- return !getCreditManager().useCreditForMessage(msg.getMessage());
+ return !getCreditManager().useCreditForMessage(queueEntry);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 119a4b1692..d3e69fc1fa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -498,9 +498,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } - public boolean wouldSuspend(QueueEntry msg) + public boolean wouldSuspend(QueueEntry queueEntry) { - return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage()); + return !_creditManager.useCreditForMessage(queueEntry); } public void getSendLock() |
