diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-25 21:00:22 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-25 21:00:22 +0000 |
| commit | 58f37a88b1cf6e6a3fa27d184238885a875cbb9f (patch) | |
| tree | aaad64c2879a64275a41097c58cfff609b603e8f /java | |
| parent | 3335bfa2ddc5c83890b967792fb2442ee0680b83 (diff) | |
| download | qpid-python-58f37a88b1cf6e6a3fa27d184238885a875cbb9f.tar.gz | |
Added mechanism to track connection ids for logging and debugging purposes. Changed format of log/debug messages for RequestManager and ResponseManager.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@499968 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
9 files changed, 77 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index a479f5f5d4..5b6bd24faf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -137,8 +137,8 @@ public class AMQChannel _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; _exchanges = exchanges; - _requestManager = new RequestManager(channelId, _session, true); - _responseManager = new ResponseManager(channelId, methodListener, _session, true); + _requestManager = new RequestManager(_session.getConnectionId(), channelId, _session, true); + _responseManager = new ResponseManager(_session.getConnectionId(), channelId, methodListener, _session, true); _txnBuffer = new TxnBuffer(_messageStore); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index a5dfc6d1e5..766a035f80 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -74,6 +74,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; public class AMQMinaProtocolSession implements AMQProtocolSession, ProtocolVersionList, @@ -117,6 +118,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private byte _minor; private FieldTable _clientProperties; + // Keeps a tally of connections for logging and debugging + private static AtomicInteger _ConnectionId; + static { _ConnectionId = new AtomicInteger(0); } + public ManagedObject getManagedObject() { return _managedObject; @@ -127,6 +132,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, AMQCodecFactory codecFactory) throws AMQException { + _ConnectionId.incrementAndGet(); _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this); _minaProtocolSession = session; session.setAttachment(this); @@ -144,6 +150,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException { + _ConnectionId.incrementAndGet(); _stateManager = stateManager; _minaProtocolSession = session; session.setAttachment(this); @@ -700,4 +707,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, throw new RuntimeException("MethodBody version did not match version of current session."); } } + + public int getConnectionId() + { + return _ConnectionId.get(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 7f7fcf20a2..1e5df9b8a5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -149,4 +149,5 @@ public interface AMQProtocolSession extends AMQProtocolWriter byte getMinor(); boolean versionEquals(byte major, byte minor); void checkMethodBodyVersion(AMQMethodBody methodBody); + int getConnectionId(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4da086b3bd..bb3c33f2fe 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -78,7 +78,7 @@ import org.apache.qpid.url.URLSyntaxException; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); - + private AtomicInteger _idFactory = new AtomicInteger(0); /** @@ -155,6 +155,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQException _lastAMQException = null; + // Keeps a tally of connections for logging and debugging + private static AtomicInteger _ConnectionId; + static { _ConnectionId = new AtomicInteger(0); } + /* * The connection meta data */ @@ -200,7 +204,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(ConnectionURL connectionURL) throws AMQException { _logger.info("Connection:" + connectionURL); - + _ConnectionId.incrementAndGet(); if (connectionURL == null) { throw new IllegalArgumentException("Connection must be specified"); @@ -1020,4 +1024,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQConnectionFactory.class.getName(), null); // factory location } + + public int getConnectionId() + { + return _ConnectionId.get(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index edef6c57c2..4d15c3cb35 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -576,4 +576,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _failoverState = failoverState; } + + public int getConnectionId() + { + return _connection.getConnectionId(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 2980a86374..de5cd4821d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -106,6 +106,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis */ protected int _queueId = 1; protected final Object _queueIdLock = new Object(); + + protected int _ConnectionId; /** * No-arg constructor for use by test subclass - has to initialise final vars @@ -118,8 +120,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _stateManager = new AMQStateManager(this); // Add channel 0 request and response managers, since they will not be added through the usual mechanism - _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false)); - _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false)); + _ConnectionId = 0; + _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false)); + _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false)); } public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) @@ -131,8 +134,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _stateManager = new AMQStateManager(this); // Add channel 0 request and response managers, since they will not be added through the usual mechanism - _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false)); - _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false)); + _ConnectionId = _protocolHandler == null ? 0 : _protocolHandler.getConnectionId(); + _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false)); + _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false)); } public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager) @@ -146,8 +150,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _stateManager.setProtocolSession(this); // Add channel 0 request and response managers, since they will not be added through the usual mechanism - _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false)); - _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false)); + _ConnectionId = _protocolHandler == null ? 0 : _protocolHandler.getConnectionId(); + _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false)); + _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false)); } public void init() @@ -379,12 +384,11 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis // Add request and response handlers, one per channel, if they do not already exist if (_channelId2RequestMgrMap.get(channelId) == null) { - _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this, false)); + _channelId2RequestMgrMap.put(channelId, new RequestManager(_ConnectionId, channelId, this, false)); } if (_channelId2ResponseMgrMap.get(channelId) == null) { - - _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this, false)); + _channelId2ResponseMgrMap.put(channelId, new ResponseManager(_ConnectionId, channelId, _stateManager, this, false)); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index f7178742f9..cd63a60c04 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -40,6 +40,7 @@ public class RequestManager * to be known. */ private boolean serverFlag; + private int connectionId; /** * Request and response frames must have a requestID and responseID which @@ -55,11 +56,12 @@ public class RequestManager private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap; - public RequestManager(int channel, AMQProtocolWriter protocolWriter, boolean serverFlag) + public RequestManager(int connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag) { this.channel = channel; this.protocolWriter = protocolWriter; this.serverFlag = serverFlag; + this.connectionId = connectionId; requestIdCount = 1L; lastProcessedResponseId = 0L; requestSentMap = new ConcurrentHashMap<Long, AMQMethodListener>(); @@ -77,11 +79,11 @@ public class RequestManager protocolWriter.writeFrame(requestFrame); if (logger.isDebugEnabled()) { - logger.debug((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + - " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody); + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + + "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody); } - //System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + - // " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody); + //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + + // "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody); return requestId; } @@ -92,11 +94,11 @@ public class RequestManager long requestIdStop = requestIdStart + responseBody.getBatchOffset(); if (logger.isDebugEnabled()) { - logger.debug((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + - " " + responseBody + "; " + responseBody.getMethodPayload()); + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " + + responseBody + "; " + responseBody.getMethodPayload()); } - //System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + - // " " + responseBody + "; " + responseBody.getMethodPayload()); + //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " + + // responseBody + "; " + responseBody.getMethodPayload()); for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) { AMQMethodListener methodListener = requestSentMap.get(requestId); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index dc7a0a2b75..3148603d65 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -43,6 +43,7 @@ public class ResponseManager * to be known. */ private boolean serverFlag; + private int connectionId; private int maxAccumulatedResponses = 20; // Default // private Class currentResponseMethodBodyClass; @@ -83,13 +84,14 @@ public class ResponseManager private ConcurrentHashMap<Long, ResponseStatus> responseMap; - public ResponseManager(int channel, AMQMethodListener methodListener, + public ResponseManager(int connectionId, int channel, AMQMethodListener methodListener, AMQProtocolWriter protocolWriter, boolean serverFlag) { this.channel = channel; this.methodListener = methodListener; this.protocolWriter = protocolWriter; this.serverFlag = serverFlag; + this.connectionId = connectionId; responseIdCount = 1L; lastReceivedRequestId = 0L; // currentResponseMethodBodyClass = null; @@ -103,11 +105,11 @@ public class ResponseManager long requestId = requestBody.getRequestId(); if (logger.isDebugEnabled()) { - logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + - " " + requestBody + "; " + requestBody.getMethodPayload()); + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " + + requestBody + "; " + requestBody.getMethodPayload()); } - //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + - // " " + requestBody + "; " + requestBody.getMethodPayload()); + //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " + + // requestBody + "; " + requestBody.getMethodPayload()); // TODO: responseMark is used in HA, but until then, ignore... long responseMark = requestBody.getResponseMark(); lastReceivedRequestId = requestId; @@ -122,11 +124,11 @@ public class ResponseManager { if (logger.isDebugEnabled()) { - logger.debug((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + - " Res[# " + requestId + "]; " + responseMethodBody); + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + + "] TX RES: Res[# " + requestId + "]; " + responseMethodBody); } - //System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + - // " Res[# " + requestId + "]; " + responseMethodBody); + //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + + // "] TX RES: Res[# " + requestId + "]; " + responseMethodBody); ResponseStatus responseStatus = responseMap.get(requestId); if (responseStatus == null) throw new RequestResponseMappingException(requestId, diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index 12ae522370..292ce6a834 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.store.MessageStore; import javax.security.sasl.SaslServer; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * A protocol session that can be used for testing purposes. @@ -45,8 +46,13 @@ public class MockProtocolSession implements AMQProtocolSession private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); + // Keeps a tally of connections for logging and debugging + private static AtomicInteger _ConnectionId; + static { _ConnectionId = new AtomicInteger(0); } + public MockProtocolSession(MessageStore messageStore) { + _ConnectionId.incrementAndGet(); _messageStore = messageStore; } @@ -221,4 +227,9 @@ public class MockProtocolSession implements AMQProtocolSession // TODO Auto-generated method stub } + + public int getConnectionId() + { + return _ConnectionId.get(); + } } |
