summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-20 23:41:18 +0000
committerKeith Wall <kwall@apache.org>2011-11-20 23:41:18 +0000
commit6e0368297f1faad241abdba36bf8ab15978120aa (patch)
treec842cea5c68d4550305748e11a0dbad6929985ab /qpid/java
parent26df8c6fd2959210deab55d0c57a2f98ef7d6542 (diff)
downloadqpid-python-6e0368297f1faad241abdba36bf8ab15978120aa.tar.gz
QPID-3559: SimpleDateFormat used in thread unsafe manner in JMX Managed Queue interface.
Resolved thread safe issue. Added supporting unit and system test. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1204296 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java50
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java232
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java129
3 files changed, 309 insertions, 102 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index c8eb118b11..d58d95c801 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
@@ -63,20 +62,22 @@ import java.util.*;
/**
* AMQQueueMBean is the management bean for an {@link AMQQueue}.
*
- * <p/><tablse id="crc"><caption>CRC Caption</caption>
+ * <p/><table id="crc"><caption>CRC Caption</caption>
* <tr><th> Responsibilities <th> Collaborations
* </table>
*/
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+
/** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
- private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
+ /** Date/time format used for message expiration and message timestamp formatting */
+ public static final String JMSTIMESTAMP_DATETIME_FORMAT = "MM-dd-yy HH:mm:ss.SSS z";
- private AMQQueue _queue = null;
- private String _queueName = null;
+ private final AMQQueue _queue;
+ private final String _queueName;
// OpenMBean data types for viewMessages method
private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types.
@@ -523,13 +524,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
list.add("JMSPriority = " + headerProperties.getPriority());
list.add("JMSType = " + headerProperties.getType());
- long longDate = headerProperties.getExpiration();
- String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
- list.add("JMSExpiration = " + strDate);
+ final long expirationDate = headerProperties.getExpiration();
+ final long timestampDate = headerProperties.getTimestamp();
- longDate = headerProperties.getTimestamp();
- strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
- list.add("JMSTimestamp = " + strDate);
+ addStringifiedJMSTimestamoAndJMSExpiration(list, expirationDate,
+ timestampDate);
return list.toArray(new String[list.size()]);
}
@@ -561,17 +560,32 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
list.add("JMSPriority = " + header.getPriority());
list.add("JMSType = " + header.getType());
- long longDate = header.getExpiration();
- String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
- list.add("JMSExpiration = " + strDate);
-
- longDate = header.getTimestamp();
- strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
- list.add("JMSTimestamp = " + strDate);
+ final long expirationDate = header.getExpiration();
+ final long timestampDate = header.getTimestamp();
+ addStringifiedJMSTimestamoAndJMSExpiration(list, expirationDate, timestampDate);
return list.toArray(new String[list.size()]);
}
+ private void addStringifiedJMSTimestamoAndJMSExpiration(final List<String> list,
+ final long expirationDate, final long timestampDate)
+ {
+ final SimpleDateFormat dateFormat;
+ if (expirationDate != 0 || timestampDate != 0)
+ {
+ dateFormat = new SimpleDateFormat(JMSTIMESTAMP_DATETIME_FORMAT);
+ }
+ else
+ {
+ dateFormat = null;
+ }
+
+ final String formattedExpirationDate = (expirationDate != 0) ? dateFormat.format(new Date(expirationDate)) : null;
+ final String formattedTimestampDate = (timestampDate != 0) ? dateFormat.format(new Date(timestampDate)) : null;
+ list.add("JMSExpiration = " + formattedExpirationDate);
+ list.add("JMSTimestamp = " + formattedTimestampDate);
+ }
+
/**
* @see ManagedQueue#moveMessages
* @param fromMessageId
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 070d105805..8f3023f269 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.lang.time.FastDateFormat;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.message.AMQMessage;
@@ -39,11 +40,20 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.TabularData;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
/**
- * Test class to test AMQQueueMBean attribtues and operations
+ * Test class to test AMQQueueMBean attributes and operations
*/
public class AMQQueueMBeanTest extends InternalBrokerBaseCase
{
@@ -139,6 +149,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
verifyBrokerState();
}
+
// todo: collect to a general testing class -duplicated from Systest/MessageReturntest
private void verifyBrokerState()
{
@@ -219,7 +230,43 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertFalse("Exclusive property should be false.", getQueue().isExclusive());
}
- public void testExceptions() throws Exception
+ /**
+ * Tests view messages with two test messages. The first message is non-persistent, the second persistent
+ * and has timestamp/expiration.
+ *
+ */
+ public void testViewMessages() throws Exception
+ {
+ sendMessages(1, false);
+ final Date msg2Timestamp = new Date();
+ final Date msg2Expiration = new Date(msg2Timestamp.getTime() + 1000);
+ sendMessages(1, true, msg2Timestamp.getTime(), msg2Expiration.getTime());
+
+ final TabularData tab = _queueMBean.viewMessages(1l, 2l);
+ assertEquals("Unexpected number of rows in table", 2, tab.size());
+ final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) tab.values().iterator();
+
+ // Check row1
+ final CompositeDataSupport row1 = rowItr.next();
+ assertEquals("Message should have AMQ message id", 1l, row1.get(ManagedQueue.MSG_AMQ_ID));
+ assertNotNull("Expected message header array", row1.get(ManagedQueue.MSG_HEADER));
+ final Map<String, String> row1Headers = headerArrayToMap((String[])row1.get(ManagedQueue.MSG_HEADER));
+ assertEquals("Unexpected JMSPriority within header", "Non_Persistent", row1Headers.get("JMSDeliveryMode"));
+ assertEquals("Unexpected JMSTimestamp within header", "null", row1Headers.get("JMSTimestamp"));
+ assertEquals("Unexpected JMSExpiration within header", "null", row1Headers.get("JMSExpiration"));
+
+ final CompositeDataSupport row2 = rowItr.next();
+ assertEquals("Message should have AMQ message id", 2l, row2.get(ManagedQueue.MSG_AMQ_ID));
+ assertNotNull("Expected message header array", row2.get(ManagedQueue.MSG_HEADER));
+ final Map<String, String> row2Headers = headerArrayToMap((String[])row2.get(ManagedQueue.MSG_HEADER));
+ assertEquals("Unexpected JMSPriority within header", "Persistent", row2Headers.get("JMSDeliveryMode"));
+ assertEquals("Unexpected JMSTimestamp within header", FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(msg2Timestamp),
+ row2Headers.get("JMSTimestamp"));
+ assertEquals("Unexpected JMSExpiration within header", FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(msg2Expiration),
+ row2Headers.get("JMSExpiration"));
+ }
+
+ public void testViewMessageWithIllegalStartEndRanges() throws Exception
{
try
{
@@ -228,7 +275,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
}
catch (JMException ex)
{
-
+ // PASS
}
try
@@ -238,7 +285,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
}
catch (JMException ex)
{
-
+ // PASS
}
try
@@ -248,7 +295,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
}
catch (JMException ex)
{
-
+ // PASS
}
try
@@ -260,45 +307,24 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
}
catch (JMException ex)
{
-
+ // PASS
}
+ }
- IncomingMessage msg = message(false, false);
- getQueue().clearQueue();
- ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
- qs.add(getQueue());
- msg.enqueue(qs);
- MessageMetaData mmd = msg.headersReceived();
- msg.setStoredMessage(getMessageStore().addMessage(mmd));
- long id = msg.getMessageNumber();
-
- msg.addContentBodyFrame(new ContentChunk()
- {
- byte[] _data = new byte[((int)MESSAGE_SIZE)];
-
- public int getSize()
- {
- return (int) MESSAGE_SIZE;
- }
-
- public byte[] getData()
- {
- return _data;
- }
+ public void testViewMessageContent() throws Exception
+ {
+ final List<AMQMessage> sentMessages = sendMessages(1, true);
+ final Long id = sentMessages.get(0).getMessageId();
- public void reduceToFit()
- {
+ final CompositeData messageData = _queueMBean.viewMessageContent(id);
+ assertNotNull(messageData);
+ }
- }
- });
+ public void testViewMessageContentWithUnknownMessageId() throws Exception
+ {
+ final List<AMQMessage> sentMessages = sendMessages(1, true);
+ final Long id = sentMessages.get(0).getMessageId();
- AMQMessage m = new AMQMessage(msg.getStoredMessage());
- for(BaseQueue q : msg.getDestinationQueues())
- {
- q.enqueue(m);
- }
-// _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
- _queueMBean.viewMessageContent(id);
try
{
_queueMBean.viewMessageContent(id + 1);
@@ -306,7 +332,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
}
catch (JMException ex)
{
-
+ // PASS
}
}
@@ -364,46 +390,6 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertFalse(channel.getBlocking());
}
- private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException
- {
- MessagePublishInfo publish = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return null;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return null;
- }
- };
-
- ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
- contentHeaderBody.setProperties(new BasicContentHeaderProperties());
- ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(publish);
- msg.setContentHeaderBody(contentHeaderBody);
- return msg;
-
- }
@Override
public void setUp() throws Exception
@@ -418,11 +404,18 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
ApplicationRegistry.remove();
}
- private void sendMessages(int messageCount, boolean persistent) throws AMQException
+ private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException
{
+ return sendMessages(messageCount, persistent, 0l, 0l);
+ }
+
+ private List<AMQMessage> sendMessages(int messageCount, boolean persistent, long timestamp, long expiration) throws AMQException
+ {
+ final List<AMQMessage> sentMessages = new ArrayList<AMQMessage>();
+
for (int i = 0; i < messageCount; i++)
{
- IncomingMessage currentMessage = message(false, persistent);
+ IncomingMessage currentMessage = createIncomingMessage(false, persistent, timestamp, expiration);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(getQueue());
currentMessage.enqueue(qs);
@@ -431,7 +424,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
MessageMetaData mmd = currentMessage.headersReceived();
currentMessage.setStoredMessage(getMessageStore().addMessage(mmd));
- // Add the body so we have somthing to test later
+ // Add the body so we have something to test later
currentMessage.addContentBodyFrame(
getSession().getMethodRegistry()
.getProtocolVersionMethodConverter()
@@ -444,7 +437,78 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
q.enqueue(m);
}
+ sentMessages.add(m);
+ }
+
+ return sentMessages;
+ }
+
+ private IncomingMessage createIncomingMessage(final boolean immediate, boolean persistent, long timestamp, long expiration) throws AMQException
+ {
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
+ final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ contentHeaderBody.setProperties(props);
+ props.setDeliveryMode((byte) (persistent ? 2 : 1));
+ if (timestamp > 0)
+ {
+ props.setTimestamp(timestamp);
+ }
+ if (expiration > 0)
+ {
+ props.setExpiration(expiration);
}
+ IncomingMessage msg = new IncomingMessage(publish);
+ msg.setContentHeaderBody(contentHeaderBody);
+ return msg;
}
+
+ /**
+ *
+ * Utility method to convert array of Strings in the form x = y into a
+ * map with key/value x =&gt; y.
+ *
+ */
+ private Map<String,String> headerArrayToMap(final String[] headerArray)
+ {
+ final Map<String, String> headerMap = new HashMap<String, String>();
+ final List<String> headerList = Arrays.asList(headerArray);
+ for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();)
+ {
+ final String nameValuePair = iterator.next();
+ final String[] nameValue = nameValuePair.split(" *= *", 2);
+ headerMap.put(nameValue[0], nameValue[1]);
+ }
+ return headerMap;
+ }
+
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
new file mode 100644
index 0000000000..0e60cc5d8f
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.management.jmx;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.TabularData;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.queue.AMQQueueMBean;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Tests the JMX API for the Managed Queue.
+ *
+ */
+public class ManagedQueueMBeanTest extends QpidBrokerTestCase
+{
+ /**
+ * JMX helper.
+ */
+ private JMXTestUtils _jmxUtils;
+
+ public void setUp() throws Exception
+ {
+ _jmxUtils = new JMXTestUtils(this);
+ _jmxUtils.setUp();
+ super.setUp();
+ _jmxUtils.open();
+ }
+
+ public void tearDown() throws Exception
+ {
+ if (_jmxUtils != null)
+ {
+ _jmxUtils.close();
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Tests {@link ManagedQueue#viewMessages(long, long)} interface.
+ */
+ public void testViewSingleMessage() throws Exception
+ {
+ final String queueName = getTestQueueName();
+
+ // Create queue and send numMessages messages to it.
+ final Connection con = getConnection();
+ final Session session = con.createSession(true, Session.SESSION_TRANSACTED);
+ final Destination dest = session.createQueue(queueName);
+ session.createConsumer(dest).close(); // Create a consumer only to cause queue creation
+
+ final List<Message> sentMessages = sendMessage(session, dest, 1);
+ final Message sentMessage = sentMessages.get(0);
+
+ // Obtain the management interface.
+ final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+ assertNotNull("ManagedQueue expected to be available", managedQueue);
+ assertEquals("Unexpected queue depth", 1, managedQueue.getMessageCount().intValue());
+
+ // Check the contents of the message
+ final TabularData tab = managedQueue.viewMessages(1l, 1l);
+ assertEquals("Unexpected number of rows in table", 1, tab.size());
+ final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) tab.values().iterator();
+
+ final CompositeDataSupport row1 = rowItr.next();
+ assertNotNull("Message should have AMQ message id", row1.get(ManagedQueue.MSG_AMQ_ID));
+ assertEquals("Unexpected queue position", 1l, row1.get(ManagedQueue.MSG_QUEUE_POS));
+ assertEquals("Unexpected redelivered flag", Boolean.FALSE, row1.get(ManagedQueue.MSG_REDELIVERED));
+
+ // Check the contents of header (encoded in a string array)
+ final String[] headerArray = (String[]) row1.get(ManagedQueue.MSG_HEADER);
+ assertNotNull("Expected message header array", headerArray);
+ final Map<String, String> headers = headerArrayToMap(headerArray);
+
+ final String expectedJMSMessageID = isBroker010() ? sentMessage.getJMSMessageID().replace("ID:", "") : sentMessage.getJMSMessageID();
+ final String expectedFormattedJMSTimestamp = FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(sentMessage.getJMSTimestamp());
+ assertEquals("Unexpected JMSMessageID within header", expectedJMSMessageID, headers.get("JMSMessageID"));
+ assertEquals("Unexpected JMSPriority within header", String.valueOf(sentMessage.getJMSPriority()), headers.get("JMSPriority"));
+ assertEquals("Unexpected JMSTimestamp within header", expectedFormattedJMSTimestamp, headers.get("JMSTimestamp"));
+ }
+
+ /**
+ *
+ * Utility method to convert array of Strings in the form x = y into a
+ * map with key/value x =&gt; y.
+ *
+ */
+ private Map<String,String> headerArrayToMap(final String[] headerArray)
+ {
+ final Map<String, String> headerMap = new HashMap<String, String>();
+ final List<String> headerList = Arrays.asList(headerArray);
+ for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();)
+ {
+ final String nameValuePair = iterator.next();
+ final String[] nameValue = nameValuePair.split(" *= *", 2);
+ headerMap.put(nameValue[0], nameValue[1]);
+ }
+ return headerMap;
+ }
+}