summaryrefslogtreecommitdiff
path: root/java/common/src/main
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-12 14:31:18 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-12 14:31:18 +0000
commit482ff88ac8fcaf14207db6e23d3b9981365dfd62 (patch)
tree7e4e929ba09510de4a129dd718b37a0830a91c75 /java/common/src/main
parent03f090715e8744bedc344495fbc03468f260a85a (diff)
downloadqpid-python-482ff88ac8fcaf14207db6e23d3b9981365dfd62.tar.gz
Refactored to create a common AMQMethodEvent class; Added clinet Method* handlers, removed old Basic* handlers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@495581 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src/main')
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java32
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java14
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestManager.java34
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java103
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java65
6 files changed, 183 insertions, 91 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
index af43ab6474..00a27a8869 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
@@ -33,11 +33,11 @@ public class AMQRequestBody extends AMQBody
// Constructor
public AMQRequestBody() {}
public AMQRequestBody(long requestId, long responseMark,
- AMQMethodBody methodPayload)
+ AMQMethodBody methodPayload)
{
- this.requestId = requestId;
- this.responseMark = responseMark;
- this.methodPayload = methodPayload;
+ this.requestId = requestId;
+ this.responseMark = responseMark;
+ this.methodPayload = methodPayload;
}
@@ -49,42 +49,42 @@ public class AMQRequestBody extends AMQBody
protected byte getFrameType()
{
- return (byte)AmqpConstants.frameRequestAsInt();
+ return (byte)AmqpConstants.frameRequestAsInt();
}
protected int getSize()
{
- return 8 + 8 + 4 + methodPayload.getBodySize();
+ return 8 + 8 + 4 + methodPayload.getBodySize();
}
protected void writePayload(ByteBuffer buffer)
{
- EncodingUtils.writeLong(buffer, requestId);
- EncodingUtils.writeLong(buffer, responseMark);
- EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0
+ EncodingUtils.writeLong(buffer, requestId);
+ EncodingUtils.writeLong(buffer, responseMark);
+ EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0
methodPayload.writePayload(buffer);
}
protected void populateFromBuffer(ByteBuffer buffer, long size)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
- requestId = EncodingUtils.readLong(buffer);
- responseMark = EncodingUtils.readLong(buffer);
- int reserved = EncodingUtils.readShort(buffer); // reserved, throw away
- methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
+ requestId = EncodingUtils.readLong(buffer);
+ responseMark = EncodingUtils.readLong(buffer);
+ int reserved = EncodingUtils.readShort(buffer); // reserved, throw away
+ methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
}
public String toString()
{
- return "Req[" + requestId + " " + responseMark + "] C" +
- methodPayload.getClazz() + " M" + methodPayload.getMethod();
+ return "Req[" + requestId + " " + responseMark + "] C" +
+ methodPayload.getClazz() + " M" + methodPayload.getMethod();
}
public static AMQFrame createAMQFrame(int channelId, long requestId,
long responseMark, AMQMethodBody methodPayload)
{
AMQRequestBody requestFrame = new AMQRequestBody(requestId, responseMark,
- methodPayload);
+ methodPayload);
AMQFrame frame = new AMQFrame();
frame.channel = channelId;
frame.bodyFrame = requestFrame;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
index 67fc485f48..90038da2d4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
@@ -33,9 +33,9 @@ public class AMQResponseBody extends AMQBody
// Constructor
public AMQResponseBody() {}
public AMQResponseBody(long getResponseId, long getRequestId,
- int batchOffset, AMQMethodBody methodPayload)
+ int batchOffset, AMQMethodBody methodPayload)
{
- this.responseId = responseId;
+ this.responseId = responseId;
this.requestId = requestId;
this.batchOffset = batchOffset;
this.methodPayload = methodPayload;
@@ -49,12 +49,12 @@ public class AMQResponseBody extends AMQBody
protected byte getFrameType()
{
- return (byte)AmqpConstants.frameResponseAsInt();
+ return (byte)AmqpConstants.frameResponseAsInt();
}
protected int getSize()
{
- return 8 + 8 + 4 + methodPayload.getBodySize();
+ return 8 + 8 + 4 + methodPayload.getBodySize();
}
protected void writePayload(ByteBuffer buffer)
@@ -76,15 +76,15 @@ public class AMQResponseBody extends AMQBody
public String toString()
{
- return "Res[" + responseId + " " + requestId + "-" + requestId + batchOffset + "] C" +
- methodPayload.getClazz() + " M" + methodPayload.getMethod();
+ return "Res[" + responseId + " " + requestId + "-" + requestId + batchOffset + "] C" +
+ methodPayload.getClazz() + " M" + methodPayload.getMethod();
}
public static AMQFrame createAMQFrame(int channelId, long responseId,
long requestId, int batchOffset, AMQMethodBody methodPayload)
{
AMQResponseBody responseFrame = new AMQResponseBody(responseId,
- requestId, batchOffset, methodPayload);
+ requestId, batchOffset, methodPayload);
AMQFrame frame = new AMQFrame();
frame.channel = channelId;
frame.bodyFrame = responseFrame;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java b/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java
new file mode 100644
index 0000000000..f779258e00
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public interface RequestHandler
+{
+ public boolean requestReceived(AMQRequestBody requestBody);
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
index e673a8e343..55c25151da 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
@@ -26,7 +26,7 @@ import org.apache.qpid.protocol.AMQProtocolWriter;
public class RequestManager
{
- private int channel;
+ private int channel;
AMQProtocolWriter protocolSession;
/**
@@ -39,60 +39,60 @@ public class RequestManager
/**
* These keep track of the last requestId and responseId to be received.
*/
- private long lastReceivedResponseId;
+ private long lastProcessedResponseId;
private Hashtable<Long, AMQResponseCallback> requestSentMap;
- public RequestManager(int channel, AMQProtocolWriter protocolSession)
+ public RequestManager(int channel, AMQProtocolWriter protocolSession)
{
- this.channel = channel;
+ this.channel = channel;
this.protocolSession = protocolSession;
- requestIdCount = 1L;
- lastReceivedResponseId = 0L;
+ requestIdCount = 1L;
+ lastProcessedResponseId = 0L;
requestSentMap = new Hashtable<Long, AMQResponseCallback>();
}
// *** Functions to originate a request ***
public long sendRequest(AMQMethodBody requestMethodBody,
- AMQResponseCallback responseCallback)
+ AMQResponseCallback responseCallback)
{
- long requestId = getNextRequestId(); // Get new request ID
- AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
- lastReceivedResponseId, requestMethodBody);
+ long requestId = getNextRequestId(); // Get new request ID
+ AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
+ lastProcessedResponseId, requestMethodBody);
protocolSession.writeFrame(requestFrame);
requestSentMap.put(requestId, responseCallback);
return requestId;
}
public void responseReceived(AMQResponseBody responseBody)
- throws RequestResponseMappingException
+ throws RequestResponseMappingException
{
- lastReceivedResponseId = responseBody.getResponseId();
long requestIdStart = responseBody.getRequestId();
long requestIdStop = requestIdStart + responseBody.getBatchOffset();
for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
{
- AMQResponseCallback responseCallback = requestSentMap.get(requestId);
+ AMQResponseCallback responseCallback = requestSentMap.get(requestId);
if (responseCallback == null)
- throw new RequestResponseMappingException(requestId,
- "Failed to locate requestId " + requestId + " in requestSentMap.");
+ throw new RequestResponseMappingException(requestId,
+ "Failed to locate requestId " + requestId + " in requestSentMap.");
responseCallback.responseFrameReceived(responseBody);
requestSentMap.remove(requestId);
}
+ lastProcessedResponseId = responseBody.getResponseId();
}
// *** Management functions ***
public int requestsMapSize()
{
- return requestSentMap.size();
+ return requestSentMap.size();
}
// *** Private helper functions ***
private long getNextRequestId()
{
- return requestIdCount++;
+ return requestIdCount++;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
index a895464a1f..280d8d562a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
@@ -28,10 +28,11 @@ import org.apache.qpid.protocol.AMQProtocolWriter;
public class ResponseManager
{
- private int channel;
+ private int channel;
+ RequestHandler requestHandler;
AMQProtocolWriter protocolSession;
- /**
+ /**
* Determines the batch behaviour of the manager.
*
* Responses are sent to the RequestResponseManager through sendResponse().
@@ -48,7 +49,7 @@ public class ResponseManager
* MANUAL: No response is sent until it is explicitly released by calling
* function xxxx(). (TODO)
*/
- public enum batchResponseModeEnum { NONE }
+ public enum batchResponseModeEnum { NONE }
private batchResponseModeEnum batchResponseMode;
/**
@@ -70,79 +71,79 @@ public class ResponseManager
private class ResponseStatus implements Comparable<ResponseStatus>
{
- public long requestId;
+ public long requestId;
public AMQMethodBody responseMethodBody;
public ResponseStatus(long requestId)
{
- this.requestId = requestId;
- responseMethodBody = null;
+ this.requestId = requestId;
+ responseMethodBody = null;
}
public int compareTo(ResponseStatus o)
{
- return (int)(requestId - o.requestId);
+ return (int)(requestId - o.requestId);
}
}
private Hashtable<Long, ResponseStatus> responseMap;
- public ResponseManager(int channel, AMQProtocolWriter protocolSession)
+ public ResponseManager(int channel, RequestHandler requestHandler,
+ AMQProtocolWriter protocolSession)
{
- this.channel = channel;
+ this.channel = channel;
+ this.requestHandler = requestHandler;
this.protocolSession = protocolSession;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
responseMap = new Hashtable<Long, ResponseStatus>();
}
- // *** Functions to handle an incoming request ***
+ // *** Functions to handle an incoming request ***
public void requestReceived(AMQRequestBody requestBody)
{
- long requestId = requestBody.getRequestId();
+ long requestId = requestBody.getRequestId();
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
- lastReceivedRequestId = requestId;
+ lastReceivedRequestId = requestId;
responseMap.put(requestId, new ResponseStatus(requestId));
-
- // TODO: Initiate some action based on the MethodBody - like send to handlers,
- // but how to do this in a way that will work for both client and server?
+ requestHandler.requestReceived(requestBody);
}
public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
- throws RequestResponseMappingException
+ throws RequestResponseMappingException
{
- ResponseStatus responseStatus = responseMap.get(requestId);
+ ResponseStatus responseStatus = responseMap.get(requestId);
if (responseStatus == null)
- throw new RequestResponseMappingException(requestId,
- "Failed to locate requestId " + requestId + " in responseMap.");
+ throw new RequestResponseMappingException(requestId,
+ "Failed to locate requestId " + requestId + " in responseMap.");
if (responseStatus.responseMethodBody != null)
- throw new RequestResponseMappingException(requestId, "RequestId " +
- requestId + " already has a response in responseMap.");
+ throw new RequestResponseMappingException(requestId, "RequestId " +
+ requestId + " already has a response in responseMap.");
responseStatus.responseMethodBody = responseMethodBody;
doBatches();
}
// *** Management functions ***
- public batchResponseModeEnum getBatchResponseMode()
+ public batchResponseModeEnum getBatchResponseMode()
{
- return batchResponseMode;
+ return batchResponseMode;
}
public void setBatchResponseMode(batchResponseModeEnum batchResponseMode)
{
- if (this.batchResponseMode != batchResponseMode)
+ if (this.batchResponseMode != batchResponseMode)
{
- this.batchResponseMode = batchResponseMode;
- doBatches();
+ this.batchResponseMode = batchResponseMode;
+ doBatches();
}
}
public int responsesMapSize()
{
- return responseMap.size();
+ return responseMap.size();
}
/**
@@ -153,12 +154,12 @@ public class ResponseManager
*/
public int outstandingResponses()
{
- int cnt = 0;
+ int cnt = 0;
for (Long requestId : responseMap.keySet())
{
- if (responseMap.get(requestId).responseMethodBody == null)
- cnt++;
- }
+ if (responseMap.get(requestId).responseMethodBody == null)
+ cnt++;
+ }
return cnt;
}
@@ -170,12 +171,12 @@ public class ResponseManager
*/
public int batchedResponses()
{
- int cnt = 0;
+ int cnt = 0;
for (Long requestId : responseMap.keySet())
{
- if (responseMap.get(requestId).responseMethodBody != null)
- cnt++;
- }
+ if (responseMap.get(requestId).responseMethodBody != null)
+ cnt++;
+ }
return cnt;
}
@@ -183,39 +184,39 @@ public class ResponseManager
private long getNextResponseId()
{
- return responseIdCount++;
+ return responseIdCount++;
}
private void doBatches()
{
- switch (batchResponseMode)
+ switch (batchResponseMode)
{
- case NONE:
- Iterator<Long> lItr = responseMap.keySet().iterator();
- while (lItr.hasNext())
+ case NONE:
+ Iterator<Long> lItr = responseMap.keySet().iterator();
+ while (lItr.hasNext())
{
- long requestId = lItr.next();
- ResponseStatus responseStatus = responseMap.get(requestId);
- if (responseStatus.responseMethodBody != null)
+ long requestId = lItr.next();
+ ResponseStatus responseStatus = responseMap.get(requestId);
+ if (responseStatus.responseMethodBody != null)
{
- sendResponseBatch(requestId, 0, responseStatus.responseMethodBody);
+ sendResponseBatch(requestId, 0, responseStatus.responseMethodBody);
lItr.remove();
}
}
- break;
+ break;
// TODO: Add additional batch mode handlers here...
- // case DELAY_FIXED:
- // case MANUAL:
+ // case DELAY_FIXED:
+ // case MANUAL:
}
}
private void sendResponseBatch(long firstRequestId, int numAdditionalRequests,
- AMQMethodBody responseMethodBody)
+ AMQMethodBody responseMethodBody)
{
- long responseId = getNextResponseId(); // Get new request ID
- AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId,
- firstRequestId, numAdditionalRequests, responseMethodBody);
+ long responseId = getNextResponseId(); // Get new request ID
+ AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId,
+ firstRequestId, numAdditionalRequests, responseMethodBody);
protocolSession.writeFrame(responseFrame);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
new file mode 100644
index 0000000000..ab36041cb8
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * An event that is passed to AMQMethodListeners describing a particular method.
+ * It supplies the:
+ * <ul><li>channel id</li>
+ * <li>protocol method</li>
+ * to listeners. This means that listeners do not need to be stateful.
+ *
+ * In the StateAwareMethodListener, other useful objects such as the protocol session
+ * are made available.
+ *
+ */
+public class AMQMethodEvent<M extends AMQMethodBody>
+{
+ private final M _method;
+
+ private final int _channelId;
+
+ public AMQMethodEvent(int channelId, M method)
+ {
+ _channelId = channelId;
+ _method = method;
+ }
+
+ public M getMethod()
+ {
+ return _method;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("Method event: ");
+ buf.append("\nChannel id: ").append(_channelId);
+ buf.append("\nMethod: ").append(_method);
+ return buf.toString();
+ }
+}