diff options
| author | Keith Wall <kwall@apache.org> | 2014-06-06 15:43:26 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-06-06 15:43:26 +0000 |
| commit | 217f1785cf7def05e296990cde21f9cc74d05022 (patch) | |
| tree | 205f2d3ad2daf1913d38be62fe43578c63965c4a /qpid/java | |
| parent | d635bd2f83fc8af867c61ddf19a3c99a7b5cb55c (diff) | |
| download | qpid-python-217f1785cf7def05e296990cde21f9cc74d05022.tar.gz | |
QPID-5802: [Java Broker] Temporarily remove the slow message store and quota message store
test implementations and the tests that rely on them.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600933 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
8 files changed, 0 insertions, 1193 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java deleted file mode 100644 index b6dd1b1b71..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.model.ConfiguredObject; - -public class QuotaMessageStore extends MemoryMessageStore -{ - public static final String TYPE = "QuotaMessageStore"; - private final AtomicLong _messageId = new AtomicLong(1); - - private long _totalStoreSize; - private boolean _limitBusted; - private long _persistentSizeLowThreshold; - private long _persistentSizeHighThreshold; - - private final EventManager _eventManager = new EventManager(); - - - @Override - public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) - { - Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); - _persistentSizeHighThreshold = overfullAttr == null - ? Long.MAX_VALUE - : overfullAttr instanceof Number - ? ((Number)overfullAttr).longValue() - : Long.parseLong(overfullAttr.toString()); - - Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); - - _persistentSizeLowThreshold = overfullAttr == null - ? _persistentSizeHighThreshold - : underfullAttr instanceof Number - ? ((Number)underfullAttr).longValue() - : Long.parseLong(underfullAttr.toString()); - - - if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) - { - _persistentSizeLowThreshold = _persistentSizeHighThreshold; - } - } - - - @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) - { - final long id = _messageId.getAndIncrement(); - return new StoredMemoryMessage<T>(id, metaData); - } - - @Override - public Transaction newTransaction() - { - return new Transaction() - { - private AtomicLong _storeSizeIncrease = new AtomicLong(); - - @Override - public StoreFuture commitTranAsync() - { - QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue()); - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - _storeSizeIncrease.addAndGet(((MessageContentSource)message).getSize()); - } - - @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - _storeSizeIncrease.addAndGet(-((MessageContentSource)message).getSize()); - } - - @Override - public void commitTran() - { - QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue()); - } - - @Override - public void abortTran() - { - } - - @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) - { - } - - @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - { - } - }; - } - - @Override - public boolean isPersistent() - { - return true; - } - - @Override - public void addEventListener(EventListener eventListener, Event... events) - { - _eventManager.addEventListener(eventListener, events); - } - - private void storedSizeChange(final int delta) - { - if(_persistentSizeHighThreshold > 0) - { - synchronized (this) - { - long newSize = _totalStoreSize += delta; - if(!_limitBusted && newSize > _persistentSizeHighThreshold) - { - _limitBusted = true; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - } - else if(_limitBusted && newSize < _persistentSizeHighThreshold) - { - _limitBusted = false; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - } - } - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStoreFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStoreFactory.java deleted file mode 100644 index b4f81e2ad6..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStoreFactory.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.store; - -import java.util.Map; - -import org.apache.qpid.server.plugin.MessageStoreFactory; - -public class QuotaMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public String getType() - { - return QuotaMessageStore.TYPE; - } - - @Override - public MessageStore createMessageStore() - { - return new QuotaMessageStore(); - } - - @Override - public void validateAttributes(Map<String, Object> attributes) - { - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java deleted file mode 100644 index 0a686926e3..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; -import org.apache.qpid.server.plugin.MessageStoreFactory; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; - -public class SlowMessageStore implements MessageStore, DurableConfigurationStore -{ - private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); - - public static final String TYPE = "SLOW"; - public static final String DELAYS = "delays"; - public static final String REAL_STORE = "realStore"; - - private static final String DEFAULT_DELAY = "default"; - private static final String PRE = "pre"; - private static final String POST = "post"; - - private HashMap<String, Long> _preDelays = new HashMap<String, Long>(); - private HashMap<String, Long> _postDelays = new HashMap<String, Long>(); - private long _defaultDelay = 0L; - private MessageStore _realMessageStore = null; - private DurableConfigurationStore _realDurableConfigurationStore = null; - - private Map<EventListener, Event[]> _eventListeners = new ConcurrentHashMap<EventListener, Event[]>(); - - @Override - public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) - { - if (storeSettings != null && storeSettings.get(REAL_STORE) != null) - { - final String realStore = (String) storeSettings.get(REAL_STORE); - _realDurableConfigurationStore = DurableConfigurationStoreFactory.FACTORY_LOADER.get(realStore).createDurableConfigurationStore(); - _realDurableConfigurationStore.openConfigurationStore(parent, storeSettings); - } - } - - private void configureDelays(Map<String, Object> delays) - { - - for(Map.Entry<String, Object> entry : delays.entrySet()) - { - String key = entry.getKey(); - if (key.startsWith(PRE)) - { - _preDelays.put(key.substring(PRE.length()), Long.parseLong(String.valueOf(entry.getValue()))); - } - else if (key.startsWith(POST)) - { - _postDelays.put(key.substring(POST.length()), Long.parseLong(String.valueOf(entry.getValue()))); - } - } - } - - private void doPostDelay(String method) - { - long delay = lookupDelay(_postDelays, method); - doDelay(delay); - } - - private void doPreDelay(String method) - { - long delay = lookupDelay(_preDelays, method); - doDelay(delay); - } - - private long lookupDelay(HashMap<String, Long> delays, String method) - { - Long delay = delays.get(method); - return (delay == null) ? _defaultDelay : delay; - } - - private void doDelay(long delay) - { - if (delay > 0) - { - long start = System.nanoTime(); - try - { - - Thread.sleep(delay); - } - catch (InterruptedException e) - { - _logger.warn("Interrupted : " + e); - } - - long slept = (System.nanoTime() - start) / 1000000; - - if (slept >= delay) - { - _logger.info("Done sleep for:" + slept+":"+delay); - } - else - { - _logger.info("Only sleep for:" + slept + " re-sleeping"); - doDelay(delay - slept); - } - } - } - - @Override - public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) - { - Object delaysAttr = messageStoreSettings.get(DELAYS); - - @SuppressWarnings({ "unchecked" }) - Map<String,Object> delays = (delaysAttr instanceof Map) ? (Map<String,Object>) delaysAttr : Collections.<String,Object>emptyMap(); - configureDelays(delays); - - if (delays.containsKey(DEFAULT_DELAY)) - { - _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY))); - } - - final Object realStoreAttr = messageStoreSettings.get(REAL_STORE) == null ? MemoryConfigurationStore.TYPE : messageStoreSettings.get(REAL_STORE); - final String realStore = (String) realStoreAttr; - _realMessageStore = MessageStoreFactory.FACTORY_LOADER.get(realStore).createMessageStore(); - - if (!_eventListeners.isEmpty()) - { - for (Iterator<Map.Entry<EventListener, Event[]>> it = _eventListeners.entrySet().iterator(); it.hasNext();) - { - Map.Entry<EventListener, Event[]> entry = it.next(); - _realMessageStore.addEventListener(entry.getKey(), entry.getValue()); - it.remove(); - } - } - _realMessageStore.openMessageStore(parent, messageStoreSettings); - - if (_realDurableConfigurationStore == null) - { - _realDurableConfigurationStore = (DurableConfigurationStore) _realMessageStore; - } - - } - - @Override - public void closeMessageStore() - { - doPreDelay("close"); - _realMessageStore.closeMessageStore(); - doPostDelay("close"); - } - - @Override - public void closeConfigurationStore() - { - _realDurableConfigurationStore.closeConfigurationStore(); - } - - @Override - public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData) - { - return _realMessageStore.addMessage(metaData); - } - - @Override - public void create(ConfiguredObjectRecord record) throws StoreException - { - doPreDelay("create"); - _realDurableConfigurationStore.create(record); - doPostDelay("create"); - } - - - @Override - public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException - { - doPreDelay("remove"); - UUID[] removed = _realDurableConfigurationStore.remove(objects); - doPostDelay("remove"); - return removed; - } - - @Override - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException - { - doPreDelay("update"); - _realDurableConfigurationStore.update(createIfNecessary, records); - doPostDelay("update"); - } - - @Override - public Transaction newTransaction() - { - doPreDelay("beginTran"); - Transaction txn = new SlowTransaction(_realMessageStore.newTransaction()); - doPostDelay("beginTran"); - return txn; - } - - @Override - public boolean isPersistent() - { - return _realMessageStore.isPersistent(); - } - - private class SlowTransaction implements Transaction - { - private final Transaction _underlying; - - private SlowTransaction(Transaction underlying) - { - _underlying = underlying; - } - - @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - doPreDelay("enqueueMessage"); - _underlying.enqueueMessage(queue, message); - doPostDelay("enqueueMessage"); - } - - @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - doPreDelay("dequeueMessage"); - _underlying.dequeueMessage(queue, message); - doPostDelay("dequeueMessage"); - } - - @Override - public void commitTran() - { - doPreDelay("commitTran"); - _underlying.commitTran(); - doPostDelay("commitTran"); - } - - @Override - public StoreFuture commitTranAsync() - { - doPreDelay("commitTran"); - StoreFuture future = _underlying.commitTranAsync(); - doPostDelay("commitTran"); - return future; - } - - @Override - public void abortTran() - { - doPreDelay("abortTran"); - _underlying.abortTran(); - doPostDelay("abortTran"); - } - - @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) - { - _underlying.removeXid(format, globalId, branchId); - } - - @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - { - _underlying.recordXid(format, globalId, branchId, enqueues, dequeues); - } - } - - @Override - public void addEventListener(EventListener eventListener, Event... events) - { - if (_realMessageStore == null) - { - _eventListeners .put(eventListener, events); - } - else - { - _realMessageStore.addEventListener(eventListener, events); - } - } - - @Override - public String getStoreLocation() - { - return _realMessageStore.getStoreLocation(); - } - - @Override - public void onDelete() - { - _realMessageStore.onDelete(); - } - - @Override - public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException - { - _realDurableConfigurationStore.visitConfiguredObjectRecords(handler); - } - - @Override - public void visitMessages(MessageHandler handler) throws StoreException - { - _realMessageStore.visitMessages(handler); - } - - @Override - public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException - { - _realMessageStore.visitMessageInstances(handler); - } - - @Override - public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException - { - _realMessageStore.visitDistributedTransactions(handler); - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java deleted file mode 100644 index 62714a75fe..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -package org.apache.qpid.server.store;/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -import java.util.Map; - -import org.apache.qpid.server.plugin.MessageStoreFactory; - -public class SlowMessageStoreFactory implements MessageStoreFactory -{ - @Override - public String getType() - { - return SlowMessageStore.TYPE; - } - - @Override - public MessageStore createMessageStore() - { - return new SlowMessageStore(); - } - - @Override - public void validateAttributes(Map<String, Object> attributes) - { - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java deleted file mode 100644 index 67484ce01b..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java +++ /dev/null @@ -1,393 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.test.utils.TestUtils; -import org.apache.qpid.util.FileUtils; - -public class StoreOverfullTest extends QpidBrokerTestCase -{ - /** Number of messages to send*/ - public static final int TEST_SIZE = 15; - - /** Message payload*/ - private static final byte[] BYTE_32K = new byte[32*1024]; - - private Connection _producerConnection; - private Connection _consumerConnection; - private Session _producerSession; - private Session _consumerSession; - private MessageProducer _producer; - private MessageConsumer _consumer; - private Queue _queue; - - private String _storePath; - - private static final int OVERFULL_SIZE = 400000; - private static final int UNDERFULL_SIZE = 350000; - - @Override - public void setUp() throws Exception - { - Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); - messageStoreSettings.put(MessageStore.STORE_TYPE, QuotaMessageStore.TYPE); - messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE); - messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE); - - _storePath = TestUtils.createStoreWithVirtualHostEntry(messageStoreSettings, getBrokerConfiguration(), getTestProfileVirtualHostNodeType()); - - super.setUp(); - - _producerConnection = getConnection(); - _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _producerConnection.start(); - - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - } - - public void tearDown() throws Exception - { - try - { - _producerConnection.close(); - _consumerConnection.close(); - } - finally - { - try - { - super.tearDown(); - } - finally - { - if (_storePath != null) - { - FileUtils.delete(new File(_storePath), true); - } - } - } - } - - /** - * Test: - * - * Send > threshold amount of data : Sender is blocked - * Remove 90% of data : Sender is unblocked - * - */ - public void testCapacityExceededCausesBlock() throws Exception - { - AtomicInteger sentMessages = new AtomicInteger(0); - _queue = getTestQueue(); - ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); - - _producer = _producerSession.createProducer(_queue); - - MessageSender sender = sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); - - long timeoutPoint = System.currentTimeMillis() + 20 * 1000; - - while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) - { - Thread.sleep(100l); - if(System.currentTimeMillis() > timeoutPoint) - { - throw new RuntimeException("Timed out waiting for session to be blocked"); - } - } - int sentCount = sentMessages.get(); - assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - int mostMessages = (int) (0.9 * sentCount); - for(int i = 0; i < mostMessages; i++) - { - if(_consumer.receive(1000l) == null) - { - break; - } - } - - long targetTime = System.currentTimeMillis() + 5000l; - while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime) - { - Thread.sleep(100l); - } - - assertFalse("Did not unblock on consuming messages", sentMessages.get() == sentCount); - - for(int i = mostMessages; i < TEST_SIZE; i++) - { - if(_consumer.receive(1000l) == null) - { - break; - } - } - - assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE); - assertNull("Unexpected exception on message sending:" + sender.getException(), sender.getException()); - } - - /** - * Two producers on different queues - */ - public void testCapacityExceededCausesBlockTwoConnections() throws Exception - { - AtomicInteger sentMessages = new AtomicInteger(0); - AtomicInteger sentMessages2 = new AtomicInteger(0); - - _queue = getTestQueue(); - AMQQueue queue2 = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName() + "_2"); - - ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); - ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)queue2); - - _producer = _producerSession.createProducer(_queue); - - Connection secondProducerConnection = getConnection(); - Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer secondProducer = secondProducerSession.createProducer(queue2); - - sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); - sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); - - long timeoutPoint = System.currentTimeMillis() + 20 * 1000; - - while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) - { - Thread.sleep(100l); - if(System.currentTimeMillis() > timeoutPoint) - { - throw new RuntimeException("Timed out waiting for session to be blocked"); - } - } - int sentCount = sentMessages.get(); - assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); - - timeoutPoint = System.currentTimeMillis() + 20 * 1000; - - while(!((AMQSession<?,?>)secondProducerSession).isFlowBlocked()) - { - Thread.sleep(100l); - if(System.currentTimeMillis() > timeoutPoint) - { - throw new RuntimeException("Timed out waiting for second session to be blocked"); - } - } - int sentCount2 = sentMessages2.get(); - assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); - - _consumer = _consumerSession.createConsumer(_queue); - MessageConsumer consumer2 = _consumerSession.createConsumer(queue2); - _consumerConnection.start(); - - for(int i = 0; i < 2*TEST_SIZE; i++) - { - if(_consumer.receive(1000l) == null - && consumer2.receive(1000l) == null) - { - break; - } - } - - assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get()); - assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); - } - - /** - * New producers are blocked - */ - public void testCapacityExceededCausesBlockNewConnection() throws Exception - { - AtomicInteger sentMessages = new AtomicInteger(0); - AtomicInteger sentMessages2 = new AtomicInteger(0); - - _queue = getTestQueue(); - - ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); - - _producer = _producerSession.createProducer(_queue); - - Connection secondProducerConnection = getConnection(); - Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer secondProducer = secondProducerSession.createProducer(_queue); - - sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); - - long timeoutPoint = System.currentTimeMillis() + 20 * 1000; - - while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) - { - Thread.sleep(100l); - if(System.currentTimeMillis() > timeoutPoint) - { - throw new RuntimeException("Timed out waiting for session to be blocked"); - } - } - int sentCount = sentMessages.get(); - assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); - - sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); - - timeoutPoint = System.currentTimeMillis() + 20 * 1000; - - while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) - { - Thread.sleep(100l); - if(System.currentTimeMillis() > timeoutPoint) - { - throw new RuntimeException("Timed out waiting for session to be blocked"); - } - } - int sentCount2 = sentMessages2.get(); - assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - for(int i = 0; i < 2*TEST_SIZE; i++) - { - if(_consumer.receive(2000l) == null) - { - break; - } - } - - assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get()); - assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); - - } - - private MessageSender sendMessagesAsync(final MessageProducer producer, - final Session producerSession, - final int numMessages, - long sleepPeriod, - AtomicInteger sentMessages) - { - MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod, sentMessages); - new Thread(sender).start(); - return sender; - } - - private class MessageSender implements Runnable - { - private final MessageProducer _senderProducer; - private final Session _senderSession; - private final int _numMessages; - private volatile JMSException _exception; - private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1); - private long _sleepPeriod; - private final AtomicInteger _sentMessages; - - public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) - { - _senderProducer = producer; - _senderSession = producerSession; - _numMessages = numMessages; - _sleepPeriod = sleepPeriod; - _sentMessages = sentMessages; - } - - public void run() - { - try - { - sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod, _sentMessages); - } - catch (JMSException e) - { - _exception = e; - _exceptionThrownLatch.countDown(); - } - } - - public Exception getException() - { - return _exception; - } - - } - - private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) - throws JMSException - { - - for (int msg = 0; msg < numMessages; msg++) - { - producer.send(nextMessage(msg, producerSession)); - sentMessages.incrementAndGet(); - - try - { - ((AMQSession<?,?>)producerSession).sync(); - } - catch (AMQException e) - { - _logger.error(e); - throw new RuntimeException(e); - } - - try - { - Thread.sleep(sleepPeriod); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } - - private Message nextMessage(int msg, Session producerSession) throws JMSException - { - BytesMessage send = producerSession.createBytesMessage(); - send.writeBytes(BYTE_32K); - send.setIntProperty("msg", msg); - return send; - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java deleted file mode 100644 index 67c3f51d46..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.client.timeouts; - -import java.util.HashMap; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SlowMessageStore; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.test.utils.TestUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This tests that when the commit takes a long time(due to POST_COMMIT_DELAY) that the commit does not timeout - * This test must be run in conjunction with SyncWaiteTimeoutDelay or be run with POST_COMMIT_DELAY > 30s to ensure - * that the default value is being replaced. - */ -public class SyncWaitDelayTest extends QpidBrokerTestCase -{ - protected static final Logger _logger = LoggerFactory.getLogger(SyncWaitDelayTest.class); - - protected long POST_COMMIT_DELAY = 1000L; - protected long SYNC_WRITE_TIMEOUT = POST_COMMIT_DELAY + 1000; - - protected Connection _connection; - protected Session _session; - protected Queue _queue; - protected MessageConsumer _consumer; - - public void setUp() throws Exception - { - Map<String, Object> slowMessageStoreDelays = new HashMap<String,Object>(); - slowMessageStoreDelays.put("postcommitTran", POST_COMMIT_DELAY); - - Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); - messageStoreSettings.put(MessageStore.STORE_TYPE, SlowMessageStore.TYPE); - messageStoreSettings.put(SlowMessageStore.DELAYS, slowMessageStoreDelays); - - TestUtils.createStoreWithVirtualHostEntry(messageStoreSettings, getBrokerConfiguration(), getTestProfileVirtualHostNodeType()); - - super.setUp(); - - //Set the syncWrite timeout to be just larger than the delay on the commitTran. - setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT)); - - _connection = getConnection(); - - //Create Queue - _queue = (Queue) getInitialContext().lookup("queue"); - - //Create Consumer - _session = _connection.createSession(true, Session.SESSION_TRANSACTED); - - //Ensure Queue exists - _session.createConsumer(_queue).close(); - } - - - public void test() throws JMSException - { - MessageProducer producer = _session.createProducer(_queue); - - Message message = _session.createTextMessage("Message"); - - producer.send(message); - - long start = System.nanoTime(); - - _logger.info("Calling Commit"); - - try - { - _session.commit(); - long end = System.nanoTime(); - long time = (end - start); - // As we are using Nano time ensure to multiply up the millis. - assertTrue("Commit was quickier than the built in delay:" + time, time > 1000000L * POST_COMMIT_DELAY); - assertFalse("Commit was slower than the built in default", time > 1000000L * 1000 * 30); - } - catch (JMSException e) - { - fail(e.getMessage()); - } - - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java deleted file mode 100644 index 28467231ed..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.client.timeouts; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQTimeoutException; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; - -/** This tests that when the syncWrite timeout is set that it will timeout on that time rather than the default 30s. */ -public class SyncWaitTimeoutDelayTest extends SyncWaitDelayTest -{ - protected static final Logger _logger = Logger.getLogger(SyncWaitTimeoutDelayTest.class); - - public void setUp() throws Exception - { - POST_COMMIT_DELAY = 1000L; - - //Set the syncWrite timeout to be less than the COMMIT Delay so we can validate that it is being applied - SYNC_WRITE_TIMEOUT = 500L; - - super.setUp(); - } - - @Override - public void test() throws JMSException - { - MessageProducer producer = _session.createProducer(_queue); - - Message message = _session.createTextMessage("Message"); - - producer.send(message); - - _logger.info("Calling Commit"); - - long start = System.nanoTime(); - try - { - _session.commit(); - fail("Commit occured even though syncWait timeout is shorter than delay in commit"); - } - catch (JMSException e) - { - assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException); - assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Exception during commit")); - // As we are using Nano time ensure to multiply up the millis. - assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30)); - } - - } -} diff --git a/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory deleted file mode 100644 index 9e2efc1031..0000000000 --- a/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -org.apache.qpid.server.store.SlowMessageStoreFactory -org.apache.qpid.server.store.QuotaMessageStoreFactory |
