diff options
author | Keith Wall <kwall@apache.org> | 2011-11-20 23:41:18 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2011-11-20 23:41:18 +0000 |
commit | 9e65efd1879b066a7325a1612dc040c84165e531 (patch) | |
tree | 50fd3ba7e7d3f0de11ba9d73170a9f7c3f6c0d2f /java/broker/src/test/java/org/apache/qpid | |
parent | a0fccd9dbfabd912a906225f817cd00072d7fc8d (diff) | |
download | qpid-python-9e65efd1879b066a7325a1612dc040c84165e531.tar.gz |
QPID-3559: SimpleDateFormat used in thread unsafe manner in JMX Managed Queue interface.
Resolved thread safe issue. Added supporting unit and system test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1204296 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid')
-rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java | 232 |
1 files changed, 148 insertions, 84 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 070d105805..8f3023f269 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.server.queue; +import org.apache.commons.lang.time.FastDateFormat; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.message.AMQMessage; @@ -39,11 +40,20 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.TabularData; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; /** - * Test class to test AMQQueueMBean attribtues and operations + * Test class to test AMQQueueMBean attributes and operations */ public class AMQQueueMBeanTest extends InternalBrokerBaseCase { @@ -139,6 +149,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase verifyBrokerState(); } + // todo: collect to a general testing class -duplicated from Systest/MessageReturntest private void verifyBrokerState() { @@ -219,7 +230,43 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertFalse("Exclusive property should be false.", getQueue().isExclusive()); } - public void testExceptions() throws Exception + /** + * Tests view messages with two test messages. The first message is non-persistent, the second persistent + * and has timestamp/expiration. + * + */ + public void testViewMessages() throws Exception + { + sendMessages(1, false); + final Date msg2Timestamp = new Date(); + final Date msg2Expiration = new Date(msg2Timestamp.getTime() + 1000); + sendMessages(1, true, msg2Timestamp.getTime(), msg2Expiration.getTime()); + + final TabularData tab = _queueMBean.viewMessages(1l, 2l); + assertEquals("Unexpected number of rows in table", 2, tab.size()); + final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) tab.values().iterator(); + + // Check row1 + final CompositeDataSupport row1 = rowItr.next(); + assertEquals("Message should have AMQ message id", 1l, row1.get(ManagedQueue.MSG_AMQ_ID)); + assertNotNull("Expected message header array", row1.get(ManagedQueue.MSG_HEADER)); + final Map<String, String> row1Headers = headerArrayToMap((String[])row1.get(ManagedQueue.MSG_HEADER)); + assertEquals("Unexpected JMSPriority within header", "Non_Persistent", row1Headers.get("JMSDeliveryMode")); + assertEquals("Unexpected JMSTimestamp within header", "null", row1Headers.get("JMSTimestamp")); + assertEquals("Unexpected JMSExpiration within header", "null", row1Headers.get("JMSExpiration")); + + final CompositeDataSupport row2 = rowItr.next(); + assertEquals("Message should have AMQ message id", 2l, row2.get(ManagedQueue.MSG_AMQ_ID)); + assertNotNull("Expected message header array", row2.get(ManagedQueue.MSG_HEADER)); + final Map<String, String> row2Headers = headerArrayToMap((String[])row2.get(ManagedQueue.MSG_HEADER)); + assertEquals("Unexpected JMSPriority within header", "Persistent", row2Headers.get("JMSDeliveryMode")); + assertEquals("Unexpected JMSTimestamp within header", FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(msg2Timestamp), + row2Headers.get("JMSTimestamp")); + assertEquals("Unexpected JMSExpiration within header", FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(msg2Expiration), + row2Headers.get("JMSExpiration")); + } + + public void testViewMessageWithIllegalStartEndRanges() throws Exception { try { @@ -228,7 +275,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } try @@ -238,7 +285,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } try @@ -248,7 +295,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } try @@ -260,45 +307,24 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } + } - IncomingMessage msg = message(false, false); - getQueue().clearQueue(); - ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); - qs.add(getQueue()); - msg.enqueue(qs); - MessageMetaData mmd = msg.headersReceived(); - msg.setStoredMessage(getMessageStore().addMessage(mmd)); - long id = msg.getMessageNumber(); - - msg.addContentBodyFrame(new ContentChunk() - { - byte[] _data = new byte[((int)MESSAGE_SIZE)]; - - public int getSize() - { - return (int) MESSAGE_SIZE; - } - - public byte[] getData() - { - return _data; - } + public void testViewMessageContent() throws Exception + { + final List<AMQMessage> sentMessages = sendMessages(1, true); + final Long id = sentMessages.get(0).getMessageId(); - public void reduceToFit() - { + final CompositeData messageData = _queueMBean.viewMessageContent(id); + assertNotNull(messageData); + } - } - }); + public void testViewMessageContentWithUnknownMessageId() throws Exception + { + final List<AMQMessage> sentMessages = sendMessages(1, true); + final Long id = sentMessages.get(0).getMessageId(); - AMQMessage m = new AMQMessage(msg.getStoredMessage()); - for(BaseQueue q : msg.getDestinationQueues()) - { - q.enqueue(m); - } -// _queue.process(_storeContext, new QueueEntry(_queue, msg), false); - _queueMBean.viewMessageContent(id); try { _queueMBean.viewMessageContent(id + 1); @@ -306,7 +332,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } } @@ -364,46 +390,6 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertFalse(channel.getBlocking()); } - private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException - { - MessagePublishInfo publish = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return immediate; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes - contentHeaderBody.setProperties(new BasicContentHeaderProperties()); - ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1)); - IncomingMessage msg = new IncomingMessage(publish); - msg.setContentHeaderBody(contentHeaderBody); - return msg; - - } @Override public void setUp() throws Exception @@ -418,11 +404,18 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase ApplicationRegistry.remove(); } - private void sendMessages(int messageCount, boolean persistent) throws AMQException + private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException { + return sendMessages(messageCount, persistent, 0l, 0l); + } + + private List<AMQMessage> sendMessages(int messageCount, boolean persistent, long timestamp, long expiration) throws AMQException + { + final List<AMQMessage> sentMessages = new ArrayList<AMQMessage>(); + for (int i = 0; i < messageCount; i++) { - IncomingMessage currentMessage = message(false, persistent); + IncomingMessage currentMessage = createIncomingMessage(false, persistent, timestamp, expiration); ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(getQueue()); currentMessage.enqueue(qs); @@ -431,7 +424,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase MessageMetaData mmd = currentMessage.headersReceived(); currentMessage.setStoredMessage(getMessageStore().addMessage(mmd)); - // Add the body so we have somthing to test later + // Add the body so we have something to test later currentMessage.addContentBodyFrame( getSession().getMethodRegistry() .getProtocolVersionMethodConverter() @@ -444,7 +437,78 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase q.enqueue(m); } + sentMessages.add(m); + } + + return sentMessages; + } + + private IncomingMessage createIncomingMessage(final boolean immediate, boolean persistent, long timestamp, long expiration) throws AMQException + { + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public void setExchange(AMQShortString exchange) + { + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + public AMQShortString getRoutingKey() + { + return null; + } + }; + + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes + final BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + contentHeaderBody.setProperties(props); + props.setDeliveryMode((byte) (persistent ? 2 : 1)); + if (timestamp > 0) + { + props.setTimestamp(timestamp); + } + if (expiration > 0) + { + props.setExpiration(expiration); } + IncomingMessage msg = new IncomingMessage(publish); + msg.setContentHeaderBody(contentHeaderBody); + return msg; } + + /** + * + * Utility method to convert array of Strings in the form x = y into a + * map with key/value x => y. + * + */ + private Map<String,String> headerArrayToMap(final String[] headerArray) + { + final Map<String, String> headerMap = new HashMap<String, String>(); + final List<String> headerList = Arrays.asList(headerArray); + for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();) + { + final String nameValuePair = iterator.next(); + final String[] nameValue = nameValuePair.split(" *= *", 2); + headerMap.put(nameValue[0], nameValue[1]); + } + return headerMap; + } + + } |