diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-10-05 13:38:13 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-10-05 13:38:13 +0000 |
| commit | 525fc55e8bfd4110558c565445a2d8f73dc6551e (patch) | |
| tree | aabee976aa8bd379e77005144d41bfefae781732 /java | |
| parent | b6ea5f51c8cf1b383e7cdd459a6bf2db82bd996c (diff) | |
| download | qpid-python-525fc55e8bfd4110558c565445a2d8f73dc6551e.tar.gz | |
Qpid-623 : When only selectors are used on a queue the main _messages queue causes a leak. Here is a new test provided by Aidan Skinner and a simple fix that will prevent OOME when only selectors are connected to the Queue.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@582263 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
2 files changed, 262 insertions, 9 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 8e72e995d0..15a517a6b2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -422,7 +422,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. message.decrementReference(storeContext); - } + } _lock.unlock(); } @@ -462,15 +462,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ private AMQMessage getNextMessage() throws AMQException { - return getNextMessage(_messages, null); + return getNextMessage(_messages, null, false); } - private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException + private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub, boolean purgeOnly) throws AMQException { AMQMessage message = messages.peek(); //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.) - while (purgeMessage(message, sub)) + while (purgeMessage(message, sub, purgeOnly)) { // if we are purging then ensure we mark this message taken for the current subscriber // the current subscriber may be null in the case of a get or a purge but this is ok. @@ -527,6 +527,24 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException { + return purgeMessage(message, sub, false); + } + + /** + * This method will return true if the message is to be purged from the queue. + * \ + * SIDE-EFFECT: The msg will be taken by the Subscription(sub) for the current Queue(_queue) when purgeOnly is false + * + * @param message + * @param sub + * @param purgeOnly When set to false the message will be taken by the given Subscription. + * + * @return if the msg should be purged + * + * @throws AMQException + */ + private boolean purgeMessage(AMQMessage message, Subscription sub, boolean purgeOnly) throws AMQException + { //Original.. complicated while loop control // (message != null // && ( @@ -561,9 +579,18 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - // if we are purging then ensure we mark this message taken for the current subscriber - // the current subscriber may be null in the case of a get or a purge but this is ok. - return purge && message.taken(_queue, sub); + if (purgeOnly) + { + // If we are simply purging the queue don't take the message + // just purge up to the next non-taken msg. + return purge && message.isTaken(_queue); + } + else + { + // if we are purging then ensure we mark this message taken for the current subscriber + // the current subscriber may be null in the case of a get or a purge but this is ok. + return purge && message.taken(_queue, sub); + } } public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue) @@ -594,7 +621,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { synchronized (_queueHeadLock) { - message = getNextMessage(messageQueue, sub); + message = getNextMessage(messageQueue, sub, false); // message will be null if we have no messages in the messageQueue. if (message == null) @@ -661,7 +688,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //fixme - we should do the clean up as the message remains on the _message queue // this is resulting in the next consumer receiving the message and then attempting to purge it // - _log.info(debugIdentity() + "We should do clean up of the main _message queue here"); + cleanMainQueue(sub); } } @@ -680,6 +707,18 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + private void cleanMainQueue(Subscription sub) + { + try + { + getNextMessage(_messages, sub, true); + } + catch (AMQException e) + { + _log.warn("Problem during main queue purge:" + e.getMessage()); + } + } + /** * enqueues the messages in the list on the queue and all required predelivery queues * diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java new file mode 100644 index 0000000000..7bbfdb5543 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java @@ -0,0 +1,214 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import java.util.Hashtable; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; + +import junit.framework.TestCase; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + + +/** + * Test Case to ensure that messages are correctly returned. + * This includes checking: + * - The message is returned. + * - The broker doesn't leak memory. + * - The broker's state is correct after test. + */ +public class QueueDepthWithSelectorTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class); + + protected final String BROKER = "vm://:1"; + protected final String VHOST = "test"; + protected final String QUEUE = this.getClass().getName(); + + private Context _context; + + private Connection _clientConnection, _producerConnection; + private Session _clientSession, _producerSession; + private MessageProducer _producer; + private MessageConsumer _consumer; + + private static final int MSG_COUNT = 50; + + private Message[] _messages = new Message[MSG_COUNT]; + + protected void setUp() throws Exception + { + if (BROKER.startsWith("vm://")) + { + TransportConnection.createVMBroker(1); + } + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID/" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + + _context = factory.getInitialContext(env); + + } + + protected void tearDown() throws Exception + { + super.tearDown(); + + if (_producerConnection != null) + { + _producerConnection.close(); + } + + if (_clientConnection != null) + { + _clientConnection.close(); + } + + if (BROKER.startsWith("vm://")) + { + TransportConnection.killAllVMBrokers(); + } + } + + public void test() throws Exception + { + init(); + //Send messages + _logger.info("Starting to send messages"); + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg)); + } + _logger.info("Closing connection"); + //Close the connection.. .giving the broker time to clean up its state. + _producerConnection.close(); + + //Verify we get all the messages. + _logger.info("Verifying messages"); + verifyAllMessagesRecevied(); + + //Close the connection.. .giving the broker time to clean up its state. + _clientConnection.close(); + + //Verify Broker state + _logger.info("Verifying broker state"); + verifyBrokerState(); + } + + private void init() throws NamingException, JMSException + { + _messages = new Message[MSG_COUNT]; + + //Create Producer + _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + _producerConnection.start(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _producer = _producerSession.createProducer((Queue) _context.lookup("queue")); + + // Create consumer + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + _clientConnection.start(); + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumer = _clientSession.createConsumer((Queue) _context.lookup("queue"), "key = 23"); + } + + private void verifyBrokerState() + { + IApplicationRegistry registry = ApplicationRegistry.getInstance(); + + VirtualHost testVhost = registry.getVirtualHostRegistry().getVirtualHost(VHOST); + assertNotNull("Unable to get test Vhost", testVhost); + assertNotNull("Unable to get test queue registry", testVhost.getQueueRegistry()); + AMQQueue q = testVhost.getQueueRegistry().getQueue(new AMQShortString(QUEUE)); + assertNotNull("Unable to get test queue", q); + assertEquals("Queue count too big", 0, q.getMessageCount()); + } + + private void verifyAllMessagesRecevied() throws JMSException + { + + boolean[] msgIdRecevied = new boolean[MSG_COUNT]; + + + for (int i = 0; i < MSG_COUNT; i++) + { + _messages[i] = _consumer.receive(1000); + assertNotNull("should have received a message but didn't", _messages[i]); + } + + //Check received messages + int msgId = 0; + for (Message msg : _messages) + { + assertNotNull("Message should not be null", msg); + assertEquals("msgId was wrong", msgId, msg.getIntProperty("ID")); + assertFalse("Already received msg id " + msgId, msgIdRecevied[msgId]); + msgIdRecevied[msgId] = true; + msgId++; + } + + //Check all received + for (msgId = 0; msgId < MSG_COUNT; msgId++) + { + assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]); + } + } + + /** + * Get the next message putting the given count into the intProperties as ID. + * + * @param msgNo the message count to store as ID. + * + * @return + * + * @throws JMSException + */ + + private Message nextMessage(int msgNo) throws JMSException + { + Message send = _producerSession.createTextMessage("MessageReturnTest"); + send.setIntProperty("ID", msgNo); + send.setIntProperty("key", 23); + return send; + } + +} |
