diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 19:59:11 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 19:59:11 +0000 |
| commit | 4c70be2495bd94eba1720c9603b3f0bddb99ffd9 (patch) | |
| tree | e5a9779e24aa70d9322db5695845b8f316259456 /java | |
| parent | 4221f261d756db2d68376e84d6be374f1ddcb9e6 (diff) | |
| download | qpid-python-4c70be2495bd94eba1720c9603b3f0bddb99ffd9.tar.gz | |
Added a request/response id to the MethodEvent class that is used to dispatch incoming messages to the handlers. Corrected some compile errors.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496456 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
4 files changed, 43 insertions, 23 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 36895e065d..99aca359fc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -38,6 +38,8 @@ import org.apache.qpid.framing.AMQRequestBody; import org.apache.qpid.framing.AMQResponseBody; import org.apache.qpid.framing.AMQResponseCallback; import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.RequestManager; +import org.apache.qpid.framing.ResponseManager; import org.apache.qpid.framing.RequestResponseMappingException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; @@ -64,6 +66,7 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; public class AMQMinaProtocolSession implements AMQProtocolSession, + AMQResponseCallback, ProtocolVersionList, Managable { @@ -179,14 +182,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); String locales = "en_US"; // Interfacing with generated code - be aware of possible changes to parameter order as versions change. - AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, + AMQMethodBody connectionStartBody = ConnectionStartBody.createMethodBody( (byte)_major, (byte)_minor, // AMQP version (major, minor) locales.getBytes(), // locales mechanisms.getBytes(), // mechanisms null, // serverProperties (short)_major, // versionMajor (short)_minor); // versionMinor - _minaProtocolSession.write(response); + writeRequest(0, connectionStartBody, this); } catch (AMQException e) { @@ -223,42 +226,47 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - private void requestFrameReceived(int channel, AMQRequestBody requestBody) throws AMQException + public void responseFrameReceived(AMQResponseBody responseBody) + { + // do nothing + } + + private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws AMQException { if (_logger.isDebugEnabled()) { - _logger.debug("Request frame received: " + frame); + _logger.debug("Request frame received: " + requestBody); } - AMQChannel channel = getChannel(channel); + AMQChannel channel = getChannel(channelNum); ResponseManager responseManager = channel.getResponseManager(); responseManager.requestReceived(requestBody); } - private void responseFrameReceived(int channel, AMQResponseBody responseBody) throws AMQException + private void responseFrameReceived(int channelNum, AMQResponseBody responseBody) throws AMQException { if (_logger.isDebugEnabled()) { - _logger.debug("Response frame received: " + frame); + _logger.debug("Response frame received: " + responseBody); } - AMQChannel channel = getChannel(channel); + AMQChannel channel = getChannel(channelNum); RequestManager requestManager = channel.getRequestManager(); requestManager.responseReceived(responseBody); } - public long writeRequest(int channel, AMQMethodBody methodBody, AMQResponseCallback responseCallback) + public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQResponseCallback responseCallback) throws RequestResponseMappingException { - AMQChannel channel = getChannel(channel); + AMQChannel channel = getChannel(channelNum); RequestManager requestManager = channel.getRequestManager(); return requestManager.sendRequest(methodBody, responseCallback); } - public void writeResponse(int channel, long requestId, AMQMethodBody methodBody) + public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) throws RequestResponseMappingException { - AMQChannel channel = getChannel(channel); + AMQChannel channel = getChannel(channelNum); ResponseManager responseManager = channel.getResponseManager(); - responseManager(requestId, methodBody); + responseManager.sendResponse(requestId, methodBody); } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 79449a4b48..b51ae2101d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -49,7 +49,7 @@ import java.io.IOException; * the state for the connection. * */ -public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList +public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList, AMQResponseCallback { private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class); @@ -175,17 +175,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.write(ConnectionCloseBody.createAMQFrame(0, + AMQMethodBody closeBody = ConnectionCloseBody.createMethodBody( (byte)0, (byte)9, // AMQP version (major, minor) 0, // classId 0, // methodId 200, // replyCode - throwable.getMessage() // replyText - )); + throwable.getMessage()); // replyText + protocolSession.writeRequest(0, closeBody, this); _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); protocolSession.close(); } } + + public void responseFrameReceived(AMQResponseBody responseBody) + { + // do nothing + } /** * Invoked when a message is received on a particular protocol session. Note that a diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index bfc0eb84de..5b34c11d11 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -111,7 +111,7 @@ public class ResponseManager lastReceivedRequestId = requestId; responseMap.put(requestId, new ResponseStatus(requestId)); // TODO: Update MethodEvent to use the RequestBody instead of MethodBody - AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload()); + AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload(), requestId); methodListener.methodReceived(methodEvent); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java index ab36041cb8..c42e49d7c8 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java @@ -36,13 +36,14 @@ import org.apache.qpid.framing.AMQMethodBody; public class AMQMethodEvent<M extends AMQMethodBody> { private final M _method; - private final int _channelId; + private final long _requestResponseId; - public AMQMethodEvent(int channelId, M method) + public AMQMethodEvent(int channelId, M method, long requestResponseId) { _channelId = channelId; _method = method; + _requestResponseId = requestResponseId; } public M getMethod() @@ -55,11 +56,17 @@ public class AMQMethodEvent<M extends AMQMethodBody> return _channelId; } + public long getRequestResponseId() + { + return _requestResponseId; + } + public String toString() { - StringBuilder buf = new StringBuilder("Method event: "); - buf.append("\nChannel id: ").append(_channelId); - buf.append("\nMethod: ").append(_method); + StringBuilder buf = new StringBuilder("Method event: \n"); + buf.append("Channel id: \n").append(_channelId); + buf.append("Method: \n").append(_method); + buf.append("Request/Response Id: ").append(_requestResponseId); return buf.toString(); } } |
