diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-01-10 23:53:12 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-01-10 23:53:12 +0000 |
| commit | 259878b1ab4ed36a0e2e04cef30c2e69f1490f94 (patch) | |
| tree | 6c46bf3068f3eb6d494b92216c76fb95c1ac2d76 /java | |
| parent | 804d660fc35571d764f2a7acc87cfe82ac31a95e (diff) | |
| download | qpid-python-259878b1ab4ed36a0e2e04cef30c2e69f1490f94.tar.gz | |
Compilation fixes. The idea is to substitute the Basic methods with the new Message class
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@495043 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
13 files changed, 266 insertions, 217 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 3134b0e684..8eba0160a8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,6 +20,36 @@ */ package org.apache.qpid.client; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; @@ -29,10 +59,10 @@ import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageQosBody; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; @@ -43,23 +73,6 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; -import javax.jms.*; -import javax.jms.IllegalStateException; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); @@ -479,12 +492,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, + MessageQosBody.createAMQFrame(channelId, (byte)0, (byte)9, // AMQP version (major, minor) false, // global prefetchHigh, // prefetchCount 0), // prefetchSize - BasicQosOkBody.class); + MessageOkBody.class); if (transacted) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index dad84bb294..8cde5e557f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,38 +20,81 @@ */ package org.apache.qpid.client; +import java.io.Serializable; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQInvalidSelectorException; -import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ExchangeBoundBody; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.TxCommitBody; +import org.apache.qpid.framing.TxCommitOkBody; +import org.apache.qpid.framing.TxRollbackBody; +import org.apache.qpid.framing.TxRollbackOkBody; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.handler.ExchangeBoundHandler; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; -import javax.jms.*; -import javax.jms.IllegalStateException; -import java.io.Serializable; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { private static final Logger _logger = Logger.getLogger(AMQSession.class); @@ -723,7 +766,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, + _connection.getProtocolHandler().writeFrame(MessageRecoverBody.createAMQFrame(_channelId, (byte)0, (byte)9, // AMQP version (major, minor) false)); // requeue } @@ -1187,7 +1230,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, + /*AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, (byte)0, (byte)9, // AMQP version (major, minor) arguments, // arguments tag, // consumerTag @@ -1196,15 +1239,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.isNoLocal(), // noLocal nowait, // nowait queueName, // queue - 0); // ticket + 0); // ticket */ + + AMQFrame jmsConsume = MessageConsumeBody.createAMQFrame(_channelId, + (byte)0, (byte)9, // AMQP version (major, minor) + tag, // consumerTag + consumer.isExclusive(), // exclusive + arguments, // arguments in the form of a field table + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + queueName, // queue + 0); // ticket */ + /* if (nowait) { protocolHandler.writeFrame(jmsConsume); } else { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); - } + protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + }*/ + + protocolHandler.syncWrite(jmsConsume,MessageOkBody.class); } catch (AMQException e) { @@ -1535,10 +1591,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, - (byte)0, (byte)9, // AMQP version (major, minor) - deliveryTag, // deliveryTag - multiple); // multiple + final AMQFrame ackFrame = MessageOkBody.createAMQFrame(_channelId,(byte)0, (byte)9); // AMQP version (major, minor) + //deliveryTag, // deliveryTag + //multiple); // multiple if (_logger.isDebugEnabled()) { _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 5597d43cc0..b13b22316a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,31 +20,32 @@ */ package org.apache.qpid.client; +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.Iterator; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import javax.jms.Destination; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.URLSyntaxException; public class BasicMessageConsumer extends Closeable implements MessageConsumer { @@ -451,14 +452,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, + final AMQFrame cancelFrame = MessageCancelBody.createAMQFrame(_channelId, (byte)0, (byte)9, // AMQP version (major, minor) - _consumerTag, // consumerTag - false); // nowait + _consumerTag); // consumerTag try { - _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + _protocolHandler.syncWrite(cancelFrame, MessageOkBody.class); } catch (AMQException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 379c0f84ca..3d1cd8ee60 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -522,7 +522,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j AbstractJMSMessage message = convertToNativeMessage(origMessage); - message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL()); + message.getMessageHeaders().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL()); // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. @@ -542,7 +542,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } message.prepareForSending(); ByteBuffer payload = message.getData(); - BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties(); + BasicContentHeaderProperties contentHeaderProperties = message.getMessageHeaders(); if (timeToLive > 0) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java index dd82eb13c1..44ef5beeb1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java @@ -20,14 +20,13 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.client.AMQSession; import javax.jms.JMSException; public class AMQMessage { - protected ContentHeaderProperties _contentHeaderProperties; + protected MessageHeaders _messageHeaders; /** * If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required @@ -36,13 +35,13 @@ public class AMQMessage protected final long _deliveryTag; - public AMQMessage(ContentHeaderProperties properties, long deliveryTag) + public AMQMessage(MessageHeaders properties, long deliveryTag) { - _contentHeaderProperties = properties; + _messageHeaders = properties; _deliveryTag = deliveryTag; } - public AMQMessage(ContentHeaderProperties properties) + public AMQMessage(MessageHeaders properties) { this(properties, -1); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 011f7c09ab..190154a468 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -20,15 +20,13 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; +import java.io.IOException; +import java.nio.charset.Charset; import javax.jms.JMSException; import javax.jms.MessageEOFException; -import java.io.IOException; -import java.nio.charset.Charset; + +import org.apache.mina.common.ByteBuffer; /** * @author Apache Software Foundation @@ -55,7 +53,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage AbstractBytesMessage(ByteBuffer data) { super(data); // this instanties a content header - getJmsContentHeaderProperties().setContentType(getMimeType()); + getMessageHeaders().setContentType(getMimeType()); if (_data == null) { @@ -69,13 +67,14 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage _data.setAutoExpand(true); } + /* AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException { // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); getJmsContentHeaderProperties().setContentType(getMimeType()); - } + }*/ public void clearBodyImpl() throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index b941b9aee8..8621a30694 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -1,12 +1,15 @@ package org.apache.qpid.client.message;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.AMQException;
-
-import javax.jms.*;
-import java.nio.charset.Charset;
import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.mina.common.ByteBuffer;
/**
* @author Apache Software Foundation
@@ -58,12 +61,12 @@ public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage super(data); // this instanties a content header
}
-
+/*
AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
throws AMQException
{
super(messageNbr, contentHeader, data);
- }
+ }*/
protected byte readWireType() throws MessageFormatException, MessageEOFException,
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 0c29344c37..25acb60dc9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -20,24 +20,24 @@ */ package org.apache.qpid.client.message; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; + import org.apache.commons.collections.map.ReferenceMap; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.url.BindingURL; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.BasicMessageConsumer; -import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.URLSyntaxException; public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message { @@ -54,7 +54,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach protected AbstractJMSMessage(ByteBuffer data) { - super(new BasicContentHeaderProperties()); + super(new MessageHeaders()); _data = data; if (_data != null) { @@ -65,7 +65,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _changedData = (data == null); } - protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException + protected AbstractJMSMessage(long deliveryTag, MessageHeaders contentHeader, ByteBuffer data) throws AMQException { this(contentHeader, deliveryTag); _data = data; @@ -77,59 +77,59 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _readableMessage = data != null; } - protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) + protected AbstractJMSMessage(MessageHeaders contentHeader, long deliveryTag) { super(contentHeader, deliveryTag); - _readableProperties = (_contentHeaderProperties != null); + _readableProperties = (_messageHeaders != null); } public String getJMSMessageID() throws JMSException { - if (getJmsContentHeaderProperties().getMessageId() == null) + if (getMessageHeaders().getMessageId() == null) { - getJmsContentHeaderProperties().setMessageId("ID:" + _deliveryTag); + getMessageHeaders().setMessageId("ID:" + _deliveryTag); } - return getJmsContentHeaderProperties().getMessageId(); + return getMessageHeaders().getMessageId(); } public void setJMSMessageID(String messageId) throws JMSException { - getJmsContentHeaderProperties().setMessageId(messageId); + getMessageHeaders().setMessageId(messageId); } public long getJMSTimestamp() throws JMSException { - return new Long(getJmsContentHeaderProperties().getTimestamp()).longValue(); + return new Long(getMessageHeaders().getTimestamp()).longValue(); } public void setJMSTimestamp(long timestamp) throws JMSException { - getJmsContentHeaderProperties().setTimestamp(timestamp); + getMessageHeaders().setTimestamp(timestamp); } public byte[] getJMSCorrelationIDAsBytes() throws JMSException { - return getJmsContentHeaderProperties().getCorrelationId().getBytes(); + return getMessageHeaders().getCorrelationId().getBytes(); } public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException { - getJmsContentHeaderProperties().setCorrelationId(new String(bytes)); + getMessageHeaders().setCorrelationId(new String(bytes)); } public void setJMSCorrelationID(String correlationId) throws JMSException { - getJmsContentHeaderProperties().setCorrelationId(correlationId); + getMessageHeaders().setCorrelationId(correlationId); } public String getJMSCorrelationID() throws JMSException { - return getJmsContentHeaderProperties().getCorrelationId(); + return getMessageHeaders().getCorrelationId(); } public Destination getJMSReplyTo() throws JMSException { - String replyToEncoding = getJmsContentHeaderProperties().getReplyTo(); + String replyToEncoding = getMessageHeaders().getReplyTo(); if (replyToEncoding == null) { return null; @@ -170,7 +170,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach final String encodedDestination = amqd.getEncodedName(); _destinationCache.put(encodedDestination, destination); - getJmsContentHeaderProperties().setReplyTo(encodedDestination); + getMessageHeaders().setReplyTo(encodedDestination); } public Destination getJMSDestination() throws JMSException @@ -185,12 +185,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public int getJMSDeliveryMode() throws JMSException { - return getJmsContentHeaderProperties().getDeliveryMode(); + return getMessageHeaders().getDeliveryMode(); } public void setJMSDeliveryMode(int i) throws JMSException { - getJmsContentHeaderProperties().setDeliveryMode((byte) i); + getMessageHeaders().setDeliveryMode((byte) i); } public boolean getJMSRedelivered() throws JMSException @@ -205,37 +205,37 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getJMSType() throws JMSException { - return getJmsContentHeaderProperties().getType(); + return getMessageHeaders().getType(); } public void setJMSType(String string) throws JMSException { - getJmsContentHeaderProperties().setType(string); + getMessageHeaders().setType(string); } public long getJMSExpiration() throws JMSException { - return new Long(getJmsContentHeaderProperties().getExpiration()).longValue(); + return new Long(getMessageHeaders().getExpiration()).longValue(); } public void setJMSExpiration(long l) throws JMSException { - getJmsContentHeaderProperties().setExpiration(l); + getMessageHeaders().setExpiration(l); } public int getJMSPriority() throws JMSException { - return getJmsContentHeaderProperties().getPriority(); + return getMessageHeaders().getPriority(); } public void setJMSPriority(int i) throws JMSException { - getJmsContentHeaderProperties().setPriority((byte) i); + getMessageHeaders().setPriority((byte) i); } public void clearProperties() throws JMSException { - getJmsContentHeaderProperties().getJMSHeaders().clear(); + getMessageHeaders().getJMSHeaders().clear(); _readableProperties = false; } @@ -250,136 +250,136 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public boolean propertyExists(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().propertyExists(propertyName); + return getMessageHeaders().getJMSHeaders().propertyExists(propertyName); } public boolean getBooleanProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getBoolean(propertyName); + return getMessageHeaders().getJMSHeaders().getBoolean(propertyName); } public byte getByteProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getByte(propertyName); + return getMessageHeaders().getJMSHeaders().getByte(propertyName); } public short getShortProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getShort(propertyName); + return getMessageHeaders().getJMSHeaders().getShort(propertyName); } public int getIntProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getInteger(propertyName); + return getMessageHeaders().getJMSHeaders().getInteger(propertyName); } public long getLongProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getLong(propertyName); + return getMessageHeaders().getJMSHeaders().getLong(propertyName); } public float getFloatProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getFloat(propertyName); + return getMessageHeaders().getJMSHeaders().getFloat(propertyName); } public double getDoubleProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getDouble(propertyName); + return getMessageHeaders().getJMSHeaders().getDouble(propertyName); } public String getStringProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getString(propertyName); + return getMessageHeaders().getJMSHeaders().getString(propertyName); } public Object getObjectProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - return getJmsContentHeaderProperties().getJMSHeaders().getObject(propertyName); + return getMessageHeaders().getJMSHeaders().getObject(propertyName); } public Enumeration getPropertyNames() throws JMSException { - return getJmsContentHeaderProperties().getJMSHeaders().getPropertyNames(); + return getMessageHeaders().getJMSHeaders().getPropertyNames(); } public void setBooleanProperty(String propertyName, boolean b) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setBoolean(propertyName, b); + getMessageHeaders().getJMSHeaders().setBoolean(propertyName, b); } public void setByteProperty(String propertyName, byte b) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setByte(propertyName, new Byte(b)); + getMessageHeaders().getJMSHeaders().setByte(propertyName, new Byte(b)); } public void setShortProperty(String propertyName, short i) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setShort(propertyName, new Short(i)); + getMessageHeaders().getJMSHeaders().setShort(propertyName, new Short(i)); } public void setIntProperty(String propertyName, int i) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setInteger(propertyName, new Integer(i)); + getMessageHeaders().getJMSHeaders().setInteger(propertyName, new Integer(i)); } public void setLongProperty(String propertyName, long l) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setLong(propertyName, new Long(l)); + getMessageHeaders().getJMSHeaders().setLong(propertyName, new Long(l)); } public void setFloatProperty(String propertyName, float f) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setFloat(propertyName, new Float(f)); + getMessageHeaders().getJMSHeaders().setFloat(propertyName, new Float(f)); } public void setDoubleProperty(String propertyName, double v) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setDouble(propertyName, new Double(v)); + getMessageHeaders().getJMSHeaders().setDouble(propertyName, new Double(v)); } public void setStringProperty(String propertyName, String value) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setString(propertyName, value); + getMessageHeaders().getJMSHeaders().setString(propertyName, value); } public void setObjectProperty(String propertyName, Object object) throws JMSException { checkWritableProperties(); checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().setObject(propertyName, object); + getMessageHeaders().getJMSHeaders().setObject(propertyName, object); } protected void removeProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); - getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName); + getMessageHeaders().getJMSHeaders().remove(propertyName); } public void acknowledgeThis() throws JMSException @@ -436,13 +436,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); buf.append("\nAMQ message number: ").append(_deliveryTag); buf.append("\nProperties:"); - if (getJmsContentHeaderProperties().getJMSHeaders().isEmpty()) + if (getMessageHeaders().getJMSHeaders().isEmpty()) { buf.append("<NONE>"); } else { - buf.append('\n').append(getJmsContentHeaderProperties().getJMSHeaders()); + buf.append('\n').append(getMessageHeaders().getJMSHeaders()); } return buf.toString(); } @@ -455,7 +455,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties) { - getJmsContentHeaderProperties().setHeaders(messageProperties); + getMessageHeaders().setJMSHeaders(messageProperties); } private void checkPropertyName(String propertyName) @@ -470,9 +470,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } - public BasicContentHeaderProperties getJmsContentHeaderProperties() + public MessageHeaders getMessageHeaders() { - return (BasicContentHeaderProperties) _contentHeaderProperties; + return (MessageHeaders) _messageHeaders; } public ByteBuffer getData() diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index d769300c69..b23fcfb2bb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -20,18 +20,17 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MessageFormatException; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetEncoder; -import java.nio.charset.CharsetDecoder; -import java.nio.CharBuffer; + +import org.apache.mina.common.ByteBuffer; public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage { @@ -53,11 +52,11 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag super(data); // this instanties a content header } - JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + /* JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException { super(messageNbr, contentHeader, data); - } + }*/ public void reset() { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index 88e78a1dad..6fb944dd06 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -20,16 +20,17 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; +import java.nio.charset.CharacterCodingException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; import javax.jms.JMSException; import javax.jms.MessageFormatException; -import java.util.*; -import java.nio.charset.Charset; -import java.nio.charset.CharacterCodingException; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage { @@ -51,24 +52,6 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm populateMapFromData(); } - - JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) - throws AMQException - { - super(messageNbr, contentHeader, data); - try - { - populateMapFromData(); - } - catch (JMSException je) - { - throw new AMQException("Error populating MapMessage from ByteBuffer", je); - - } - - } - - public String toBodyString() throws JMSException { return _map.toString(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 35c5377f14..6e432f995b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -20,17 +20,19 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; import javax.jms.JMSException; import javax.jms.MessageFormatException; import javax.jms.ObjectMessage; -import java.io.*; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; + +import org.apache.mina.common.ByteBuffer; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { @@ -54,16 +56,16 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); _data.setAutoExpand(true); } - getJmsContentHeaderProperties().setContentType(MIME_TYPE); + getMessageHeaders().setContentType(MIME_TYPE); } /** * Creates read only message for delivery to consumers */ - JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException + /* JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException { super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); - } + }*/ public void clearBodyImpl() throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 972a5fc8bf..327603092e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -20,13 +20,10 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; +import javax.jms.JMSException; +import javax.jms.StreamMessage; -import javax.jms.*; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; +import org.apache.mina.common.ByteBuffer; /** * @author Apache Software Foundation @@ -59,11 +56,11 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea } - JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + /* JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException { super(messageNbr, contentHeader, data); - } + }*/ public void reset() { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index d8394b0489..530b6be970 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -20,14 +20,13 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.AMQException; -import org.apache.mina.common.ByteBuffer; - -import javax.jms.JMSException; import java.io.UnsupportedEncodingException; -import java.nio.charset.Charset; import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage { @@ -48,17 +47,17 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text JMSTextMessage(ByteBuffer data, String encoding) throws JMSException { super(data); // this instantiates a content header - getJmsContentHeaderProperties().setContentType(MIME_TYPE); - getJmsContentHeaderProperties().setEncoding(encoding); + getMessageHeaders().setContentType(MIME_TYPE); + getMessageHeaders().setEncoding(encoding); } - JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) + /* JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException { super(deliveryTag, contentHeader, data); contentHeader.setContentType(MIME_TYPE); _data = data; - } + }*/ JMSTextMessage(ByteBuffer data) throws JMSException { @@ -109,13 +108,13 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text _data.limit(text.length()) ; //_data.sweep(); _data.setAutoExpand(true); - if (getJmsContentHeaderProperties().getEncoding() == null) + if (getMessageHeaders().getEncoding() == null) { _data.put(text.getBytes()); } else { - _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding())); + _data.put(text.getBytes(getMessageHeaders().getEncoding())); } _changedData=true; } @@ -147,11 +146,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { return null; } - if (getJmsContentHeaderProperties().getEncoding() != null) + if (getMessageHeaders().getEncoding() != null) { try { - _decodedValue = _data.getString(Charset.forName(getJmsContentHeaderProperties().getEncoding()).newDecoder()); + _decodedValue = _data.getString(Charset.forName(getMessageHeaders().getEncoding()).newDecoder()); } catch (CharacterCodingException e) { |
