diff options
| author | Robert Greig <rgreig@apache.org> | 2006-12-09 14:50:26 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2006-12-09 14:50:26 +0000 |
| commit | f616a17577e96442ec43de0afe87cd3e0704ee3b (patch) | |
| tree | 3dd42752e3e08ad3ee2d0cf9f765407bb4c5b6e5 /java/client/src/main | |
| parent | 92a90067b4ea4240a0303100f46130f0ad612898 (diff) | |
| download | qpid-python-f616a17577e96442ec43de0afe87cd3e0704ee3b.tar.gz | |
QPID-102 Addition of StreamMessage support
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@484987 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
6 files changed, 769 insertions, 101 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3f0aee23a0..8f90913e5c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.*; @@ -367,13 +368,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public StreamMessage createStreamMessage() throws JMSException { - checkNotClosed(); - throw new UnsupportedOperationException("Stream messages not supported"); + synchronized (_connection.getFailoverMutex()) + { + checkNotClosed(); + + try + { + return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE); + } + catch (AMQException e) + { + throw new JMSException("Unable to create text message: " + e); + } + } } public TextMessage createTextMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java new file mode 100644 index 0000000000..77b3bd7566 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -0,0 +1,147 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageEOFException; +import javax.jms.MessageNotWriteableException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.CharacterCodingException; + +/** + * @author Apache Software Foundation + */ +public abstract class AbstractBytesMessage extends AbstractJMSMessage +{ + + /** + * The default initial size of the buffer. The buffer expands automatically. + */ + private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; + + AbstractBytesMessage() + { + this(null); + } + + /** + * Construct a bytes message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + AbstractBytesMessage(ByteBuffer data) + { + super(data); // this instanties a content header + getJmsContentHeaderProperties().setContentType(getMimeType()); + + if (_data == null) + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); + _data.setAutoExpand(true); + } + } + + AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + throws AMQException + { + // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea + super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); + getJmsContentHeaderProperties().setContentType(getMimeType()); + } + + public void clearBodyImpl() throws JMSException + { + _data.clear(); + } + + public String toBodyString() throws JMSException + { + checkReadable(); + try + { + return getText(); + } + catch (IOException e) + { + throw new JMSException(e.toString()); + } + } + + /** + * We reset the stream before and after reading the data. This means that toString() will always output + * the entire message and also that the caller can then immediately start reading as if toString() had + * never been called. + * + * @return + * @throws IOException + */ + private String getText() throws IOException + { + // this will use the default platform encoding + if (_data == null) + { + return null; + } + int pos = _data.position(); + _data.rewind(); + // one byte left is for the end of frame marker + if (_data.remaining() == 0) + { + // this is really redundant since pos must be zero + _data.position(pos); + return null; + } + else + { + String data = _data.getString(Charset.forName("UTF8").newDecoder()); + _data.position(pos); + return data; + } + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws javax.jms.MessageEOFException if there are less than len bytes available to read + */ + protected void checkAvailable(int len) throws MessageEOFException + { + if (_data.remaining() < len) + { + throw new MessageEOFException("Unable to read " + len + " bytes"); + } + } + + public void reset() throws JMSException + { + super.reset(); + _data.flip(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 6e1958e40a..456d4d520c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,29 +20,21 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.AMQException; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; +import javax.jms.BytesMessage; import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; +import javax.jms.MessageFormatException; import javax.jms.MessageEOFException; -import java.io.*; -import java.nio.charset.Charset; import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; -public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.BytesMessage +public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage { private static final String MIME_TYPE = "application/octet-stream"; - - /** - * The default initial size of the buffer. The buffer expands automatically. - */ - private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; - JMSBytesMessage() { this(null); @@ -57,71 +49,12 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt JMSBytesMessage(ByteBuffer data) { super(data); // this instanties a content header - getJmsContentHeaderProperties().setContentType(MIME_TYPE); - - if (_data == null) - { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); - _data.setAutoExpand(true); - } } JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException { - // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); - getJmsContentHeaderProperties().setContentType(MIME_TYPE); - } - - public void clearBodyImpl() throws JMSException - { - _data.clear(); - } - - public String toBodyString() throws JMSException - { - checkReadable(); - try - { - return getText(); - } - catch (IOException e) - { - throw new JMSException(e.toString()); - } - } - - /** - * We reset the stream before and after reading the data. This means that toString() will always output - * the entire message and also that the caller can then immediately start reading as if toString() had - * never been called. - * - * @return - * @throws IOException - */ - private String getText() throws IOException - { - // this will use the default platform encoding - if (_data == null) - { - return null; - } - int pos = _data.position(); - _data.rewind(); - // one byte left is for the end of frame marker - if (_data.remaining() == 0) - { - // this is really redundant since pos must be zero - _data.position(pos); - return null; - } - else - { - String data = _data.getString(Charset.forName("UTF8").newDecoder()); - _data.position(pos); - return data; - } + super(messageNbr, contentHeader, data); } public String getMimeType() @@ -135,21 +68,6 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt return _data.limit(); } - - /** - * Check that there is at least a certain number of bytes available to read - * - * @param len the number of bytes - * @throws MessageEOFException if there are less than len bytes available to read - */ - private void checkAvailable(int len) throws MessageEOFException - { - if (_data.remaining() < len) - { - throw new MessageEOFException("Unable to read " + len + " bytes"); - } - } - public boolean readBoolean() throws JMSException { checkReadable(); @@ -340,6 +258,8 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt try { _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must add the null terminator manually + _data.put((byte)0); } catch (CharacterCodingException e) { @@ -368,13 +288,51 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt { throw new NullPointerException("Argument must not be null"); } - _data.putObject(object); - } - - public void reset() throws JMSException - { - super.reset(); - _data.flip(); + Class clazz = object.getClass(); + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeUTF((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java new file mode 100644 index 0000000000..cc820a5623 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -0,0 +1,509 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; +import org.apache.mina.common.ByteBuffer; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.StreamMessage; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +/** + * @author Apache Software Foundation + */ +public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage +{ + public static final String MIME_TYPE="jms/stream-message"; + + private static final String[] _typeNames = { "boolean", + "byte", + "byte array", + "short", + "char", + "int", + "long", + "float", + "double", + "utf string" }; + + private static final byte BOOLEAN_TYPE = (byte) 1; + + private static final byte BYTE_TYPE = (byte) 2; + + private static final byte BYTEARRAY_TYPE = (byte) 3; + + private static final byte SHORT_TYPE = (byte) 4; + + private static final byte CHAR_TYPE = (byte) 5; + + private static final byte INT_TYPE = (byte) 6; + + private static final byte LONG_TYPE = (byte) 7; + + private static final byte FLOAT_TYPE = (byte) 8; + + private static final byte DOUBLE_TYPE = (byte) 9; + + private static final byte STRING_TYPE = (byte) 10; + + /** + * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read + * a byte array in multiple chunks, hence this is used to track how much is left to be read + */ + private int _byteArrayRemaining = -1; + + JMSStreamMessage() + { + this(null); + } + + /** + * Construct a stream message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + JMSStreamMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + + JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + throws AMQException + { + super(messageNbr, contentHeader, data); + } + + public String getMimeType() + { + return MIME_TYPE; + } + + private void readAndCheckType(byte type) throws MessageFormatException + { + if (_data.get() != type) + { + throw new MessageFormatException("Type " + _typeNames[type - 1] + " not found next in stream"); + } + } + + private void writeTypeDiscriminator(byte type) + { + _data.put(type); + } + + public boolean readBoolean() throws JMSException + { + checkReadable(); + checkAvailable(2); + readAndCheckType(BOOLEAN_TYPE); + return readBooleanImpl(); + } + + private boolean readBooleanImpl() + { + return _data.get() != 0; + } + + public byte readByte() throws JMSException + { + checkReadable(); + checkAvailable(2); + readAndCheckType(BYTE_TYPE); + return readByteImpl(); + } + + private byte readByteImpl() + { + return _data.get(); + } + + public short readShort() throws JMSException + { + checkReadable(); + checkAvailable(3); + readAndCheckType(SHORT_TYPE); + return readShortImpl(); + } + + private short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws JMSException + */ + public char readChar() throws JMSException + { + checkReadable(); + checkAvailable(3); + readAndCheckType(CHAR_TYPE); + return readCharImpl(); + } + + private char readCharImpl() + { + return _data.getChar(); + } + + public int readInt() throws JMSException + { + checkReadable(); + checkAvailable(5); + readAndCheckType(INT_TYPE); + return readIntImpl(); + } + + private int readIntImpl() + { + return _data.getInt(); + } + + public long readLong() throws JMSException + { + checkReadable(); + checkAvailable(9); + readAndCheckType(LONG_TYPE); + return readLongImpl(); + } + + private long readLongImpl() + { + return _data.getLong(); + } + + public float readFloat() throws JMSException + { + checkReadable(); + checkAvailable(5); + readAndCheckType(FLOAT_TYPE); + return readFloatImpl(); + } + + private float readFloatImpl() + { + return _data.getFloat(); + } + + public double readDouble() throws JMSException + { + checkReadable(); + checkAvailable(9); + readAndCheckType(DOUBLE_TYPE); + return readDoubleImpl(); + } + + private double readDoubleImpl() + { + return _data.getDouble(); + } + + public String readString() throws JMSException + { + checkReadable(); + // we check only for one byte plus the type byte since theoretically the string could be only a + // single byte when using UTF-8 encoding + checkAvailable(2); + readAndCheckType(STRING_TYPE); + return readStringImpl(); + } + + private String readStringImpl() throws JMSException + { + try + { + return _data.getString(Charset.forName("UTF-8").newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + je.setLinkedException(e); + throw je; + } + } + + public int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator plus array size + checkAvailable(5); + readAndCheckType(BYTEARRAY_TYPE); + int size = _data.getInt(); + // size of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new MessageEOFException("Byte array has stated size " + size + " but message only contains " + + _data.remaining() + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + + return readBytesImpl(bytes); + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + } + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + public Object readObject() throws JMSException + { + checkReadable(); + checkAvailable(1); + byte type = _data.get(); + Object result = null; + switch (type) + { + case BOOLEAN_TYPE: + result = readBooleanImpl(); + break; + case BYTE_TYPE: + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + result = new byte[size]; + readBytesImpl(new byte[size]); + } + break; + case SHORT_TYPE: + result = readShortImpl(); + break; + case CHAR_TYPE: + result = readCharImpl(); + break; + case INT_TYPE: + result = readIntImpl(); + break; + case LONG_TYPE: + result = readLongImpl(); + break; + case FLOAT_TYPE: + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + result = readDoubleImpl(); + break; + case STRING_TYPE: + result = readStringImpl(); + break; + } + return result; + } + + public void writeBoolean(boolean b) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(BOOLEAN_TYPE); + _data.put(b ? (byte) 1 : (byte) 0); + } + + public void writeByte(byte b) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(BYTE_TYPE); + _data.put(b); + } + + public void writeShort(short i) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(SHORT_TYPE); + _data.putShort(i); + } + + public void writeChar(char c) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(CHAR_TYPE); + _data.putChar(c); + } + + public void writeInt(int i) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(INT_TYPE); + _data.putInt(i); + } + + public void writeLong(long l) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(LONG_TYPE); + _data.putLong(l); + } + + public void writeFloat(float v) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(FLOAT_TYPE); + _data.putFloat(v); + } + + public void writeDouble(double v) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(DOUBLE_TYPE); + _data.putDouble(v); + } + + public void writeString(String string) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(STRING_TYPE); + try + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte)0); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } + } + + public void writeBytes(byte[] bytes) throws JMSException + { + checkWritable(); + writeBytes(bytes, 0, bytes == null?0:bytes.length); + } + + public void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(BYTEARRAY_TYPE); + if (bytes == null) + { + _data.putInt(-1); + } + else + { + _data.putInt(length); + _data.put(bytes, offset, length); + } + } + + public void writeObject(Object object) throws JMSException + { + checkWritable(); + if (object == null) + { + throw new NullPointerException("Argument must not be null"); + } + Class clazz = object.getClass(); + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java new file mode 100644 index 0000000000..aae9f0cdb2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; + +public class JMSStreamMessageFactory extends AbstractJMSMessageFactory +{ + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws + AMQException + { + return new JMSStreamMessage(deliveryTag, contentHeader, data); + } + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSStreamMessage(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 31c9c2ed91..348988f06d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -103,6 +103,7 @@ public class MessageFactoryRegistry mf.registerFactory("text/xml", new JMSTextMessageFactory()); mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory()); mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); + mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); mf.registerFactory(null, new JMSBytesMessageFactory()); return mf; } |
