diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 21:24:15 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 21:24:15 +0000 |
| commit | 5aaad510dc978dc09f92c774c81255b7af6b8b68 (patch) | |
| tree | 2e057027cafb429e848ca8738b2cdb0f2f95397f /java | |
| parent | 4c70be2495bd94eba1720c9603b3f0bddb99ffd9 (diff) | |
| download | qpid-python-5aaad510dc978dc09f92c774c81255b7af6b8b68.tar.gz | |
Changed the RequestManager to use AMQMethodListener instead of the old AMQResponseCallback.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496499 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 35 insertions, 64 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 99aca359fc..ee035287b7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -36,7 +36,6 @@ import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQRequestBody; import org.apache.qpid.framing.AMQResponseBody; -import org.apache.qpid.framing.AMQResponseCallback; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.RequestManager; import org.apache.qpid.framing.ResponseManager; @@ -66,7 +65,6 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; public class AMQMinaProtocolSession implements AMQProtocolSession, - AMQResponseCallback, ProtocolVersionList, Managable { @@ -189,7 +187,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, null, // serverProperties (short)_major, // versionMajor (short)_minor); // versionMinor - writeRequest(0, connectionStartBody, this); + writeRequest(0, connectionStartBody, _stateManager); } catch (AMQException e) { @@ -226,11 +224,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - public void responseFrameReceived(AMQResponseBody responseBody) - { - // do nothing - } - private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws AMQException { if (_logger.isDebugEnabled()) @@ -253,12 +246,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, requestManager.responseReceived(responseBody); } - public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQResponseCallback responseCallback) + public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener) throws RequestResponseMappingException { AMQChannel channel = getChannel(channelNum); RequestManager requestManager = channel.getRequestManager(); - return requestManager.sendRequest(methodBody, responseCallback); + return requestManager.sendRequest(methodBody, methodListener); } public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index b51ae2101d..0a89d70734 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -49,7 +50,7 @@ import java.io.IOException; * the state for the connection. * */ -public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList, AMQResponseCallback +public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList { private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class); @@ -153,7 +154,8 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco } - public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception + public void exceptionCaught(IoSession protocolSession, AMQMethodListener methodListener, + Throwable throwable) throws Exception { AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); if (throwable instanceof AMQProtocolHeaderException) @@ -181,16 +183,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco 0, // methodId 200, // replyCode throwable.getMessage()); // replyText - protocolSession.writeRequest(0, closeBody, this); + session.writeRequest(0, closeBody, methodListener); _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); protocolSession.close(); } } - - public void responseFrameReceived(AMQResponseBody responseBody) - { - // do nothing - } /** * Invoked when a message is received on a particular protocol session. Note that a diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java deleted file mode 100644 index ed0e692921..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -public interface AMQResponseCallback -{ - /** - * Callback for response frames. An instance of this class must be - * passed to RequestResponseManager.sendRequest(). When a response - * is received, then this method will be invoked in the passed - * instance. - */ - public void responseFrameReceived(AMQResponseBody responseBody); -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index 900d068c13..b567aea37e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -22,6 +22,8 @@ package org.apache.qpid.framing; import java.util.Hashtable; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQProtocolWriter; public class RequestManager @@ -41,7 +43,7 @@ public class RequestManager */ private long lastProcessedResponseId; - private Hashtable<Long, AMQResponseCallback> requestSentMap; + private Hashtable<Long, AMQMethodListener> requestSentMap; public RequestManager(int channel, AMQProtocolWriter protocolWriter) { @@ -49,34 +51,36 @@ public class RequestManager this.protocolWriter = protocolWriter; requestIdCount = 1L; lastProcessedResponseId = 0L; - requestSentMap = new Hashtable<Long, AMQResponseCallback>(); + requestSentMap = new Hashtable<Long, AMQMethodListener>(); } // *** Functions to originate a request *** public long sendRequest(AMQMethodBody requestMethodBody, - AMQResponseCallback responseCallback) + AMQMethodListener methodListener) { long requestId = getNextRequestId(); // Get new request ID AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, lastProcessedResponseId, requestMethodBody); protocolWriter.writeFrame(requestFrame); - requestSentMap.put(requestId, responseCallback); + requestSentMap.put(requestId, methodListener); return requestId; } public void responseReceived(AMQResponseBody responseBody) - throws RequestResponseMappingException + throws Exception { long requestIdStart = responseBody.getRequestId(); long requestIdStop = requestIdStart + responseBody.getBatchOffset(); for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) { - AMQResponseCallback responseCallback = requestSentMap.get(requestId); - if (responseCallback == null) + AMQMethodListener methodListener = requestSentMap.get(requestId); + if (methodListener == null) throw new RequestResponseMappingException(requestId, "Failed to locate requestId " + requestId + " in requestSentMap."); - responseCallback.responseFrameReceived(responseBody); + AMQMethodEvent methodEvent = new AMQMethodEvent(channel, responseBody.getMethodPayload(), + requestId); + methodListener.methodReceived(methodEvent); requestSentMap.remove(requestId); } lastProcessedResponseId = responseBody.getResponseId(); diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java index c42e49d7c8..c624d2e364 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java @@ -37,13 +37,13 @@ public class AMQMethodEvent<M extends AMQMethodBody> { private final M _method; private final int _channelId; - private final long _requestResponseId; + private final long _requestId; - public AMQMethodEvent(int channelId, M method, long requestResponseId) + public AMQMethodEvent(int channelId, M method, long requestId) { _channelId = channelId; _method = method; - _requestResponseId = requestResponseId; + _requestId = requestId; } public M getMethod() @@ -56,9 +56,9 @@ public class AMQMethodEvent<M extends AMQMethodBody> return _channelId; } - public long getRequestResponseId() + public long getRequestId() { - return _requestResponseId; + return _requestId; } public String toString() @@ -66,7 +66,7 @@ public class AMQMethodEvent<M extends AMQMethodBody> StringBuilder buf = new StringBuilder("Method event: \n"); buf.append("Channel id: \n").append(_channelId); buf.append("Method: \n").append(_method); - buf.append("Request/Response Id: ").append(_requestResponseId); + buf.append("Request Id: ").append(_requestId); return buf.toString(); } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java index 41e4ad68c8..5ec9b122af 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java @@ -21,6 +21,9 @@ package org.apache.qpid.protocol; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.protocol.AMQMethodListener; public interface AMQProtocolWriter { @@ -29,4 +32,10 @@ public interface AMQProtocolWriter * @param frame the frame to be encoded and written */ public void writeFrame(AMQDataBlock frame); + + public long writeRequest(int channelNum, AMQMethodBody methodBody, + AMQMethodListener methodListener) throws RequestResponseMappingException; + + public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) + throws RequestResponseMappingException; } |
