diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-01-18 22:54:31 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-01-18 22:54:31 +0000 |
| commit | 3c0e0c3bb319af15035f1c6596fd1a5c7568c293 (patch) | |
| tree | e35bcb0d6316e1cf460c9045b938d105f8328f65 /java/client/src/main | |
| parent | 8a5d46b686eebfc0caf2f1e92eae3dea7b868ebd (diff) | |
| download | qpid-python-3c0e0c3bb319af15035f1c6596fd1a5c7568c293.tar.gz | |
implemented the logic for MessageTransfer and MessageAppend
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497616 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
9 files changed, 105 insertions, 90 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java index eb4a036bbf..152a86ed63 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.handler; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageAppendBody; import org.apache.qpid.protocol.AMQMethodEvent; @@ -30,6 +31,7 @@ import org.apache.qpid.client.state.StateAwareMethodListener; public class MessageAppendMethodHandler implements StateAwareMethodListener { private static MessageAppendMethodHandler _instance = new MessageAppendMethodHandler(); + private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); public static MessageAppendMethodHandler getInstance() { @@ -44,7 +46,11 @@ public class MessageAppendMethodHandler implements StateAwareMethodListener AMQMethodEvent evt) throws AMQException { - // TODO + try { + protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod()); + } catch (Exception e) { + _logger.error("Unable to add data from MessageAppendBody",e); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java index 4ca6d63d6e..c7d1c60585 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.handler; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageCloseBody; import org.apache.qpid.protocol.AMQMethodEvent; @@ -30,7 +31,8 @@ import org.apache.qpid.client.state.StateAwareMethodListener; public class MessageCloseMethodHandler implements StateAwareMethodListener { private static MessageCloseMethodHandler _instance = new MessageCloseMethodHandler(); - + private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); + public static MessageCloseMethodHandler getInstance() { return _instance; @@ -44,7 +46,10 @@ public class MessageCloseMethodHandler implements StateAwareMethodListener AMQMethodEvent evt) throws AMQException { - // TODO + MessageCloseBody body = (MessageCloseBody)evt.getMethod(); + String referenceId = new String(body.getReference()); + protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId); + _logger.debug("Method Close Body received, notify session to accept unprocessed message"); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java index 055fe6e940..fa31bc8056 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java @@ -22,13 +22,14 @@ package org.apache.qpid.client.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.client.message.MessageHeaders; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.protocol.AMQMethodEvent; public class MessageTransferMethodHandler implements StateAwareMethodListener { @@ -50,7 +51,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener { final UnprocessedMessage msg = new UnprocessedMessage(); MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod(); - msg.content = transferBody.getBody(); + msg.channelId = evt.getChannelId(); msg.deliveryTag = evt.getRequestId(); _logger.debug("New JmsDeliver method received"); @@ -74,7 +75,16 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener msg.contentHeader = messageHeaders; - protocolSession.unprocessedMessageReceived(msg); + if(transferBody.getBody().contentType == Content.ContentTypeEnum.CONTENT_TYPE_INLINE) + { + msg.addContent(transferBody.getBody().getContentAsByteArray()); + protocolSession.deliverMessageToAMQSession(evt.getChannelId(), msg); + } + else + { + String referenceId = new String(transferBody.getBody().getContentAsByteArray()); + protocolSession.deliverMessageToAMQSession(evt.getChannelId(),referenceId); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index f1ce5796d5..73901c5cf2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.Content; -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; - -import javax.jms.JMSException; import java.util.Iterator; import java.util.List; +import javax.jms.JMSException; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; + public abstract class AbstractJMSMessageFactory implements MessageFactory { private static final Logger _logger = Logger.getLogger(AbstractJMSMessageFactory.class); @@ -38,11 +38,14 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory ByteBuffer data, MessageHeaders contentHeader) throws AMQException; protected AbstractJMSMessage createMessageWithBody(long messageNbr, - MessageHeaders contentHeader, Content body) throws AMQException { - ByteBuffer data; - - data = ByteBuffer.allocate(body.content.remaining()); - data.put(body.content); + MessageHeaders contentHeader, List contents) throws AMQException { + + ByteBuffer data = ByteBuffer.allocate((int)contentHeader.getSize()); + for (final Iterator it = contents.iterator();it.hasNext();) + { + byte[] bytes = (byte[]) it.next(); + data.put(bytes); + } data.flip(); _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining()); @@ -52,9 +55,9 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, MessageHeaders contentHeader, - Content body) throws JMSException, AMQException + List contents) throws JMSException, AMQException { - final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, body); + final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, contents); msg.setJMSRedelivered(redelivered); return msg; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 64584788ef..17c735fd0e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -20,8 +20,9 @@ */ package org.apache.qpid.client.message; +import java.util.List; + import org.apache.qpid.AMQException; -import org.apache.qpid.framing.Content; import javax.jms.JMSException; @@ -30,7 +31,7 @@ public interface MessageFactory { AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, MessageHeaders contentHeader, - Content body) + List contents) throws JMSException, AMQException; AbstractJMSMessage createMessage() throws JMSException; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 108f041f78..c7a7cd892b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -21,6 +21,7 @@ package org.apache.qpid.client.message; import java.util.HashMap; +import java.util.List; import java.util.Map; import javax.jms.JMSException; @@ -59,7 +60,7 @@ public class MessageFactoryRegistry */ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, MessageHeaders contentHeader, - Content body) throws AMQException, JMSException + List contents) throws AMQException, JMSException { MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(contentHeader.getContentType()); if (mf == null) @@ -68,7 +69,7 @@ public class MessageFactoryRegistry } else { - return mf.createMessage(deliveryTag, redelivered, contentHeader, body); + return mf.createMessage(deliveryTag, redelivered, contentHeader, contents); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java index f3a78bd904..72e920ffd3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java @@ -69,7 +69,17 @@ public class MessageHeaders private String _routingKey; - public MessageHeaders() + private int _size; + + public int getSize() { + return _size; + } + + public void setSize(int size) { + this._size = size; + } + + public MessageHeaders() { } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 5a528e6fea..ea1c3c02c8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -20,10 +20,8 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.framing.*; - -import java.util.List; import java.util.LinkedList; +import java.util.List; /** * This class contains everything needed to process a JMS message. It assembles the @@ -34,13 +32,20 @@ import java.util.LinkedList; * the MINA dispatcher thread. * */ -public class UnprocessedMessage -{ - private long _bytesReceived = 0; - - public Content content; - public int channelId; - public long deliveryTag; - public MessageHeaders contentHeader; - +public class UnprocessedMessage { + public int bytesReceived = 0; + + public List contents = new LinkedList(); + + public int channelId; + + public long deliveryTag; + + public MessageHeaders contentHeader; + + public void addContent(byte[] content) { + contents.add(content); + bytesReceived = bytesReceived + content.length; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index e59b528e8b..52cc70a37a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -42,6 +42,7 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQRequestBody; import org.apache.qpid.framing.AMQResponseBody; +import org.apache.qpid.framing.MessageAppendBody; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.RequestManager; @@ -94,7 +95,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis * Maps from a channel id to an unprocessed message. This is used to tie together the * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. */ - protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + protected ConcurrentMap _referenceId2UnprocessedMsgMap = new ConcurrentHashMap(); protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap(); protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap(); @@ -244,15 +245,21 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis } /** - * Callback invoked from the BasicDeliverMethodHandler when a message has been received. + * This is involed from MessageTransferMethodHandler if type is CONTENT_TYPE_REFERENCE * This is invoked on the MINA dispatcher thread. * @param message * @throws AMQException if this was not expected */ - public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException + public void unprocessedMessageReceived(String referenceId, UnprocessedMessage message) throws AMQException { - //_channelId2UnprocessedMsgMap.put(message.channelId, message); - deliverMessageToAMQSession(message.channelId, message); + _referenceId2UnprocessedMsgMap.put(referenceId, message); + } + + public void messageAppendBodyReceived(MessageAppendBody appendBody) throws Exception + { + String referenceId = new String(appendBody.getReference()); + UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId); + msg.addContent(appendBody.bytes); } public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception @@ -279,63 +286,30 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis requestManager.responseReceived(responseBody); } -// public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) -// throws AMQException -// { -// UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); -// if (msg == null) -// { -// throw new AMQException("Error: received content header without having received a BasicDeliver frame first"); -// } -// if (msg.contentHeader != null) -// { -// throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); -// } -// msg.contentHeader = contentHeader; -// if (contentHeader.bodySize == 0) -// { -// deliverMessageToAMQSession(channelId, msg); -// } -// } -// -// public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException -// { -// UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); -// if (msg == null) -// { -// throw new AMQException("Error: received content body without having received a JMSDeliver frame first"); -// } -// if (msg.contentHeader == null) -// { -// _channelId2UnprocessedMsgMap.remove(channelId); -// throw new AMQException("Error: received content body without having received a ContentHeader frame first"); -// } -// try -// { -// msg.receiveBody(contentBody); -// } -// catch (UnexpectedBodyReceivedException e) -// { -// _channelId2UnprocessedMsgMap.remove(channelId); -// throw e; -// } -// if (msg.isAllBodyDataReceived()) -// { -// deliverMessageToAMQSession(channelId, msg); -// } -// } - /** + * This is involed from MessageTransferMethodHandler if type is CONTENT_TYPE_INLINE * Deliver a message to the appropriate session, removing the unprocessed message * from our map * @param channelId the channel id the message should be delivered to * @param msg the message */ - private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) + public void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) { AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId); + msg.contentHeader.setSize(msg.bytesReceived); session.messageReceived(msg); - //_channelId2UnprocessedMsgMap.remove(channelId); + } + + /** + * This is involed from MessageCloseMethodHandler if type is CONTENT_TYPE_REFERENCE + * In this case we use the reference id to obtain the unprocessed message + * The channel id is used to retrive a session + */ + public void deliverMessageToAMQSession(int channelId, String referenceId) + { + UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId); + deliverMessageToAMQSession(channelId,msg); + _referenceId2UnprocessedMsgMap.remove(referenceId); } public long writeRequest(int channelNum, AMQMethodBody methodBody, |
