summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-08-12 18:06:35 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-08-12 18:06:35 +0000
commit19551c7503e23bfc655df20e366b8a9324a700c6 (patch)
tree007c3ba279cb51618223165a8f16ecfe8b84bcf4 /qpid
parent0a0eb533a2de22c0c27732034e97be617e361e2f (diff)
downloadqpid-python-19551c7503e23bfc655df20e366b8a9324a700c6.tar.gz
QPID-2002 : Added new SUB-1003 Message with testing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@803639 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java12
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java135
5 files changed, 148 insertions, 4 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
index 2cd2149712..3380bf2aa6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
@@ -100,3 +100,5 @@ BND-1002 = Deleted
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
+# 0 - The current subscription state
+SUB-1003 = State : {0} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
index 5fc8c82e15..6d358652b8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
@@ -266,3 +266,4 @@ BND-1002 = Deleted
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
+SUB-1003 = State : {0} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 54bd568bca..b34ef1c382 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -599,6 +599,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
{
_stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString()));
}
else
{
@@ -611,6 +612,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
{
_stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString()));
}
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java
index 80ebcc79cd..7752b873b6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java
@@ -68,4 +68,16 @@ public class SubscriptionMessagesTest extends AbstractTestMessages
validateLogMessage(log, "SUB-1002", expected);
}
+
+ public void testMessage1003()
+ {
+ String state = "ACTIVE";
+
+ _logMessage = SubscriptionMessages.SUB_1003(state);
+ List<Object> log = performLog();
+
+ String[] expected = {"State :", state};
+
+ validateLogMessage(log, "SUB-1003", expected);
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
index 11e345de5e..2274af520b 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
@@ -20,8 +20,12 @@
*/
package org.apache.qpid.server.logging;
+import junit.framework.AssertionFailedError;
+import org.apache.qpid.client.AMQConnection;
+
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
@@ -37,6 +41,7 @@ import java.util.List;
*
* SUB-1001 : Create : [Durable] [Arguments : <key=value>]
* SUB-1002 : Close
+ * SUB-1003 : State : <state>
*/
public class SubscriptionLoggingTest extends AbstractTestLogging
{
@@ -69,7 +74,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
*
* 1. Running Broker
* 2. Create a new Subscription to a transient queue/topic.
- * Output:
+ * Output: 6
*
* <date> SUB-1001 : Create
*
@@ -170,7 +175,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
assertTrue("AutoClose not on log message:" + message, message.contains("AutoClose"));
// Beacause it is an auto close and we have no messages on the queue we
- // will get a close message
+ // will get a close message
log = getLog(results.get(1));
validateMessageID("SUB-1002", log);
@@ -276,8 +281,6 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
{
_session.createConsumer(_queue).close();
-
-
//Validate
List<String> results = _monitor.findMatches(SUB_PREFIX);
@@ -296,4 +299,128 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
}
+ /**
+ * Description:
+ * When a Subscription fills its prefetch it will become suspended. This
+ * will be logged as a SUB-1003 message.
+ * Input:
+ *
+ * 1. Running broker
+ * 2. Message Producer to put more data on the queue than the client's prefetch
+ * 3. Client that ensures that its prefetch becomes full
+ * Output:
+ *
+ * <date> SUB-1003 : State : <state>
+ *
+ * Validation Steps:
+ * 1. The SUB ID is correct
+ * 2. The state is correct
+ *
+ * @throws java.io.IOException - if there is a problem getting the matches
+ * @throws javax.jms.JMSException - if there is a problem creating the consumer
+ */
+ public void testSubscriptionSuspend() throws Exception, IOException
+ {
+ //Close session with large prefetch
+ _connection.createSession(false, Session.AUTO_ACKNOWLEDGE).close();
+
+ int PREFETCH = 15;
+
+ //Create new session with small prefetch
+ _session = ((AMQConnection) _connection).createSession(false, Session.AUTO_ACKNOWLEDGE, PREFETCH);
+
+ MessageConsumer consumer = _session.createConsumer(_queue);
+
+ _connection.start();
+
+ //Fill the prefetch and two extra so that our receive bellow allows the
+ // subscription to become active then return to a suspended state.
+ sendMessage(_session, _queue, 17);
+
+ // Retreive the first message, and start the flow of messages
+ assertNotNull("First message not retreived", consumer.receive(1000));
+
+ //Give the internal broker time to respond to the ack that the above
+ // receive will perform.
+ if (!isExternalBroker())
+ {
+ Thread.sleep(1000);
+ }
+
+ _connection.close();
+
+ //Validate
+ List<String> results = _monitor.findMatches("SUB-1003");
+
+ try
+ {
+ // Validation expects three messages.
+ // The first will be logged by the QueueActor as part of the processQueue thread
+// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+ // The second will be by the connnection as it acknowledges and activates the subscription
+// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE
+ // The final one will be the subscription suspending as part of the SubFlushRunner
+// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+
+ assertEquals("Result set larger than expected.", 3, results.size());
+
+ // Validate Initial Suspension
+ String expectedState = "SUSPENDED";
+ String log = getLog(results.get(0));
+ validateSubscriptionState(log, expectedState);
+
+ // Validate that the logActor is the the queue
+ String actor = fromActor(log);
+ assertTrue("Actor string does not contain expected queue("
+ + _queue.getQueueName() + ") name." + actor,
+ actor.contains("qu(" + _queue.getQueueName() + ")"));
+
+ // After being suspended the subscription should become active.
+ expectedState = "ACTIVE";
+ log = getLog(results.get(1));
+ validateSubscriptionState(log, expectedState);
+ // Validate we have a connection Actor
+ actor = fromActor(log);
+ assertTrue("The actor is not a connection actor:" + actor, actor.startsWith("con:"));
+
+ // Validate that it was re-suspended
+ expectedState = "SUSPENDED";
+ log = getLog(results.get(2));
+ validateSubscriptionState(log, expectedState);
+ // Validate we have a subscription Actor
+ actor = fromActor(log);
+ assertTrue("The actor is not a subscription actor:" + actor, actor.startsWith("sub:"));
+
+ }
+ catch (AssertionFailedError afe)
+ {
+ System.err.println("Log Dump:");
+ for (String log : results)
+ {
+ System.err.println(log);
+ }
+ throw afe;
+ }
+
+ }
+
+ /**
+ * Validate that the given log statement is a well formatted SUB-1003
+ * message. That means the ID and expected state are correct.
+ *
+ * @param log the log to test
+ * @param expectedState the state that should be logged.
+ */
+ private void validateSubscriptionState(String log, String expectedState)
+ {
+ validateMessageID("SUB-1003", log);
+ String logMessage = getMessageString(fromMessage(log));
+ assertTrue("Log Message does not start with 'State'" + logMessage,
+ logMessage.startsWith("State"));
+
+ assertTrue("Log Message does not have expected State of '"
+ + expectedState + "'" + logMessage,
+ logMessage.endsWith(expectedState));
+ }
+
}