summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorPhil Harvey <philharveyonline@apache.org>2012-12-20 09:48:35 +0000
committerPhil Harvey <philharveyonline@apache.org>2012-12-20 09:48:35 +0000
commit7e577de7a0bd77c87f7b2c1961ec11b0f3b35502 (patch)
treed0414712ffed8f3e5aec75b8ad5f2a40be2c3063 /java/broker
parenta25269e5ba5801b787eebf2fd12d466c9fdba70e (diff)
downloadqpid-python-7e577de7a0bd77c87f7b2c1961ec11b0f3b35502.tar.gz
QPID-4515: improved broker logging, particularly when receiving/sending AMQP 0-8/0-9 frames and when committing transactions.
Work done by Keith (kwall) and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1424382 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java117
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java17
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java10
4 files changed, 110 insertions, 41 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 76a3a7f224..0826f182fd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -378,7 +378,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if (_logger.isDebugEnabled())
{
- _logger.debug(debugIdentity() + "Content body received on channel " + _channelId);
+ _logger.debug(debugIdentity() + " content body received on channel " + _channelId);
}
try
@@ -1583,6 +1583,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void sync()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("sync() called on channel " + debugIdentity());
+ }
+
AsyncCommand cmd;
while((cmd = _unfinishedCommandsQueue.poll()) != null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
index a68ac5439c..917215a42f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
@@ -218,55 +218,71 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final boolean isRedelivered = entry.isRedelivered();
- final AMQBody returnBlock = new AMQBody()
- {
-
- private AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return _methodRegistry.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
+ final AMQBody returnBlock = new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered);
+ return returnBlock;
+ }
+ private class EncodedDeliveryBody implements AMQBody
+ {
+ private final long _deliveryTag;
+ private final AMQShortString _routingKey;
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _consumerTag;
+ private final boolean _isRedelivered;
+ private AMQBody _underlyingBody;
+
+ private EncodedDeliveryBody(long deliveryTag, AMQShortString routingKey, AMQShortString exchangeName, AMQShortString consumerTag, boolean isRedelivered)
+ {
+ _deliveryTag = deliveryTag;
+ _routingKey = routingKey;
+ _exchangeName = exchangeName;
+ _consumerTag = consumerTag;
+ _isRedelivered = isRedelivered;
+ }
+ public AMQBody createAMQBody()
+ {
+ return _methodRegistry.createBasicDeliverBody(_consumerTag,
+ _deliveryTag,
+ _isRedelivered,
+ _exchangeName,
+ _routingKey);
+ }
- }
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
- public byte getFrameType()
+ public int getSize()
+ {
+ if(_underlyingBody == null)
{
- return AMQMethodBody.TYPE;
+ _underlyingBody = createAMQBody();
}
+ return _underlyingBody.getSize();
+ }
- public int getSize()
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_underlyingBody == null)
{
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
+ _underlyingBody = createAMQBody();
}
+ _underlyingBody.writePayload(buffer);
+ }
- public void writePayload(DataOutput buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
+ @Override
+ public String toString()
+ {
+ return "[" + getClass().getSimpleName() + " underlyingBody: " + String.valueOf(_underlyingBody) + "]";
+ }
}
private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
@@ -368,7 +384,6 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
_methodBody = methodBody;
_headerBody = headerBody;
_contentBody = contentBody;
-
}
public long getSize()
@@ -380,6 +395,19 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[").append(getClass().getSimpleName())
+ .append(" methodBody=").append(_methodBody)
+ .append(", headerBody=").append(_headerBody)
+ .append(", contentBody=").append(_contentBody)
+ .append(", channel=").append(_channel).append("]");
+ return builder.toString();
+ }
+
}
public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
@@ -408,6 +436,17 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append(getClass().getSimpleName())
+ .append("methodBody=").append(_methodBody)
+ .append(", headerBody=").append(_headerBody)
+ .append(", channel=").append(_channel).append("]");
+ return builder.toString();
+ }
}
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 72c21d357e..f77f3a764a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -303,9 +303,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
try
{
+ long startTime = 0;
+ String frameToString = null;
if (_logger.isDebugEnabled())
{
- _logger.debug("Frame Received: " + frame);
+ startTime = System.currentTimeMillis();
+ frameToString = frame.toString();
+ _logger.debug("RECV: " + frame);
}
// Check that this channel is not closing
@@ -340,6 +344,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeChannel(channelId);
throw e;
}
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
+ }
}
finally
{
@@ -543,6 +552,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("SEND: " + frame);
+ }
+
_sender.send(buf);
final long time = System.currentTimeMillis();
_lastIoTime = time;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index f11fb1086e..df95ce46d5 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -384,10 +384,20 @@ public class LocalTransaction implements ServerTransaction
private void doPostTransactionActions()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Beginning " + _postTransactionActions.size() + " post transaction actions");
+ }
+
for(int i = 0; i < _postTransactionActions.size(); i++)
{
_postTransactionActions.get(i).postCommit();
}
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Completed post transaction actions");
+ }
}
public void rollback()