diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-07-25 14:24:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-07-25 14:24:36 +0000 |
| commit | 42bfb186da9e911c208f22dd5f6c794b9bddd859 (patch) | |
| tree | 5c0d345cd36b6ef2d17f0abf39f247fc7fdc21c9 /qpid/java/broker-plugins | |
| parent | 9a08d3ffc0a21d501a33a2b318fca72f85d0c096 (diff) | |
| download | qpid-python-42bfb186da9e911c208f22dd5f6c794b9bddd859.tar.gz | |
QPID-4304 : [Java Broker] Add an attribute to queues - "messageDurability" - which controls whether message data is persisted or not. By default, depend on the persistence setting of the message, but allow an individual queue to declare that all (or no) messages should be persisted on the queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613440 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
17 files changed, 83 insertions, 77 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index f9bf697a0d..dfdc4e230c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -102,12 +102,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index 2a4aeb8b7e..ad99d14170 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -102,12 +102,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index af1bd00ae9..5adeba66b1 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -334,11 +334,7 @@ public class ServerSessionDelegate extends SessionDelegate int enqueues = serverSession.enqueue(message, instanceProperties, exchange); - if(enqueues != 0) - { - storeMessage.flushToStore(); - } - else + if(enqueues == 0) { if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 200be71187..4b37823898 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -34,8 +34,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -93,7 +91,6 @@ import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -152,9 +149,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); - // Set of messages being acknowledged in the current transaction - private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>(); - private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; @@ -422,7 +416,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>> else { incrementOutstandingTxnsIfNecessary(); - handle.flushToStore(); } } } @@ -1412,7 +1405,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } finally { - _acknowledgedMessages.clear(); + _ackedMessages.clear(); } } @@ -1435,7 +1428,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } finally { - _acknowledgedMessages.clear(); + _ackedMessages.clear(); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index 2ad6fc2ca6..b7d7e5b236 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -114,12 +114,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java index 50145e5c6d..c4de7a252b 100755 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java @@ -104,11 +104,6 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> return buf; } - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - public void remove() { } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index 0ff22f9d51..e0f0fc98a5 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -20,15 +20,21 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.store.MessageCounter; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.test.utils.QpidTestCase; /** @@ -85,8 +91,9 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); - storedMessage.flushToStore(); - + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); + txn.commitTran(); AMQMessage message = new AMQMessage(storedMessage); MessageReference ref = message.newReference(); @@ -151,14 +158,13 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); - storedMessage.flushToStore(); - + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); + txn.commitTran(); AMQMessage message = new AMQMessage(storedMessage); MessageReference ref = message.newReference(); - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertEquals(1, getStoreMessageCount()); MessageReference ref2 = message.newReference(); @@ -166,6 +172,54 @@ public class ReferenceCountingTest extends QpidTestCase assertEquals(1, getStoreMessageCount()); } + private TransactionLogResource createTransactionLogResource(final String queueName) + { + return new TransactionLogResource() + { + @Override + public String getName() + { + return queueName; + } + + @Override + public UUID getId() + { + return UUID.nameUUIDFromBytes(queueName.getBytes()); + } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.DEFAULT; + } + }; + } + + private EnqueueableMessage createEnqueueableMessage(final StoredMessage storedMessage) + { + return new EnqueueableMessage() + { + @Override + public long getMessageNumber() + { + return storedMessage.getMessageNumber(); + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public StoredMessage getStoredMessage() + { + return storedMessage; + } + }; + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ReferenceCountingTest.class); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index b8157b8f9e..f6d849bf79 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -261,12 +261,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement } @Override - public StoreFuture flushToStore() - { - throw new UnsupportedOperationException(); - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java index 3944774dfa..2e7e1cf097 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -159,8 +159,6 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv offset += bareMessageBuf.remaining(); } - storedMessage.flushToStore(); - Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference()); MessageReference<Message_1_0> reference = message.newReference(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 3974198f62..bc2d3fe375 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -111,12 +111,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java index 69479b73d6..5412a09b4c 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java @@ -210,12 +210,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java index b8073d149b..46b7c322e6 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java @@ -99,12 +99,6 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java index d6abe94f30..639c16a71a 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -115,12 +115,6 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ } @Override - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override public void remove() { throw new UnsupportedOperationException(); diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index b65181966c..2c40e536be 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -59,6 +59,7 @@ import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -1010,9 +1011,9 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return false; + return MessageDurability.NEVER; } private class ConsumedMessageInstance implements MessageInstance diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html index 9a24e23407..c32c1d3bba 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html @@ -33,6 +33,17 @@ <td><input type="checkbox" name="durable" id="formAddQueue.durable" value="durable" checked="checked" dojoType="dijit.form.CheckBox" /></td> </tr> <tr> + <td valign="top"><strong>Persist Messages? </strong></td> + <td> + <select id="formAddQueue.messageDurability" name="messageDurability" data-dojo-type="dijit.form.FilteringSelect" + data-dojo-props="name: 'messageDurability', value: '', searchAttr: 'name', placeHolder: '', value: '', required: false "> + <option value="ALWAYS">Always</option> + <option value="DEFAULT">Default</option> + <option value="NEVER">Never</option> + </select> + </td> + </tr> + <tr> <td valign="top"><strong>Queue Type: </strong></td> <td> <input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" /> diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js index 039437a0bf..6c0924d09b 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js @@ -277,6 +277,7 @@ define(["dojo/_base/xhr", storeNodes(["name", "state", "durable", + "messageDurability", "exclusive", "owner", "lifetimePolicy", @@ -351,6 +352,7 @@ define(["dojo/_base/xhr", this.exclusive.innerHTML = entities.encode(String(this.queueData[ "exclusive" ])); this.owner.innerHTML = this.queueData[ "owner" ] ? entities.encode(String(this.queueData[ "owner" ])) : "" ; this.lifetimePolicy.innerHTML = entities.encode(String(this.queueData[ "lifetimePolicy" ])); + this.messageDurability.innerHTML = entities.encode(String(this.queueData[ "messageDurability" ])); this.alternateExchange.innerHTML = this.queueData[ "alternateExchange" ] ? entities.encode(String(this.queueData[ "alternateExchange" ])) : "" ; this.queueDepthMessages.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"])); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html index 89b7327957..8f2bf3364d 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html @@ -33,6 +33,10 @@ <div class="durable" style="float:left;"></div> </div> <div style="clear:both"> + <div class="formLabel-labelCell" style="float:left; width: 150px;">Persist Messages:</div> + <div class="messageDurability" style="float:left;"></div> + </div> + <div style="clear:both"> <div class="formLabel-labelCell" style="float:left; width: 150px;">Exclusive:</div> <div class="exclusive" style="float:left;"></div> </div> |
