diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-08-12 18:06:35 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-08-12 18:06:35 +0000 |
| commit | 19551c7503e23bfc655df20e366b8a9324a700c6 (patch) | |
| tree | 007c3ba279cb51618223165a8f16ecfe8b84bcf4 /qpid | |
| parent | 0a0eb533a2de22c0c27732034e97be617e361e2f (diff) | |
| download | qpid-python-19551c7503e23bfc655df20e366b8a9324a700c6.tar.gz | |
QPID-2002 : Added new SUB-1003 Message with testing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@803639 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
5 files changed, 148 insertions, 4 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties index 2cd2149712..3380bf2aa6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties @@ -100,3 +100,5 @@ BND-1002 = Deleted #Subscription SUB-1001 = Create[ : Durable][ : Arguments : {0}] SUB-1002 = Close +# 0 - The current subscription state +SUB-1003 = State : {0}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties index 5fc8c82e15..6d358652b8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties @@ -266,3 +266,4 @@ BND-1002 = Deleted #Subscription SUB-1001 = Create[ : Durable][ : Arguments : {0}] SUB-1002 = Close +SUB-1003 = State : {0}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 54bd568bca..b34ef1c382 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -599,6 +599,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) { _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); + CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString())); } else { @@ -611,6 +612,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) { _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString())); } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java index 80ebcc79cd..7752b873b6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java @@ -68,4 +68,16 @@ public class SubscriptionMessagesTest extends AbstractTestMessages validateLogMessage(log, "SUB-1002", expected); } + + public void testMessage1003() + { + String state = "ACTIVE"; + + _logMessage = SubscriptionMessages.SUB_1003(state); + List<Object> log = performLog(); + + String[] expected = {"State :", state}; + + validateLogMessage(log, "SUB-1003", expected); + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java index 11e345de5e..2274af520b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java @@ -20,8 +20,12 @@ */ package org.apache.qpid.server.logging; +import junit.framework.AssertionFailedError; +import org.apache.qpid.client.AMQConnection; + import javax.jms.Connection; import javax.jms.JMSException; +import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; @@ -37,6 +41,7 @@ import java.util.List; * * SUB-1001 : Create : [Durable] [Arguments : <key=value>] * SUB-1002 : Close + * SUB-1003 : State : <state> */ public class SubscriptionLoggingTest extends AbstractTestLogging { @@ -69,7 +74,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging * * 1. Running Broker * 2. Create a new Subscription to a transient queue/topic. - * Output: + * Output: 6 * * <date> SUB-1001 : Create * @@ -170,7 +175,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging assertTrue("AutoClose not on log message:" + message, message.contains("AutoClose")); // Beacause it is an auto close and we have no messages on the queue we - // will get a close message + // will get a close message log = getLog(results.get(1)); validateMessageID("SUB-1002", log); @@ -276,8 +281,6 @@ public class SubscriptionLoggingTest extends AbstractTestLogging { _session.createConsumer(_queue).close(); - - //Validate List<String> results = _monitor.findMatches(SUB_PREFIX); @@ -296,4 +299,128 @@ public class SubscriptionLoggingTest extends AbstractTestLogging } + /** + * Description: + * When a Subscription fills its prefetch it will become suspended. This + * will be logged as a SUB-1003 message. + * Input: + * + * 1. Running broker + * 2. Message Producer to put more data on the queue than the client's prefetch + * 3. Client that ensures that its prefetch becomes full + * Output: + * + * <date> SUB-1003 : State : <state> + * + * Validation Steps: + * 1. The SUB ID is correct + * 2. The state is correct + * + * @throws java.io.IOException - if there is a problem getting the matches + * @throws javax.jms.JMSException - if there is a problem creating the consumer + */ + public void testSubscriptionSuspend() throws Exception, IOException + { + //Close session with large prefetch + _connection.createSession(false, Session.AUTO_ACKNOWLEDGE).close(); + + int PREFETCH = 15; + + //Create new session with small prefetch + _session = ((AMQConnection) _connection).createSession(false, Session.AUTO_ACKNOWLEDGE, PREFETCH); + + MessageConsumer consumer = _session.createConsumer(_queue); + + _connection.start(); + + //Fill the prefetch and two extra so that our receive bellow allows the + // subscription to become active then return to a suspended state. + sendMessage(_session, _queue, 17); + + // Retreive the first message, and start the flow of messages + assertNotNull("First message not retreived", consumer.receive(1000)); + + //Give the internal broker time to respond to the ack that the above + // receive will perform. + if (!isExternalBroker()) + { + Thread.sleep(1000); + } + + _connection.close(); + + //Validate + List<String> results = _monitor.findMatches("SUB-1003"); + + try + { + // Validation expects three messages. + // The first will be logged by the QueueActor as part of the processQueue thread +// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED + // The second will be by the connnection as it acknowledges and activates the subscription +// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE + // The final one will be the subscription suspending as part of the SubFlushRunner +// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED + + assertEquals("Result set larger than expected.", 3, results.size()); + + // Validate Initial Suspension + String expectedState = "SUSPENDED"; + String log = getLog(results.get(0)); + validateSubscriptionState(log, expectedState); + + // Validate that the logActor is the the queue + String actor = fromActor(log); + assertTrue("Actor string does not contain expected queue(" + + _queue.getQueueName() + ") name." + actor, + actor.contains("qu(" + _queue.getQueueName() + ")")); + + // After being suspended the subscription should become active. + expectedState = "ACTIVE"; + log = getLog(results.get(1)); + validateSubscriptionState(log, expectedState); + // Validate we have a connection Actor + actor = fromActor(log); + assertTrue("The actor is not a connection actor:" + actor, actor.startsWith("con:")); + + // Validate that it was re-suspended + expectedState = "SUSPENDED"; + log = getLog(results.get(2)); + validateSubscriptionState(log, expectedState); + // Validate we have a subscription Actor + actor = fromActor(log); + assertTrue("The actor is not a subscription actor:" + actor, actor.startsWith("sub:")); + + } + catch (AssertionFailedError afe) + { + System.err.println("Log Dump:"); + for (String log : results) + { + System.err.println(log); + } + throw afe; + } + + } + + /** + * Validate that the given log statement is a well formatted SUB-1003 + * message. That means the ID and expected state are correct. + * + * @param log the log to test + * @param expectedState the state that should be logged. + */ + private void validateSubscriptionState(String log, String expectedState) + { + validateMessageID("SUB-1003", log); + String logMessage = getMessageString(fromMessage(log)); + assertTrue("Log Message does not start with 'State'" + logMessage, + logMessage.startsWith("State")); + + assertTrue("Log Message does not have expected State of '" + + expectedState + "'" + logMessage, + logMessage.endsWith(expectedState)); + } + } |
