diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-12 14:31:18 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-12 14:31:18 +0000 |
| commit | 482ff88ac8fcaf14207db6e23d3b9981365dfd62 (patch) | |
| tree | 7e4e929ba09510de4a129dd718b37a0830a91c75 /java/common/src/main | |
| parent | 03f090715e8744bedc344495fbc03468f260a85a (diff) | |
| download | qpid-python-482ff88ac8fcaf14207db6e23d3b9981365dfd62.tar.gz | |
Refactored to create a common AMQMethodEvent class; Added clinet Method* handlers, removed old Basic* handlers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@495581 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src/main')
6 files changed, 183 insertions, 91 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java index af43ab6474..00a27a8869 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java @@ -33,11 +33,11 @@ public class AMQRequestBody extends AMQBody // Constructor public AMQRequestBody() {} public AMQRequestBody(long requestId, long responseMark, - AMQMethodBody methodPayload) + AMQMethodBody methodPayload) { - this.requestId = requestId; - this.responseMark = responseMark; - this.methodPayload = methodPayload; + this.requestId = requestId; + this.responseMark = responseMark; + this.methodPayload = methodPayload; } @@ -49,42 +49,42 @@ public class AMQRequestBody extends AMQBody protected byte getFrameType() { - return (byte)AmqpConstants.frameRequestAsInt(); + return (byte)AmqpConstants.frameRequestAsInt(); } protected int getSize() { - return 8 + 8 + 4 + methodPayload.getBodySize(); + return 8 + 8 + 4 + methodPayload.getBodySize(); } protected void writePayload(ByteBuffer buffer) { - EncodingUtils.writeLong(buffer, requestId); - EncodingUtils.writeLong(buffer, responseMark); - EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0 + EncodingUtils.writeLong(buffer, requestId); + EncodingUtils.writeLong(buffer, responseMark); + EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0 methodPayload.writePayload(buffer); } protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException, AMQProtocolVersionException { - requestId = EncodingUtils.readLong(buffer); - responseMark = EncodingUtils.readLong(buffer); - int reserved = EncodingUtils.readShort(buffer); // reserved, throw away - methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); + requestId = EncodingUtils.readLong(buffer); + responseMark = EncodingUtils.readLong(buffer); + int reserved = EncodingUtils.readShort(buffer); // reserved, throw away + methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); } public String toString() { - return "Req[" + requestId + " " + responseMark + "] C" + - methodPayload.getClazz() + " M" + methodPayload.getMethod(); + return "Req[" + requestId + " " + responseMark + "] C" + + methodPayload.getClazz() + " M" + methodPayload.getMethod(); } public static AMQFrame createAMQFrame(int channelId, long requestId, long responseMark, AMQMethodBody methodPayload) { AMQRequestBody requestFrame = new AMQRequestBody(requestId, responseMark, - methodPayload); + methodPayload); AMQFrame frame = new AMQFrame(); frame.channel = channelId; frame.bodyFrame = requestFrame; diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java index 67fc485f48..90038da2d4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java @@ -33,9 +33,9 @@ public class AMQResponseBody extends AMQBody // Constructor public AMQResponseBody() {} public AMQResponseBody(long getResponseId, long getRequestId, - int batchOffset, AMQMethodBody methodPayload) + int batchOffset, AMQMethodBody methodPayload) { - this.responseId = responseId; + this.responseId = responseId; this.requestId = requestId; this.batchOffset = batchOffset; this.methodPayload = methodPayload; @@ -49,12 +49,12 @@ public class AMQResponseBody extends AMQBody protected byte getFrameType() { - return (byte)AmqpConstants.frameResponseAsInt(); + return (byte)AmqpConstants.frameResponseAsInt(); } protected int getSize() { - return 8 + 8 + 4 + methodPayload.getBodySize(); + return 8 + 8 + 4 + methodPayload.getBodySize(); } protected void writePayload(ByteBuffer buffer) @@ -76,15 +76,15 @@ public class AMQResponseBody extends AMQBody public String toString() { - return "Res[" + responseId + " " + requestId + "-" + requestId + batchOffset + "] C" + - methodPayload.getClazz() + " M" + methodPayload.getMethod(); + return "Res[" + responseId + " " + requestId + "-" + requestId + batchOffset + "] C" + + methodPayload.getClazz() + " M" + methodPayload.getMethod(); } public static AMQFrame createAMQFrame(int channelId, long responseId, long requestId, int batchOffset, AMQMethodBody methodPayload) { AMQResponseBody responseFrame = new AMQResponseBody(responseId, - requestId, batchOffset, methodPayload); + requestId, batchOffset, methodPayload); AMQFrame frame = new AMQFrame(); frame.channel = channelId; frame.bodyFrame = responseFrame; diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java b/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java new file mode 100644 index 0000000000..f779258e00 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface RequestHandler +{ + public boolean requestReceived(AMQRequestBody requestBody); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index e673a8e343..55c25151da 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -26,7 +26,7 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public class RequestManager { - private int channel; + private int channel; AMQProtocolWriter protocolSession; /** @@ -39,60 +39,60 @@ public class RequestManager /** * These keep track of the last requestId and responseId to be received. */ - private long lastReceivedResponseId; + private long lastProcessedResponseId; private Hashtable<Long, AMQResponseCallback> requestSentMap; - public RequestManager(int channel, AMQProtocolWriter protocolSession) + public RequestManager(int channel, AMQProtocolWriter protocolSession) { - this.channel = channel; + this.channel = channel; this.protocolSession = protocolSession; - requestIdCount = 1L; - lastReceivedResponseId = 0L; + requestIdCount = 1L; + lastProcessedResponseId = 0L; requestSentMap = new Hashtable<Long, AMQResponseCallback>(); } // *** Functions to originate a request *** public long sendRequest(AMQMethodBody requestMethodBody, - AMQResponseCallback responseCallback) + AMQResponseCallback responseCallback) { - long requestId = getNextRequestId(); // Get new request ID - AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, - lastReceivedResponseId, requestMethodBody); + long requestId = getNextRequestId(); // Get new request ID + AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, + lastProcessedResponseId, requestMethodBody); protocolSession.writeFrame(requestFrame); requestSentMap.put(requestId, responseCallback); return requestId; } public void responseReceived(AMQResponseBody responseBody) - throws RequestResponseMappingException + throws RequestResponseMappingException { - lastReceivedResponseId = responseBody.getResponseId(); long requestIdStart = responseBody.getRequestId(); long requestIdStop = requestIdStart + responseBody.getBatchOffset(); for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) { - AMQResponseCallback responseCallback = requestSentMap.get(requestId); + AMQResponseCallback responseCallback = requestSentMap.get(requestId); if (responseCallback == null) - throw new RequestResponseMappingException(requestId, - "Failed to locate requestId " + requestId + " in requestSentMap."); + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in requestSentMap."); responseCallback.responseFrameReceived(responseBody); requestSentMap.remove(requestId); } + lastProcessedResponseId = responseBody.getResponseId(); } // *** Management functions *** public int requestsMapSize() { - return requestSentMap.size(); + return requestSentMap.size(); } // *** Private helper functions *** private long getNextRequestId() { - return requestIdCount++; + return requestIdCount++; } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index a895464a1f..280d8d562a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -28,10 +28,11 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public class ResponseManager { - private int channel; + private int channel; + RequestHandler requestHandler; AMQProtocolWriter protocolSession; - /** + /** * Determines the batch behaviour of the manager. * * Responses are sent to the RequestResponseManager through sendResponse(). @@ -48,7 +49,7 @@ public class ResponseManager * MANUAL: No response is sent until it is explicitly released by calling * function xxxx(). (TODO) */ - public enum batchResponseModeEnum { NONE } + public enum batchResponseModeEnum { NONE } private batchResponseModeEnum batchResponseMode; /** @@ -70,79 +71,79 @@ public class ResponseManager private class ResponseStatus implements Comparable<ResponseStatus> { - public long requestId; + public long requestId; public AMQMethodBody responseMethodBody; public ResponseStatus(long requestId) { - this.requestId = requestId; - responseMethodBody = null; + this.requestId = requestId; + responseMethodBody = null; } public int compareTo(ResponseStatus o) { - return (int)(requestId - o.requestId); + return (int)(requestId - o.requestId); } } private Hashtable<Long, ResponseStatus> responseMap; - public ResponseManager(int channel, AMQProtocolWriter protocolSession) + public ResponseManager(int channel, RequestHandler requestHandler, + AMQProtocolWriter protocolSession) { - this.channel = channel; + this.channel = channel; + this.requestHandler = requestHandler; this.protocolSession = protocolSession; responseIdCount = 1L; lastReceivedRequestId = 0L; responseMap = new Hashtable<Long, ResponseStatus>(); } - // *** Functions to handle an incoming request *** + // *** Functions to handle an incoming request *** public void requestReceived(AMQRequestBody requestBody) { - long requestId = requestBody.getRequestId(); + long requestId = requestBody.getRequestId(); // TODO: responseMark is used in HA, but until then, ignore... long responseMark = requestBody.getResponseMark(); - lastReceivedRequestId = requestId; + lastReceivedRequestId = requestId; responseMap.put(requestId, new ResponseStatus(requestId)); - - // TODO: Initiate some action based on the MethodBody - like send to handlers, - // but how to do this in a way that will work for both client and server? + requestHandler.requestReceived(requestBody); } public void sendResponse(long requestId, AMQMethodBody responseMethodBody) - throws RequestResponseMappingException + throws RequestResponseMappingException { - ResponseStatus responseStatus = responseMap.get(requestId); + ResponseStatus responseStatus = responseMap.get(requestId); if (responseStatus == null) - throw new RequestResponseMappingException(requestId, - "Failed to locate requestId " + requestId + " in responseMap."); + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in responseMap."); if (responseStatus.responseMethodBody != null) - throw new RequestResponseMappingException(requestId, "RequestId " + - requestId + " already has a response in responseMap."); + throw new RequestResponseMappingException(requestId, "RequestId " + + requestId + " already has a response in responseMap."); responseStatus.responseMethodBody = responseMethodBody; doBatches(); } // *** Management functions *** - public batchResponseModeEnum getBatchResponseMode() + public batchResponseModeEnum getBatchResponseMode() { - return batchResponseMode; + return batchResponseMode; } public void setBatchResponseMode(batchResponseModeEnum batchResponseMode) { - if (this.batchResponseMode != batchResponseMode) + if (this.batchResponseMode != batchResponseMode) { - this.batchResponseMode = batchResponseMode; - doBatches(); + this.batchResponseMode = batchResponseMode; + doBatches(); } } public int responsesMapSize() { - return responseMap.size(); + return responseMap.size(); } /** @@ -153,12 +154,12 @@ public class ResponseManager */ public int outstandingResponses() { - int cnt = 0; + int cnt = 0; for (Long requestId : responseMap.keySet()) { - if (responseMap.get(requestId).responseMethodBody == null) - cnt++; - } + if (responseMap.get(requestId).responseMethodBody == null) + cnt++; + } return cnt; } @@ -170,12 +171,12 @@ public class ResponseManager */ public int batchedResponses() { - int cnt = 0; + int cnt = 0; for (Long requestId : responseMap.keySet()) { - if (responseMap.get(requestId).responseMethodBody != null) - cnt++; - } + if (responseMap.get(requestId).responseMethodBody != null) + cnt++; + } return cnt; } @@ -183,39 +184,39 @@ public class ResponseManager private long getNextResponseId() { - return responseIdCount++; + return responseIdCount++; } private void doBatches() { - switch (batchResponseMode) + switch (batchResponseMode) { - case NONE: - Iterator<Long> lItr = responseMap.keySet().iterator(); - while (lItr.hasNext()) + case NONE: + Iterator<Long> lItr = responseMap.keySet().iterator(); + while (lItr.hasNext()) { - long requestId = lItr.next(); - ResponseStatus responseStatus = responseMap.get(requestId); - if (responseStatus.responseMethodBody != null) + long requestId = lItr.next(); + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus.responseMethodBody != null) { - sendResponseBatch(requestId, 0, responseStatus.responseMethodBody); + sendResponseBatch(requestId, 0, responseStatus.responseMethodBody); lItr.remove(); } } - break; + break; // TODO: Add additional batch mode handlers here... - // case DELAY_FIXED: - // case MANUAL: + // case DELAY_FIXED: + // case MANUAL: } } private void sendResponseBatch(long firstRequestId, int numAdditionalRequests, - AMQMethodBody responseMethodBody) + AMQMethodBody responseMethodBody) { - long responseId = getNextResponseId(); // Get new request ID - AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, - firstRequestId, numAdditionalRequests, responseMethodBody); + long responseId = getNextResponseId(); // Get new request ID + AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, + firstRequestId, numAdditionalRequests, responseMethodBody); protocolSession.writeFrame(responseFrame); } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java new file mode 100644 index 0000000000..ab36041cb8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.protocol; + +import org.apache.qpid.framing.AMQMethodBody; + +/** + * An event that is passed to AMQMethodListeners describing a particular method. + * It supplies the: + * <ul><li>channel id</li> + * <li>protocol method</li> + * to listeners. This means that listeners do not need to be stateful. + * + * In the StateAwareMethodListener, other useful objects such as the protocol session + * are made available. + * + */ +public class AMQMethodEvent<M extends AMQMethodBody> +{ + private final M _method; + + private final int _channelId; + + public AMQMethodEvent(int channelId, M method) + { + _channelId = channelId; + _method = method; + } + + public M getMethod() + { + return _method; + } + + public int getChannelId() + { + return _channelId; + } + + public String toString() + { + StringBuilder buf = new StringBuilder("Method event: "); + buf.append("\nChannel id: ").append(_channelId); + buf.append("\nMethod: ").append(_method); + return buf.toString(); + } +} |
