diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-08-03 13:27:39 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-08-03 13:27:39 +0000 |
| commit | 014ebe841ae307d4077a45cb522b49fe758ce9d5 (patch) | |
| tree | 8a80bca827718ab6b1e188d4c32bddd66182ec69 /qpid/java/broker | |
| parent | 4377b1a6910c7dc0c18060349e2fd6ca704b94ff (diff) | |
| download | qpid-python-014ebe841ae307d4077a45cb522b49fe758ce9d5.tar.gz | |
QPID-2002: Added testing of Channel Logging
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@800368 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
6 files changed, 57 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index aa390d6c26..2a46ee53b4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -57,6 +57,12 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.logging.actors.AMQPChannelActor; +import org.apache.qpid.server.logging.actors.CurrentActor; public class AMQChannel { @@ -111,13 +117,22 @@ public class AMQChannel // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - private boolean _closing; + private boolean _closing; + + private LogActor _actor; + private LogSubject _logSubject; public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { _session = session; _channelId = channelId; + + _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); + _logSubject = new ChannelLogSubject(this); + + _actor.message(ChannelMessages.CHN_1001()); + _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); @@ -371,6 +386,8 @@ public class AMQChannel private void setClosing(boolean closing) { _closing = closing; + + CurrentActor.get().message(_logSubject, ChannelMessages.CHN_1003()); } private void unsubscribeAllConsumers() throws AMQException @@ -774,6 +791,8 @@ public class AMQChannel boolean wasSuspended = _suspended.getAndSet(suspended); if (wasSuspended != suspended) { + _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started")); + if (wasSuspended) { // may need to deliver queued messages @@ -865,6 +884,8 @@ public class AMQChannel public void setCredit(final long prefetchSize, final int prefetchCount) { + //fixme +// _actor.message(ChannelMessages.CHN_100X(prefetchSize, prefetchCount); _creditManager.setCreditLimits(prefetchSize, prefetchCount); } @@ -906,4 +927,9 @@ public class AMQChannel { return _recordDeliveryMethod; } + + public LogActor getLogActor() + { + return _actor; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties index d9f95ecb8e..24df17683f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties @@ -239,8 +239,8 @@ CON-1001 = Open[ : Client ID : {0}][ : Protocol Version : {1}] CON-1002 = Close #Channel -# 0 - count -CHN-1001 = Create : Prefetch {0, number} +CHN-1001 = Create +# : Prefetch Size {0,number} : Count {1,number} # 0 - flow CHN-1002 = Flow {0} CHN-1003 = Close diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 1da5b1c26e..e8ea56bafd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.logging.actors.AMQPConnectionActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -205,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return _sessionID; } + public LogActor getLogActor() + { + return _actor; + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; @@ -230,7 +236,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); - CurrentActor.set(_actor); + //Look up the Channel's Actor and set that as the current actor + // If that is not available then we can use the ConnectionActor + // that is associated with this AMQMPSession. + LogActor channelActor = null; + if (_channelMap.get(channelId) != null) + { + channelActor = _channelMap.get(channelId).getLogActor(); + } + CurrentActor.set(channelActor == null ? _actor : channelActor); + try { if (_logger.isDebugEnabled()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index f721730d9c..fff406bb3d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -28,6 +28,8 @@ import org.apache.qpid.framing.*; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -38,6 +40,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession { long getSessionID(); + LogActor getLogActor(); + public static final class ProtocolSessionIdentifier { private final Object _sessionIdentifier; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java index 2a37eae728..b4dd3da2e6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java @@ -27,15 +27,12 @@ public class ChannelMessagesTest extends AbstractTestMessages { public void testMessage1001() { - Integer prefetch = 12345; - - _logMessage = ChannelMessages.CHN_1001(prefetch); + _logMessage = ChannelMessages.CHN_1001(); List<Object> log = performLog(); // We use the MessageFormat here as that is what the ChannelMessage // will do, this makes the resulting value 12,345 - String[] expected = {"Create", "Prefetch", - MessageFormat.format("{0, number}", prefetch)}; + String[] expected = {"Create"}; validateLogMessage(log, "CHN-1001", expected); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index b9dcd972b1..c301969ae5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -62,6 +63,11 @@ public class MockProtocolSession implements AMQProtocolSession return _sessionID; } + public LogActor getLogActor() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { } |
