From 9d87aefce83a6848c3dbabc32a7d3338f6a66dfd Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 11 Jan 2007 15:15:50 +0000 Subject: Split the RequestResponseManager into RequestManager and ResponseManager since these two functions are independent of each other. Also added a new exception. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@495251 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/framing/RequestManager.java | 98 +++++++++ .../qpid/framing/RequestResponseManager.java | 219 -------------------- .../framing/RequestResponseMappingException.java | 82 ++++++++ .../org/apache/qpid/framing/ResponseManager.java | 221 +++++++++++++++++++++ 4 files changed, 401 insertions(+), 219 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/framing/RequestManager.java delete mode 100644 java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/RequestResponseMappingException.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java new file mode 100644 index 0000000000..e673a8e343 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import java.util.Hashtable; + +import org.apache.qpid.protocol.AMQProtocolWriter; + +public class RequestManager +{ + private int channel; + AMQProtocolWriter protocolSession; + + /** + * Request and response frames must have a requestID and responseID which + * indepenedently increment from 0 on a per-channel basis. These are the + * counters, and contain the value of the next (not yet used) frame. + */ + private long requestIdCount; + + /** + * These keep track of the last requestId and responseId to be received. + */ + private long lastReceivedResponseId; + + private Hashtable requestSentMap; + + public RequestManager(int channel, AMQProtocolWriter protocolSession) + { + this.channel = channel; + this.protocolSession = protocolSession; + requestIdCount = 1L; + lastReceivedResponseId = 0L; + requestSentMap = new Hashtable(); + } + + // *** Functions to originate a request *** + + public long sendRequest(AMQMethodBody requestMethodBody, + AMQResponseCallback responseCallback) + { + long requestId = getNextRequestId(); // Get new request ID + AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, + lastReceivedResponseId, requestMethodBody); + protocolSession.writeFrame(requestFrame); + requestSentMap.put(requestId, responseCallback); + return requestId; + } + + public void responseReceived(AMQResponseBody responseBody) + throws RequestResponseMappingException + { + lastReceivedResponseId = responseBody.getResponseId(); + long requestIdStart = responseBody.getRequestId(); + long requestIdStop = requestIdStart + responseBody.getBatchOffset(); + for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) + { + AMQResponseCallback responseCallback = requestSentMap.get(requestId); + if (responseCallback == null) + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in requestSentMap."); + responseCallback.responseFrameReceived(responseBody); + requestSentMap.remove(requestId); + } + } + + // *** Management functions *** + + public int requestsMapSize() + { + return requestSentMap.size(); + } + + // *** Private helper functions *** + + private long getNextRequestId() + { + return requestIdCount++; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java deleted file mode 100644 index 3e86ed7194..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import java.util.Iterator; -import java.util.TreeMap; - -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQProtocolWriter; - -public class RequestResponseManager -{ - private int channel; - AMQProtocolWriter protocolSession; - - /** - * Determines the batch behaviour of the manager. - * - * Responses are sent to the RequestResponseManager through sendResponse(). - * These may be internally stored/accumulated for batching purposes, depending - * on the batching strategy/mode of the RequestResponseManager. - * - * The following modes are possibe: - * - * NONE: Each request results in an immediate single response, no batching - * takes place. - * DELAY_FIXED: Waits until a fixed period has passed to batch - * accumulated responses. An optional fixed threshold may be set, which - * if reached or exceeded within the delay period will trigger the batch. (TODO) - * MANUAL: No response is sent until it is explicitly released by calling - * function xxxx(). (TODO) - */ - public enum batchResponseModeEnum { NONE } - private batchResponseModeEnum batchResponseMode; - - /** - * Request and response frames must have a requestID and responseID which - * indepenedently increment from 0 on a per-channel basis. These are the - * counters, and contain the value of the next (not yet used) frame. - */ - private long requestIdCount; - private long responseIdCount; - - /** - * These keep track of the last requestId and responseId to be received. - */ - private long lastReceivedRequestId; - private long lastReceivedResponseId; - - /** - * Last requestID sent in a response (for batching) - */ - private long lastSentRequestId; - - private class ResponseStatus implements Comparable - { - public long requestId; - public AMQMethodBody responseMethodBody; - - public ResponseStatus(long requestId) - { - this.requestId = requestId; - responseMethodBody = null; - } - - public int compareTo(ResponseStatus o) - { - return (int)(requestId - o.requestId); - } - } - - private TreeMap requestSentMap; - private TreeMap responseMap; - - public RequestResponseManager(int channel, AMQProtocolWriter protocolSession) - { - this.channel = channel; - this.protocolSession = protocolSession; - requestIdCount = 1L; - responseIdCount = 1L; - lastReceivedRequestId = 0L; - lastReceivedResponseId = 0L; - requestSentMap = new TreeMap(); - responseMap = new TreeMap(); - } - - // *** Functions to originate a request *** - - public long sendRequest(AMQMethodBody requestMethodBody, AMQResponseCallback responseCallback) - { - long requestId = getRequestId(); // Get new request ID - AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, - lastReceivedResponseId, requestMethodBody); - protocolSession.writeFrame(requestFrame); - requestSentMap.put(requestId, responseCallback); - return requestId; - } - - public void responseReceived(AMQResponseBody responseBody) throws AMQException - { - lastReceivedResponseId = responseBody.getResponseId(); - long requestIdStart = responseBody.getRequestId(); - long requestIdStop = requestIdStart + responseBody.getBatchOffset(); - for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) - { - AMQResponseCallback responseCallback = requestSentMap.get(requestId); - if (responseCallback == null) - throw new AMQException("Failed to locate requestId " + requestId + - " in requestSentMap."); - responseCallback.responseFrameReceived(responseBody); - requestSentMap.remove(requestId); - } - } - - // *** Functions to handle an incoming request *** - - public void requestReceived(AMQRequestBody requestBody) - { - long requestId = requestBody.getRequestId(); - // TODO: responseMark is used in HA, but until then, ignore... - long responseMark = requestBody.getResponseMark(); - lastReceivedRequestId = requestId; - responseMap.put(requestId, new ResponseStatus(requestId)); - - // TODO: Initiate some action based on the MethodBody - like send to handlers, - // but how to do this in a way that will work for both client and server? - } - - public void sendResponse(long requestId, AMQMethodBody responseMethodBody) throws AMQException - { - ResponseStatus responseStatus = responseMap.get(requestId); - if (responseStatus == null) - throw new AMQException("Failed to locate requestId " + requestId + - " in responseMap."); - if (responseStatus.responseMethodBody != null) - throw new AMQException("RequestId " + requestId + " already has a response."); - responseStatus.responseMethodBody = responseMethodBody; - doBatches(); - } - - // *** Management functions *** - - public batchResponseModeEnum getBatchResponseMode() - { - return batchResponseMode; - } - - public void setBatchResponseMode(batchResponseModeEnum batchResponseMode) - { - if (this.batchResponseMode != batchResponseMode) - { - this.batchResponseMode = batchResponseMode; - doBatches(); - } - } - - // *** Private helper functions *** - - private long getRequestId() - { - return requestIdCount++; - } - - private long getResponseId() - { - return responseIdCount++; - } - - private void doBatches() - { - switch (batchResponseMode) - { - case NONE: - Iterator lItr = responseMap.keySet().iterator(); - while (lItr.hasNext()) - { - long requestId = lItr.next(); - ResponseStatus responseStatus = responseMap.get(requestId); - if (responseStatus.responseMethodBody != null) - { - sendResponseBatch(requestId, 0, responseStatus.responseMethodBody); - lItr.remove(); - } - } - break; - - // TODO: Add additional batch mode handlers here... - // case DELAY_FIXED: - // case MANUAL: - } - } - - private void sendResponseBatch(long firstRequestId, int numAdditionalRequests, - AMQMethodBody responseMethodBody) - { - long responseId = getResponseId(); // Get new request ID - AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, - firstRequestId, numAdditionalRequests, responseMethodBody); - protocolSession.writeFrame(responseFrame); - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestResponseMappingException.java b/java/common/src/main/java/org/apache/qpid/framing/RequestResponseMappingException.java new file mode 100644 index 0000000000..25b38dd664 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestResponseMappingException.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + +public class RequestResponseMappingException extends AMQException +{ + private long requestResponseId; + + public RequestResponseMappingException(String msg) + { + super(msg); + } + + public RequestResponseMappingException(long requestResponseId, String msg) + { + super(msg); + this.requestResponseId = requestResponseId; + } + + public RequestResponseMappingException(String msg, Throwable t) + { + super(msg, t); + } + + public RequestResponseMappingException(long requestResponseId, String msg, Throwable t) + { + super(msg, t); + this.requestResponseId = requestResponseId; + } + + public RequestResponseMappingException(Logger logger, String msg) + { + super(msg); + logger.error(getMessage(), this); + } + + public RequestResponseMappingException(Logger logger, long requestResponseId, String msg) + { + super(msg); + this.requestResponseId = requestResponseId; + logger.error(getMessage(), this); + } + + public RequestResponseMappingException(Logger logger, String msg, Throwable t) + { + super(msg, t); + logger.error(getMessage(), this); + } + + public RequestResponseMappingException(Logger logger, long requestResponseId, String msg, Throwable t) + { + super(msg, t); + this.requestResponseId = requestResponseId; + logger.error(getMessage(), this); + } + + public long getRequestResponseId() + { + return requestResponseId; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java new file mode 100644 index 0000000000..a895464a1f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -0,0 +1,221 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import java.util.Iterator; +import java.util.Hashtable; + +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQProtocolWriter; + +public class ResponseManager +{ + private int channel; + AMQProtocolWriter protocolSession; + + /** + * Determines the batch behaviour of the manager. + * + * Responses are sent to the RequestResponseManager through sendResponse(). + * These may be internally stored/accumulated for batching purposes, depending + * on the batching strategy/mode of the RequestResponseManager. + * + * The following modes are possibe: + * + * NONE: Each request results in an immediate single response, no batching + * takes place. + * DELAY_FIXED: Waits until a fixed period has passed to batch + * accumulated responses. An optional fixed threshold may be set, which + * if reached or exceeded within the delay period will trigger the batch. (TODO) + * MANUAL: No response is sent until it is explicitly released by calling + * function xxxx(). (TODO) + */ + public enum batchResponseModeEnum { NONE } + private batchResponseModeEnum batchResponseMode; + + /** + * Request and response frames must have a requestID and responseID which + * indepenedently increment from 0 on a per-channel basis. These are the + * counters, and contain the value of the next (not yet used) frame. + */ + private long responseIdCount; + + /** + * These keep track of the last requestId and responseId to be received. + */ + private long lastReceivedRequestId; + + /** + * Last requestID sent in a response (for batching) + */ + private long lastSentRequestId; + + private class ResponseStatus implements Comparable + { + public long requestId; + public AMQMethodBody responseMethodBody; + + public ResponseStatus(long requestId) + { + this.requestId = requestId; + responseMethodBody = null; + } + + public int compareTo(ResponseStatus o) + { + return (int)(requestId - o.requestId); + } + } + + private Hashtable responseMap; + + public ResponseManager(int channel, AMQProtocolWriter protocolSession) + { + this.channel = channel; + this.protocolSession = protocolSession; + responseIdCount = 1L; + lastReceivedRequestId = 0L; + responseMap = new Hashtable(); + } + + // *** Functions to handle an incoming request *** + + public void requestReceived(AMQRequestBody requestBody) + { + long requestId = requestBody.getRequestId(); + // TODO: responseMark is used in HA, but until then, ignore... + long responseMark = requestBody.getResponseMark(); + lastReceivedRequestId = requestId; + responseMap.put(requestId, new ResponseStatus(requestId)); + + // TODO: Initiate some action based on the MethodBody - like send to handlers, + // but how to do this in a way that will work for both client and server? + } + + public void sendResponse(long requestId, AMQMethodBody responseMethodBody) + throws RequestResponseMappingException + { + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus == null) + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in responseMap."); + if (responseStatus.responseMethodBody != null) + throw new RequestResponseMappingException(requestId, "RequestId " + + requestId + " already has a response in responseMap."); + responseStatus.responseMethodBody = responseMethodBody; + doBatches(); + } + + // *** Management functions *** + + public batchResponseModeEnum getBatchResponseMode() + { + return batchResponseMode; + } + + public void setBatchResponseMode(batchResponseModeEnum batchResponseMode) + { + if (this.batchResponseMode != batchResponseMode) + { + this.batchResponseMode = batchResponseMode; + doBatches(); + } + } + + public int responsesMapSize() + { + return responseMap.size(); + } + + /** + * As the responseMap may contain both outstanding responses (those with + * ResponseStatus.responseMethodBody still null) and responses waiting to + * be batched (those with ResponseStatus.responseMethodBody not null), we + * need to count only those in the map with responseMethodBody null. + */ + public int outstandingResponses() + { + int cnt = 0; + for (Long requestId : responseMap.keySet()) + { + if (responseMap.get(requestId).responseMethodBody == null) + cnt++; + } + return cnt; + } + + /** + * As the responseMap may contain both outstanding responses (those with + * ResponseStatus.responseMethodBody still null) and responses waiting to + * be batched (those with ResponseStatus.responseMethodBody not null), we + * need to count only those in the map with responseMethodBody not null. + */ + public int batchedResponses() + { + int cnt = 0; + for (Long requestId : responseMap.keySet()) + { + if (responseMap.get(requestId).responseMethodBody != null) + cnt++; + } + return cnt; + } + + // *** Private helper functions *** + + private long getNextResponseId() + { + return responseIdCount++; + } + + private void doBatches() + { + switch (batchResponseMode) + { + case NONE: + Iterator lItr = responseMap.keySet().iterator(); + while (lItr.hasNext()) + { + long requestId = lItr.next(); + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus.responseMethodBody != null) + { + sendResponseBatch(requestId, 0, responseStatus.responseMethodBody); + lItr.remove(); + } + } + break; + + // TODO: Add additional batch mode handlers here... + // case DELAY_FIXED: + // case MANUAL: + } + } + + private void sendResponseBatch(long firstRequestId, int numAdditionalRequests, + AMQMethodBody responseMethodBody) + { + long responseId = getNextResponseId(); // Get new request ID + AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, + firstRequestId, numAdditionalRequests, responseMethodBody); + protocolSession.writeFrame(responseFrame); + } +} -- cgit v1.2.1