diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-11 15:15:50 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-11 15:15:50 +0000 |
| commit | 9d87aefce83a6848c3dbabc32a7d3338f6a66dfd (patch) | |
| tree | 9588fb95c69277f8cd8f43264998a9a239d80204 /java/common/src/main | |
| parent | 259878b1ab4ed36a0e2e04cef30c2e69f1490f94 (diff) | |
| download | qpid-python-9d87aefce83a6848c3dbabc32a7d3338f6a66dfd.tar.gz | |
Split the RequestResponseManager into RequestManager and ResponseManager since these two functions are independent of each other. Also added a new exception.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@495251 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src/main')
| -rw-r--r-- | java/common/src/main/java/org/apache/qpid/framing/RequestManager.java | 98 | ||||
| -rw-r--r-- | java/common/src/main/java/org/apache/qpid/framing/RequestResponseMappingException.java | 82 | ||||
| -rw-r--r-- | java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (renamed from java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java) | 98 |
3 files changed, 230 insertions, 48 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java new file mode 100644 index 0000000000..e673a8e343 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import java.util.Hashtable; + +import org.apache.qpid.protocol.AMQProtocolWriter; + +public class RequestManager +{ + private int channel; + AMQProtocolWriter protocolSession; + + /** + * Request and response frames must have a requestID and responseID which + * indepenedently increment from 0 on a per-channel basis. These are the + * counters, and contain the value of the next (not yet used) frame. + */ + private long requestIdCount; + + /** + * These keep track of the last requestId and responseId to be received. + */ + private long lastReceivedResponseId; + + private Hashtable<Long, AMQResponseCallback> requestSentMap; + + public RequestManager(int channel, AMQProtocolWriter protocolSession) + { + this.channel = channel; + this.protocolSession = protocolSession; + requestIdCount = 1L; + lastReceivedResponseId = 0L; + requestSentMap = new Hashtable<Long, AMQResponseCallback>(); + } + + // *** Functions to originate a request *** + + public long sendRequest(AMQMethodBody requestMethodBody, + AMQResponseCallback responseCallback) + { + long requestId = getNextRequestId(); // Get new request ID + AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, + lastReceivedResponseId, requestMethodBody); + protocolSession.writeFrame(requestFrame); + requestSentMap.put(requestId, responseCallback); + return requestId; + } + + public void responseReceived(AMQResponseBody responseBody) + throws RequestResponseMappingException + { + lastReceivedResponseId = responseBody.getResponseId(); + long requestIdStart = responseBody.getRequestId(); + long requestIdStop = requestIdStart + responseBody.getBatchOffset(); + for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) + { + AMQResponseCallback responseCallback = requestSentMap.get(requestId); + if (responseCallback == null) + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in requestSentMap."); + responseCallback.responseFrameReceived(responseBody); + requestSentMap.remove(requestId); + } + } + + // *** Management functions *** + + public int requestsMapSize() + { + return requestSentMap.size(); + } + + // *** Private helper functions *** + + private long getNextRequestId() + { + return requestIdCount++; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestResponseMappingException.java b/java/common/src/main/java/org/apache/qpid/framing/RequestResponseMappingException.java new file mode 100644 index 0000000000..25b38dd664 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestResponseMappingException.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + +public class RequestResponseMappingException extends AMQException +{ + private long requestResponseId; + + public RequestResponseMappingException(String msg) + { + super(msg); + } + + public RequestResponseMappingException(long requestResponseId, String msg) + { + super(msg); + this.requestResponseId = requestResponseId; + } + + public RequestResponseMappingException(String msg, Throwable t) + { + super(msg, t); + } + + public RequestResponseMappingException(long requestResponseId, String msg, Throwable t) + { + super(msg, t); + this.requestResponseId = requestResponseId; + } + + public RequestResponseMappingException(Logger logger, String msg) + { + super(msg); + logger.error(getMessage(), this); + } + + public RequestResponseMappingException(Logger logger, long requestResponseId, String msg) + { + super(msg); + this.requestResponseId = requestResponseId; + logger.error(getMessage(), this); + } + + public RequestResponseMappingException(Logger logger, String msg, Throwable t) + { + super(msg, t); + logger.error(getMessage(), this); + } + + public RequestResponseMappingException(Logger logger, long requestResponseId, String msg, Throwable t) + { + super(msg, t); + this.requestResponseId = requestResponseId; + logger.error(getMessage(), this); + } + + public long getRequestResponseId() + { + return requestResponseId; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index 3e86ed7194..a895464a1f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -21,12 +21,12 @@ package org.apache.qpid.framing; import java.util.Iterator; -import java.util.TreeMap; +import java.util.Hashtable; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQProtocolWriter; -public class RequestResponseManager +public class ResponseManager { private int channel; AMQProtocolWriter protocolSession; @@ -56,14 +56,12 @@ public class RequestResponseManager * indepenedently increment from 0 on a per-channel basis. These are the * counters, and contain the value of the next (not yet used) frame. */ - private long requestIdCount; private long responseIdCount; /** * These keep track of the last requestId and responseId to be received. */ private long lastReceivedRequestId; - private long lastReceivedResponseId; /** * Last requestID sent in a response (for batching) @@ -87,49 +85,17 @@ public class RequestResponseManager } } - private TreeMap<Long, AMQResponseCallback> requestSentMap; - private TreeMap<Long, ResponseStatus> responseMap; + private Hashtable<Long, ResponseStatus> responseMap; - public RequestResponseManager(int channel, AMQProtocolWriter protocolSession) + public ResponseManager(int channel, AMQProtocolWriter protocolSession) { this.channel = channel; this.protocolSession = protocolSession; - requestIdCount = 1L; responseIdCount = 1L; lastReceivedRequestId = 0L; - lastReceivedResponseId = 0L; - requestSentMap = new TreeMap<Long, AMQResponseCallback>(); - responseMap = new TreeMap<Long, ResponseStatus>(); + responseMap = new Hashtable<Long, ResponseStatus>(); } - // *** Functions to originate a request *** - - public long sendRequest(AMQMethodBody requestMethodBody, AMQResponseCallback responseCallback) - { - long requestId = getRequestId(); // Get new request ID - AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, - lastReceivedResponseId, requestMethodBody); - protocolSession.writeFrame(requestFrame); - requestSentMap.put(requestId, responseCallback); - return requestId; - } - - public void responseReceived(AMQResponseBody responseBody) throws AMQException - { - lastReceivedResponseId = responseBody.getResponseId(); - long requestIdStart = responseBody.getRequestId(); - long requestIdStop = requestIdStart + responseBody.getBatchOffset(); - for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) - { - AMQResponseCallback responseCallback = requestSentMap.get(requestId); - if (responseCallback == null) - throw new AMQException("Failed to locate requestId " + requestId + - " in requestSentMap."); - responseCallback.responseFrameReceived(responseBody); - requestSentMap.remove(requestId); - } - } - // *** Functions to handle an incoming request *** public void requestReceived(AMQRequestBody requestBody) @@ -144,14 +110,16 @@ public class RequestResponseManager // but how to do this in a way that will work for both client and server? } - public void sendResponse(long requestId, AMQMethodBody responseMethodBody) throws AMQException + public void sendResponse(long requestId, AMQMethodBody responseMethodBody) + throws RequestResponseMappingException { ResponseStatus responseStatus = responseMap.get(requestId); if (responseStatus == null) - throw new AMQException("Failed to locate requestId " + requestId + - " in responseMap."); + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in responseMap."); if (responseStatus.responseMethodBody != null) - throw new AMQException("RequestId " + requestId + " already has a response."); + throw new RequestResponseMappingException(requestId, "RequestId " + + requestId + " already has a response in responseMap."); responseStatus.responseMethodBody = responseMethodBody; doBatches(); } @@ -172,14 +140,48 @@ public class RequestResponseManager } } - // *** Private helper functions *** + public int responsesMapSize() + { + return responseMap.size(); + } - private long getRequestId() + /** + * As the responseMap may contain both outstanding responses (those with + * ResponseStatus.responseMethodBody still null) and responses waiting to + * be batched (those with ResponseStatus.responseMethodBody not null), we + * need to count only those in the map with responseMethodBody null. + */ + public int outstandingResponses() { - return requestIdCount++; + int cnt = 0; + for (Long requestId : responseMap.keySet()) + { + if (responseMap.get(requestId).responseMethodBody == null) + cnt++; + } + return cnt; } - private long getResponseId() + /** + * As the responseMap may contain both outstanding responses (those with + * ResponseStatus.responseMethodBody still null) and responses waiting to + * be batched (those with ResponseStatus.responseMethodBody not null), we + * need to count only those in the map with responseMethodBody not null. + */ + public int batchedResponses() + { + int cnt = 0; + for (Long requestId : responseMap.keySet()) + { + if (responseMap.get(requestId).responseMethodBody != null) + cnt++; + } + return cnt; + } + + // *** Private helper functions *** + + private long getNextResponseId() { return responseIdCount++; } @@ -211,7 +213,7 @@ public class RequestResponseManager private void sendResponseBatch(long firstRequestId, int numAdditionalRequests, AMQMethodBody responseMethodBody) { - long responseId = getResponseId(); // Get new request ID + long responseId = getNextResponseId(); // Get new request ID AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, firstRequestId, numAdditionalRequests, responseMethodBody); protocolSession.writeFrame(responseFrame); |
