diff options
| author | Robert Greig <rgreig@apache.org> | 2006-11-19 20:31:57 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2006-11-19 20:31:57 +0000 |
| commit | b78fce98ef10e47539419cd44c09b8c8e8525264 (patch) | |
| tree | f6515b549750a748c18185ed658481311710367c | |
| parent | b958d39017652d1a21a233d3058105932b8d77ac (diff) | |
| download | qpid-python-b78fce98ef10e47539419cd44c09b8c8e8525264.tar.gz | |
QPID-32: sync of changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@476911 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 25 insertions, 6 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 77997a3685..0862588a0d 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -82,7 +82,8 @@ <auto_register>true</auto_register> </queue> <store> - <class>org.apache.qpid.server.store.MemoryMessageStore</class> + <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>--> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> </store> <virtualhosts>${conf}/virtualhosts.xml</virtualhosts> </broker> diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java index b9be952b6c..27461f12cd 100644 --- a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java @@ -23,6 +23,7 @@ import org.apache.qpid.framing.*; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.log4j.Logger; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; @@ -32,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class AMQMessage { + private static final Logger _log = Logger.getLogger(AMQMessage.class); + /** * Used in clustering */ @@ -260,7 +263,7 @@ public class AMQMessage public boolean isAllContentReceived() throws AMQException { - return _bodyLengthReceived == _messageHandle.getBodySize(); + return _bodyLengthReceived == _contentHeaderBody.bodySize; } public long getMessageId() @@ -274,6 +277,10 @@ public class AMQMessage public void incrementReference() { _referenceCount.incrementAndGet(); + if (_log.isDebugEnabled()) + { + _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount); + } } /** @@ -289,11 +296,15 @@ public class AMQMessage // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after // the message has been passed to all queues. i.e. we are // not relying on the all the increments having taken place before the delivery manager decrements. - /*if (_referenceCount.decrementAndGet() == 0) + if (_referenceCount.decrementAndGet() == 0) { try { - _store.removeMessage(_messageId); + if (_log.isDebugEnabled()) + { + _log.debug("Ref count on message " + _messageId + " is zero; removing message"); + } + _messageHandle.removeMessage(); } catch (AMQException e) { @@ -301,7 +312,7 @@ public class AMQMessage incrementReference(); throw new MessageCleanupException(_messageId, e); } - } */ + } } public void setPublisher(AMQProtocolSession publisher) diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java index 67015021f1..b2a1cf9810 100644 --- a/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -47,6 +47,8 @@ public interface AMQMessageHandle boolean isPersistent() throws AMQException; void setPublishAndContentHeaderBody(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody) - throws AMQException; + throws AMQException; + + void removeMessage() throws AMQException; }
\ No newline at end of file diff --git a/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index 63075d980c..0b882ee169 100644 --- a/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -128,4 +128,9 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _publishBody = new WeakReference<BasicPublishBody>(publishBody); _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody); } + + public void removeMessage() throws AMQException + { + _messageStore.removeMessage(_messageId); + } } |
