summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-11-19 20:31:57 +0000
committerRobert Greig <rgreig@apache.org>2006-11-19 20:31:57 +0000
commitb78fce98ef10e47539419cd44c09b8c8e8525264 (patch)
treef6515b549750a748c18185ed658481311710367c
parentb958d39017652d1a21a233d3058105932b8d77ac (diff)
downloadqpid-python-b78fce98ef10e47539419cd44c09b8c8e8525264.tar.gz
QPID-32: sync of changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@476911 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/etc/config.xml3
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQMessage.java19
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java4
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java5
4 files changed, 25 insertions, 6 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index 77997a3685..0862588a0d 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -82,7 +82,8 @@
<auto_register>true</auto_register>
</queue>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>-->
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
</store>
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
</broker>
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
index b9be952b6c..27461f12cd 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
@@ -23,6 +23,7 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.log4j.Logger;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class AMQMessage
{
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
/**
* Used in clustering
*/
@@ -260,7 +263,7 @@ public class AMQMessage
public boolean isAllContentReceived() throws AMQException
{
- return _bodyLengthReceived == _messageHandle.getBodySize();
+ return _bodyLengthReceived == _contentHeaderBody.bodySize;
}
public long getMessageId()
@@ -274,6 +277,10 @@ public class AMQMessage
public void incrementReference()
{
_referenceCount.incrementAndGet();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount);
+ }
}
/**
@@ -289,11 +296,15 @@ public class AMQMessage
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
// the message has been passed to all queues. i.e. we are
// not relying on the all the increments having taken place before the delivery manager decrements.
- /*if (_referenceCount.decrementAndGet() == 0)
+ if (_referenceCount.decrementAndGet() == 0)
{
try
{
- _store.removeMessage(_messageId);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count on message " + _messageId + " is zero; removing message");
+ }
+ _messageHandle.removeMessage();
}
catch (AMQException e)
{
@@ -301,7 +312,7 @@ public class AMQMessage
incrementReference();
throw new MessageCleanupException(_messageId, e);
}
- } */
+ }
}
public void setPublisher(AMQProtocolSession publisher)
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
index 67015021f1..b2a1cf9810 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -47,6 +47,8 @@ public interface AMQMessageHandle
boolean isPersistent() throws AMQException;
void setPublishAndContentHeaderBody(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody)
- throws AMQException;
+ throws AMQException;
+
+ void removeMessage() throws AMQException;
} \ No newline at end of file
diff --git a/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index 63075d980c..0b882ee169 100644
--- a/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -128,4 +128,9 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
_publishBody = new WeakReference<BasicPublishBody>(publishBody);
_contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
}
+
+ public void removeMessage() throws AMQException
+ {
+ _messageStore.removeMessage(_messageId);
+ }
}