diff options
| author | Keith Wall <kwall@apache.org> | 2014-06-06 15:43:08 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-06-06 15:43:08 +0000 |
| commit | 39249098b7b374c5e45d7139aa8b9df3aebad385 (patch) | |
| tree | ab13b41b26d2036f5765e3a95b8692fe3903ce54 /qpid/java/broker-plugins | |
| parent | 53fd008b70676ce1382bec414bcd0d86299a4ced (diff) | |
| download | qpid-python-39249098b7b374c5e45d7139aa8b9df3aebad385.tar.gz | |
QPID-5800: [Java Broker} Refactor MessageStore implementations extracting a MessageStoreProvider interface.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600931 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
13 files changed, 230 insertions, 101 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index e1e37ad3bd..47456f2675 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -21,6 +21,10 @@ package org.apache.qpid.server.protocol.v0_8; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Set; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -39,10 +43,6 @@ import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.Set; - /** * Tests that acknowledgements are handled correctly. */ @@ -90,8 +90,6 @@ public class AckTest extends QpidTestCase { for (int i = 1; i <= count; i++) { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. MessagePublishInfo publishBody = new MessagePublishInfo() { @@ -130,13 +128,15 @@ public class AckTest extends QpidTestCase b.setDeliveryMode((byte) 2); } - // we increment the reference here since we are not delivering the messaging to any queues, which is where - // the reference is normally incremented. The test is easier to construct if we have direct access to the - // subscription + // The test is easier to construct if we have direct access to the subscription ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); - MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis()); - final StoredMessage storedMessage = _messageStore.addMessage(mmd); + + final MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis()); + + final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd); + + final StoredMessage storedMessage = result; final AMQMessage message = new AMQMessage(storedMessage); ServerTransaction txn = new AutoCommitTransaction(_messageStore); txn.enqueue(_queue, message, diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index e9c37e7b42..69b3069ddb 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -81,7 +81,8 @@ public class ReferenceCountingTest extends QpidTestCase - MessageMetaData mmd = new MessageMetaData(info, chb); + final MessageMetaData mmd = new MessageMetaData(info, chb); + StoredMessage storedMessage = _store.addMessage(mmd); storedMessage.flushToStore(); @@ -139,7 +140,8 @@ public class ReferenceCountingTest extends QpidTestCase final ContentHeaderBody chb = createPersistentContentHeader(); - MessageMetaData mmd = new MessageMetaData(info, chb); + final MessageMetaData mmd = new MessageMetaData(info, chb); + StoredMessage storedMessage = _store.addMessage(mmd); storedMessage.flushToStore(); diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index d682076350..38b4c66ebe 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -35,11 +35,21 @@ import java.util.List; import java.util.Map; import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.AbstractJDBCMessageStore; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.util.FileUtils; /** @@ -47,7 +57,8 @@ import org.apache.qpid.util.FileUtils; * mechanism. * */ -public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore +public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider, + DurableConfigurationStore { private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); @@ -72,6 +83,8 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa private String _storeLocation; private Class<Driver> _driverClass; + private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); + public DerbyMessageStore() { } @@ -239,8 +252,6 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa } } - - @Override public String getStoreLocation() { return _storeLocation; @@ -446,4 +457,81 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa { return DriverManager.getConnection(_connectionURL); } + + @Override + public MessageStore getMessageStore() + { + return _messageStoreFacade; + } + + private class MessageStoreWrapper implements MessageStore + { + + @Override + public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + DerbyMessageStore.this.openMessageStore(parent, messageStoreSettings); + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) + { + return DerbyMessageStore.this.addMessage(metaData); + } + + @Override + public boolean isPersistent() + { + return DerbyMessageStore.this.isPersistent(); + } + + @Override + public Transaction newTransaction() + { + return DerbyMessageStore.this.newTransaction(); + } + + @Override + public void closeMessageStore() + { + DerbyMessageStore.this.closeMessageStore(); + } + + @Override + public void addEventListener(final EventListener eventListener, final Event... events) + { + DerbyMessageStore.this.addEventListener(eventListener, events); + } + + @Override + public String getStoreLocation() + { + return DerbyMessageStore.this.getStoreLocation(); + } + + @Override + public void onDelete() + { + DerbyMessageStore.this.onDelete(); + } + + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + DerbyMessageStore.this.visitMessages(handler); + } + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + DerbyMessageStore.this.visitMessageInstances(handler); + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + DerbyMessageStore.this.visitDistributedTransactions(handler); + } + } + } diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java index 13c897135d..9bc3780a71 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java @@ -48,7 +48,7 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory, DurableCon @Override public MessageStore createMessageStore() { - return new DerbyMessageStore(); + return (new DerbyMessageStore()).getMessageStore(); } @Override diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java index ba7ae26292..1d35b9ef83 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -50,7 +50,7 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes @Override protected MessageStore createStore() throws Exception { - return new DerbyMessageStore(); + return (new DerbyMessageStore()).getMessageStore(); } @Override diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java index 9a2d945494..4594b7f223 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -83,7 +83,7 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return new DerbyMessageStore(); + return (new DerbyMessageStore()).getMessageStore(); } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index d70f2a3d78..ddafa83bb3 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -31,12 +31,22 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; import org.apache.qpid.server.store.AbstractJDBCMessageStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.server.util.MapValueConverter; /** @@ -44,7 +54,7 @@ import org.apache.qpid.server.util.MapValueConverter; * mechanism. * */ -public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStore +public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider { private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class); @@ -60,6 +70,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag protected String _connectionURL; private ConnectionProvider _connectionProvider; + private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); private static class JDBCDetails { @@ -331,7 +342,6 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag { } - @Override public String getStoreLocation() { return _connectionURL; @@ -428,4 +438,80 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } } + @Override + public MessageStore getMessageStore() + { + return _messageStoreFacade; + } + + private class MessageStoreWrapper implements MessageStore + { + + @Override + public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + JDBCMessageStore.this.openMessageStore(parent, messageStoreSettings); + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) + { + return JDBCMessageStore.this.addMessage(metaData); + } + + @Override + public boolean isPersistent() + { + return JDBCMessageStore.this.isPersistent(); + } + + @Override + public Transaction newTransaction() + { + return JDBCMessageStore.this.newTransaction(); + } + + @Override + public void closeMessageStore() + { + JDBCMessageStore.this.closeMessageStore(); + } + + @Override + public void addEventListener(final EventListener eventListener, final Event... events) + { + JDBCMessageStore.this.addEventListener(eventListener, events); + } + + @Override + public String getStoreLocation() + { + return JDBCMessageStore.this.getStoreLocation(); + } + + @Override + public void onDelete() + { + JDBCMessageStore.this.onDelete(); + } + + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + JDBCMessageStore.this.visitMessages(handler); + } + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + JDBCMessageStore.this.visitMessageInstances(handler); + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + JDBCMessageStore.this.visitDistributedTransactions(handler); + } + } + } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java index e1db859a98..ab7ac6c671 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java @@ -42,7 +42,7 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory, DurableConf @Override public MessageStore createMessageStore() { - return new JDBCMessageStore(); + return (new JDBCMessageStore()).getMessageStore(); } @Override diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java index 2322fa7102..1f03b7f75f 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -73,7 +73,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return new JDBCMessageStore(); + return (new JDBCMessageStore()).getMessageStore(); } private void assertTablesExist(Set<String> expectedTables, boolean exists) throws SQLException diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryConfigurationStore.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryConfigurationStore.java new file mode 100644 index 0000000000..0b0e6705d5 --- /dev/null +++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryConfigurationStore.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + + +/** A simple message store that stores the messages in a thread-safe structure in memory. */ +public class MemoryConfigurationStore extends AbstractMemoryStore +{ + public static final String TYPE = "Memory"; + +} diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index c8dd2e6e61..e69de29bb2 100644 --- a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - - -/** A simple message store that stores the messages in a thread-safe structure in memory. */ -public class MemoryMessageStore extends AbstractMemoryMessageStore -{ - public static final String TYPE = "Memory"; - -} diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java index d5d5969a47..8148ff9371 100644 --- a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java +++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java @@ -33,7 +33,7 @@ public class MemoryMessageStoreFactory implements MessageStoreFactory, DurableCo @Override public String getType() { - return MemoryMessageStore.TYPE; + return MemoryConfigurationStore.TYPE; } @Override @@ -50,7 +50,7 @@ public class MemoryMessageStoreFactory implements MessageStoreFactory, DurableCo @Override public DurableConfigurationStore createDurableConfigurationStore() { - return new MemoryMessageStore(); + return new MemoryConfigurationStore(); } @Override diff --git a/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java index 8fd3cbb1fe..e69de29bb2 100644 --- a/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java +++ b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.util.Collections; -import java.util.Map; - -public class MemoryMessageStoreTest extends MessageStoreTestCase -{ - - @Override - protected Map<String, Object> getStoreSettings() throws Exception - { - return Collections.<String, Object>emptyMap(); - } - - @Override - protected MessageStore createMessageStore() - { - return new MemoryMessageStore(); - } - - @Override - protected void reopenStore() throws Exception - { - // cannot re-open memory message store as it is not persistent - } - -} |
