summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-25 21:00:22 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-25 21:00:22 +0000
commit58f37a88b1cf6e6a3fa27d184238885a875cbb9f (patch)
treeaaad64c2879a64275a41097c58cfff609b603e8f /java
parent3335bfa2ddc5c83890b967792fb2442ee0680b83 (diff)
downloadqpid-python-58f37a88b1cf6e6a3fa27d184238885a875cbb9f.tar.gz
Added mechanism to track connection ids for logging and debugging purposes. Changed format of log/debug messages for RequestManager and ResponseManager.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@499968 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestManager.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java20
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java11
9 files changed, 77 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index a479f5f5d4..5b6bd24faf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -137,8 +137,8 @@ public class AMQChannel
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
- _requestManager = new RequestManager(channelId, _session, true);
- _responseManager = new ResponseManager(channelId, methodListener, _session, true);
+ _requestManager = new RequestManager(_session.getConnectionId(), channelId, _session, true);
+ _responseManager = new ResponseManager(_session.getConnectionId(), channelId, methodListener, _session, true);
_txnBuffer = new TxnBuffer(_messageStore);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index a5dfc6d1e5..766a035f80 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -74,6 +74,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
public class AMQMinaProtocolSession implements AMQProtocolSession,
ProtocolVersionList,
@@ -117,6 +118,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private byte _minor;
private FieldTable _clientProperties;
+ // Keeps a tally of connections for logging and debugging
+ private static AtomicInteger _ConnectionId;
+ static { _ConnectionId = new AtomicInteger(0); }
+
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -127,6 +132,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
AMQCodecFactory codecFactory)
throws AMQException
{
+ _ConnectionId.incrementAndGet();
_stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
@@ -144,6 +150,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
AMQCodecFactory codecFactory, AMQStateManager stateManager)
throws AMQException
{
+ _ConnectionId.incrementAndGet();
_stateManager = stateManager;
_minaProtocolSession = session;
session.setAttachment(this);
@@ -700,4 +707,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
throw new RuntimeException("MethodBody version did not match version of current session.");
}
}
+
+ public int getConnectionId()
+ {
+ return _ConnectionId.get();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 7f7fcf20a2..1e5df9b8a5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -149,4 +149,5 @@ public interface AMQProtocolSession extends AMQProtocolWriter
byte getMinor();
boolean versionEquals(byte major, byte minor);
void checkMethodBodyVersion(AMQMethodBody methodBody);
+ int getConnectionId();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 4da086b3bd..bb3c33f2fe 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -78,7 +78,7 @@ import org.apache.qpid.url.URLSyntaxException;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = Logger.getLogger(AMQConnection.class);
-
+
private AtomicInteger _idFactory = new AtomicInteger(0);
/**
@@ -155,6 +155,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQException _lastAMQException = null;
+ // Keeps a tally of connections for logging and debugging
+ private static AtomicInteger _ConnectionId;
+ static { _ConnectionId = new AtomicInteger(0); }
+
/*
* The connection meta data
*/
@@ -200,7 +204,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQConnection(ConnectionURL connectionURL) throws AMQException
{
_logger.info("Connection:" + connectionURL);
-
+ _ConnectionId.incrementAndGet();
if (connectionURL == null)
{
throw new IllegalArgumentException("Connection must be specified");
@@ -1020,4 +1024,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
AMQConnectionFactory.class.getName(),
null); // factory location
}
+
+ public int getConnectionId()
+ {
+ return _ConnectionId.get();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index edef6c57c2..4d15c3cb35 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -576,4 +576,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_failoverState = failoverState;
}
+
+ public int getConnectionId()
+ {
+ return _connection.getConnectionId();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 2980a86374..de5cd4821d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -106,6 +106,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
*/
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
+
+ protected int _ConnectionId;
/**
* No-arg constructor for use by test subclass - has to initialise final vars
@@ -118,8 +120,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_stateManager = new AMQStateManager(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
+ _ConnectionId = 0;
+ _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -131,8 +134,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_stateManager = new AMQStateManager(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
+ _ConnectionId = _protocolHandler == null ? 0 : _protocolHandler.getConnectionId();
+ _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
@@ -146,8 +150,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_stateManager.setProtocolSession(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
+ _ConnectionId = _protocolHandler == null ? 0 : _protocolHandler.getConnectionId();
+ _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
}
public void init()
@@ -379,12 +384,11 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
// Add request and response handlers, one per channel, if they do not already exist
if (_channelId2RequestMgrMap.get(channelId) == null)
{
- _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this, false));
+ _channelId2RequestMgrMap.put(channelId, new RequestManager(_ConnectionId, channelId, this, false));
}
if (_channelId2ResponseMgrMap.get(channelId) == null)
{
-
- _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this, false));
+ _channelId2ResponseMgrMap.put(channelId, new ResponseManager(_ConnectionId, channelId, _stateManager, this, false));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
index f7178742f9..cd63a60c04 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
@@ -40,6 +40,7 @@ public class RequestManager
* to be known.
*/
private boolean serverFlag;
+ private int connectionId;
/**
* Request and response frames must have a requestID and responseID which
@@ -55,11 +56,12 @@ public class RequestManager
private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap;
- public RequestManager(int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
+ public RequestManager(int connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
this.protocolWriter = protocolWriter;
this.serverFlag = serverFlag;
+ this.connectionId = connectionId;
requestIdCount = 1L;
lastProcessedResponseId = 0L;
requestSentMap = new ConcurrentHashMap<Long, AMQMethodListener>();
@@ -77,11 +79,11 @@ public class RequestManager
protocolWriter.writeFrame(requestFrame);
if (logger.isDebugEnabled())
{
- logger.debug((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel +
- " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
}
- //System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel +
- // " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+ //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ // "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
return requestId;
}
@@ -92,11 +94,11 @@ public class RequestManager
long requestIdStop = requestIdStart + responseBody.getBatchOffset();
if (logger.isDebugEnabled())
{
- logger.debug((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel +
- " " + responseBody + "; " + responseBody.getMethodPayload());
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " +
+ responseBody + "; " + responseBody.getMethodPayload());
}
- //System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel +
- // " " + responseBody + "; " + responseBody.getMethodPayload());
+ //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " +
+ // responseBody + "; " + responseBody.getMethodPayload());
for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
{
AMQMethodListener methodListener = requestSentMap.get(requestId);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
index dc7a0a2b75..3148603d65 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
@@ -43,6 +43,7 @@ public class ResponseManager
* to be known.
*/
private boolean serverFlag;
+ private int connectionId;
private int maxAccumulatedResponses = 20; // Default
// private Class currentResponseMethodBodyClass;
@@ -83,13 +84,14 @@ public class ResponseManager
private ConcurrentHashMap<Long, ResponseStatus> responseMap;
- public ResponseManager(int channel, AMQMethodListener methodListener,
+ public ResponseManager(int connectionId, int channel, AMQMethodListener methodListener,
AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
this.methodListener = methodListener;
this.protocolWriter = protocolWriter;
this.serverFlag = serverFlag;
+ this.connectionId = connectionId;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
// currentResponseMethodBodyClass = null;
@@ -103,11 +105,11 @@ public class ResponseManager
long requestId = requestBody.getRequestId();
if (logger.isDebugEnabled())
{
- logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
- " " + requestBody + "; " + requestBody.getMethodPayload());
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " +
+ requestBody + "; " + requestBody.getMethodPayload());
}
- //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
- // " " + requestBody + "; " + requestBody.getMethodPayload());
+ //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " +
+ // requestBody + "; " + requestBody.getMethodPayload());
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
lastReceivedRequestId = requestId;
@@ -122,11 +124,11 @@ public class ResponseManager
{
if (logger.isDebugEnabled())
{
- logger.debug((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel +
- " Res[# " + requestId + "]; " + responseMethodBody);
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ "] TX RES: Res[# " + requestId + "]; " + responseMethodBody);
}
- //System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel +
- // " Res[# " + requestId + "]; " + responseMethodBody);
+ //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ // "] TX RES: Res[# " + requestId + "]; " + responseMethodBody);
ResponseStatus responseStatus = responseMap.get(requestId);
if (responseStatus == null)
throw new RequestResponseMappingException(requestId,
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 12ae522370..292ce6a834 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.store.MessageStore;
import javax.security.sasl.SaslServer;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A protocol session that can be used for testing purposes.
@@ -45,8 +46,13 @@ public class MockProtocolSession implements AMQProtocolSession
private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+ // Keeps a tally of connections for logging and debugging
+ private static AtomicInteger _ConnectionId;
+ static { _ConnectionId = new AtomicInteger(0); }
+
public MockProtocolSession(MessageStore messageStore)
{
+ _ConnectionId.incrementAndGet();
_messageStore = messageStore;
}
@@ -221,4 +227,9 @@ public class MockProtocolSession implements AMQProtocolSession
// TODO Auto-generated method stub
}
+
+ public int getConnectionId()
+ {
+ return _ConnectionId.get();
+ }
}