diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-01-17 16:20:02 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-01-17 16:20:02 +0000 |
| commit | bbafa7f994e1d5592e984414e915bdc76a445c2e (patch) | |
| tree | 24f978f832b200184b3ec56a9f34310d78c6f7c2 | |
| parent | 41ef372b003840d21339e26300faedb95e627426 (diff) | |
| download | qpid-python-bbafa7f994e1d5592e984414e915bdc76a445c2e.tar.gz | |
Filled in the MessageTransferMethodHandler and added a few fields to the MessageHeaders class.
Minor modifications to AMQProtocolSession to directly fire the unprocessed message to AMQSession instead of strong in a map.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497062 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 87 insertions, 53 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index f1a9823e38..73dfdd90e5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -221,13 +221,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void dispatchMessage(UnprocessedMessage message) { - if (message.deliverBody != null) + if (message.content != null) { - final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.content.consumerTag); if (consumer == null) { - _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring..."); + _logger.warn("Received a message from queue " + message.content.consumerTag + " without a handler - ignoring..."); _logger.warn("Consumers that exist: " + _consumers); _logger.warn("Session hashcode: " + System.identityHashCode(this)); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 55cb28be42..1f062ebb43 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -501,12 +501,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (_logger.isDebugEnabled()) { - _logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag); + _logger.debug("notifyMessage called with message number " + messageFrame.content.deliveryTag); } try { - AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag, - messageFrame.deliverBody.redelivered, + AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.content.deliveryTag, + messageFrame.content.redelivered, messageFrame.contentHeader, messageFrame.bodies); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java index 25bb9652ac..71252dca8c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java @@ -20,9 +20,12 @@ */ package org.apache.qpid.client.handler; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageTransferBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.message.MessageHeaders; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; @@ -30,7 +33,8 @@ import org.apache.qpid.client.state.StateAwareMethodListener; public class MessageTransferMethodHandler implements StateAwareMethodListener { private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler(); - + private static final Logger _logger = Logger.getLogger(MessageTransferMethodHandler.class); + public static MessageTransferMethodHandler getInstance() { return _instance; @@ -44,7 +48,33 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener AMQMethodEvent evt) throws AMQException { - // TODO + final UnprocessedMessage msg = new UnprocessedMessage(); + MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod(); + msg.content = transferBody.getBody(); + msg.channelId = evt.getChannelId(); + _logger.debug("New JmsDeliver method received"); + + MessageHeaders messageHeaders = new MessageHeaders(); + messageHeaders.setMessageId(transferBody.getMessageId()); + messageHeaders.setAppId(transferBody.getAppId()); + messageHeaders.setContentType(transferBody.getContentType()); + messageHeaders.setEncoding(transferBody.getContentEncoding()); + messageHeaders.setCorrelationId(transferBody.getCorrelationId()); + messageHeaders.setDestination(transferBody.getDestination()); + messageHeaders.setExchange(transferBody.getExchange()); + messageHeaders.setExpiration(transferBody.getExpiration()); + messageHeaders.setReplyTo(transferBody.getReplyTo()); + messageHeaders.setRoutingKey(transferBody.getRoutingKey()); + messageHeaders.setTransactionId(transferBody.getTransactionId()); + messageHeaders.setUserId(transferBody.getUserId()); + messageHeaders.setPriority(transferBody.getPriority()); + messageHeaders.setDeliveryMode(transferBody.getDeliveryMode()); + messageHeaders.setJMSHeaders(transferBody.getApplicationHeaders()); + + msg.contentHeader = messageHeaders; + + protocolSession.unprocessedMessageReceived(msg); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java index cb260eaf8f..f3a78bd904 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java @@ -34,16 +34,20 @@ import java.util.Enumeration; public class MessageHeaders { private static final Logger _logger = Logger.getLogger(MessageHeaders.class); - + private String _contentType; private String _encoding; + + private String _destination; + + private String _exchange; private FieldTable _jmsHeaders; - private byte _deliveryMode; + private short _deliveryMode; - private byte _priority; + private short _priority; private String _correlationId; @@ -63,6 +67,8 @@ public class MessageHeaders private String _transactionId; + private String _routingKey; + public MessageHeaders() { } @@ -108,22 +114,22 @@ public class MessageHeaders } - public byte getDeliveryMode() + public short getDeliveryMode() { return _deliveryMode; } - public void setDeliveryMode(byte deliveryMode) + public void setDeliveryMode(short deliveryMode) { _deliveryMode = deliveryMode; } - public byte getPriority() + public short getPriority() { return _priority; } - public void setPriority(byte priority) + public void setPriority(short priority) { _priority = priority; } @@ -161,12 +167,12 @@ public class MessageHeaders public String getMessageId() { - return _messageId == null ? null : _messageId.toString(); + return _messageId; } public void setMessageId(String messageId) { - _messageId = messageId == null ? null : new String(messageId); + _messageId = messageId; } public long getTimestamp() @@ -621,13 +627,37 @@ public class MessageHeaders } - public String get_transactionId() { + public String getTransactionId() { return _transactionId; } - public void set_transactionId(String id) { + public void setTransactionId(String id) { _transactionId = id; } + + public String getDestination() { + return _destination; + } + + public void setDestination(String destination) { + this._destination = destination; + } + + public String getExchange() { + return _exchange; + } + + public void setExchange(String exchange) { + this._exchange = exchange; + } + + public String getRoutingKey() { + return _routingKey; + } + + public void setRoutingKey(String routingKey) { + this._routingKey = routingKey; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 7d20c32b66..38cfb31eac 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -38,27 +38,8 @@ public class UnprocessedMessage { private long _bytesReceived = 0; - public BasicDeliverBody deliverBody; - public BasicReturnBody bounceBody; // TODO: check change (gustavo) + public Content content; public int channelId; - public ContentHeaderBody contentHeader; - - /** - * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general - */ - public List bodies = new LinkedList(); - - public void receiveBody(ContentBody body) throws UnexpectedBodyReceivedException - { - bodies.add(body); - if (body.payload != null) - { - _bytesReceived += body.payload.remaining(); - } - } - - public boolean isAllBodyDataReceived() - { - return _bytesReceived == contentHeader.bodySize; - } + public MessageHeaders contentHeader; + } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 735af586c7..44be521197 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -309,7 +309,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody); if (frame.bodyFrame instanceof AMQRequestBody) - { + { _protocolSession.messageRequestBodyReceived(frame.channel, (AMQRequestBody)frame.bodyFrame); } else if (frame.bodyFrame instanceof AMQResponseBody) @@ -327,15 +327,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter // try // { // boolean wasAnyoneInterested = false; -// while (it.hasNext()) -// { -// final AMQMethodListener listener = (AMQMethodListener) it.next(); -// wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; -// } -// if (!wasAnyoneInterested) -// { -// throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); -// } +// Q // } // catch (AMQException e) // { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index d1c86dde45..ef738058d4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -240,7 +240,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis */ public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException { - _channelId2UnprocessedMsgMap.put(message.channelId, message); + //_channelId2UnprocessedMsgMap.put(message.channelId, message); + deliverMessageToAMQSession(message.channelId, message); } public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception @@ -323,7 +324,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId); session.messageReceived(msg); - _channelId2UnprocessedMsgMap.remove(channelId); + //_channelId2UnprocessedMsgMap.remove(channelId); } public long writeRequest(int channelNum, AMQMethodBody methodBody, |
