diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-08-12 18:05:34 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-08-12 18:05:34 +0000 |
| commit | 0a0eb533a2de22c0c27732034e97be617e361e2f (patch) | |
| tree | f8b655dc12d6976e864e5a485728c1470756c35d /qpid | |
| parent | 4c86290c98f282e053b90ae9236651c9422bc4c3 (diff) | |
| download | qpid-python-0a0eb533a2de22c0c27732034e97be617e361e2f.tar.gz | |
QPID-2002 : Addition of a QueueActor to be set during running of the processQueue thread
Made QueueLogSubject public so it can be reused by QueueActor
Updated SAMQQ to create a QueueActor for use during the processQueue thread run
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@803638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
4 files changed, 183 insertions, 2 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java new file mode 100644 index 0000000000..acac447ff6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.queue.AMQQueue; + +import java.text.MessageFormat; + +/** + * This Actor is used when while the queue is performing an asynchronous process + * of its queue. + */ +public class QueueActor extends AbstractActor +{ + + /** + * Create an QueueLogSubject that Logs in the following format. + * + * @param queue The queue that this Actor is working for + * @param rootLogger the Root logger to use. + */ + public QueueActor(AMQQueue queue, RootMessageLogger rootLogger) + { + super(rootLogger); + + _logString = "[" + MessageFormat.format(QueueLogSubject.LOG_FORMAT, + queue.getVirtualHost().getName(), + queue.getName()) + "] "; + + } +} +
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java index 89f31ef477..b132d9e93f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java @@ -33,12 +33,12 @@ public class QueueLogSubject extends AbstractLogSubject * 0 - Virtualhost name * 1 - queue name */ - protected static String BINDING_FORMAT = "vh(/{0})/qu({1})"; + public static String LOG_FORMAT = "vh(/{0})/qu({1})"; /** Create an QueueLogSubject that Logs in the following format. */ public QueueLogSubject(AMQQueue queue) { - setLogStringWithFormat(BINDING_FORMAT, + setLogStringWithFormat(LOG_FORMAT, queue.getVirtualHost().getName(), queue.getName()); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 82dd4b0195..b14b92b014 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -30,8 +30,10 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.QueueMessages; /* @@ -118,6 +120,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); private LogSubject _logSubject; + private LogActor _logActor; protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -154,6 +157,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); _logSubject = new QueueLogSubject(this); + _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); // Log the correct creation message @@ -1189,12 +1193,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { try { + CurrentActor.set(_logActor); processQueue(this); } catch (AMQException e) { _logger.error(e); } + finally + { + CurrentActor.remove(); + } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java new file mode 100644 index 0000000000..5d2fe26707 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import junit.framework.TestCase; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; + +import java.util.List; + +public class QueueActorTest extends TestCase +{ + LogActor _amqpActor; + UnitTestMessageLogger _rawLogger; + + public void setUp() throws ConfigurationException + { + Configuration config = new PropertiesConfiguration(); + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + RootMessageLogger rootLogger = + new RootMessageLoggerImpl(serverConfig, _rawLogger); + + MockAMQQueue queue = new MockAMQQueue(getName()); + + queue.setVirtualHost(ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next()); + + _amqpActor = new QueueActor(queue, rootLogger); + } + + public void tearDown() + { + _rawLogger.clearLogMessages(); + ApplicationRegistry.remove(); + } + + /** + * Test the QueueActor as a logger. + * + * The test logs a message then verifies that it entered the logs correctly + * + * The log message should be fully repalaced (no '{n}' values) and should + * contain the correct queue identification. + */ + public void testQueueActor() + { + final String message = "test logging"; + + _amqpActor.message(new LogSubject() + { + public String toString() + { + return "[AMQPActorTest]"; + } + + }, new LogMessage() + { + public String toString() + { + return message; + } + }); + + List<Object> logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 1, logs.size()); + + String log = logs.get(0).toString(); + + // Verify that the logged message is present in the output + assertTrue("Message was not found in log message", + log.contains(message)); + + // Verify that all the values were presented to the MessageFormatter + // so we will not end up with '{n}' entries in the log. + assertFalse("Verify that the string does not contain any '{':" + log, + log.contains("{")); + + // Verify that the message has the correct type + assertTrue("Message contains the [vh: prefix:" + log, + log.contains("[vh(")); + + // Verify that the logged message contains the 'qu(' marker + String expected = "qu(" + getName() + ")"; + assertTrue("Message was not logged with a queue identifer '"+expected+"' actual:" + log, + log.contains(expected)); + } + +} + |
