summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-01-17 16:20:02 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-01-17 16:20:02 +0000
commitbbafa7f994e1d5592e984414e915bdc76a445c2e (patch)
tree24f978f832b200184b3ec56a9f34310d78c6f7c2
parent41ef372b003840d21339e26300faedb95e627426 (diff)
downloadqpid-python-bbafa7f994e1d5592e984414e915bdc76a445c2e.tar.gz
Filled in the MessageTransferMethodHandler and added a few fields to the MessageHeaders class.
Minor modifications to AMQProtocolSession to directly fire the unprocessed message to AMQSession instead of strong in a map. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497062 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java34
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java52
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java25
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java5
7 files changed, 87 insertions, 53 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index f1a9823e38..73dfdd90e5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -221,13 +221,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void dispatchMessage(UnprocessedMessage message)
{
- if (message.deliverBody != null)
+ if (message.content != null)
{
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.content.consumerTag);
if (consumer == null)
{
- _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring...");
+ _logger.warn("Received a message from queue " + message.content.consumerTag + " without a handler - ignoring...");
_logger.warn("Consumers that exist: " + _consumers);
_logger.warn("Session hashcode: " + System.identityHashCode(this));
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 55cb28be42..1f062ebb43 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -501,12 +501,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (_logger.isDebugEnabled())
{
- _logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag);
+ _logger.debug("notifyMessage called with message number " + messageFrame.content.deliveryTag);
}
try
{
- AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag,
- messageFrame.deliverBody.redelivered,
+ AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.content.deliveryTag,
+ messageFrame.content.redelivered,
messageFrame.contentHeader,
messageFrame.bodies);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
index 25bb9652ac..71252dca8c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
@@ -20,9 +20,12 @@
*/
package org.apache.qpid.client.handler;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.message.MessageHeaders;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -30,7 +33,8 @@ import org.apache.qpid.client.state.StateAwareMethodListener;
public class MessageTransferMethodHandler implements StateAwareMethodListener
{
private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler();
-
+ private static final Logger _logger = Logger.getLogger(MessageTransferMethodHandler.class);
+
public static MessageTransferMethodHandler getInstance()
{
return _instance;
@@ -44,7 +48,33 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener
AMQMethodEvent evt)
throws AMQException
{
- // TODO
+ final UnprocessedMessage msg = new UnprocessedMessage();
+ MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod();
+ msg.content = transferBody.getBody();
+ msg.channelId = evt.getChannelId();
+ _logger.debug("New JmsDeliver method received");
+
+ MessageHeaders messageHeaders = new MessageHeaders();
+ messageHeaders.setMessageId(transferBody.getMessageId());
+ messageHeaders.setAppId(transferBody.getAppId());
+ messageHeaders.setContentType(transferBody.getContentType());
+ messageHeaders.setEncoding(transferBody.getContentEncoding());
+ messageHeaders.setCorrelationId(transferBody.getCorrelationId());
+ messageHeaders.setDestination(transferBody.getDestination());
+ messageHeaders.setExchange(transferBody.getExchange());
+ messageHeaders.setExpiration(transferBody.getExpiration());
+ messageHeaders.setReplyTo(transferBody.getReplyTo());
+ messageHeaders.setRoutingKey(transferBody.getRoutingKey());
+ messageHeaders.setTransactionId(transferBody.getTransactionId());
+ messageHeaders.setUserId(transferBody.getUserId());
+ messageHeaders.setPriority(transferBody.getPriority());
+ messageHeaders.setDeliveryMode(transferBody.getDeliveryMode());
+ messageHeaders.setJMSHeaders(transferBody.getApplicationHeaders());
+
+ msg.contentHeader = messageHeaders;
+
+ protocolSession.unprocessedMessageReceived(msg);
+
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
index cb260eaf8f..f3a78bd904 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
@@ -34,16 +34,20 @@ import java.util.Enumeration;
public class MessageHeaders
{
private static final Logger _logger = Logger.getLogger(MessageHeaders.class);
-
+
private String _contentType;
private String _encoding;
+
+ private String _destination;
+
+ private String _exchange;
private FieldTable _jmsHeaders;
- private byte _deliveryMode;
+ private short _deliveryMode;
- private byte _priority;
+ private short _priority;
private String _correlationId;
@@ -63,6 +67,8 @@ public class MessageHeaders
private String _transactionId;
+ private String _routingKey;
+
public MessageHeaders()
{
}
@@ -108,22 +114,22 @@ public class MessageHeaders
}
- public byte getDeliveryMode()
+ public short getDeliveryMode()
{
return _deliveryMode;
}
- public void setDeliveryMode(byte deliveryMode)
+ public void setDeliveryMode(short deliveryMode)
{
_deliveryMode = deliveryMode;
}
- public byte getPriority()
+ public short getPriority()
{
return _priority;
}
- public void setPriority(byte priority)
+ public void setPriority(short priority)
{
_priority = priority;
}
@@ -161,12 +167,12 @@ public class MessageHeaders
public String getMessageId()
{
- return _messageId == null ? null : _messageId.toString();
+ return _messageId;
}
public void setMessageId(String messageId)
{
- _messageId = messageId == null ? null : new String(messageId);
+ _messageId = messageId;
}
public long getTimestamp()
@@ -621,13 +627,37 @@ public class MessageHeaders
}
- public String get_transactionId() {
+ public String getTransactionId() {
return _transactionId;
}
- public void set_transactionId(String id) {
+ public void setTransactionId(String id) {
_transactionId = id;
}
+
+ public String getDestination() {
+ return _destination;
+ }
+
+ public void setDestination(String destination) {
+ this._destination = destination;
+ }
+
+ public String getExchange() {
+ return _exchange;
+ }
+
+ public void setExchange(String exchange) {
+ this._exchange = exchange;
+ }
+
+ public String getRoutingKey() {
+ return _routingKey;
+ }
+
+ public void setRoutingKey(String routingKey) {
+ this._routingKey = routingKey;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 7d20c32b66..38cfb31eac 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -38,27 +38,8 @@ public class UnprocessedMessage
{
private long _bytesReceived = 0;
- public BasicDeliverBody deliverBody;
- public BasicReturnBody bounceBody; // TODO: check change (gustavo)
+ public Content content;
public int channelId;
- public ContentHeaderBody contentHeader;
-
- /**
- * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
- */
- public List bodies = new LinkedList();
-
- public void receiveBody(ContentBody body) throws UnexpectedBodyReceivedException
- {
- bodies.add(body);
- if (body.payload != null)
- {
- _bytesReceived += body.payload.remaining();
- }
- }
-
- public boolean isAllBodyDataReceived()
- {
- return _bytesReceived == contentHeader.bodySize;
- }
+ public MessageHeaders contentHeader;
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 735af586c7..44be521197 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -309,7 +309,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
if (frame.bodyFrame instanceof AMQRequestBody)
- {
+ {
_protocolSession.messageRequestBodyReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
}
else if (frame.bodyFrame instanceof AMQResponseBody)
@@ -327,15 +327,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// try
// {
// boolean wasAnyoneInterested = false;
-// while (it.hasNext())
-// {
-// final AMQMethodListener listener = (AMQMethodListener) it.next();
-// wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
-// }
-// if (!wasAnyoneInterested)
-// {
-// throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
-// }
+// Q
// }
// catch (AMQException e)
// {
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index d1c86dde45..ef738058d4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -240,7 +240,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
*/
public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
{
- _channelId2UnprocessedMsgMap.put(message.channelId, message);
+ //_channelId2UnprocessedMsgMap.put(message.channelId, message);
+ deliverMessageToAMQSession(message.channelId, message);
}
public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
@@ -323,7 +324,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
session.messageReceived(msg);
- _channelId2UnprocessedMsgMap.remove(channelId);
+ //_channelId2UnprocessedMsgMap.remove(channelId);
}
public long writeRequest(int channelNum, AMQMethodBody methodBody,