From 42bfb186da9e911c208f22dd5f6c794b9bddd859 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 25 Jul 2014 14:24:36 +0000 Subject: QPID-4304 : [Java Broker] Add an attribute to queues - "messageDurability" - which controls whether message data is persisted or not. By default, depend on the persistence setting of the message, but allow an individual queue to declare that all (or no) messages should be persisted on the queue git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613440 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/AbstractBDBMessageStore.java | 43 ++-- .../tuple/PreparedTransactionBinding.java | 5 +- .../store/berkeleydb/BDBMessageStoreTest.java | 12 +- .../qpid/server/plugin/PluggableService.java | 2 +- .../server/message/internal/InternalMessage.java | 6 - .../java/org/apache/qpid/server/model/Queue.java | 6 + .../apache/qpid/server/queue/AbstractQueue.java | 39 +++- .../qpid/server/queue/QueueArgumentsConverter.java | 11 +- .../server/store/AbstractJDBCMessageStore.java | 88 +++------ .../qpid/server/store/MemoryMessageStore.java | 35 ++-- .../qpid/server/store/MessageDurability.java | 42 ++++ .../qpid/server/store/StoredMemoryMessage.java | 6 - .../apache/qpid/server/store/StoredMessage.java | 2 - .../qpid/server/store/TransactionLogResource.java | 4 +- .../server/txn/AsyncAutoCommitTransaction.java | 38 ++-- .../qpid/server/txn/AutoCommitTransaction.java | 36 ++-- .../java/org/apache/qpid/server/txn/DtxBranch.java | 2 +- .../apache/qpid/server/txn/LocalTransaction.java | 49 +++-- .../SynchronousMessageStoreRecoverer.java | 5 +- .../AbstractDurableConfigurationStoreTestCase.java | 4 +- .../store/MessageStoreQuotaEventsTestBase.java | 16 +- .../qpid/server/store/MessageStoreTestCase.java | 80 +++++--- .../server/txn/AsyncAutoCommitTransactionTest.java | 2 + .../qpid/server/txn/AutoCommitTransactionTest.java | 16 +- .../qpid/server/txn/LocalTransactionTest.java | 16 +- .../SynchronousMessageStoreRecovererTest.java | 2 + .../v0_10/MessageConverter_Internal_to_v0_10.java | 6 - .../protocol/v0_10/MessageConverter_v0_10.java | 6 - .../protocol/v0_10/ServerSessionDelegate.java | 6 +- .../qpid/server/protocol/v0_8/AMQChannel.java | 11 +- .../v0_8/MessageConverter_Internal_to_v0_8.java | 6 - .../server/protocol/v0_8/MockStoredMessage.java | 5 - .../protocol/v0_8/ReferenceCountingTest.java | 66 ++++++- .../protocol/v1_0/MessageConverter_to_1_0.java | 6 - .../server/protocol/v1_0/ReceivingLink_1_0.java | 2 - .../v0_10_v1_0/MessageConverter_1_0_to_v0_10.java | 6 - .../v0_8_v0_10/MessageConverter_0_10_to_0_8.java | 6 - .../v0_8_v0_10/MessageConverter_0_8_to_0_10.java | 6 - .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 6 - .../server/management/amqp/ManagementNode.java | 5 +- .../src/main/java/resources/addQueue.html | 11 ++ .../java/resources/js/qpid/management/Queue.js | 2 + .../src/main/java/resources/showQueue.html | 4 + .../java/org/apache/qpid/client/AMQSession.java | 4 +- .../org/apache/qpid/client/AMQSession_0_10.java | 63 +++--- .../server/queue/QueueMessageDurabilityTest.java | 216 +++++++++++++++++++++ .../server/store/VirtualHostMessageStoreTest.java | 5 +- qpid/java/test-profiles/CPPExcludes | 4 + qpid/java/test-profiles/JavaTransientExcludes | 2 + 49 files changed, 672 insertions(+), 349 deletions(-) create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index cf187fe1e9..be592a0d42 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -55,7 +55,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.Xid; @@ -123,14 +122,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore long newMessageId = getNextMessageId(); - if (metaData.isPersistent()) - { - return (StoredMessage) new StoredBDBMessage(newMessageId, metaData); - } - else - { - return new StoredMemoryMessage(newMessageId, metaData); - } + return new StoredBDBMessage(newMessageId, metaData); } public long getNextMessageId() @@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected abstract Logger getLogger(); - private class StoredBDBMessage implements StoredMessage + class StoredBDBMessage implements StoredMessage { private final long _messageId; @@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - @Override - public synchronized StoreFuture flushToStore() + synchronized StoreFuture flushToStore() { if(!stored()) { @@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { private Transaction _txn; private int _storeSizeIncrease; + private final List _onCommitActions = new ArrayList<>(); private BDBTransaction() throws StoreException { @@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore if(message.getStoredMessage() instanceof StoredBDBMessage) { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + _onCommitActions.add(new Runnable() + { + @Override + public void run() + { + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + }); + } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); @@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void commitTran() throws StoreException { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractBDBMessageStore.this.commitTranImpl(_txn, true); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); } + private void doPreCommitActions() + { + for(Runnable action : _onCommitActions) + { + action.run(); + } + _onCommitActions.clear(); + } + @Override public StoreFuture commitTranAsync() throws StoreException { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); } @@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void abortTran() throws StoreException { checkMessageStoreOpen(); - + _onCommitActions.clear(); AbstractBDBMessageStore.this.abortTran(_txn); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index 8fb011152c..1f4cf45ce1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -131,9 +132,9 @@ public class PreparedTransactionBinding extends TupleBinding storedMessage_0_8 = store.addMessage(messageMetaData_0_8); storedMessage_0_8.addContent(0, chunk1); - storedMessage_0_8.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); return storedMessage_0_8; } diff --git a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java index 59fe72e377..c9c3afccba 100644 --- a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java +++ b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java @@ -24,7 +24,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Retention(RetentionPolicy.SOURCE) +@Retention(RetentionPolicy.CLASS) @Target(ElementType.TYPE) public @interface PluggableService { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index fdc2fa90a5..4f2327adee 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -234,12 +234,6 @@ public class InternalMessage extends AbstractServerMessageImpl> extends ConfiguredObject @@ -35,6 +36,7 @@ public interface Queue> extends ConfiguredObject String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages"; String ALTERNATE_EXCHANGE = "alternateExchange"; String EXCLUSIVE = "exclusive"; + String MESSAGE_DURABILITY = "messageDurability"; String MESSAGE_GROUP_KEY = "messageGroupKey"; String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups"; String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup"; @@ -130,6 +132,10 @@ public interface Queue> extends ConfiguredObject @ManagedAttribute( defaultValue = "${queue.alertRepeatGap}") long getAlertRepeatGap(); + @ManagedAttribute( defaultValue = "DEFAULT" ) + MessageDurability getMessageDurability(); + + //children Collection getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 479029093f..2aeca6f45f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -77,6 +77,7 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; @@ -175,6 +176,9 @@ public abstract class AbstractQueue> @ManagedAttributeField private ExclusivityPolicy _exclusive; + @ManagedAttributeField + private MessageDurability _messageDurability; + private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name private final Set _notificationChecks = @@ -245,12 +249,38 @@ public abstract class AbstractQueue> { super.onCreate(); + if(isDurable() && (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)) + { + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction() + { + @Override + public Object run() + { + setAttribute(AbstractConfiguredObject.DURABLE, true, false); + return null; + } + }); + } - if (isDurable() && !(getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE - || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)) + if (isDurable()) { _virtualHost.getDurableConfigurationStore().create(asObjectRecord()); } + else if(getMessageDurability() != MessageDurability.NEVER) + { + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction() + { + @Override + public Object run() + { + setAttribute(Queue.MESSAGE_DURABILITY, getMessageDurability(), MessageDurability.NEVER); + return null; + } + }); + } _recovering.set(false); } @@ -510,6 +540,11 @@ public abstract class AbstractQueue> } } + @Override + public final MessageDurability getMessageDurability() + { + return _messageDurability; + } @Override public Collection getAvailableAttributes() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 15df952e61..37e82b0771 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -52,6 +52,9 @@ public class QueueArgumentsConverter public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group"; + + public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability"; + public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude"; public static final String QPID_TRACE_ID = "qpid.trace.id"; @@ -91,6 +94,7 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP); ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL); + ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY); } @@ -138,7 +142,12 @@ public class QueueArgumentsConverter { if(modelArguments.containsKey(entry.getValue())) { - wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue())); + Object value = modelArguments.get(entry.getValue()); + if(value instanceof Enum) + { + value = ((Enum) value).name(); + } + wireArguments.put(entry.getKey(), value); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 24866e4e2e..f2f85e1387 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -447,14 +447,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { checkMessageStoreOpen(); - if(metaData.isPersistent()) - { - return new StoredJDBCMessage(getNextMessageId(), metaData); - } - else - { - return new StoredMemoryMessage(getNextMessageId(), metaData); - } + return new StoredJDBCMessage(getNextMessageId(), metaData); + } @Override @@ -970,9 +964,9 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return true; + return MessageDurability.DEFAULT; } } @@ -1122,7 +1116,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { private final ConnectionWrapper _connWrapper; private int _storeSizeIncrease; - + private final List _onCommitActions = new ArrayList<>(); protected JDBCTransaction() { @@ -1144,16 +1138,23 @@ public abstract class AbstractJDBCMessageStore implements MessageStore final StoredMessage storedMessage = message.getStoredMessage(); if(storedMessage instanceof StoredJDBCMessage) { - try - { - ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); - } - catch (SQLException e) + _onCommitActions.add(new Runnable() { - throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); - } + @Override + public void run() + { + try + { + ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + catch (SQLException e) + { + throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); + } + } + }); } - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber()); } @@ -1170,7 +1171,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore public void commitTran() { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractJDBCMessageStore.this.commitTran(_connWrapper); storedSizeChange(_storeSizeIncrease); } @@ -1179,17 +1180,26 @@ public abstract class AbstractJDBCMessageStore implements MessageStore public StoreFuture commitTranAsync() { checkMessageStoreOpen(); - + doPreCommitActions(); StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); storedSizeChange(_storeSizeIncrease); return storeFuture; } + private void doPreCommitActions() + { + for(Runnable action : _onCommitActions) + { + action.run(); + } + _onCommitActions.clear(); + } + @Override public void abortTran() { checkMessageStoreOpen(); - + _onCommitActions.clear(); AbstractJDBCMessageStore.this.abortTran(_connWrapper); } @@ -1215,7 +1225,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore private final long _messageId; private final boolean _isRecovered; - private StorableMessageMetaData _metaData; private volatile SoftReference _metaDataRef; private byte[] _data; @@ -1319,39 +1328,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore return buf; } - @Override - public synchronized StoreFuture flushToStore() - { - checkMessageStoreOpen(); - - Connection conn = null; - try - { - if(!stored()) - { - conn = newConnection(); - - store(conn); - - conn.commit(); - storedSizeChange(getMetaData().getContentSize()); - } - } - catch (SQLException e) - { - if(getLogger().isDebugEnabled()) - { - getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e); - } - throw new StoreException(e); - } - finally - { - JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger()); - } - return StoreFuture.IMMEDIATE_FUTURE; - } - @Override public void remove() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index f4551aae05..f3b2cac52e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -64,6 +64,12 @@ public class MemoryMessageStore implements MessageStore @Override public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) { + + if(message.getStoredMessage() instanceof StoredMemoryMessage) + { + _messages.putIfAbsent(message.getMessageNumber(), (StoredMemoryMessage) message.getStoredMessage()); + } + Set messageIds = _localEnqueueMap.get(queue.getId()); if (messageIds == null) { @@ -196,31 +202,20 @@ public class MemoryMessageStore implements MessageStore { long id = getNextMessageId(); - if(metaData.isPersistent()) + StoredMemoryMessage storedMemoryMessage = new StoredMemoryMessage(id, metaData) { - return new StoredMemoryMessage(id, metaData) + + @Override + public void remove() { + _messages.remove(getMessageNumber()); + super.remove(); + } - @Override - public StoreFuture flushToStore() - { - _messages.putIfAbsent(getMessageNumber(), this) ; - return super.flushToStore(); - } + }; - @Override - public void remove() - { - _messages.remove(getMessageNumber()); - super.remove(); - } + return storedMemoryMessage; - }; - } - else - { - return new StoredMemoryMessage(id, metaData); - } } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java new file mode 100644 index 0000000000..4dd621bda0 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public enum MessageDurability +{ + DEFAULT(false,true), + ALWAYS(true,true), + NEVER(false,false); + + private final boolean _nonPersistent; + private final boolean _persistent; + + MessageDurability(final boolean nonPersistent, final boolean persistent) + { + _nonPersistent = nonPersistent; + _persistent = persistent; + } + + public boolean persist(final boolean persistent) + { + return persistent ? _persistent : _nonPersistent; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index f4e1376980..e1043e8807 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -122,12 +122,6 @@ public class StoredMemoryMessage implements S return buf; } - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - public T getMetaData() { return _metaData; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java index 7909003855..6beb74f4ae 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -34,7 +34,5 @@ public interface StoredMessage ByteBuffer getContent(int offsetInMessage, int size); - StoreFuture flushToStore(); - void remove(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java index 18b3125641..f5b1aa6ce7 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java @@ -24,7 +24,9 @@ import java.util.UUID; public interface TransactionLogResource { + String getName(); public UUID getId(); - boolean isDurable(); + //boolean isDurable(); + MessageDurability getMessageDurability(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 013e9f32ed..65064b015c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.txn; +import java.util.Collection; +import java.util.List; + import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; @@ -31,9 +34,6 @@ import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; -import java.util.Collection; -import java.util.List; - /** * An implementation of ServerTransaction where each enqueue/dequeue * operation takes place within it own transaction. @@ -93,7 +93,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction try { StoreFuture future; - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -162,7 +162,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction ServerMessage message = entry.getMessage(); TransactionLogResource queue = entry.getOwningResource(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -205,7 +205,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction try { StoreFuture future; - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -237,28 +237,24 @@ public class AsyncAutoCommitTransaction implements ServerTransaction try { - if(message.isPersistent()) + for(BaseQueue queue : queues) { - for(BaseQueue queue : queues) + if (queue.getMessageDurability().persist(message.isPersistent())) { - if (queue.isDurable()) + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); - } - if (txn == null) - { - txn = _messageStore.newTransaction(); - } - - txn.enqueueMessage(queue, message); + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); + } + if (txn == null) + { + txn = _messageStore.newTransaction(); + } + txn.enqueueMessage(queue, message); - } } - } + StoreFuture future; if (txn != null) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index ec3b6f69fb..2ecd847719 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.txn; +import java.util.Collection; +import java.util.List; + import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; @@ -30,9 +33,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; -import java.util.Collection; -import java.util.List; - /** * An implementation of ServerTransaction where each enqueue/dequeue * operation takes place within it own transaction. @@ -77,7 +77,7 @@ public class AutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -109,7 +109,7 @@ public class AutoCommitTransaction implements ServerTransaction ServerMessage message = entry.getMessage(); TransactionLogResource queue = entry.getOwningResource(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -146,7 +146,7 @@ public class AutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -175,25 +175,21 @@ public class AutoCommitTransaction implements ServerTransaction try { - if(message.isPersistent()) + for(BaseQueue queue : queues) { - for(BaseQueue queue : queues) + if (queue.getMessageDurability().persist(message.isPersistent())) { - if (queue.isDurable()) + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); - } - if (txn == null) - { - txn = _messageStore.newTransaction(); - } - - txn.enqueueMessage(queue, message); + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName()); + } + if (txn == null) + { + txn = _messageStore.newTransaction(); + } + txn.enqueueMessage(queue, message); - } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java index 535ad77ea4..4195c72a28 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -381,7 +381,7 @@ public class DtxBranch public boolean isDurable() { - return _message.isPersistent() && _resource.isDurable(); + return _resource.getMessageDurability().persist(_message.isPersistent()); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index b3d013c99f..e371bcdb02 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -20,21 +20,21 @@ */ package org.apache.qpid.server.txn; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.TransactionLogResource; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import org.apache.qpid.server.store.TransactionLogResource; /** * A concrete implementation of ServerTransaction where enqueue/dequeue @@ -97,7 +97,7 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { try { @@ -129,7 +129,7 @@ public class LocalTransaction implements ServerTransaction ServerMessage message = entry.getMessage(); TransactionLogResource queue = entry.getOwningResource(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) { @@ -186,7 +186,7 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); - if(message.isPersistent() && queue.isDurable()) + if(queue.getMessageDurability().persist(message.isPersistent())) { try { @@ -211,29 +211,26 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); - if(message.isPersistent()) + try { - try + for(BaseQueue queue : queues) { - for(BaseQueue queue : queues) + if(queue.getMessageDurability().persist(message.isPersistent())) { - if(queue.isDurable()) + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() ); - } + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() ); + } - beginTranIfNecessary(); - _transaction.enqueueMessage(queue, message); + beginTranIfNecessary(); + _transaction.enqueueMessage(queue, message); - } } } - catch(RuntimeException e) - { - tidyUpOnError(e); - } + } + catch(RuntimeException e) + { + tidyUpOnError(e); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java index 9fc71e23d7..becf4a073c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; @@ -220,9 +221,9 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return false; + return MessageDurability.DEFAULT; } }; txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 6b6de5b66a..4ddf421901 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -34,8 +34,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -import org.apache.qpid.server.model.ConfiguredObjectFactory; -import org.apache.qpid.server.model.VirtualHostNode; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -49,6 +47,7 @@ import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -56,6 +55,7 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 51b33276ec..be4505bc80 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.server.store; -import static org.mockito.Mockito.mock; - import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; + import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; @@ -148,12 +146,6 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple return _transactionResource; } - @Override - public boolean isDurable() - { - return true; - } - private static class TestMessage implements EnqueueableMessage { private final StoredMessage _handle; @@ -180,4 +172,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple return _handle; } } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.DEFAULT; + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index d4b990da07..758799b81f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -28,13 +28,14 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import org.hamcrest.Description; +import org.mockito.ArgumentMatcher; + import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.UUIDGenerator; @@ -45,9 +46,6 @@ import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.test.utils.QpidTestCase; -import org.hamcrest.Description; -import org.mockito.ArgumentMatcher; - public abstract class MessageStoreTestCase extends QpidTestCase { private MessageStore _store; @@ -117,8 +115,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); MessageHandler handler = mock(MessageHandler.class); _store.visitMessages(handler); @@ -127,14 +124,60 @@ public abstract class MessageStoreTestCase extends QpidTestCase } + public void enqueueMessage(final StoredMessage message, final String queueName) + { + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(new TransactionLogResource() + { + private final UUID _id = UUID.nameUUIDFromBytes(queueName.getBytes()); + + @Override + public String getName() + { + return queueName; + } + + @Override + public UUID getId() + { + return _id; + } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.DEFAULT; + } + }, new EnqueueableMessage() + { + @Override + public long getMessageNumber() + { + return message.getMessageNumber(); + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public StoredMessage getStoredMessage() + { + return message; + } + }); + txn.commitTran(); + } + public void testVisitMessagesAborted() throws Exception { int contentSize = 0; for (int i = 0; i < 3; i++) { final StoredMessage message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); } MessageHandler handler = mock(MessageHandler.class); @@ -151,16 +194,16 @@ public abstract class MessageStoreTestCase extends QpidTestCase for (int i = 0; i < 3; i++) { final StoredMessage message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); + } reopenStore(); final StoredMessage message = _store.addMessage(new TestMessageMetaData(4, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); + assertTrue("Unexpected message id " + message.getMessageNumber(), message.getMessageNumber() >= 4); } @@ -170,8 +213,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message); @@ -305,8 +346,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); MessageHandler handler = mock(MessageHandler.class); _store.visitMessages(handler); @@ -319,8 +358,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase long messageId = 1; int contentSize = 0; final StoredMessage message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); - StoreFuture flushFuture = message.flushToStore(); - flushFuture.waitForCompletion(); + enqueueMessage(message, "dummyQ"); final AtomicReference> retrievedMessageRef = new AtomicReference>(); _store.visitMessages(new MessageHandler() @@ -360,7 +398,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase TransactionLogResource queue = mock(TransactionLogResource.class); when(queue.getId()).thenReturn(queueId); when(queue.getName()).thenReturn("testQueue"); - when(queue.isDurable()).thenReturn(true); + when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); return queue; } @@ -391,8 +429,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase private EnqueueableMessage createEnqueueableMessage(long messageId1) { final StoredMessage message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0)); - StoreFuture flushFuture = message1.flushToStore(); - flushFuture.waitForCompletion(); EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1); return enqueueableMessage1; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java index 8285bdba4c..ec0908efba 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java @@ -25,6 +25,7 @@ import java.util.Collections; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; @@ -53,6 +54,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase when(_messageStore.newTransaction()).thenReturn(_storeTransaction); when(_storeTransaction.commitTranAsync()).thenReturn(_future); when(_queue.isDurable()).thenReturn(true); + when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); } public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureAlreadyComplete() throws Exception diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java index 3e1183d203..5abbd7352b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -20,22 +20,23 @@ */ package org.apache.qpid.server.txn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.MockMessageInstance; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * A unit test ensuring that AutoCommitTransaction creates a separate transaction for * each dequeue/enqueue operation that involves enlistable messages. Verifies @@ -428,6 +429,7 @@ public class AutoCommitTransactionTest extends QpidTestCase { BaseQueue queue = mock(BaseQueue.class); when(queue.isDurable()).thenReturn(durable); + when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER); return queue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index 58c7401c60..c905e52715 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -20,22 +20,23 @@ */ package org.apache.qpid.server.txn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.MockMessageInstance; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * A unit test ensuring that LocalTransactionTest creates a long-lived store transaction * that spans many dequeue/enqueue operations of enlistable messages. Verifies @@ -652,6 +653,7 @@ public class LocalTransactionTest extends QpidTestCase { BaseQueue queue = mock(BaseQueue.class); when(queue.isDurable()).thenReturn(durable); + when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER); return queue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java index 685fea207b..9c05ed564a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.NullMessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -379,6 +380,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase { AMQQueue queue = mock(AMQQueue.class); final UUID queueId = UUID.randomUUID(); + when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); when(queue.getId()).thenReturn(queueId); when(queue.getName()).thenReturn("test-queue"); when(_virtualHost.getQueue(queueId)).thenReturn(queue); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index f9bf697a0d..dfdc4e230c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -101,12 +101,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter> private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); - // Set of messages being acknowledged in the current transaction - private SortedSet _acknowledgedMessages = new TreeSet(); - private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; @@ -422,7 +416,6 @@ public class AMQChannel> else { incrementOutstandingTxnsIfNecessary(); - handle.flushToStore(); } } } @@ -1412,7 +1405,7 @@ public class AMQChannel> } finally { - _acknowledgedMessages.clear(); + _ackedMessages.clear(); } } @@ -1435,7 +1428,7 @@ public class AMQChannel> } finally { - _acknowledgedMessages.clear(); + _ackedMessages.clear(); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index 2ad6fc2ca6..b7d7e5b236 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -113,12 +113,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter return buf; } - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - public void remove() { } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index 0ff22f9d51..e0f0fc98a5 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -20,15 +20,21 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.store.MessageCounter; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.test.utils.QpidTestCase; /** @@ -85,8 +91,9 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); - storedMessage.flushToStore(); - + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); + txn.commitTran(); AMQMessage message = new AMQMessage(storedMessage); MessageReference ref = message.newReference(); @@ -151,14 +158,13 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); - storedMessage.flushToStore(); - + Transaction txn = _store.newTransaction(); + txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); + txn.commitTran(); AMQMessage message = new AMQMessage(storedMessage); MessageReference ref = message.newReference(); - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertEquals(1, getStoreMessageCount()); MessageReference ref2 = message.newReference(); @@ -166,6 +172,54 @@ public class ReferenceCountingTest extends QpidTestCase assertEquals(1, getStoreMessageCount()); } + private TransactionLogResource createTransactionLogResource(final String queueName) + { + return new TransactionLogResource() + { + @Override + public String getName() + { + return queueName; + } + + @Override + public UUID getId() + { + return UUID.nameUUIDFromBytes(queueName.getBytes()); + } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.DEFAULT; + } + }; + } + + private EnqueueableMessage createEnqueueableMessage(final StoredMessage storedMessage) + { + return new EnqueueableMessage() + { + @Override + public long getMessageNumber() + { + return storedMessage.getMessageNumber(); + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public StoredMessage getStoredMessage() + { + return storedMessage; + } + }; + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ReferenceCountingTest.class); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index b8157b8f9e..f6d849bf79 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -260,12 +260,6 @@ public abstract class MessageConverter_to_1_0 implement return buf; } - @Override - public StoreFuture flushToStore() - { - throw new UnsupportedOperationException(); - } - @Override public void remove() { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java index 3944774dfa..2e7e1cf097 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -159,8 +159,6 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv offset += bareMessageBuf.remaining(); } - storedMessage.flushToStore(); - Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference()); MessageReference reference = message.newReference(); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 3974198f62..bc2d3fe375 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -110,12 +110,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverterDurable? + + Persist Messages? + + + + Queue Type: diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js index 039437a0bf..6c0924d09b 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js @@ -277,6 +277,7 @@ define(["dojo/_base/xhr", storeNodes(["name", "state", "durable", + "messageDurability", "exclusive", "owner", "lifetimePolicy", @@ -351,6 +352,7 @@ define(["dojo/_base/xhr", this.exclusive.innerHTML = entities.encode(String(this.queueData[ "exclusive" ])); this.owner.innerHTML = this.queueData[ "owner" ] ? entities.encode(String(this.queueData[ "owner" ])) : "" ; this.lifetimePolicy.innerHTML = entities.encode(String(this.queueData[ "lifetimePolicy" ])); + this.messageDurability.innerHTML = entities.encode(String(this.queueData[ "messageDurability" ])); this.alternateExchange.innerHTML = this.queueData[ "alternateExchange" ] ? entities.encode(String(this.queueData[ "alternateExchange" ])) : "" ; this.queueDepthMessages.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"])); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html index 89b7327957..8f2bf3364d 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html @@ -32,6 +32,10 @@
Durable:
+
+
Persist Messages:
+
+
Exclusive:
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a37b532617..3316ae801b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -118,7 +118,7 @@ public abstract class AMQSession arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name()); + amqSession.createQueue(new AMQShortString(DURABLE_ALWAYS_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name()); + amqSession.createQueue(new AMQShortString(DURABLE_NEVER_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name()); + amqSession.createQueue(new AMQShortString(DURABLE_DEFAULT_PERSIST_NAME), false, true, false, arguments); + + arguments = new HashMap<>(); + arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name()); + amqSession.createQueue(new AMQShortString(NONDURABLE_ALWAYS_PERSIST_NAME), false, false, false, arguments); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_ALWAYS_PERSIST_NAME), + AMQShortString.valueOf("Y.*.*.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_NEVER_PERSIST_NAME), + AMQShortString.valueOf("*.Y.*.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(DURABLE_DEFAULT_PERSIST_NAME), + AMQShortString.valueOf("*.*.Y.*"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + + amqSession.bindQueue(AMQShortString.valueOf(NONDURABLE_ALWAYS_PERSIST_NAME), + AMQShortString.valueOf("*.*.*.Y"), + null, + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME), + null); + } + + public void testSendPersistentMessageToAll() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + conn.start(); + producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test")); + session.commit(); + + AMQSession amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + restartBroker(); + + conn = getConnection(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + + assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + } + + + public void testSendNonPersistentMessageToAll() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + conn.start(); + producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test")); + session.commit(); + + AMQSession amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + restartBroker(); + + conn = getConnection(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + amqSession = (AMQSession) session; + assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + + assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME))); + + } + + public void testNonPersistentContentRetained() throws Exception + { + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + conn.start(); + producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1")); + producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2")); + session.commit(); + MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)); + Message msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test2", ((TextMessage) msg).getText()); + session.rollback(); + restartBroker(); + conn = getConnection(); + conn.start(); + session = conn.createSession(true, Session.SESSION_TRANSACTED); + AMQSession amqSession = (AMQSession) session; + assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME))); + assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME))); + consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)); + msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test2", ((TextMessage)msg).getText()); + session.commit(); + } + + public void testPersistentContentRetainedOnTransientQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false"); + Connection conn = getConnection(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + conn.start(); + producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1")); + session.commit(); + MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)); + Message msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test1", ((TextMessage)msg).getText()); + session.commit(); + System.gc(); + consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)); + msg = consumer.receive(1000l); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("test1", ((TextMessage)msg).getText()); + session.commit(); + } + + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index 2c38a04895..8550c804a6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; -import java.io.IOException; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; @@ -34,6 +33,8 @@ import java.util.Map; import javax.security.auth.Subject; +import org.codehaus.jackson.map.ObjectMapper; + import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -77,7 +78,6 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; -import org.codehaus.jackson.map.ObjectMapper; /** * @@ -604,7 +604,6 @@ public class VirtualHostMessageStoreTest extends QpidTestCase MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis()); final StoredMessage storedMessage = _virtualHost.getMessageStore().addMessage(mmd); - storedMessage.flushToStore(); final AMQMessage currentMessage = new AMQMessage(storedMessage); diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 9e9708bf3c..3d87da11c8 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -193,3 +193,7 @@ org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide // Exclude java broker specific behavior allowing queue re-bind to topic exchanges on 0.8/0-10 paths org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange + +// Tests queue message durability settings which are a Java Broker specific feature +org.apache.qpid.server.queue.QueueMessageDurabilityTest#* + diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index a50a8bd599..ff4485d599 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -33,6 +33,8 @@ org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testRecover +org.apache.qpid.server.queue.QueueMessageDurabilityTest#* + org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence -- cgit v1.2.1