diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-01-17 20:49:25 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-01-17 20:49:25 +0000 |
| commit | ddef18423e3b574c52a58b4b2233a3dfb59160a6 (patch) | |
| tree | f9cf7cf71d18e5a6a3c813e3a9db58eb759d1bf5 /java/client | |
| parent | 0596dd29d27b8a077beed219dc9e268277d12d94 (diff) | |
| download | qpid-python-ddef18423e3b574c52a58b4b2233a3dfb59160a6.tar.gz | |
fixed the compilation errors in the message classes and changed to use MessageHeaders instead of the old ContentHeaders class
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497169 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
17 files changed, 87 insertions, 84 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ff69604baf..55cdd39931 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -238,7 +238,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - else + /*else { try { @@ -271,7 +271,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); } - } + }*/ } public void stopDispatcher() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 1f062ebb43..1f038d05ec 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -501,14 +501,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (_logger.isDebugEnabled()) { - _logger.debug("notifyMessage called with message number " + messageFrame.content.deliveryTag); + _logger.debug("notifyMessage called with message number " + messageFrame.content.getDestination()); } try { - AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.content.deliveryTag, - messageFrame.content.redelivered, + AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.contentHeader.getDestination(), + messageFrame.contentHeader.getr, messageFrame.contentHeader, - messageFrame.bodies); + messageFrame.content); _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); jmsMessage.setConsumer(this); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 190154a468..f27436393b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -27,6 +27,7 @@ import javax.jms.JMSException; import javax.jms.MessageEOFException; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; /** * @author Apache Software Foundation @@ -67,14 +68,14 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage _data.setAutoExpand(true); } - /* - AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + + AbstractBytesMessage(long messageNbr, MessageHeaders contentHeader, ByteBuffer data) throws AMQException { // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); - getJmsContentHeaderProperties().setContentType(getMimeType()); - }*/ + super(messageNbr, contentHeader, data); + getMessageHeaders().setContentType(getMimeType()); + } public void clearBodyImpl() throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index 8621a30694..7b97a8b065 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -10,6 +10,7 @@ import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
/**
* @author Apache Software Foundation
@@ -61,12 +62,12 @@ public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage super(data); // this instanties a content header
}
-/*
- AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
+
+ AbstractBytesTypedMessage(long messageNbr, MessageHeaders contentHeader, ByteBuffer data)
throws AMQException
{
super(messageNbr, contentHeader, data);
- }*/
+ }
protected byte readWireType() throws MessageFormatException, MessageEOFException,
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 25acb60dc9..0123a2324d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -395,7 +395,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach // we set multiple to true here since acknowledgement implies acknowledge of all previous messages // received on the session - _session.acknowledgeMessage(_deliveryTag, true); + try { + _session.acknowledgeMessage(_deliveryTag, true); + } catch (AMQException e) { + JMSException ex = new JMSException("Error trying to acknowledge"); + ex.initCause(e); + throw ex; + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index dcff8c348b..5d77b022d5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -21,8 +21,7 @@ package org.apache.qpid.client.message; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.Content; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; @@ -35,44 +34,27 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory private static final Logger _logger = Logger.getLogger(AbstractJMSMessageFactory.class); - protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, - ContentHeaderBody contentHeader) throws AMQException; + protected abstract AbstractJMSMessage createMessage(long messageNbr, + ByteBuffer data, MessageHeaders contentHeader) throws AMQException; - protected AbstractJMSMessage createMessageWithBody(long messageNbr, - ContentHeaderBody contentHeader, - List bodies) throws AMQException - { + protected AbstractJMSMessage createMessageWithBody(long messageNbr, + MessageHeaders contentHeader, Content body) throws AMQException { ByteBuffer data; - // we optimise the non-fragmented case to avoid copying - if (bodies != null && bodies.size() == 1) - { - _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize +")"); - data = ((ContentBody)bodies.get(0)).payload; - } - else - { - _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize + ")"); - data = ByteBuffer.allocate((int)contentHeader.bodySize); // XXX: Is cast a problem? - final Iterator it = bodies.iterator(); - while (it.hasNext()) - { - ContentBody cb = (ContentBody) it.next(); - data.put(cb.payload); - cb.payload.release(); - } - data.flip(); - } + data = ByteBuffer.allocate(body.content.length); + data.put(body.content); + data.flip(); + _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining()); return createMessage(messageNbr, data, contentHeader); } public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, - ContentHeaderBody contentHeader, - List bodies) throws JMSException, AMQException + MessageHeaders contentHeader, + Content body) throws JMSException, AMQException { - final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, bodies); + final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, body); msg.setJMSRedelivered(redelivered); return msg; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index b23fcfb2bb..6943f277ca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -31,6 +31,7 @@ import javax.jms.JMSException; import javax.jms.MessageFormatException; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage { @@ -52,11 +53,11 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag super(data); // this instanties a content header } - /* JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + JMSBytesMessage(long messageNbr, MessageHeaders contentHeader, ByteBuffer data) throws AMQException { super(messageNbr, contentHeader, data); - }*/ + } public void reset() { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java index 78f392a83f..ed8586a06b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java @@ -20,15 +20,14 @@ */ package org.apache.qpid.client.message; +import javax.jms.JMSException; + import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; - -import javax.jms.JMSException; public class JMSBytesMessageFactory extends AbstractJMSMessageFactory { - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, MessageHeaders contentHeader) throws AMQException { return new JMSBytesMessage(deliveryTag, contentHeader, data); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index 6fb944dd06..b8b1203515 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -31,6 +31,7 @@ import javax.jms.MessageFormatException; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage { @@ -52,6 +53,21 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm populateMapFromData(); } + JMSMapMessage(long messageNbr, MessageHeaders contentHeader, ByteBuffer data)throws AMQException + { + super(messageNbr, contentHeader, data); + try + { + populateMapFromData(); + } + catch (JMSException je) + { + throw new AMQException("Error populating MapMessage from ByteBuffer", je); + + } + } + + public String toBodyString() throws JMSException { return _map.toString(); @@ -71,7 +87,6 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } - @Override public void clearBodyImpl() throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java index b110f04460..072c9b0f16 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java @@ -20,12 +20,11 @@ */ package org.apache.qpid.client.message; +import javax.jms.JMSException; + import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.AMQException; -import javax.jms.JMSException; - public class JMSMapMessageFactory extends AbstractJMSMessageFactory { public AbstractJMSMessage createMessage() throws JMSException @@ -33,7 +32,7 @@ public class JMSMapMessageFactory extends AbstractJMSMessageFactory return new JMSMapMessage(); } - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, MessageHeaders contentHeader) throws AMQException { return new JMSMapMessage(deliveryTag, contentHeader, data); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 6e432f995b..f8a61b32d3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -33,6 +33,7 @@ import javax.jms.MessageFormatException; import javax.jms.ObjectMessage; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { @@ -62,10 +63,10 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag /** * Creates read only message for delivery to consumers */ - /* JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException + JMSObjectMessage(long messageNbr, MessageHeaders contentHeader, ByteBuffer data) throws AMQException { - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); - }*/ + super(messageNbr, contentHeader, data); + } public void clearBodyImpl() throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java index b2228a6805..6980022746 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java @@ -20,15 +20,14 @@ */ package org.apache.qpid.client.message; +import javax.jms.JMSException; + import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; - -import javax.jms.JMSException; public class JMSObjectMessageFactory extends AbstractJMSMessageFactory { - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, MessageHeaders contentHeader) throws AMQException { return new JMSObjectMessage(deliveryTag, contentHeader, data); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 327603092e..6be367f07a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -24,6 +24,7 @@ import javax.jms.JMSException; import javax.jms.StreamMessage; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; /** * @author Apache Software Foundation @@ -56,11 +57,11 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea } - /* JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + JMSStreamMessage(long messageNbr, MessageHeaders contentHeader, ByteBuffer data) throws AMQException { super(messageNbr, contentHeader, data); - }*/ + } public void reset() { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java index aae9f0cdb2..7517fea16d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java @@ -20,15 +20,14 @@ */ package org.apache.qpid.client.message; +import javax.jms.JMSException; + import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.AMQException; -import javax.jms.JMSException; - public class JMSStreamMessageFactory extends AbstractJMSMessageFactory { - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, MessageHeaders contentHeader) throws AMQException { return new JMSStreamMessage(deliveryTag, contentHeader, data); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 530b6be970..a436f2b0f1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -27,6 +27,7 @@ import java.nio.charset.Charset; import javax.jms.JMSException; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage { @@ -51,13 +52,13 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text getMessageHeaders().setEncoding(encoding); } - /* JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) + JMSTextMessage(long deliveryTag, MessageHeaders contentHeader, ByteBuffer data) throws AMQException { super(deliveryTag, contentHeader, data); contentHeader.setContentType(MIME_TYPE); _data = data; - }*/ + } JMSTextMessage(ByteBuffer data) throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java index e7ddde2790..64b1fc3892 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.client.message; +import javax.jms.JMSException; + import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; - -import javax.jms.JMSException; public class JMSTextMessageFactory extends AbstractJMSMessageFactory { @@ -35,8 +33,8 @@ public class JMSTextMessageFactory extends AbstractJMSMessageFactory return new JMSTextMessage(); } - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, MessageHeaders contentHeader) throws AMQException { - return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, data); + return new JMSTextMessage(deliveryTag, contentHeader, data); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 1a739feb35..609da9a05e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.jms.JMSException; +import javax.security.sasl.SaslClient; + +import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IdleStatus; @@ -29,7 +36,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.ConnectionTuneParameters; -import org.apache.qpid.client.message.UnexpectedBodyReceivedException; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.framing.AMQDataBlock; @@ -43,12 +49,6 @@ import org.apache.qpid.framing.ResponseManager; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQProtocolWriter; -import org.apache.commons.lang.StringUtils; - -import javax.jms.JMSException; -import javax.security.sasl.SaslClient; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; /** * Wrapper for protocol session that provides type-safe access to session attributes. |
