diff options
Diffstat (limited to 'java')
6 files changed, 548 insertions, 6 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 25439aba94..6e886f25a8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -162,7 +162,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter private FailoverException _lastFailoverException; /** Defines the default timeout to use for synchronous protocol commands. */ - private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30); /** * Creates a new protocol handler, associated with the specified client connection instance. diff --git a/java/systests/pom.xml b/java/systests/pom.xml index 73c8fb7351..38e7f3bf34 100644 --- a/java/systests/pom.xml +++ b/java/systests/pom.xml @@ -96,6 +96,10 @@ <name>QPID_HOME</name> <value>${basedir}/${topDirectoryLocation}/broker</value> </property> + <property> + <name>amqj.logging.level</name> + <value>${amqj.logging.level}</value> + </property> </systemProperties> <excludes> diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java new file mode 100644 index 0000000000..232295811c --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -0,0 +1,282 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; + +import java.util.HashMap; +import java.util.Iterator; + +public class SlowMessageStore implements MessageStore +{ + private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); + private static final String DELAYS = "delays"; + private HashMap<String, Long> _preDelays = new HashMap<String, Long>(); + private HashMap<String, Long> _postDelays = new HashMap<String, Long>(); + private long _defaultDelay = 0L; + private MessageStore _realStore = new MemoryMessageStore(); + private static final String PRE = "pre"; + private static final String POST = "post"; + private String DEFAULT_DELAY = "default"; + + public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + { + Configuration delays = config.subset(base + "." + DELAYS); + + configureDelays(delays); + + String messageStoreClass = config.getString(base + ".store.class"); + + if (delays.containsKey(DEFAULT_DELAY)) + { + _defaultDelay = delays.getLong(DEFAULT_DELAY); + } + + if (messageStoreClass != null) + { + Class clazz = Class.forName(messageStoreClass); + + Object o = clazz.newInstance(); + + if (!(o instanceof MessageStore)) + { + throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + + " does not."); + } + _realStore = (MessageStore) o; + _realStore.configure(virtualHost, base + ".store", config); + } + else + { + _realStore.configure(virtualHost, base + ".store", config); + } + } + + private void configureDelays(Configuration config) + { + Iterator delays = config.getKeys(); + + while (delays.hasNext()) + { + String key = (String) delays.next(); + if (key.endsWith(PRE)) + { + _preDelays.put(key.substring(0, key.length() - PRE.length() - 1), config.getLong(key)); + } + else if (key.endsWith(POST)) + { + _postDelays.put(key.substring(0, key.length() - POST.length() - 1), config.getLong(key)); + } + } + } + + private void doPostDelay(String method) + { + long delay = lookupDelay(_postDelays, method); + doDelay(delay); + } + + private void doPreDelay(String method) + { + long delay = lookupDelay(_preDelays, method); + doDelay(delay); + } + + private long lookupDelay(HashMap<String, Long> delays, String method) + { + Long delay = delays.get(method); + return (delay == null) ? _defaultDelay : delay; + } + + private void doDelay(long delay) + { + if (delay > 0) + { + try + { + Thread.sleep(delay); + } + catch (InterruptedException e) + { + _logger.warn("Interrupted : " + e); + } + } + } + + // ***** MessageStore Interface. + + public void close() throws Exception + { + doPreDelay("close"); + _realStore.close(); + doPostDelay("close"); + } + + public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException + { + doPreDelay("removeMessage"); + _realStore.removeMessage(storeContext, messageId); + doPostDelay("removeMessage"); + } + + public void createExchange(Exchange exchange) throws AMQException + { + doPreDelay("createExchange"); + _realStore.createExchange(exchange); + doPostDelay("createExchange"); + } + + public void removeExchange(Exchange exchange) throws AMQException + { + doPreDelay("removeExchange"); + _realStore.removeExchange(exchange); + doPostDelay("removeExchange"); + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + doPreDelay("bindQueue"); + _realStore.bindQueue(exchange, routingKey, queue, args); + doPostDelay("bindQueue"); + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + doPreDelay("unbindQueue"); + _realStore.unbindQueue(exchange, routingKey, queue, args); + doPostDelay("unbindQueue"); + } + + public void createQueue(AMQQueue queue) throws AMQException + { + createQueue(queue, null); + } + + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + { + doPreDelay("createQueue"); + _realStore.createQueue(queue, arguments); + doPostDelay("createQueue"); + } + + public void removeQueue(AMQQueue queue) throws AMQException + { + doPreDelay("removeQueue"); + _realStore.removeQueue(queue); + doPostDelay("removeQueue"); + } + + public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + doPreDelay("enqueueMessage"); + _realStore.enqueueMessage(context, queue, messageId); + doPostDelay("enqueueMessage"); + } + + public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + doPreDelay("dequeueMessage"); + _realStore.dequeueMessage(context, queue, messageId); + doPostDelay("dequeueMessage"); + } + + public void beginTran(StoreContext context) throws AMQException + { + doPreDelay("beginTran"); + _realStore.beginTran(context); + doPostDelay("beginTran"); + } + + public void commitTran(StoreContext context) throws AMQException + { + doPreDelay("commitTran"); + _realStore.commitTran(context); + doPostDelay("commitTran"); + } + + public void abortTran(StoreContext context) throws AMQException + { + doPreDelay("abortTran"); + _realStore.abortTran(context); + doPostDelay("abortTran"); + } + + public boolean inTran(StoreContext context) + { + doPreDelay("inTran"); + boolean b = _realStore.inTran(context); + doPostDelay("inTran"); + return b; + } + + public Long getNewMessageId() + { + doPreDelay("getNewMessageId"); + Long l = _realStore.getNewMessageId(); + doPostDelay("getNewMessageId"); + return l; + } + + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + { + doPreDelay("storeContentBodyChunk"); + _realStore.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); + doPostDelay("storeContentBodyChunk"); + } + + public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + { + doPreDelay("storeMessageMetaData"); + _realStore.storeMessageMetaData(context, messageId, messageMetaData); + doPostDelay("storeMessageMetaData"); + } + + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + { + doPreDelay("getMessageMetaData"); + MessageMetaData mmd = _realStore.getMessageMetaData(context, messageId); + doPostDelay("getMessageMetaData"); + return mmd; + } + + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + { + doPreDelay("getContentBodyChunk"); + ContentChunk c = _realStore.getContentBodyChunk(context, messageId, index); + doPostDelay("getContentBodyChunk"); + return c; + } + + public boolean isPersistent() + { + return _realStore.isPersistent(); + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java new file mode 100644 index 0000000000..f2c8a5e1f5 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.client.timeouts; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.io.File; + +/** + * This tests that when the commit takes a long time(due to POST_COMMIT_DELAY) that the commit does not timeout + * This test must be run in conjunction with SyncWaiteTimeoutDelay or be run with POST_COMMIT_DELAY > 30s to ensure + * that the default value is being replaced. + */ +public class SyncWaitDelayTest extends QpidTestCase +{ + protected static final Logger _logger = LoggerFactory.getLogger(SyncWaitDelayTest.class); + + final String QpidHome = System.getProperty("QPID_HOME"); + final File _configFile = new File(QpidHome, "etc/config.xml"); + + private String VIRTUALHOST = "test"; + protected long POST_COMMIT_DELAY = 1000L; + protected long SYNC_WRITE_TIMEOUT = POST_COMMIT_DELAY + 1000; + + protected Connection _connection; + protected Session _session; + protected Queue _queue; + protected MessageConsumer _consumer; + + public void setUp() throws Exception + { + super.setUp(); + stopBroker(); + if (!_configFile.exists()) + { + fail("Unable to test without config file:" + _configFile); + } + + ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(_configFile); + + //Disable management on broker. + config.getConfiguration().setProperty("management.enabled", "false"); + + Configuration testVirtualhost = config.getConfiguration().subset("virtualhosts.virtualhost." + VIRTUALHOST); + testVirtualhost.setProperty("store.class", "org.apache.qpid.server.store.SlowMessageStore"); + testVirtualhost.setProperty("store.delays.commitTran.post", POST_COMMIT_DELAY); + + startBroker(1, config); + + //Set the syncWrite timeout to be just larger than the delay on the commitTran. + setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT)); + + _connection = getConnection(); + + //Create Queue + _queue = (Queue) getInitialContext().lookup("queue"); + + //Create Consumer + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + + //Ensure Queue exists + _session.createConsumer(_queue).close(); + } + + + public void test() throws JMSException + { + MessageProducer producer = _session.createProducer(_queue); + + Message message = _session.createTextMessage("Message"); + + producer.send(message); + + long start = System.nanoTime(); + + _logger.info("Calling Commit"); + + try + { + _session.commit(); + long end = System.nanoTime(); + long time = (end - start); + // As we are using Nano time ensure to multiply up the millis. + assertTrue("Commit was quickier than the delay:" + time, time > 1000000L * POST_COMMIT_DELAY); + assertFalse("Commit was to slower than the build in default", time > 1000000L * 1000 * 30); + } + catch (JMSException e) + { + fail(e.getMessage()); + } + + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java new file mode 100644 index 0000000000..2b736ed392 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.client.timeouts; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQTimeoutException; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; + +/** This tests that when the syncWrite timeout is set that it will timeout on that time rather than the default 30s. */ +public class SyncWaitTimeoutDelayTest extends SyncWaitDelayTest +{ + protected static final Logger _logger = Logger.getLogger(SyncWaitTimeoutDelayTest.class); + + public void setUp() throws Exception + { + POST_COMMIT_DELAY = 1000L; + + //Set the syncWrite timeout to be less than the COMMIT Delay so we can validate that it is being applied + SYNC_WRITE_TIMEOUT = 500L; + + super.setUp(); + } + + public void test() throws JMSException + { + MessageProducer producer = _session.createProducer(_queue); + + Message message = _session.createTextMessage("Message"); + + producer.send(message); + + _logger.info("Calling Commit"); + + long start = System.nanoTime(); + try + { + _session.commit(); + fail("Commit occured even though syncWait timeout is shorter than delay in commit"); + } + catch (JMSException e) + { + assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException); + assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit")); + // As we are using Nano time ensure to multiply up the millis. + assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30)); + } + + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index 65939e1fb7..74b3d5e049 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.StringTokenizer; import java.util.Map; +import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -35,6 +36,7 @@ import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +52,8 @@ public class QpidTestCase extends TestCase protected long RECEIVE_TIMEOUT = 1000l; + private Map<String, String> _setProperties = new HashMap<String, String>(); + /** * Some tests are excluded when the property test.excludes is set to true. * An exclusion list is either a file (prop test.excludesfile) which contains one test name @@ -131,6 +135,8 @@ public class QpidTestCase extends TestCase private static final String QPID_HOME = "QPID_HOME"; + protected int DEFAULT_VM_PORT = 1; + protected String _broker = System.getProperty(BROKER, VM); private String _brokerClean = System.getProperty(BROKER_CLEAN, null); private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); @@ -282,12 +288,26 @@ public class QpidTestCase extends TestCase } } + public void startBroker(int port, ConfigurationFileApplicationRegistry config) throws Exception + { + ApplicationRegistry.initialise(config, port); + startBroker(port); + } + public void startBroker() throws Exception { + startBroker(0); + } + + public void startBroker(int port) throws Exception + { if (_broker.equals(VM)) { + //If we are starting on port 0 use the default VM_PORT + port = port == 0 ? DEFAULT_VM_PORT : port; + // create an in_VM broker - TransportConnection.createVMBroker(1); + TransportConnection.createVMBroker(port); } else if (!_broker.equals(EXTERNAL)) { @@ -362,6 +382,11 @@ public class QpidTestCase extends TestCase public void stopBroker() throws Exception { + stopBroker(0); + } + + public void stopBroker(int port) throws Exception + { _logger.info("stopping broker: " + _broker); if (_brokerProcess != null) { @@ -372,12 +397,40 @@ public class QpidTestCase extends TestCase } else if (_broker.equals(VM)) { - TransportConnection.killAllVMBrokers(); - ApplicationRegistry.removeAll(); + port = port == 0 ? DEFAULT_VM_PORT : port; + + TransportConnection.killVMBroker(port); + ApplicationRegistry.remove(port); } _brokerStarted = false; } + protected void setSystemProperty(String property, String value) + { + if (!_setProperties.containsKey(property)) + { + _setProperties.put(property, System.getProperty(property)); + } + + System.setProperty(property, value); + } + + protected void revertSystemProperties() + { + for (String key : _setProperties.keySet()) + { + String value = _setProperties.get(key); + if (value != null) + { + System.setProperty(key, value); + } + else + { + System.clearProperty(key); + } + } + } + /** * Check whether the broker is an 0.8 * @@ -395,8 +448,13 @@ public class QpidTestCase extends TestCase public void restartBroker() throws Exception { - stopBroker(); - startBroker(); + restartBroker(0); + } + + public void restartBroker(int port) throws Exception + { + stopBroker(port); + startBroker(port); } /** @@ -508,6 +566,8 @@ public class QpidTestCase extends TestCase c.close(); } } + + revertSystemProperties(); } } |
