summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-15 19:59:11 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-15 19:59:11 +0000
commit4c70be2495bd94eba1720c9603b3f0bddb99ffd9 (patch)
treee5a9779e24aa70d9322db5695845b8f316259456 /java
parent4221f261d756db2d68376e84d6be374f1ddcb9e6 (diff)
downloadqpid-python-4c70be2495bd94eba1720c9603b3f0bddb99ffd9.tar.gz
Added a request/response id to the MethodEvent class that is used to dispatch incoming messages to the handlers. Corrected some compile errors.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496456 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java17
4 files changed, 43 insertions, 23 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 36895e065d..99aca359fc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -38,6 +38,8 @@ import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.AMQResponseCallback;
import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.RequestManager;
+import org.apache.qpid.framing.ResponseManager;
import org.apache.qpid.framing.RequestResponseMappingException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
@@ -64,6 +66,7 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
public class AMQMinaProtocolSession implements AMQProtocolSession,
+ AMQResponseCallback,
ProtocolVersionList,
Managable
{
@@ -179,14 +182,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
+ AMQMethodBody connectionStartBody = ConnectionStartBody.createMethodBody(
(byte)_major, (byte)_minor, // AMQP version (major, minor)
locales.getBytes(), // locales
mechanisms.getBytes(), // mechanisms
null, // serverProperties
(short)_major, // versionMajor
(short)_minor); // versionMinor
- _minaProtocolSession.write(response);
+ writeRequest(0, connectionStartBody, this);
}
catch (AMQException e)
{
@@ -223,42 +226,47 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- private void requestFrameReceived(int channel, AMQRequestBody requestBody) throws AMQException
+ public void responseFrameReceived(AMQResponseBody responseBody)
+ {
+ // do nothing
+ }
+
+ private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws AMQException
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Request frame received: " + frame);
+ _logger.debug("Request frame received: " + requestBody);
}
- AMQChannel channel = getChannel(channel);
+ AMQChannel channel = getChannel(channelNum);
ResponseManager responseManager = channel.getResponseManager();
responseManager.requestReceived(requestBody);
}
- private void responseFrameReceived(int channel, AMQResponseBody responseBody) throws AMQException
+ private void responseFrameReceived(int channelNum, AMQResponseBody responseBody) throws AMQException
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Response frame received: " + frame);
+ _logger.debug("Response frame received: " + responseBody);
}
- AMQChannel channel = getChannel(channel);
+ AMQChannel channel = getChannel(channelNum);
RequestManager requestManager = channel.getRequestManager();
requestManager.responseReceived(responseBody);
}
- public long writeRequest(int channel, AMQMethodBody methodBody, AMQResponseCallback responseCallback)
+ public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQResponseCallback responseCallback)
throws RequestResponseMappingException
{
- AMQChannel channel = getChannel(channel);
+ AMQChannel channel = getChannel(channelNum);
RequestManager requestManager = channel.getRequestManager();
return requestManager.sendRequest(methodBody, responseCallback);
}
- public void writeResponse(int channel, long requestId, AMQMethodBody methodBody)
+ public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
throws RequestResponseMappingException
{
- AMQChannel channel = getChannel(channel);
+ AMQChannel channel = getChannel(channelNum);
ResponseManager responseManager = channel.getResponseManager();
- responseManager(requestId, methodBody);
+ responseManager.sendResponse(requestId, methodBody);
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 79449a4b48..b51ae2101d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -49,7 +49,7 @@ import java.io.IOException;
* the state for the connection.
*
*/
-public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList
+public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList, AMQResponseCallback
{
private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
@@ -175,17 +175,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+ AMQMethodBody closeBody = ConnectionCloseBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
0, // classId
0, // methodId
200, // replyCode
- throwable.getMessage() // replyText
- ));
+ throwable.getMessage()); // replyText
+ protocolSession.writeRequest(0, closeBody, this);
_logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
protocolSession.close();
}
}
+
+ public void responseFrameReceived(AMQResponseBody responseBody)
+ {
+ // do nothing
+ }
/**
* Invoked when a message is received on a particular protocol session. Note that a
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
index bfc0eb84de..5b34c11d11 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
@@ -111,7 +111,7 @@ public class ResponseManager
lastReceivedRequestId = requestId;
responseMap.put(requestId, new ResponseStatus(requestId));
// TODO: Update MethodEvent to use the RequestBody instead of MethodBody
- AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload());
+ AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload(), requestId);
methodListener.methodReceived(methodEvent);
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
index ab36041cb8..c42e49d7c8 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
@@ -36,13 +36,14 @@ import org.apache.qpid.framing.AMQMethodBody;
public class AMQMethodEvent<M extends AMQMethodBody>
{
private final M _method;
-
private final int _channelId;
+ private final long _requestResponseId;
- public AMQMethodEvent(int channelId, M method)
+ public AMQMethodEvent(int channelId, M method, long requestResponseId)
{
_channelId = channelId;
_method = method;
+ _requestResponseId = requestResponseId;
}
public M getMethod()
@@ -55,11 +56,17 @@ public class AMQMethodEvent<M extends AMQMethodBody>
return _channelId;
}
+ public long getRequestResponseId()
+ {
+ return _requestResponseId;
+ }
+
public String toString()
{
- StringBuilder buf = new StringBuilder("Method event: ");
- buf.append("\nChannel id: ").append(_channelId);
- buf.append("\nMethod: ").append(_method);
+ StringBuilder buf = new StringBuilder("Method event: \n");
+ buf.append("Channel id: \n").append(_channelId);
+ buf.append("Method: \n").append(_method);
+ buf.append("Request/Response Id: ").append(_requestResponseId);
return buf.toString();
}
}