From ba09630a4258cded77842e1bd5d746b8fbda0cfe Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 17 Mar 2013 16:44:47 +0000 Subject: QPID-4000 : [Java Broker] Add conversion of 0-x messages to 1-0 subscriptions git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1457482 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/message/AMQMessage.java | 4 +- .../qpid/server/message/MessageMetaData_1_0.java | 17 + .../server/protocol/v1_0/Subscription_1_0.java | 561 ++++++++++++++--- .../qpid/client/message/JMSBytesMessage.java | 36 +- .../apache/qpid/client/message/JMSMapMessage.java | 45 +- .../qpid/client/message/JMSStreamMessage.java | 163 ++++- .../qpid/client/message/TypedBytesCodes.java | 46 -- .../client/message/TypedBytesContentReader.java | 674 --------------------- .../client/message/TypedBytesContentWriter.java | 370 ----------- .../apache/qpid/typedmessage/TypedBytesCodes.java | 46 ++ .../qpid/typedmessage/TypedBytesContentReader.java | 669 ++++++++++++++++++++ .../qpid/typedmessage/TypedBytesContentWriter.java | 366 +++++++++++ .../typedmessage/TypedBytesFormatException.java | 9 + 13 files changed, 1783 insertions(+), 1223 deletions(-) delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index 6a0e4d216e..439d7aa928 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -65,7 +65,7 @@ public class AMQMessage extends AbstractServerMessageImpl { this(handle, null); } - + public AMQMessage(StoredMessage handle, WeakReference channelRef) { super(handle); @@ -93,7 +93,7 @@ public class AMQMessage extends AbstractServerMessageImpl return getStoredMessage().getMetaData(); } - public ContentHeaderBody getContentHeaderBody() throws AMQException + public ContentHeaderBody getContentHeaderBody() { return getMessageMetaData().getContentHeaderBody(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java index e01f20d54f..1d8b239733 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.Symbol; @@ -59,6 +60,22 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData private MessageHeader_1_0 _messageHeader; + public MessageMetaData_1_0(List
sections, SectionEncoder encoder) + { + this(sections, encodeSections(sections, encoder)); + } + + private static ArrayList encodeSections(final List
sections, final SectionEncoder encoder) + { + ArrayList encodedSections = new ArrayList(sections.size()); + for(Section section : sections) + { + encoder.encodeObject(section); + encodedSections.add(encoder.getEncoding().asByteBuffer()); + encoder.reset(); + } + return encodedSections; + } public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index 8a3d3716c7..8bde913149 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.io.EOFException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -35,25 +38,46 @@ import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.DeliveryState; import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedByte; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.Modified; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesFormatException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData_1_0; +import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.codec.BBDecoder; class Subscription_1_0 implements Subscription { @@ -201,150 +225,489 @@ class Subscription_1_0 implements Subscription public void send(final QueueEntry queueEntry) throws AMQException { - //TODO ServerMessage serverMessage = queueEntry.getMessage(); + Message_1_0 message; if(serverMessage instanceof Message_1_0) { - Message_1_0 message = (Message_1_0) serverMessage; - Transfer transfer = new Transfer(); - //TODO + message = (Message_1_0) serverMessage; + } + else + { + if(serverMessage instanceof AMQMessage) + { + message = new Message_1_0(convert08Message((AMQMessage)serverMessage)); + } + else if(serverMessage instanceof MessageTransferMessage) + { + message = new Message_1_0(convert010Message((MessageTransferMessage)serverMessage)); + } + else + { + return; + } + } + + Transfer transfer = new Transfer(); + //TODO - List fragments = message.getFragments(); - ByteBuffer payload; - if(fragments.size() == 1) + List fragments = message.getFragments(); + ByteBuffer payload; + if(fragments.size() == 1) + { + payload = fragments.get(0); + } + else + { + int size = 0; + for(ByteBuffer fragment : fragments) { - payload = fragments.get(0); + size += fragment.remaining(); } - else + + payload = ByteBuffer.allocate(size); + + for(ByteBuffer fragment : fragments) { - int size = 0; - for(ByteBuffer fragment : fragments) - { - size += fragment.remaining(); - } + payload.put(fragment.duplicate()); + } + + payload.flip(); + } - payload = ByteBuffer.allocate(size); + if(queueEntry.getDeliveryCount() != 0) + { + payload = payload.duplicate(); + ValueHandler valueHandler = new ValueHandler(_typeRegistry); - for(ByteBuffer fragment : fragments) + Header oldHeader = null; + try + { + ByteBuffer encodedBuf = payload.duplicate(); + Object value = valueHandler.parse(payload); + if(value instanceof Header) { - payload.put(fragment.duplicate()); + oldHeader = (Header) value; + } + else + { + payload.position(0); } - - payload.flip(); + } + catch (AmqpErrorException e) + { + //TODO + throw new RuntimeException(e); } - if(queueEntry.getDeliveryCount() != 0) + Header header = new Header(); + if(oldHeader != null) { - payload = payload.duplicate(); - ValueHandler valueHandler = new ValueHandler(_typeRegistry); + header.setDurable(oldHeader.getDurable()); + header.setPriority(oldHeader.getPriority()); + header.setTtl(oldHeader.getTtl()); + } + header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); + _sectionEncoder.reset(); + _sectionEncoder.encodeObject(header); + Binary encodedHeader = _sectionEncoder.getEncoding(); + + ByteBuffer oldPayload = payload; + payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength()); + payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength()); + payload.put(oldPayload); + payload.flip(); + } - Header oldHeader = null; - try + transfer.setPayload(payload); + byte[] data = new byte[8]; + ByteBuffer.wrap(data).putLong(_deliveryTag++); + final Binary tag = new Binary(data); + + transfer.setDeliveryTag(tag); + + synchronized(_link.getLock()) + { + if(_link.isAttached()) + { + if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode())) { - ByteBuffer encodedBuf = payload.duplicate(); - Object value = valueHandler.parse(payload); - if(value instanceof Header) - { - oldHeader = (Header) value; - } - else - { - payload.position(0); - } + transfer.setSettled(true); } - catch (AmqpErrorException e) + else { - //TODO - throw new RuntimeException(e); + UnsettledAction action = _acquires + ? new DispositionAction(tag, queueEntry) + : new DoNothingAction(tag, queueEntry); + + _link.addUnsettled(tag, action, queueEntry); } - Header header = new Header(); - if(oldHeader != null) + if(_transactionId != null) { - header.setDurable(oldHeader.getDurable()); - header.setPriority(oldHeader.getPriority()); - header.setTtl(oldHeader.getTtl()); + TransactionalState state = new TransactionalState(); + state.setTxnId(_transactionId); + transfer.setState(state); } - header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); - _sectionEncoder.reset(); - _sectionEncoder.encodeObject(header); - Binary encodedHeader = _sectionEncoder.getEncoding(); + // TODO - need to deal with failure here + if(_acquires && _transactionId != null) + { + ServerTransaction txn = _link.getTransaction(_transactionId); + if(txn != null) + { + txn.addPostTransactionAction(new ServerTransaction.Action(){ + + public void postCommit() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void onRollback() + { + if(queueEntry.isAcquiredBy(Subscription_1_0.this)) + { + queueEntry.release(); + _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); + + + } + } + }); + } - ByteBuffer oldPayload = payload; - payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength()); - payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength()); - payload.put(oldPayload); - payload.flip(); + } + getSession().getConnectionModel().registerMessageDelivered(message.getSize()); + getEndpoint().transfer(transfer); } + else + { + queueEntry.release(); + } + } - transfer.setPayload(payload); - byte[] data = new byte[8]; - ByteBuffer.wrap(data).putLong(_deliveryTag++); - final Binary tag = new Binary(data); + } - transfer.setDeliveryTag(tag); + private StoredMessage convert010Message(final MessageTransferMessage serverMessage) + { + final MessageMetaData_1_0 metaData = convertMetaData(serverMessage); + + return convertServerMessage(metaData, serverMessage); + + } - synchronized(_link.getLock()) + private MessageMetaData_1_0 convertMetaData(final MessageTransferMessage serverMessage) + { + List
sections = new ArrayList
(3); + final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties(); + final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties(); + + Header header = new Header(); + if(deliveryProps != null) + { + header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT); + if(deliveryProps.hasPriority()) { - if(_link.isAttached()) + header.setPriority(UnsignedByte.valueOf((byte)deliveryProps.getPriority().getValue())); + } + if(deliveryProps.hasTtl()) + { + header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl())); + } + sections.add(header); + } + + Properties props = new Properties(); + if(msgProps != null) + { + // props.setAbsoluteExpiryTime(); + if(msgProps.hasContentEncoding()) + { + props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding())); + } + + if(msgProps.hasCorrelationId()) + { + props.setCorrelationId(msgProps.getCorrelationId()); + } + // props.setCreationTime(); + // props.setGroupId(); + // props.setGroupSequence(); + if(msgProps.hasMessageId()) + { + props.setMessageId(msgProps.getMessageId()); + } + if(msgProps.hasReplyTo()) + { + props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey()); + } + if(msgProps.hasContentType()) + { + props.setContentType(Symbol.valueOf(msgProps.getContentType())); + + // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client + if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) { - if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode())) - { - transfer.setSettled(true); - } - else - { - UnsettledAction action = _acquires - ? new DispositionAction(tag, queueEntry) - : new DoNothingAction(tag, queueEntry); + props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); + } + } + // props.setReplyToGroupId(); + props.setSubject(serverMessage.getRoutingKey()); + // props.setTo(); + if(msgProps.hasUserId()) + { + props.setUserId(new Binary(msgProps.getUserId())); + } - _link.addUnsettled(tag, action, queueEntry); - } + sections.add(props); - if(_transactionId != null) - { - TransactionalState state = new TransactionalState(); - state.setTxnId(_transactionId); - transfer.setState(state); - } - // TODO - need to deal with failure here - if(_acquires && _transactionId != null) - { - ServerTransaction txn = _link.getTransaction(_transactionId); - if(txn != null) - { - txn.addPostTransactionAction(new ServerTransaction.Action(){ + if(msgProps.getApplicationHeaders() != null) + { + sections.add(new ApplicationProperties(msgProps.getApplicationHeaders())); + } + } + return new MessageMetaData_1_0(sections, _sectionEncoder); + } - public void postCommit() - { - //To change body of implemented methods use File | Settings | File Templates. - } + private StoredMessage convert08Message(final AMQMessage serverMessage) + { + final MessageMetaData_1_0 metaData = convertMetaData(serverMessage); - public void onRollback() - { - if(queueEntry.isAcquiredBy(Subscription_1_0.this)) - { - queueEntry.release(); - _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); + return convertServerMessage(metaData, serverMessage); - } - } - }); - } + } - } + private StoredMessage convertServerMessage(final MessageMetaData_1_0 metaData, + final ServerMessage serverMessage) + { + final String mimeType = serverMessage.getMessageHeader().getMimeType(); + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getContent(ByteBuffer.wrap(data), 0); + + Section bodySection = convertMessageBody(mimeType, data); - getEndpoint().transfer(transfer); + final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection); + + return new StoredMessage() + { + @Override + public MessageMetaData_1_0 getMetaData() + { + return metaData; + } + + @Override + public long getMessageNumber() + { + return serverMessage.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + ByteBuffer buf = allData.duplicate(); + buf.position(offsetInMessage); + buf = buf.slice(); + int size; + if(dst.remaining() sections = new ArrayList
(3); + + Header header = new Header(); + + header.setDurable(serverMessage.isPersistent()); + + BasicContentHeaderProperties contentHeader = + (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties(); + + header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority())); + final long expiration = serverMessage.getExpiration(); + final long arrivalTime = serverMessage.getArrivalTime(); + + if(expiration > arrivalTime) + { + header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime)); + } + sections.add(header); + + + Properties props = new Properties(); + + props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString())); + + props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString())); + + // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client + if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) + { + props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); + } + + final AMQShortString correlationId = contentHeader.getCorrelationId(); + if(correlationId != null) + { + props.setCorrelationId(new Binary(correlationId.getBytes())); + } + // props.setCreationTime(); + // props.setGroupId(); + // props.setGroupSequence(); + final AMQShortString messageId = contentHeader.getMessageId(); + if(messageId != null) + { + props.setMessageId(new Binary(messageId.getBytes())); + } + props.setReplyTo(String.valueOf(contentHeader.getReplyTo())); + + // props.setReplyToGroupId(); + props.setSubject(serverMessage.getRoutingKey()); + // props.setTo(); + if(contentHeader.getUserId() != null) + { + props.setUserId(new Binary(contentHeader.getUserId().getBytes())); + } + sections.add(props); + + sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders()))); + return new MessageMetaData_1_0(sections, _sectionEncoder); } public void queueDeleted(final AMQQueue queue) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index b0320d0f4e..6ffa051ff8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -20,13 +20,16 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.AMQException; - +import java.io.EOFException; +import java.nio.ByteBuffer; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; -import java.nio.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.typedmessage.TypedBytesFormatException; public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage { @@ -100,7 +103,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM private void checkAvailable(final int i) throws MessageEOFException { - _typedBytesContentReader.checkAvailable(1); + try + { + _typedBytesContentReader.checkAvailable(1); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } } public byte readByte() throws JMSException @@ -178,7 +188,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM // we check only for one byte since theoretically the string could be only a // single byte when using UTF-8 encoding - return _typedBytesContentReader.readLengthPrefixedUTF(); + try + { + return _typedBytesContentReader.readLengthPrefixedUTF(); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public int readBytes(byte[] bytes) throws JMSException @@ -275,7 +292,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM public void writeUTF(String string) throws JMSException { checkWritable(); - _typedBytesContentWriter.writeLengthPrefixedUTF(string); + try + { + _typedBytesContentWriter.writeLengthPrefixedUTF(string); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public void writeBytes(byte[] bytes) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index e18ed80f6d..0b05179215 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -20,18 +20,21 @@ */ package org.apache.qpid.client.message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.AMQException; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; +import java.io.EOFException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import org.apache.qpid.AMQException; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.typedmessage.TypedBytesFormatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMessage { @@ -455,9 +458,22 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe final int entries = reader.readIntImpl(); for (int i = 0; i < entries; i++) { - String propName = reader.readStringImpl(); - Object value = reader.readObject(); - _map.put(propName, value); + String propName = null; + try + { + propName = reader.readStringImpl(); + Object value = reader.readObject(); + _map.put(propName, value); + + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } } } else @@ -477,7 +493,14 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe { writer.writeNullTerminatedStringImpl(entry.getKey()); - writer.writeObject(entry.getValue()); + try + { + writer.writeObject(entry.getValue()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } return writer.getData(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index b1af262580..223facbb59 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -20,11 +20,16 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.AMQException; - +import java.io.EOFException; +import java.nio.ByteBuffer; import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; import javax.jms.StreamMessage; -import java.nio.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.typedmessage.TypedBytesFormatException; /** * @author Apache Software Foundation @@ -95,20 +100,53 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public boolean readBoolean() throws JMSException { checkReadable(); - return _typedBytesContentReader.readBoolean(); + try + { + return _typedBytesContentReader.readBoolean(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public byte readByte() throws JMSException { checkReadable(); - return _typedBytesContentReader.readByte(); + try + { + return _typedBytesContentReader.readByte(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public short readShort() throws JMSException { checkReadable(); - return _typedBytesContentReader.readShort(); + try + { + return _typedBytesContentReader.readShort(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } /** @@ -120,37 +158,103 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public char readChar() throws JMSException { checkReadable(); - return _typedBytesContentReader.readChar(); + try + { + return _typedBytesContentReader.readChar(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public int readInt() throws JMSException { checkReadable(); - return _typedBytesContentReader.readInt(); + try + { + return _typedBytesContentReader.readInt(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public long readLong() throws JMSException { checkReadable(); - return _typedBytesContentReader.readLong(); + try + { + return _typedBytesContentReader.readLong(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public float readFloat() throws JMSException { checkReadable(); - return _typedBytesContentReader.readFloat(); + try + { + return _typedBytesContentReader.readFloat(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public double readDouble() throws JMSException { checkReadable(); - return _typedBytesContentReader.readDouble(); + try + { + return _typedBytesContentReader.readDouble(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public String readString() throws JMSException { checkReadable(); - return _typedBytesContentReader.readString(); + try + { + return _typedBytesContentReader.readString(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public int readBytes(byte[] bytes) throws JMSException @@ -161,14 +265,36 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea } checkReadable(); - return _typedBytesContentReader.readBytes(bytes); + try + { + return _typedBytesContentReader.readBytes(bytes); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public Object readObject() throws JMSException { checkReadable(); - return _typedBytesContentReader.readObject(); + try + { + return _typedBytesContentReader.readObject(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public void writeBoolean(boolean b) throws JMSException @@ -240,6 +366,13 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public void writeObject(Object object) throws JMSException { checkWritable(); - _typedBytesContentWriter.writeObject(object); + try + { + _typedBytesContentWriter.writeObject(object); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java deleted file mode 100644 index 26a0b41cdc..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -public interface TypedBytesCodes -{ - static final byte BOOLEAN_TYPE = (byte) 1; - - static final byte BYTE_TYPE = (byte) 2; - - static final byte BYTEARRAY_TYPE = (byte) 3; - - static final byte SHORT_TYPE = (byte) 4; - - static final byte CHAR_TYPE = (byte) 5; - - static final byte INT_TYPE = (byte) 6; - - static final byte LONG_TYPE = (byte) 7; - - static final byte FLOAT_TYPE = (byte) 8; - - static final byte DOUBLE_TYPE = (byte) 9; - - static final byte STRING_TYPE = (byte) 10; - - static final byte NULL_STRING_TYPE = (byte) 11; -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java deleted file mode 100644 index b00ac7e34b..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java +++ /dev/null @@ -1,674 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotReadableException; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; - -class TypedBytesContentReader implements TypedBytesCodes -{ - - private final ByteBuffer _data; - private final int _position; - private final int _limit; - - - private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); - - private final CharsetDecoder _charsetDecoder = UTF8_CHARSET.newDecoder(); - - private int _byteArrayRemaining = -1; - - - public TypedBytesContentReader(final ByteBuffer data) - { - _data = data.duplicate(); - _position = _data.position(); - _limit = _data.limit(); - } - - /** - * Check that there is at least a certain number of bytes available to read - * - * @param len the number of bytes - * @throws javax.jms.MessageEOFException if there are less than len bytes available to read - */ - protected void checkAvailable(int len) throws MessageEOFException - { - if (_data.remaining() < len) - { - throw new MessageEOFException("Unable to read " + len + " bytes"); - } - } - - protected byte readWireType() throws MessageFormatException, MessageEOFException, - MessageNotReadableException - { - checkAvailable(1); - return _data.get(); - } - - protected boolean readBoolean() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - boolean result; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - boolean readBooleanImpl() - { - return _data.get() != 0; - } - - protected byte readByte() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - byte result; - try - { - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - - byte readByteImpl() - { - return _data.get(); - } - - protected short readShort() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - short result; - try - { - switch (wireType) - { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - - short readShortImpl() - { - return _data.getShort(); - } - - /** - * Note that this method reads a unicode character as two bytes from the stream - * - * @return the character read from the stream - * @throws javax.jms.JMSException - */ - protected char readChar() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - try - { - if (wireType == NULL_STRING_TYPE) - { - throw new NullPointerException(); - } - - if (wireType != CHAR_TYPE) - { - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); - } - else - { - checkAvailable(2); - return readCharImpl(); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - char readCharImpl() - { - return _data.getChar(); - } - - protected int readInt() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - int result; - try - { - switch (wireType) - { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected int readIntImpl() - { - return _data.getInt(); - } - - protected long readLong() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - long result; - try - { - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - long readLongImpl() - { - return _data.getLong(); - } - - protected float readFloat() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - float result; - try - { - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - float readFloatImpl() - { - return _data.getFloat(); - } - - protected double readDouble() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - double result; - try - { - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - double readDoubleImpl() - { - return _data.getDouble(); - } - - protected String readString() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - String result; - try - { - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case NULL_STRING_TYPE: - result = null; - throw new NullPointerException("data is null"); - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected String readStringImpl() throws JMSException - { - try - { - _charsetDecoder.reset(); - ByteBuffer dup = _data.duplicate(); - int pos = _data.position(); - byte b; - while((b = _data.get()) != 0) {}; - dup.limit(_data.position()-1); - return _charsetDecoder.decode(dup).toString(); - - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } - - protected int readBytes(byte[] bytes) throws JMSException - { - if (bytes == null) - { - throw new IllegalArgumentException("byte array must not be null"); - } - // first call - if (_byteArrayRemaining == -1) - { - // type discriminator checked separately so you get a MessageFormatException rather than - // an EOF even in the case where both would be applicable - checkAvailable(1); - byte wireType = readWireType(); - if (wireType != BYTEARRAY_TYPE) - { - throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); - } - checkAvailable(4); - int size = _data.getInt(); - // length of -1 indicates null - if (size == -1) - { - return -1; - } - else - { - if (size > _data.remaining()) - { - throw new MessageEOFException("Byte array has stated length " - + size - + " but message only contains " - + - _data.remaining() - + " bytes"); - } - else - { - _byteArrayRemaining = size; - } - } - } - else if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - return -1; - } - - int returnedSize = readBytesImpl(bytes); - if (returnedSize < bytes.length) - { - _byteArrayRemaining = -1; - } - return returnedSize; - } - - private int readBytesImpl(byte[] bytes) - { - int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); - _byteArrayRemaining -= count; - - if (count == 0) - { - return 0; - } - else - { - _data.get(bytes, 0, count); - return count; - } - } - - protected Object readObject() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - Object result = null; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { - result = null; - } - else - { - _byteArrayRemaining = size; - byte[] bytesResult = new byte[size]; - readBytesImpl(bytesResult); - result = bytesResult; - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case NULL_STRING_TYPE: - result = null; - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - public void reset() - { - _byteArrayRemaining = -1; - _data.position(_position); - _data.limit(_limit); - } - - public ByteBuffer getData() - { - ByteBuffer buf = _data.duplicate(); - buf.position(_position); - buf.limit(_limit); - return buf; - } - - public long size() - { - return _limit - _position; - } - - public int remaining() - { - return _data.remaining(); - } - - public void readRawBytes(final byte[] bytes, final int offset, final int count) - { - _data.get(bytes, offset, count); - } - - public String readLengthPrefixedUTF() throws JMSException - { - try - { - short length = readShortImpl(); - if(length == 0) - { - return ""; - } - else - { - _charsetDecoder.reset(); - ByteBuffer encodedString = _data.slice(); - encodedString.limit(length); - _data.position(_data.position()+length); - CharBuffer string = _charsetDecoder.decode(encodedString); - - return string.toString(); - } - } - catch(CharacterCodingException e) - { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java deleted file mode 100644 index 7c91db3a32..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java +++ /dev/null @@ -1,370 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetEncoder; - -class TypedBytesContentWriter implements TypedBytesCodes -{ - private final ByteArrayOutputStream _baos = new ByteArrayOutputStream(); - private final DataOutputStream _data = new DataOutputStream(_baos); - private static final Charset UTF8 = Charset.forName("UTF-8"); - - protected void writeTypeDiscriminator(byte type) throws JMSException - { - try - { - _data.writeByte(type); - } - catch (IOException e) - { - throw handle(e); - } - } - - private JMSException handle(final IOException e) - { - JMSException jmsEx = new JMSException("Unable to write value: " + e.getMessage()); - jmsEx.setLinkedException(e); - return jmsEx; - } - - - protected void writeBoolean(boolean b) throws JMSException - { - writeTypeDiscriminator(BOOLEAN_TYPE); - writeBooleanImpl(b); - } - - public void writeBooleanImpl(final boolean b) throws JMSException - { - try - { - _data.writeByte(b ? (byte) 1 : (byte) 0); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeByte(byte b) throws JMSException - { - writeTypeDiscriminator(BYTE_TYPE); - writeByteImpl(b); - } - - public void writeByteImpl(final byte b) throws JMSException - { - try - { - _data.writeByte(b); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeShort(short i) throws JMSException - { - writeTypeDiscriminator(SHORT_TYPE); - writeShortImpl(i); - } - - public void writeShortImpl(final short i) throws JMSException - { - try - { - _data.writeShort(i); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeChar(char c) throws JMSException - { - writeTypeDiscriminator(CHAR_TYPE); - writeCharImpl(c); - } - - public void writeCharImpl(final char c) throws JMSException - { - try - { - _data.writeChar(c); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeInt(int i) throws JMSException - { - writeTypeDiscriminator(INT_TYPE); - writeIntImpl(i); - } - - protected void writeIntImpl(int i) throws JMSException - { - try - { - _data.writeInt(i); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeLong(long l) throws JMSException - { - writeTypeDiscriminator(LONG_TYPE); - writeLongImpl(l); - } - - public void writeLongImpl(final long l) throws JMSException - { - try - { - _data.writeLong(l); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeFloat(float v) throws JMSException - { - writeTypeDiscriminator(FLOAT_TYPE); - writeFloatImpl(v); - } - - public void writeFloatImpl(final float v) throws JMSException - { - try - { - _data.writeFloat(v); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeDouble(double v) throws JMSException - { - writeTypeDiscriminator(DOUBLE_TYPE); - writeDoubleImpl(v); - } - - public void writeDoubleImpl(final double v) throws JMSException - { - try - { - _data.writeDouble(v); - } - catch (IOException e) - { - throw handle(e); - } - } - - protected void writeString(String string) throws JMSException - { - if (string == null) - { - writeTypeDiscriminator(NULL_STRING_TYPE); - } - else - { - writeTypeDiscriminator(STRING_TYPE); - writeNullTerminatedStringImpl(string); - } - } - - protected void writeNullTerminatedStringImpl(String string) - throws JMSException - { - try - { - _data.write(string.getBytes(UTF8)); - _data.writeByte((byte) 0); - } - catch (IOException e) - { - throw handle(e); - } - - } - - protected void writeBytes(byte[] bytes) throws JMSException - { - writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); - } - - protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException - { - writeTypeDiscriminator(BYTEARRAY_TYPE); - writeBytesImpl(bytes, offset, length); - } - - public void writeBytesImpl(final byte[] bytes, final int offset, final int length) throws JMSException - { - try - { - if (bytes == null) - { - _data.writeInt(-1); - } - else - { - _data.writeInt(length); - _data.write(bytes, offset, length); - } - } - catch (IOException e) - { - throw handle(e); - } - } - - public void writeBytesRaw(final byte[] bytes, final int offset, final int length) throws JMSException - { - try - { - if (bytes != null) - { - _data.write(bytes, offset, length); - } - } - catch (IOException e) - { - throw handle(e); - } - } - - - protected void writeObject(Object object) throws JMSException - { - Class clazz; - - if (object == null) - { - // string handles the output of null values - clazz = String.class; - } - else - { - clazz = object.getClass(); - } - - if (clazz == Byte.class) - { - writeByte((Byte) object); - } - else if (clazz == Boolean.class) - { - writeBoolean((Boolean) object); - } - else if (clazz == byte[].class) - { - writeBytes((byte[]) object); - } - else if (clazz == Short.class) - { - writeShort((Short) object); - } - else if (clazz == Character.class) - { - writeChar((Character) object); - } - else if (clazz == Integer.class) - { - writeInt((Integer) object); - } - else if (clazz == Long.class) - { - writeLong((Long) object); - } - else if (clazz == Float.class) - { - writeFloat((Float) object); - } - else if (clazz == Double.class) - { - writeDouble((Double) object); - } - else if (clazz == String.class) - { - writeString((String) object); - } - else - { - throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); - } - } - - public ByteBuffer getData() - { - return ByteBuffer.wrap(_baos.toByteArray()); - } - - public void writeLengthPrefixedUTF(final String string) throws JMSException - { - try - { - CharsetEncoder encoder = UTF8.newEncoder(); - java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); - - writeShortImpl((short) encodedString.limit()); - while(encodedString.hasRemaining()) - { - _data.writeByte(encodedString.get()); - } - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Unable to encode string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - catch (IOException e) - { - throw handle(e); - } - - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java new file mode 100644 index 0000000000..0e12ac65d8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.typedmessage; + +public interface TypedBytesCodes +{ + static final byte BOOLEAN_TYPE = (byte) 1; + + static final byte BYTE_TYPE = (byte) 2; + + static final byte BYTEARRAY_TYPE = (byte) 3; + + static final byte SHORT_TYPE = (byte) 4; + + static final byte CHAR_TYPE = (byte) 5; + + static final byte INT_TYPE = (byte) 6; + + static final byte LONG_TYPE = (byte) 7; + + static final byte FLOAT_TYPE = (byte) 8; + + static final byte DOUBLE_TYPE = (byte) 9; + + static final byte STRING_TYPE = (byte) 10; + + static final byte NULL_STRING_TYPE = (byte) 11; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java new file mode 100644 index 0000000000..0ba865f1e6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java @@ -0,0 +1,669 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.typedmessage; + + +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +public class TypedBytesContentReader implements TypedBytesCodes +{ + + private final ByteBuffer _data; + private final int _position; + private final int _limit; + + + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + private final CharsetDecoder _charsetDecoder = UTF8_CHARSET.newDecoder(); + + private int _byteArrayRemaining = -1; + + + public TypedBytesContentReader(final ByteBuffer data) + { + _data = data.duplicate(); + _position = _data.position(); + _limit = _data.limit(); + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws javax.jms.MessageEOFException if there are less than len bytes available to read + */ + public void checkAvailable(int len) throws EOFException + { + if (_data.remaining() < len) + { + throw new EOFException("Unable to read " + len + " bytes"); + } + } + + public byte readWireType() throws TypedBytesFormatException, EOFException + { + checkAvailable(1); + return _data.get(); + } + + public boolean readBoolean() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + boolean result; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public boolean readBooleanImpl() + { + return _data.get() != 0; + } + + public byte readByte() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + byte result; + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + public byte readByteImpl() + { + return _data.get(); + } + + public short readShort() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + short result; + try + { + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + public short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws javax.jms.JMSException + */ + public char readChar() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + try + { + if (wireType == NULL_STRING_TYPE) + { + throw new NullPointerException(); + } + + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public char readCharImpl() + { + return _data.getChar(); + } + + public int readInt() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + int result; + try + { + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public int readIntImpl() + { + return _data.getInt(); + } + + public long readLong() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + long result; + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public long readLongImpl() + { + return _data.getLong(); + } + + public float readFloat() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + float result; + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public float readFloatImpl() + { + return _data.getFloat(); + } + + public double readDouble() throws TypedBytesFormatException, EOFException + { + int position = _data.position(); + byte wireType = readWireType(); + double result; + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public double readDoubleImpl() + { + return _data.getDouble(); + } + + public String readString() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + String result; + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + throw new NullPointerException("data is null"); + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public String readStringImpl() throws TypedBytesFormatException + { + try + { + _charsetDecoder.reset(); + ByteBuffer dup = _data.duplicate(); + int pos = _data.position(); + byte b; + while((b = _data.get()) != 0) {}; + dup.limit(_data.position()-1); + return _charsetDecoder.decode(dup).toString(); + + } + catch (CharacterCodingException e) + { + TypedBytesFormatException jmse = new TypedBytesFormatException("Error decoding byte stream as a UTF8 string: " + e); + jmse.initCause(e); + throw jmse; + } + } + + public int readBytes(byte[] bytes) throws EOFException, TypedBytesFormatException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readWireType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); + int size = _data.getInt(); + // length of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new EOFException("Byte array has stated length " + + size + + " but message only contains " + + + _data.remaining() + + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } + + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + public Object readObject() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + Object result = null; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + byte[] bytesResult = new byte[size]; + readBytesImpl(bytesResult); + result = bytesResult; + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public void reset() + { + _byteArrayRemaining = -1; + _data.position(_position); + _data.limit(_limit); + } + + public ByteBuffer getData() + { + ByteBuffer buf = _data.duplicate(); + buf.position(_position); + buf.limit(_limit); + return buf; + } + + public long size() + { + return _limit - _position; + } + + public int remaining() + { + return _data.remaining(); + } + + public void readRawBytes(final byte[] bytes, final int offset, final int count) + { + _data.get(bytes, offset, count); + } + + public String readLengthPrefixedUTF() throws TypedBytesFormatException + { + try + { + short length = readShortImpl(); + if(length == 0) + { + return ""; + } + else + { + _charsetDecoder.reset(); + ByteBuffer encodedString = _data.slice(); + encodedString.limit(length); + _data.position(_data.position()+length); + CharBuffer string = _charsetDecoder.decode(encodedString); + + return string.toString(); + } + } + catch(CharacterCodingException e) + { + TypedBytesFormatException jmse = new TypedBytesFormatException("Error decoding byte stream as a UTF8 string: " + e); + jmse.initCause(e); + throw jmse; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java new file mode 100644 index 0000000000..c7ca2d7df7 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java @@ -0,0 +1,366 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.typedmessage; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; + +public class TypedBytesContentWriter implements TypedBytesCodes +{ + private final ByteArrayOutputStream _baos = new ByteArrayOutputStream(); + private final DataOutputStream _data = new DataOutputStream(_baos); + private static final Charset UTF8 = Charset.forName("UTF-8"); + + protected void writeTypeDiscriminator(byte type) + { + try + { + _data.writeByte(type); + } + catch (IOException e) + { + throw handle(e); + } + } + + private RuntimeException handle(final IOException e) + { + RuntimeException jmsEx = new RuntimeException("Unable to write value: " + e.getMessage()); + return jmsEx; + } + + + public void writeBoolean(boolean b) + { + writeTypeDiscriminator(BOOLEAN_TYPE); + writeBooleanImpl(b); + } + + public void writeBooleanImpl(final boolean b) + { + try + { + _data.writeByte(b ? (byte) 1 : (byte) 0); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeByte(byte b) + { + writeTypeDiscriminator(BYTE_TYPE); + writeByteImpl(b); + } + + public void writeByteImpl(final byte b) + { + try + { + _data.writeByte(b); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeShort(short i) + { + writeTypeDiscriminator(SHORT_TYPE); + writeShortImpl(i); + } + + public void writeShortImpl(final short i) + { + try + { + _data.writeShort(i); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeChar(char c) + { + writeTypeDiscriminator(CHAR_TYPE); + writeCharImpl(c); + } + + public void writeCharImpl(final char c) + { + try + { + _data.writeChar(c); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeInt(int i) + { + writeTypeDiscriminator(INT_TYPE); + writeIntImpl(i); + } + + public void writeIntImpl(int i) + { + try + { + _data.writeInt(i); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeLong(long l) + { + writeTypeDiscriminator(LONG_TYPE); + writeLongImpl(l); + } + + public void writeLongImpl(final long l) + { + try + { + _data.writeLong(l); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeFloat(float v) + { + writeTypeDiscriminator(FLOAT_TYPE); + writeFloatImpl(v); + } + + public void writeFloatImpl(final float v) + { + try + { + _data.writeFloat(v); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeDouble(double v) + { + writeTypeDiscriminator(DOUBLE_TYPE); + writeDoubleImpl(v); + } + + public void writeDoubleImpl(final double v) + { + try + { + _data.writeDouble(v); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeString(String string) + { + if (string == null) + { + writeTypeDiscriminator(NULL_STRING_TYPE); + } + else + { + writeTypeDiscriminator(STRING_TYPE); + writeNullTerminatedStringImpl(string); + } + } + + public void writeNullTerminatedStringImpl(String string) + + { + try + { + _data.write(string.getBytes(UTF8)); + _data.writeByte((byte) 0); + } + catch (IOException e) + { + throw handle(e); + } + + } + + public void writeBytes(byte[] bytes) + { + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + } + + public void writeBytes(byte[] bytes, int offset, int length) + { + writeTypeDiscriminator(BYTEARRAY_TYPE); + writeBytesImpl(bytes, offset, length); + } + + public void writeBytesImpl(final byte[] bytes, final int offset, final int length) + { + try + { + if (bytes == null) + { + _data.writeInt(-1); + } + else + { + _data.writeInt(length); + _data.write(bytes, offset, length); + } + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeBytesRaw(final byte[] bytes, final int offset, final int length) + { + try + { + if (bytes != null) + { + _data.write(bytes, offset, length); + } + } + catch (IOException e) + { + throw handle(e); + } + } + + + public void writeObject(Object object) throws TypedBytesFormatException + { + Class clazz; + + if (object == null) + { + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); + } + + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new TypedBytesFormatException("Only primitives plus byte arrays and String are valid types"); + } + } + + public ByteBuffer getData() + { + return ByteBuffer.wrap(_baos.toByteArray()); + } + + public void writeLengthPrefixedUTF(final String string) throws TypedBytesFormatException + { + try + { + CharsetEncoder encoder = UTF8.newEncoder(); + java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); + + writeShortImpl((short) encodedString.limit()); + while(encodedString.hasRemaining()) + { + _data.writeByte(encodedString.get()); + } + } + catch (CharacterCodingException e) + { + TypedBytesFormatException jmse = new TypedBytesFormatException("Unable to encode string: " + e); + jmse.initCause(e); + throw jmse; + } + catch (IOException e) + { + throw handle(e); + } + + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java new file mode 100644 index 0000000000..95e7ea0acc --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java @@ -0,0 +1,9 @@ +package org.apache.qpid.typedmessage; + +public class TypedBytesFormatException extends Exception +{ + public TypedBytesFormatException(String s) + { + super(s); + } +} -- cgit v1.2.1