diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-07 22:28:45 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-07 22:28:45 +0000 |
| commit | 4a997d0ccf34f790cf9e55e6b2e22cc65b0428e1 (patch) | |
| tree | f2e9d8c5acb42f068c30d6c0939e83b187d79c11 | |
| parent | d9319171b0e761c86e39facf365db62a88ee1ce6 (diff) | |
| download | qpid-python-4a997d0ccf34f790cf9e55e6b2e22cc65b0428e1.tar.gz | |
QPID-942 : Added tests for broker and client log messages produced when flow control invoked
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@822949 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 10 | ||||
| -rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java | 85 |
2 files changed, 88 insertions, 7 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 8b6c15c0c3..08c4e94d1e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -127,6 +127,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); + private final AtomicBoolean _overfull = new AtomicBoolean(false); protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -1187,6 +1188,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if(_atomicQueueSize.get() > _capacity) { + _overfull.set(true); //Overfull log message _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity)); @@ -1217,10 +1219,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if(_capacity != 0L) { - if(_atomicQueueSize.get() <= _flowResumeCapacity) + if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity) { - //Underfull log message - _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + if(_overfull.compareAndSet(true,false)) + {//Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + } for(AMQChannel c : _blockedChannels.keySet()) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index 97147904e1..02db144694 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -26,15 +26,18 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.AbstractTestLogging; import org.apache.qpid.framing.AMQShortString; import javax.jms.*; import javax.naming.NamingException; import java.util.HashMap; import java.util.Map; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.io.IOException; -public class ProducerFlowControlTest extends QpidTestCase +public class ProducerFlowControlTest extends AbstractTestLogging { private static final int TIMEOUT = 1500; @@ -56,10 +59,12 @@ public class ProducerFlowControlTest extends QpidTestCase private MessageConsumer consumer; private final AtomicInteger _sentMessages = new AtomicInteger(); - protected void setUp() throws Exception + public void setUp() throws Exception { super.setUp(); + _monitor.reset(); + producerConnection = getConnection(); producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -70,7 +75,7 @@ public class ProducerFlowControlTest extends QpidTestCase } - protected void tearDown() throws Exception + public void tearDown() throws Exception { producerConnection.close(); consumerConnection.close(); @@ -117,6 +122,79 @@ public class ProducerFlowControlTest extends QpidTestCase } + public void testBrokerLogMessages() + throws JMSException, NamingException, AMQException, InterruptedException, IOException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + List<String> results = _monitor.findMatches("QUE-1003"); + + assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + while(consumer.receive(1000) != null); + + results = _monitor.findMatches("QUE-1004"); + + assertEquals("Did not find correct number of QUE_1004 queue underfull messages", 1, results.size()); + + + + } + + + public void testClientLogMessages() + throws JMSException, NamingException, AMQException, InterruptedException, IOException + { + long origTimeoutValue = Long.getLong("qpid.flow_control_wait_failure",AMQSession.DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + System.setProperty("qpid.flow_control_wait_failure","3000"); + System.setProperty("qpid.flow_control_wait_notify_period","1000"); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).declareAndBind((AMQDestination)queue); + producer = session.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(10000); + List<String> results = _monitor.findMatches("Message send delayed by"); + assertEquals("Incorrect number of delay messages logged by client",3,results.size()); + results = _monitor.findMatches("Message send failed due to timeout waiting on broker enforced flow control"); + assertEquals("Incorrect number of send failure messages logged by client",1,results.size()); + + System.setProperty("qpid.flow_control_wait_failure",String.valueOf(origTimeoutValue)); + System.setProperty("qpid.flow_control_wait_notify_period","5000"); + + + } + public void testFlowControlOnCapacityResumeEqual() throws JMSException, NamingException, AMQException, InterruptedException @@ -131,7 +209,6 @@ public class ProducerFlowControlTest extends QpidTestCase _sentMessages.set(0); - // try to send 5 messages (should block after 4) sendMessagesAsync(producer, producerSession, 5, 50L); |
