summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java99
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java84
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java10
6 files changed, 98 insertions, 111 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index e0c181a5fc..db32f13d8d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -30,20 +30,17 @@ import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.queue.AMQQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
/**
* A deliverable message.
*/
-public class AMQMessage implements ServerMessage
+public class AMQMessage extends AbstractServerMessageImpl
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- private final AtomicInteger _referenceCount = new AtomicInteger(0);
-
/** Flag to indicate that this message requires 'immediate' delivery. */
private static final byte IMMEDIATE = 0x01;
@@ -76,6 +73,8 @@ public class AMQMessage implements ServerMessage
public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
{
+ super(handle);
+
_handle = handle;
final MessageMetaData metaData = handle.getMetaData();
_size = metaData.getContentSize();
@@ -89,12 +88,6 @@ public class AMQMessage implements ServerMessage
_channelRef = channelRef;
}
-
- public String debugIdentity()
- {
- return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
- }
-
public void setExpiration(final long expiration)
{
@@ -102,11 +95,6 @@ public class AMQMessage implements ServerMessage
}
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
-
public MessageMetaData getMessageMetaData()
{
return _handle.getMetaData();
@@ -117,88 +105,12 @@ public class AMQMessage implements ServerMessage
return getMessageMetaData().getContentHeaderBody();
}
-
-
public Long getMessageId()
{
return _handle.getMessageNumber();
}
/**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
- * operation.
- */
- public AMQMessage takeReference()
- {
- incrementReference(); // _referenceCount.incrementAndGet();
-
- return this;
- }
-
- public boolean incrementReference()
- {
- return incrementReference(1);
- }
-
- /* Threadsafe. Increment the reference count on the message. */
- public boolean incrementReference(int count)
- {
-
- if(_referenceCount.addAndGet(count) <= 0)
- {
- _referenceCount.addAndGet(-count);
- return false;
- }
- else
- {
- return true;
- }
-
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- *
- *
- * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
- * failed
- */
- public void decrementReference()
- {
- int count = _referenceCount.decrementAndGet();
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
-
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_handle != null)
- {
- _handle.remove();
-
- }
- }
- else
- {
- if (count < 0)
- {
- throw new RuntimeException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
- }
- }
- }
-
-
- /**
* Called selectors to determin if the message has already been sent
*
* @return _deliveredToConsumer
@@ -323,10 +235,7 @@ public class AMQMessage implements ServerMessage
public String toString()
{
- // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- // _taken + " by :" + _takenBySubcription;
-
- return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
+ return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + getReferenceCount();
}
public int getContent(ByteBuffer buf, int offset)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
new file mode 100644
index 0000000000..186bb8601c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -0,0 +1,84 @@
+package org.apache.qpid.server.message;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.store.StoredMessage;
+
+public abstract class AbstractServerMessageImpl implements ServerMessage
+{
+ private final AtomicInteger _referenceCount = new AtomicInteger(0);
+ private final StoredMessage<?> _handle;
+
+ public AbstractServerMessageImpl(StoredMessage<?> handle)
+ {
+ _handle = handle;
+ }
+
+ public boolean incrementReference()
+ {
+ return incrementReference(1);
+ }
+
+ public boolean incrementReference(int count)
+ {
+ if(_referenceCount.addAndGet(count) <= 0)
+ {
+ _referenceCount.addAndGet(-count);
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ *
+ *
+ * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
+ */
+ public void decrementReference()
+ {
+ int count = _referenceCount.decrementAndGet();
+
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (count == 0)
+ {
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _referenceCount.set(Integer.MIN_VALUE/2);
+
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ if (_handle != null)
+ {
+ _handle.remove();
+ }
+ }
+ else
+ {
+ if (count < 0)
+ {
+ throw new RuntimeException("Reference count for message id " + debugIdentity()
+ + " has gone below 0.");
+ }
+ }
+ }
+
+ public String debugIdentity()
+ {
+ return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")";
+ }
+
+ protected int getReferenceCount()
+ {
+ return _referenceCount.get();
+ }
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
index 08006435f8..51841e6dd0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
@@ -29,18 +29,14 @@ import java.nio.ByteBuffer;
import java.lang.ref.WeakReference;
-public class MessageTransferMessage implements InboundMessage, ServerMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl implements InboundMessage
{
-
-
private StoredMessage<MessageMetaData_0_10> _storeMessage;
-
-
private WeakReference<Session> _sessionRef;
public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
{
-
+ super(storeMessage);
_storeMessage = storeMessage;
_sessionRef = sessionRef;
}
@@ -145,5 +141,4 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
{
return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
}
-
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java b/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
index ed189c49c4..cb44f80b91 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
@@ -29,11 +29,11 @@ public class TransferMessageReference extends MessageReference<MessageTransferMe
protected void onReference(MessageTransferMessage message)
{
-
+ message.incrementReference();
}
protected void onRelease(MessageTransferMessage message)
{
-
+ message.decrementReference();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 67ddd6ca77..fbc3a10e6a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -53,6 +53,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -172,6 +173,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void postCommit()
{
+ MessageReference<?> ref = message.newReference();
for(int i = 0; i < _queues.length; i++)
{
try
@@ -184,6 +186,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
throw new RuntimeException(e);
}
}
+ ref.release();
}
public void onRollback()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
index 2d41eb9899..2ffa157ca8 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
@@ -86,11 +86,7 @@ public class ReferenceCountingTest extends QpidTestCase
AMQMessage message = new AMQMessage(storedMessage);
- message = message.takeReference();
-
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
-
+ message.incrementReference();
assertEquals(1, _store.getMessageCount());
message.decrementReference();
@@ -146,12 +142,12 @@ public class ReferenceCountingTest extends QpidTestCase
AMQMessage message = new AMQMessage(storedMessage);
- message = message.takeReference();
+ message.incrementReference();
// we call routing complete to set up the handle
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertEquals(1, _store.getMessageCount());
- message = message.takeReference();
+ message.incrementReference();
message.decrementReference();
assertEquals(1, _store.getMessageCount());
}