summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-06 15:43:26 +0000
committerKeith Wall <kwall@apache.org>2014-06-06 15:43:26 +0000
commit217f1785cf7def05e296990cde21f9cc74d05022 (patch)
tree205f2d3ad2daf1913d38be62fe43578c63965c4a /qpid/java
parentd635bd2f83fc8af867c61ddf19a3c99a7b5cb55c (diff)
downloadqpid-python-217f1785cf7def05e296990cde21f9cc74d05022.tar.gz
QPID-5802: [Java Broker] Temporarily remove the slow message store and quota message store
test implementations and the tests that rely on them. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600933 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java157
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStoreFactory.java48
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java344
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java44
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java393
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java115
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java72
-rw-r--r--qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory20
8 files changed, 0 insertions, 1193 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
deleted file mode 100644
index b6dd1b1b71..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.model.ConfiguredObject;
-
-public class QuotaMessageStore extends MemoryMessageStore
-{
- public static final String TYPE = "QuotaMessageStore";
- private final AtomicLong _messageId = new AtomicLong(1);
-
- private long _totalStoreSize;
- private boolean _limitBusted;
- private long _persistentSizeLowThreshold;
- private long _persistentSizeHighThreshold;
-
- private final EventManager _eventManager = new EventManager();
-
-
- @Override
- public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
- {
- Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
- _persistentSizeHighThreshold = overfullAttr == null
- ? Long.MAX_VALUE
- : overfullAttr instanceof Number
- ? ((Number)overfullAttr).longValue()
- : Long.parseLong(overfullAttr.toString());
-
- Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
-
- _persistentSizeLowThreshold = overfullAttr == null
- ? _persistentSizeHighThreshold
- : underfullAttr instanceof Number
- ? ((Number)underfullAttr).longValue()
- : Long.parseLong(underfullAttr.toString());
-
-
- if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
- {
- _persistentSizeLowThreshold = _persistentSizeHighThreshold;
- }
- }
-
-
- @Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
- {
- final long id = _messageId.getAndIncrement();
- return new StoredMemoryMessage<T>(id, metaData);
- }
-
- @Override
- public Transaction newTransaction()
- {
- return new Transaction()
- {
- private AtomicLong _storeSizeIncrease = new AtomicLong();
-
- @Override
- public StoreFuture commitTranAsync()
- {
- QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue());
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- _storeSizeIncrease.addAndGet(((MessageContentSource)message).getSize());
- }
-
- @Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- _storeSizeIncrease.addAndGet(-((MessageContentSource)message).getSize());
- }
-
- @Override
- public void commitTran()
- {
- QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue());
- }
-
- @Override
- public void abortTran()
- {
- }
-
- @Override
- public void removeXid(long format, byte[] globalId, byte[] branchId)
- {
- }
-
- @Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
- {
- }
- };
- }
-
- @Override
- public boolean isPersistent()
- {
- return true;
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
- {
- _eventManager.addEventListener(eventListener, events);
- }
-
- private void storedSizeChange(final int delta)
- {
- if(_persistentSizeHighThreshold > 0)
- {
- synchronized (this)
- {
- long newSize = _totalStoreSize += delta;
- if(!_limitBusted && newSize > _persistentSizeHighThreshold)
- {
- _limitBusted = true;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- }
- else if(_limitBusted && newSize < _persistentSizeHighThreshold)
- {
- _limitBusted = false;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- }
- }
- }
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStoreFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStoreFactory.java
deleted file mode 100644
index b4f81e2ad6..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStoreFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.store;
-
-import java.util.Map;
-
-import org.apache.qpid.server.plugin.MessageStoreFactory;
-
-public class QuotaMessageStoreFactory implements MessageStoreFactory
-{
-
- @Override
- public String getType()
- {
- return QuotaMessageStore.TYPE;
- }
-
- @Override
- public MessageStore createMessageStore()
- {
- return new QuotaMessageStore();
- }
-
- @Override
- public void validateAttributes(Map<String, Object> attributes)
- {
- }
-
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
deleted file mode 100644
index 0a686926e3..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
-import org.apache.qpid.server.plugin.MessageStoreFactory;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-
-public class SlowMessageStore implements MessageStore, DurableConfigurationStore
-{
- private static final Logger _logger = Logger.getLogger(SlowMessageStore.class);
-
- public static final String TYPE = "SLOW";
- public static final String DELAYS = "delays";
- public static final String REAL_STORE = "realStore";
-
- private static final String DEFAULT_DELAY = "default";
- private static final String PRE = "pre";
- private static final String POST = "post";
-
- private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
- private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
- private long _defaultDelay = 0L;
- private MessageStore _realMessageStore = null;
- private DurableConfigurationStore _realDurableConfigurationStore = null;
-
- private Map<EventListener, Event[]> _eventListeners = new ConcurrentHashMap<EventListener, Event[]>();
-
- @Override
- public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
- {
- if (storeSettings != null && storeSettings.get(REAL_STORE) != null)
- {
- final String realStore = (String) storeSettings.get(REAL_STORE);
- _realDurableConfigurationStore = DurableConfigurationStoreFactory.FACTORY_LOADER.get(realStore).createDurableConfigurationStore();
- _realDurableConfigurationStore.openConfigurationStore(parent, storeSettings);
- }
- }
-
- private void configureDelays(Map<String, Object> delays)
- {
-
- for(Map.Entry<String, Object> entry : delays.entrySet())
- {
- String key = entry.getKey();
- if (key.startsWith(PRE))
- {
- _preDelays.put(key.substring(PRE.length()), Long.parseLong(String.valueOf(entry.getValue())));
- }
- else if (key.startsWith(POST))
- {
- _postDelays.put(key.substring(POST.length()), Long.parseLong(String.valueOf(entry.getValue())));
- }
- }
- }
-
- private void doPostDelay(String method)
- {
- long delay = lookupDelay(_postDelays, method);
- doDelay(delay);
- }
-
- private void doPreDelay(String method)
- {
- long delay = lookupDelay(_preDelays, method);
- doDelay(delay);
- }
-
- private long lookupDelay(HashMap<String, Long> delays, String method)
- {
- Long delay = delays.get(method);
- return (delay == null) ? _defaultDelay : delay;
- }
-
- private void doDelay(long delay)
- {
- if (delay > 0)
- {
- long start = System.nanoTime();
- try
- {
-
- Thread.sleep(delay);
- }
- catch (InterruptedException e)
- {
- _logger.warn("Interrupted : " + e);
- }
-
- long slept = (System.nanoTime() - start) / 1000000;
-
- if (slept >= delay)
- {
- _logger.info("Done sleep for:" + slept+":"+delay);
- }
- else
- {
- _logger.info("Only sleep for:" + slept + " re-sleeping");
- doDelay(delay - slept);
- }
- }
- }
-
- @Override
- public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
- {
- Object delaysAttr = messageStoreSettings.get(DELAYS);
-
- @SuppressWarnings({ "unchecked" })
- Map<String,Object> delays = (delaysAttr instanceof Map) ? (Map<String,Object>) delaysAttr : Collections.<String,Object>emptyMap();
- configureDelays(delays);
-
- if (delays.containsKey(DEFAULT_DELAY))
- {
- _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY)));
- }
-
- final Object realStoreAttr = messageStoreSettings.get(REAL_STORE) == null ? MemoryConfigurationStore.TYPE : messageStoreSettings.get(REAL_STORE);
- final String realStore = (String) realStoreAttr;
- _realMessageStore = MessageStoreFactory.FACTORY_LOADER.get(realStore).createMessageStore();
-
- if (!_eventListeners.isEmpty())
- {
- for (Iterator<Map.Entry<EventListener, Event[]>> it = _eventListeners.entrySet().iterator(); it.hasNext();)
- {
- Map.Entry<EventListener, Event[]> entry = it.next();
- _realMessageStore.addEventListener(entry.getKey(), entry.getValue());
- it.remove();
- }
- }
- _realMessageStore.openMessageStore(parent, messageStoreSettings);
-
- if (_realDurableConfigurationStore == null)
- {
- _realDurableConfigurationStore = (DurableConfigurationStore) _realMessageStore;
- }
-
- }
-
- @Override
- public void closeMessageStore()
- {
- doPreDelay("close");
- _realMessageStore.closeMessageStore();
- doPostDelay("close");
- }
-
- @Override
- public void closeConfigurationStore()
- {
- _realDurableConfigurationStore.closeConfigurationStore();
- }
-
- @Override
- public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
- {
- return _realMessageStore.addMessage(metaData);
- }
-
- @Override
- public void create(ConfiguredObjectRecord record) throws StoreException
- {
- doPreDelay("create");
- _realDurableConfigurationStore.create(record);
- doPostDelay("create");
- }
-
-
- @Override
- public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
- {
- doPreDelay("remove");
- UUID[] removed = _realDurableConfigurationStore.remove(objects);
- doPostDelay("remove");
- return removed;
- }
-
- @Override
- public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
- {
- doPreDelay("update");
- _realDurableConfigurationStore.update(createIfNecessary, records);
- doPostDelay("update");
- }
-
- @Override
- public Transaction newTransaction()
- {
- doPreDelay("beginTran");
- Transaction txn = new SlowTransaction(_realMessageStore.newTransaction());
- doPostDelay("beginTran");
- return txn;
- }
-
- @Override
- public boolean isPersistent()
- {
- return _realMessageStore.isPersistent();
- }
-
- private class SlowTransaction implements Transaction
- {
- private final Transaction _underlying;
-
- private SlowTransaction(Transaction underlying)
- {
- _underlying = underlying;
- }
-
- @Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- doPreDelay("enqueueMessage");
- _underlying.enqueueMessage(queue, message);
- doPostDelay("enqueueMessage");
- }
-
- @Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- doPreDelay("dequeueMessage");
- _underlying.dequeueMessage(queue, message);
- doPostDelay("dequeueMessage");
- }
-
- @Override
- public void commitTran()
- {
- doPreDelay("commitTran");
- _underlying.commitTran();
- doPostDelay("commitTran");
- }
-
- @Override
- public StoreFuture commitTranAsync()
- {
- doPreDelay("commitTran");
- StoreFuture future = _underlying.commitTranAsync();
- doPostDelay("commitTran");
- return future;
- }
-
- @Override
- public void abortTran()
- {
- doPreDelay("abortTran");
- _underlying.abortTran();
- doPostDelay("abortTran");
- }
-
- @Override
- public void removeXid(long format, byte[] globalId, byte[] branchId)
- {
- _underlying.removeXid(format, globalId, branchId);
- }
-
- @Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
- {
- _underlying.recordXid(format, globalId, branchId, enqueues, dequeues);
- }
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
- {
- if (_realMessageStore == null)
- {
- _eventListeners .put(eventListener, events);
- }
- else
- {
- _realMessageStore.addEventListener(eventListener, events);
- }
- }
-
- @Override
- public String getStoreLocation()
- {
- return _realMessageStore.getStoreLocation();
- }
-
- @Override
- public void onDelete()
- {
- _realMessageStore.onDelete();
- }
-
- @Override
- public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
- {
- _realDurableConfigurationStore.visitConfiguredObjectRecords(handler);
- }
-
- @Override
- public void visitMessages(MessageHandler handler) throws StoreException
- {
- _realMessageStore.visitMessages(handler);
- }
-
- @Override
- public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
- {
- _realMessageStore.visitMessageInstances(handler);
- }
-
- @Override
- public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
- {
- _realMessageStore.visitDistributedTransactions(handler);
- }
-
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
deleted file mode 100644
index 62714a75fe..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.qpid.server.store;/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import java.util.Map;
-
-import org.apache.qpid.server.plugin.MessageStoreFactory;
-
-public class SlowMessageStoreFactory implements MessageStoreFactory
-{
- @Override
- public String getType()
- {
- return SlowMessageStore.TYPE;
- }
-
- @Override
- public MessageStore createMessageStore()
- {
- return new SlowMessageStore();
- }
-
- @Override
- public void validateAttributes(Map<String, Object> attributes)
- {
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
deleted file mode 100644
index 67484ce01b..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
+++ /dev/null
@@ -1,393 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestUtils;
-import org.apache.qpid.util.FileUtils;
-
-public class StoreOverfullTest extends QpidBrokerTestCase
-{
- /** Number of messages to send*/
- public static final int TEST_SIZE = 15;
-
- /** Message payload*/
- private static final byte[] BYTE_32K = new byte[32*1024];
-
- private Connection _producerConnection;
- private Connection _consumerConnection;
- private Session _producerSession;
- private Session _consumerSession;
- private MessageProducer _producer;
- private MessageConsumer _consumer;
- private Queue _queue;
-
- private String _storePath;
-
- private static final int OVERFULL_SIZE = 400000;
- private static final int UNDERFULL_SIZE = 350000;
-
- @Override
- public void setUp() throws Exception
- {
- Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
- messageStoreSettings.put(MessageStore.STORE_TYPE, QuotaMessageStore.TYPE);
- messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE);
- messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE);
-
- _storePath = TestUtils.createStoreWithVirtualHostEntry(messageStoreSettings, getBrokerConfiguration(), getTestProfileVirtualHostNodeType());
-
- super.setUp();
-
- _producerConnection = getConnection();
- _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- _producerConnection.start();
-
- _consumerConnection = getConnection();
- _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- }
-
- public void tearDown() throws Exception
- {
- try
- {
- _producerConnection.close();
- _consumerConnection.close();
- }
- finally
- {
- try
- {
- super.tearDown();
- }
- finally
- {
- if (_storePath != null)
- {
- FileUtils.delete(new File(_storePath), true);
- }
- }
- }
- }
-
- /**
- * Test:
- *
- * Send > threshold amount of data : Sender is blocked
- * Remove 90% of data : Sender is unblocked
- *
- */
- public void testCapacityExceededCausesBlock() throws Exception
- {
- AtomicInteger sentMessages = new AtomicInteger(0);
- _queue = getTestQueue();
- ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
-
- _producer = _producerSession.createProducer(_queue);
-
- MessageSender sender = sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
-
- long timeoutPoint = System.currentTimeMillis() + 20 * 1000;
-
- while(!((AMQSession<?,?>)_producerSession).isFlowBlocked())
- {
- Thread.sleep(100l);
- if(System.currentTimeMillis() > timeoutPoint)
- {
- throw new RuntimeException("Timed out waiting for session to be blocked");
- }
- }
- int sentCount = sentMessages.get();
- assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
-
- _consumer = _consumerSession.createConsumer(_queue);
- _consumerConnection.start();
-
- int mostMessages = (int) (0.9 * sentCount);
- for(int i = 0; i < mostMessages; i++)
- {
- if(_consumer.receive(1000l) == null)
- {
- break;
- }
- }
-
- long targetTime = System.currentTimeMillis() + 5000l;
- while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime)
- {
- Thread.sleep(100l);
- }
-
- assertFalse("Did not unblock on consuming messages", sentMessages.get() == sentCount);
-
- for(int i = mostMessages; i < TEST_SIZE; i++)
- {
- if(_consumer.receive(1000l) == null)
- {
- break;
- }
- }
-
- assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE);
- assertNull("Unexpected exception on message sending:" + sender.getException(), sender.getException());
- }
-
- /**
- * Two producers on different queues
- */
- public void testCapacityExceededCausesBlockTwoConnections() throws Exception
- {
- AtomicInteger sentMessages = new AtomicInteger(0);
- AtomicInteger sentMessages2 = new AtomicInteger(0);
-
- _queue = getTestQueue();
- AMQQueue queue2 = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName() + "_2");
-
- ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
- ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)queue2);
-
- _producer = _producerSession.createProducer(_queue);
-
- Connection secondProducerConnection = getConnection();
- Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer secondProducer = secondProducerSession.createProducer(queue2);
-
- sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
- sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2);
-
- long timeoutPoint = System.currentTimeMillis() + 20 * 1000;
-
- while(!((AMQSession<?,?>)_producerSession).isFlowBlocked())
- {
- Thread.sleep(100l);
- if(System.currentTimeMillis() > timeoutPoint)
- {
- throw new RuntimeException("Timed out waiting for session to be blocked");
- }
- }
- int sentCount = sentMessages.get();
- assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
-
- timeoutPoint = System.currentTimeMillis() + 20 * 1000;
-
- while(!((AMQSession<?,?>)secondProducerSession).isFlowBlocked())
- {
- Thread.sleep(100l);
- if(System.currentTimeMillis() > timeoutPoint)
- {
- throw new RuntimeException("Timed out waiting for second session to be blocked");
- }
- }
- int sentCount2 = sentMessages2.get();
- assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2);
-
- _consumer = _consumerSession.createConsumer(_queue);
- MessageConsumer consumer2 = _consumerSession.createConsumer(queue2);
- _consumerConnection.start();
-
- for(int i = 0; i < 2*TEST_SIZE; i++)
- {
- if(_consumer.receive(1000l) == null
- && consumer2.receive(1000l) == null)
- {
- break;
- }
- }
-
- assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get());
- assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get());
- }
-
- /**
- * New producers are blocked
- */
- public void testCapacityExceededCausesBlockNewConnection() throws Exception
- {
- AtomicInteger sentMessages = new AtomicInteger(0);
- AtomicInteger sentMessages2 = new AtomicInteger(0);
-
- _queue = getTestQueue();
-
- ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
-
- _producer = _producerSession.createProducer(_queue);
-
- Connection secondProducerConnection = getConnection();
- Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer secondProducer = secondProducerSession.createProducer(_queue);
-
- sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
-
- long timeoutPoint = System.currentTimeMillis() + 20 * 1000;
-
- while(!((AMQSession<?,?>)_producerSession).isFlowBlocked())
- {
- Thread.sleep(100l);
- if(System.currentTimeMillis() > timeoutPoint)
- {
- throw new RuntimeException("Timed out waiting for session to be blocked");
- }
- }
- int sentCount = sentMessages.get();
- assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
-
- sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2);
-
- timeoutPoint = System.currentTimeMillis() + 20 * 1000;
-
- while(!((AMQSession<?,?>)_producerSession).isFlowBlocked())
- {
- Thread.sleep(100l);
- if(System.currentTimeMillis() > timeoutPoint)
- {
- throw new RuntimeException("Timed out waiting for session to be blocked");
- }
- }
- int sentCount2 = sentMessages2.get();
- assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2);
-
- _consumer = _consumerSession.createConsumer(_queue);
- _consumerConnection.start();
-
- for(int i = 0; i < 2*TEST_SIZE; i++)
- {
- if(_consumer.receive(2000l) == null)
- {
- break;
- }
- }
-
- assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get());
- assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get());
-
- }
-
- private MessageSender sendMessagesAsync(final MessageProducer producer,
- final Session producerSession,
- final int numMessages,
- long sleepPeriod,
- AtomicInteger sentMessages)
- {
- MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod, sentMessages);
- new Thread(sender).start();
- return sender;
- }
-
- private class MessageSender implements Runnable
- {
- private final MessageProducer _senderProducer;
- private final Session _senderSession;
- private final int _numMessages;
- private volatile JMSException _exception;
- private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
- private long _sleepPeriod;
- private final AtomicInteger _sentMessages;
-
- public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages)
- {
- _senderProducer = producer;
- _senderSession = producerSession;
- _numMessages = numMessages;
- _sleepPeriod = sleepPeriod;
- _sentMessages = sentMessages;
- }
-
- public void run()
- {
- try
- {
- sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod, _sentMessages);
- }
- catch (JMSException e)
- {
- _exception = e;
- _exceptionThrownLatch.countDown();
- }
- }
-
- public Exception getException()
- {
- return _exception;
- }
-
- }
-
- private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages)
- throws JMSException
- {
-
- for (int msg = 0; msg < numMessages; msg++)
- {
- producer.send(nextMessage(msg, producerSession));
- sentMessages.incrementAndGet();
-
- try
- {
- ((AMQSession<?,?>)producerSession).sync();
- }
- catch (AMQException e)
- {
- _logger.error(e);
- throw new RuntimeException(e);
- }
-
- try
- {
- Thread.sleep(sleepPeriod);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- private Message nextMessage(int msg, Session producerSession) throws JMSException
- {
- BytesMessage send = producerSession.createBytesMessage();
- send.writeBytes(BYTE_32K);
- send.setIntProperty("msg", msg);
- return send;
- }
-
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
deleted file mode 100644
index 67c3f51d46..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.client.timeouts;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SlowMessageStore;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This tests that when the commit takes a long time(due to POST_COMMIT_DELAY) that the commit does not timeout
- * This test must be run in conjunction with SyncWaiteTimeoutDelay or be run with POST_COMMIT_DELAY > 30s to ensure
- * that the default value is being replaced.
- */
-public class SyncWaitDelayTest extends QpidBrokerTestCase
-{
- protected static final Logger _logger = LoggerFactory.getLogger(SyncWaitDelayTest.class);
-
- protected long POST_COMMIT_DELAY = 1000L;
- protected long SYNC_WRITE_TIMEOUT = POST_COMMIT_DELAY + 1000;
-
- protected Connection _connection;
- protected Session _session;
- protected Queue _queue;
- protected MessageConsumer _consumer;
-
- public void setUp() throws Exception
- {
- Map<String, Object> slowMessageStoreDelays = new HashMap<String,Object>();
- slowMessageStoreDelays.put("postcommitTran", POST_COMMIT_DELAY);
-
- Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
- messageStoreSettings.put(MessageStore.STORE_TYPE, SlowMessageStore.TYPE);
- messageStoreSettings.put(SlowMessageStore.DELAYS, slowMessageStoreDelays);
-
- TestUtils.createStoreWithVirtualHostEntry(messageStoreSettings, getBrokerConfiguration(), getTestProfileVirtualHostNodeType());
-
- super.setUp();
-
- //Set the syncWrite timeout to be just larger than the delay on the commitTran.
- setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT));
-
- _connection = getConnection();
-
- //Create Queue
- _queue = (Queue) getInitialContext().lookup("queue");
-
- //Create Consumer
- _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
-
- //Ensure Queue exists
- _session.createConsumer(_queue).close();
- }
-
-
- public void test() throws JMSException
- {
- MessageProducer producer = _session.createProducer(_queue);
-
- Message message = _session.createTextMessage("Message");
-
- producer.send(message);
-
- long start = System.nanoTime();
-
- _logger.info("Calling Commit");
-
- try
- {
- _session.commit();
- long end = System.nanoTime();
- long time = (end - start);
- // As we are using Nano time ensure to multiply up the millis.
- assertTrue("Commit was quickier than the built in delay:" + time, time > 1000000L * POST_COMMIT_DELAY);
- assertFalse("Commit was slower than the built in default", time > 1000000L * 1000 * 30);
- }
- catch (JMSException e)
- {
- fail(e.getMessage());
- }
-
- }
-
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
deleted file mode 100644
index 28467231ed..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.client.timeouts;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQTimeoutException;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-
-/** This tests that when the syncWrite timeout is set that it will timeout on that time rather than the default 30s. */
-public class SyncWaitTimeoutDelayTest extends SyncWaitDelayTest
-{
- protected static final Logger _logger = Logger.getLogger(SyncWaitTimeoutDelayTest.class);
-
- public void setUp() throws Exception
- {
- POST_COMMIT_DELAY = 1000L;
-
- //Set the syncWrite timeout to be less than the COMMIT Delay so we can validate that it is being applied
- SYNC_WRITE_TIMEOUT = 500L;
-
- super.setUp();
- }
-
- @Override
- public void test() throws JMSException
- {
- MessageProducer producer = _session.createProducer(_queue);
-
- Message message = _session.createTextMessage("Message");
-
- producer.send(message);
-
- _logger.info("Calling Commit");
-
- long start = System.nanoTime();
- try
- {
- _session.commit();
- fail("Commit occured even though syncWait timeout is shorter than delay in commit");
- }
- catch (JMSException e)
- {
- assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException);
- assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Exception during commit"));
- // As we are using Nano time ensure to multiply up the millis.
- assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30));
- }
-
- }
-}
diff --git a/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
deleted file mode 100644
index 9e2efc1031..0000000000
--- a/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-org.apache.qpid.server.store.SlowMessageStoreFactory
-org.apache.qpid.server.store.QuotaMessageStoreFactory