diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-20 23:41:18 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-20 23:41:18 +0000 |
| commit | 6e0368297f1faad241abdba36bf8ab15978120aa (patch) | |
| tree | c842cea5c68d4550305748e11a0dbad6929985ab /qpid/java | |
| parent | 26df8c6fd2959210deab55d0c57a2f98ef7d6542 (diff) | |
| download | qpid-python-6e0368297f1faad241abdba36bf8ab15978120aa.tar.gz | |
QPID-3559: SimpleDateFormat used in thread unsafe manner in JMX Managed Queue interface.
Resolved thread safe issue. Added supporting unit and system test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1204296 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
3 files changed, 309 insertions, 102 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index c8eb118b11..d58d95c801 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -28,7 +28,6 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; @@ -63,20 +62,22 @@ import java.util.*; /** * AMQQueueMBean is the management bean for an {@link AMQQueue}. * - * <p/><tablse id="crc"><caption>CRC Caption</caption> + * <p/><table id="crc"><caption>CRC Caption</caption> * <tr><th> Responsibilities <th> Collaborations * </table> */ @MBeanDescription("Management Interface for AMQQueue") public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { + /** Used for debugging purposes. */ private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); - private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z"); + /** Date/time format used for message expiration and message timestamp formatting */ + public static final String JMSTIMESTAMP_DATETIME_FORMAT = "MM-dd-yy HH:mm:ss.SSS z"; - private AMQQueue _queue = null; - private String _queueName = null; + private final AMQQueue _queue; + private final String _queueName; // OpenMBean data types for viewMessages method private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types. @@ -523,13 +524,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que list.add("JMSPriority = " + headerProperties.getPriority()); list.add("JMSType = " + headerProperties.getType()); - long longDate = headerProperties.getExpiration(); - String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; - list.add("JMSExpiration = " + strDate); + final long expirationDate = headerProperties.getExpiration(); + final long timestampDate = headerProperties.getTimestamp(); - longDate = headerProperties.getTimestamp(); - strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; - list.add("JMSTimestamp = " + strDate); + addStringifiedJMSTimestamoAndJMSExpiration(list, expirationDate, + timestampDate); return list.toArray(new String[list.size()]); } @@ -561,17 +560,32 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que list.add("JMSPriority = " + header.getPriority()); list.add("JMSType = " + header.getType()); - long longDate = header.getExpiration(); - String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; - list.add("JMSExpiration = " + strDate); - - longDate = header.getTimestamp(); - strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; - list.add("JMSTimestamp = " + strDate); + final long expirationDate = header.getExpiration(); + final long timestampDate = header.getTimestamp(); + addStringifiedJMSTimestamoAndJMSExpiration(list, expirationDate, timestampDate); return list.toArray(new String[list.size()]); } + private void addStringifiedJMSTimestamoAndJMSExpiration(final List<String> list, + final long expirationDate, final long timestampDate) + { + final SimpleDateFormat dateFormat; + if (expirationDate != 0 || timestampDate != 0) + { + dateFormat = new SimpleDateFormat(JMSTIMESTAMP_DATETIME_FORMAT); + } + else + { + dateFormat = null; + } + + final String formattedExpirationDate = (expirationDate != 0) ? dateFormat.format(new Date(expirationDate)) : null; + final String formattedTimestampDate = (timestampDate != 0) ? dateFormat.format(new Date(timestampDate)) : null; + list.add("JMSExpiration = " + formattedExpirationDate); + list.add("JMSTimestamp = " + formattedTimestampDate); + } + /** * @see ManagedQueue#moveMessages * @param fromMessageId diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 070d105805..8f3023f269 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.server.queue; +import org.apache.commons.lang.time.FastDateFormat; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.message.AMQMessage; @@ -39,11 +40,20 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.TabularData; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; /** - * Test class to test AMQQueueMBean attribtues and operations + * Test class to test AMQQueueMBean attributes and operations */ public class AMQQueueMBeanTest extends InternalBrokerBaseCase { @@ -139,6 +149,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase verifyBrokerState(); } + // todo: collect to a general testing class -duplicated from Systest/MessageReturntest private void verifyBrokerState() { @@ -219,7 +230,43 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertFalse("Exclusive property should be false.", getQueue().isExclusive()); } - public void testExceptions() throws Exception + /** + * Tests view messages with two test messages. The first message is non-persistent, the second persistent + * and has timestamp/expiration. + * + */ + public void testViewMessages() throws Exception + { + sendMessages(1, false); + final Date msg2Timestamp = new Date(); + final Date msg2Expiration = new Date(msg2Timestamp.getTime() + 1000); + sendMessages(1, true, msg2Timestamp.getTime(), msg2Expiration.getTime()); + + final TabularData tab = _queueMBean.viewMessages(1l, 2l); + assertEquals("Unexpected number of rows in table", 2, tab.size()); + final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) tab.values().iterator(); + + // Check row1 + final CompositeDataSupport row1 = rowItr.next(); + assertEquals("Message should have AMQ message id", 1l, row1.get(ManagedQueue.MSG_AMQ_ID)); + assertNotNull("Expected message header array", row1.get(ManagedQueue.MSG_HEADER)); + final Map<String, String> row1Headers = headerArrayToMap((String[])row1.get(ManagedQueue.MSG_HEADER)); + assertEquals("Unexpected JMSPriority within header", "Non_Persistent", row1Headers.get("JMSDeliveryMode")); + assertEquals("Unexpected JMSTimestamp within header", "null", row1Headers.get("JMSTimestamp")); + assertEquals("Unexpected JMSExpiration within header", "null", row1Headers.get("JMSExpiration")); + + final CompositeDataSupport row2 = rowItr.next(); + assertEquals("Message should have AMQ message id", 2l, row2.get(ManagedQueue.MSG_AMQ_ID)); + assertNotNull("Expected message header array", row2.get(ManagedQueue.MSG_HEADER)); + final Map<String, String> row2Headers = headerArrayToMap((String[])row2.get(ManagedQueue.MSG_HEADER)); + assertEquals("Unexpected JMSPriority within header", "Persistent", row2Headers.get("JMSDeliveryMode")); + assertEquals("Unexpected JMSTimestamp within header", FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(msg2Timestamp), + row2Headers.get("JMSTimestamp")); + assertEquals("Unexpected JMSExpiration within header", FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(msg2Expiration), + row2Headers.get("JMSExpiration")); + } + + public void testViewMessageWithIllegalStartEndRanges() throws Exception { try { @@ -228,7 +275,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } try @@ -238,7 +285,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } try @@ -248,7 +295,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } try @@ -260,45 +307,24 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } + } - IncomingMessage msg = message(false, false); - getQueue().clearQueue(); - ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); - qs.add(getQueue()); - msg.enqueue(qs); - MessageMetaData mmd = msg.headersReceived(); - msg.setStoredMessage(getMessageStore().addMessage(mmd)); - long id = msg.getMessageNumber(); - - msg.addContentBodyFrame(new ContentChunk() - { - byte[] _data = new byte[((int)MESSAGE_SIZE)]; - - public int getSize() - { - return (int) MESSAGE_SIZE; - } - - public byte[] getData() - { - return _data; - } + public void testViewMessageContent() throws Exception + { + final List<AMQMessage> sentMessages = sendMessages(1, true); + final Long id = sentMessages.get(0).getMessageId(); - public void reduceToFit() - { + final CompositeData messageData = _queueMBean.viewMessageContent(id); + assertNotNull(messageData); + } - } - }); + public void testViewMessageContentWithUnknownMessageId() throws Exception + { + final List<AMQMessage> sentMessages = sendMessages(1, true); + final Long id = sentMessages.get(0).getMessageId(); - AMQMessage m = new AMQMessage(msg.getStoredMessage()); - for(BaseQueue q : msg.getDestinationQueues()) - { - q.enqueue(m); - } -// _queue.process(_storeContext, new QueueEntry(_queue, msg), false); - _queueMBean.viewMessageContent(id); try { _queueMBean.viewMessageContent(id + 1); @@ -306,7 +332,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } } @@ -364,46 +390,6 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertFalse(channel.getBlocking()); } - private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException - { - MessagePublishInfo publish = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return immediate; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes - contentHeaderBody.setProperties(new BasicContentHeaderProperties()); - ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1)); - IncomingMessage msg = new IncomingMessage(publish); - msg.setContentHeaderBody(contentHeaderBody); - return msg; - - } @Override public void setUp() throws Exception @@ -418,11 +404,18 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase ApplicationRegistry.remove(); } - private void sendMessages(int messageCount, boolean persistent) throws AMQException + private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException { + return sendMessages(messageCount, persistent, 0l, 0l); + } + + private List<AMQMessage> sendMessages(int messageCount, boolean persistent, long timestamp, long expiration) throws AMQException + { + final List<AMQMessage> sentMessages = new ArrayList<AMQMessage>(); + for (int i = 0; i < messageCount; i++) { - IncomingMessage currentMessage = message(false, persistent); + IncomingMessage currentMessage = createIncomingMessage(false, persistent, timestamp, expiration); ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(getQueue()); currentMessage.enqueue(qs); @@ -431,7 +424,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase MessageMetaData mmd = currentMessage.headersReceived(); currentMessage.setStoredMessage(getMessageStore().addMessage(mmd)); - // Add the body so we have somthing to test later + // Add the body so we have something to test later currentMessage.addContentBodyFrame( getSession().getMethodRegistry() .getProtocolVersionMethodConverter() @@ -444,7 +437,78 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase q.enqueue(m); } + sentMessages.add(m); + } + + return sentMessages; + } + + private IncomingMessage createIncomingMessage(final boolean immediate, boolean persistent, long timestamp, long expiration) throws AMQException + { + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public void setExchange(AMQShortString exchange) + { + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + public AMQShortString getRoutingKey() + { + return null; + } + }; + + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes + final BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + contentHeaderBody.setProperties(props); + props.setDeliveryMode((byte) (persistent ? 2 : 1)); + if (timestamp > 0) + { + props.setTimestamp(timestamp); + } + if (expiration > 0) + { + props.setExpiration(expiration); } + IncomingMessage msg = new IncomingMessage(publish); + msg.setContentHeaderBody(contentHeaderBody); + return msg; } + + /** + * + * Utility method to convert array of Strings in the form x = y into a + * map with key/value x => y. + * + */ + private Map<String,String> headerArrayToMap(final String[] headerArray) + { + final Map<String, String> headerMap = new HashMap<String, String>(); + final List<String> headerList = Arrays.asList(headerArray); + for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();) + { + final String nameValuePair = iterator.next(); + final String[] nameValue = nameValuePair.split(" *= *", 2); + headerMap.put(nameValue[0], nameValue[1]); + } + return headerMap; + } + + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java new file mode 100644 index 0000000000..0e60cc5d8f --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.management.jmx; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.Connection; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.TabularData; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.queue.AMQQueueMBean; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Tests the JMX API for the Managed Queue. + * + */ +public class ManagedQueueMBeanTest extends QpidBrokerTestCase +{ + /** + * JMX helper. + */ + private JMXTestUtils _jmxUtils; + + public void setUp() throws Exception + { + _jmxUtils = new JMXTestUtils(this); + _jmxUtils.setUp(); + super.setUp(); + _jmxUtils.open(); + } + + public void tearDown() throws Exception + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + super.tearDown(); + } + + /** + * Tests {@link ManagedQueue#viewMessages(long, long)} interface. + */ + public void testViewSingleMessage() throws Exception + { + final String queueName = getTestQueueName(); + + // Create queue and send numMessages messages to it. + final Connection con = getConnection(); + final Session session = con.createSession(true, Session.SESSION_TRANSACTED); + final Destination dest = session.createQueue(queueName); + session.createConsumer(dest).close(); // Create a consumer only to cause queue creation + + final List<Message> sentMessages = sendMessage(session, dest, 1); + final Message sentMessage = sentMessages.get(0); + + // Obtain the management interface. + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertNotNull("ManagedQueue expected to be available", managedQueue); + assertEquals("Unexpected queue depth", 1, managedQueue.getMessageCount().intValue()); + + // Check the contents of the message + final TabularData tab = managedQueue.viewMessages(1l, 1l); + assertEquals("Unexpected number of rows in table", 1, tab.size()); + final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) tab.values().iterator(); + + final CompositeDataSupport row1 = rowItr.next(); + assertNotNull("Message should have AMQ message id", row1.get(ManagedQueue.MSG_AMQ_ID)); + assertEquals("Unexpected queue position", 1l, row1.get(ManagedQueue.MSG_QUEUE_POS)); + assertEquals("Unexpected redelivered flag", Boolean.FALSE, row1.get(ManagedQueue.MSG_REDELIVERED)); + + // Check the contents of header (encoded in a string array) + final String[] headerArray = (String[]) row1.get(ManagedQueue.MSG_HEADER); + assertNotNull("Expected message header array", headerArray); + final Map<String, String> headers = headerArrayToMap(headerArray); + + final String expectedJMSMessageID = isBroker010() ? sentMessage.getJMSMessageID().replace("ID:", "") : sentMessage.getJMSMessageID(); + final String expectedFormattedJMSTimestamp = FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(sentMessage.getJMSTimestamp()); + assertEquals("Unexpected JMSMessageID within header", expectedJMSMessageID, headers.get("JMSMessageID")); + assertEquals("Unexpected JMSPriority within header", String.valueOf(sentMessage.getJMSPriority()), headers.get("JMSPriority")); + assertEquals("Unexpected JMSTimestamp within header", expectedFormattedJMSTimestamp, headers.get("JMSTimestamp")); + } + + /** + * + * Utility method to convert array of Strings in the form x = y into a + * map with key/value x => y. + * + */ + private Map<String,String> headerArrayToMap(final String[] headerArray) + { + final Map<String, String> headerMap = new HashMap<String, String>(); + final List<String> headerList = Arrays.asList(headerArray); + for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();) + { + final String nameValuePair = iterator.next(); + final String[] nameValue = nameValuePair.split(" *= *", 2); + headerMap.put(nameValue[0], nameValue[1]); + } + return headerMap; + } +} |
