summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-08-03 13:27:39 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-08-03 13:27:39 +0000
commit014ebe841ae307d4077a45cb522b49fe758ce9d5 (patch)
tree8a80bca827718ab6b1e188d4c32bddd66182ec69 /qpid/java/broker
parent4377b1a6910c7dc0c18060349e2fd6ca704b94ff (diff)
downloadqpid-python-014ebe841ae307d4077a45cb522b49fe758ce9d5.tar.gz
QPID-2002: Added testing of Channel Logging
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@800368 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java6
6 files changed, 57 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index aa390d6c26..2a46ee53b4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -57,6 +57,12 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.logging.actors.AMQPChannelActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
public class AMQChannel
{
@@ -111,13 +117,22 @@ public class AMQChannel
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
- private boolean _closing;
+ private boolean _closing;
+
+ private LogActor _actor;
+ private LogSubject _logSubject;
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
_session = session;
_channelId = channelId;
+
+ _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
+ _logSubject = new ChannelLogSubject(this);
+
+ _actor.message(ChannelMessages.CHN_1001());
+
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
@@ -371,6 +386,8 @@ public class AMQChannel
private void setClosing(boolean closing)
{
_closing = closing;
+
+ CurrentActor.get().message(_logSubject, ChannelMessages.CHN_1003());
}
private void unsubscribeAllConsumers() throws AMQException
@@ -774,6 +791,8 @@ public class AMQChannel
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
+ _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started"));
+
if (wasSuspended)
{
// may need to deliver queued messages
@@ -865,6 +884,8 @@ public class AMQChannel
public void setCredit(final long prefetchSize, final int prefetchCount)
{
+ //fixme
+// _actor.message(ChannelMessages.CHN_100X(prefetchSize, prefetchCount);
_creditManager.setCreditLimits(prefetchSize, prefetchCount);
}
@@ -906,4 +927,9 @@ public class AMQChannel
{
return _recordDeliveryMethod;
}
+
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
index d9f95ecb8e..24df17683f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
@@ -239,8 +239,8 @@ CON-1001 = Open[ : Client ID : {0}][ : Protocol Version : {1}]
CON-1002 = Close
#Channel
-# 0 - count
-CHN-1001 = Create : Prefetch {0, number}
+CHN-1001 = Create
+# : Prefetch Size {0,number} : Count {1,number}
# 0 - flow
CHN-1002 = Flow {0}
CHN-1003 = Close
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 1da5b1c26e..e8ea56bafd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -205,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _sessionID;
}
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -230,7 +236,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
- CurrentActor.set(_actor);
+ //Look up the Channel's Actor and set that as the current actor
+ // If that is not available then we can use the ConnectionActor
+ // that is associated with this AMQMPSession.
+ LogActor channelActor = null;
+ if (_channelMap.get(channelId) != null)
+ {
+ channelActor = _channelMap.get(channelId).getLogActor();
+ }
+ CurrentActor.set(channelActor == null ? _actor : channelActor);
+
try
{
if (_logger.isDebugEnabled())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index f721730d9c..fff406bb3d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -28,6 +28,8 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -38,6 +40,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
{
long getSessionID();
+ LogActor getLogActor();
+
public static final class ProtocolSessionIdentifier
{
private final Object _sessionIdentifier;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
index 2a37eae728..b4dd3da2e6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
@@ -27,15 +27,12 @@ public class ChannelMessagesTest extends AbstractTestMessages
{
public void testMessage1001()
{
- Integer prefetch = 12345;
-
- _logMessage = ChannelMessages.CHN_1001(prefetch);
+ _logMessage = ChannelMessages.CHN_1001();
List<Object> log = performLog();
// We use the MessageFormat here as that is what the ChannelMessage
// will do, this makes the resulting value 12,345
- String[] expected = {"Create", "Prefetch",
- MessageFormat.format("{0, number}", prefetch)};
+ String[] expected = {"Create"};
validateLogMessage(log, "CHN-1001", expected);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index b9dcd972b1..c301969ae5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -62,6 +63,11 @@ public class MockProtocolSession implements AMQProtocolSession
return _sessionID;
}
+ public LogActor getLogActor()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
}