diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2011-09-09 17:47:22 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2011-09-09 17:47:22 +0000 |
| commit | 5270591c7831e559925e720c6bfc0c78c514b95a (patch) | |
| tree | 8ce7aabb2f5a4a87c4c53b3e8f810c62392eba11 /java/broker/src | |
| parent | 282e16aab532d842dffad3935bfd1a952bc584be (diff) | |
| download | qpid-python-5270591c7831e559925e720c6bfc0c78c514b95a.tar.gz | |
QPID-2627 : Remove dependency on MINA
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1167311 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
16 files changed, 556 insertions, 297 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java index 0e3a3894fe..d76b163fa1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java @@ -274,132 +274,6 @@ public class HeadersParser } - public static void main(String[] args) throws AMQFrameDecodingException - { - - FieldTable bindingTable = new FieldTable(); - - bindingTable.setString(new AMQShortString("x-match"),"all"); - bindingTable.setInteger("a",1); - bindingTable.setVoid(new AMQShortString("b")); - bindingTable.setString("c",""); - bindingTable.setInteger("d",4); - bindingTable.setInteger("e",1); - - - - FieldTable bindingTable2 = new FieldTable(); - bindingTable2.setString(new AMQShortString("x-match"),"all"); - bindingTable2.setInteger("a",1); - bindingTable2.setVoid(new AMQShortString("b")); - bindingTable2.setString("c",""); - bindingTable2.setInteger("d",4); - bindingTable2.setInteger("e",1); - bindingTable2.setInteger("f",1); - - - FieldTable table = new FieldTable(); - table.setInteger("a",1); - table.setInteger("b",2); - table.setString("c",""); - table.setInteger("d",4); - table.setInteger("e",1); - table.setInteger("f",1); - table.setInteger("h",1); - table.setInteger("i",1); - table.setInteger("j",1); - table.setInteger("k",1); - table.setInteger("l",1); - - org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.allocate( (int) table.getEncodedSize()); - EncodingUtils.writeFieldTableBytes(buffer, table); - buffer.flip(); - - FieldTable table2 = EncodingUtils.readFieldTable(buffer); - - - - FieldTable bindingTable3 = new FieldTable(); - bindingTable3.setString(new AMQShortString("x-match"),"any"); - bindingTable3.setInteger("a",1); - bindingTable3.setInteger("b",3); - - - FieldTable bindingTable4 = new FieldTable(); - bindingTable4.setString(new AMQShortString("x-match"),"any"); - bindingTable4.setVoid(new AMQShortString("a")); - - - FieldTable bindingTable5 = new FieldTable(); - bindingTable5.setString(new AMQShortString("x-match"),"all"); - bindingTable5.setString(new AMQShortString("h"),"hello"); - - for(int i = 0; i < 100; i++) - { - printMatches(new FieldTable[] {bindingTable5} , table2); - } - - - - } - - - - private static void printMatches(final FieldTable[] bindingKeys, final FieldTable routingKey) - { - HeadersMatcherDFAState sm = null; - Map<HeaderMatcherResult, String> resultMap = new HashMap<HeaderMatcherResult, String>(); - - HeadersParser parser = new HeadersParser(); - - for(int i = 0; i < bindingKeys.length; i++) - { - HeaderMatcherResult r = new HeaderMatcherResult(); - resultMap.put(r, bindingKeys[i].toString()); - - - if(i==0) - { - sm = parser.createStateMachine(bindingKeys[i], r); - } - else - { - sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeys[i], r)); - } - } - - Collection<HeaderMatcherResult> results = null; - long beforeTime = System.currentTimeMillis(); - for(int i = 0; i < 1000000; i++) - { - routingKey.size(); - - assert sm != null; - results = sm.match(routingKey); - - } - long elapsed = System.currentTimeMillis() - beforeTime; - System.out.println("1000000 Iterations took: " + elapsed); - Collection<String> resultStrings = new ArrayList<String>(); - - assert results != null; - for(HeaderMatcherResult result : results) - { - resultStrings.add(resultMap.get(result)); - } - - final ArrayList<String> nonMatches = new ArrayList<String>(); - for(FieldTable key : bindingKeys) - { - nonMatches.add(key.toString()); - } - nonMatches.removeAll(resultStrings); - System.out.println("\""+routingKey+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches); - - - } - - public final static class KeyValuePair { public final HeaderKey _key; diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java index 66cb7ed83b..5992e42fb7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java +++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java @@ -29,7 +29,10 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.MessageMetaDataType; import org.apache.qpid.AMQException; +import org.apache.qpid.server.util.ByteBufferInputStream; +import org.apache.qpid.server.util.ByteBufferOutputStream; +import java.io.*; import java.nio.ByteBuffer; import java.util.Set; @@ -120,38 +123,38 @@ public class MessageMetaData implements StorableMessageMetaData return size; } + public int writeToBuffer(int offset, ByteBuffer dest) { - ByteBuffer src = ByteBuffer.allocate((int)getStorableSize()); - - org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(src); - EncodingUtils.writeInteger(minaSrc, _contentHeaderBody.getSize()); - _contentHeaderBody.writePayload(minaSrc); - EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getExchange()); - EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getRoutingKey()); - byte flags = 0; - if(_messagePublishInfo.isMandatory()) - { - flags |= MANDATORY_FLAG; - } - if(_messagePublishInfo.isImmediate()) + int oldPosition = dest.position(); + try { - flags |= IMMEDIATE_FLAG; + + DataOutputStream dataOutputStream = new DataOutputStream(new ByteBufferOutputStream(dest)); + EncodingUtils.writeInteger(dataOutputStream, _contentHeaderBody.getSize()); + _contentHeaderBody.writePayload(dataOutputStream); + EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getExchange()); + EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getRoutingKey()); + byte flags = 0; + if(_messagePublishInfo.isMandatory()) + { + flags |= MANDATORY_FLAG; + } + if(_messagePublishInfo.isImmediate()) + { + flags |= IMMEDIATE_FLAG; + } + dest.put(flags); + dest.putLong(_arrivalTime); + } - EncodingUtils.writeByte(minaSrc, flags); - EncodingUtils.writeLong(minaSrc,_arrivalTime); - src.position(minaSrc.position()); - src.flip(); - src.position(offset); - src = src.slice(); - if(dest.remaining() < src.limit()) + catch (IOException e) { - src.limit(dest.remaining()); + // This shouldn't happen as we are not actually using anything that can throw an IO Exception + throw new RuntimeException(e); } - dest.put(src); - - return src.limit(); + return dest.position()-oldPosition; } public int getContentSize() @@ -173,14 +176,15 @@ public class MessageMetaData implements StorableMessageMetaData { try { - org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(buf); - int size = EncodingUtils.readInteger(minaSrc); - ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(minaSrc, size); - final AMQShortString exchange = EncodingUtils.readAMQShortString(minaSrc); - final AMQShortString routingKey = EncodingUtils.readAMQShortString(minaSrc); + ByteBufferInputStream bbis = new ByteBufferInputStream(buf); + DataInputStream dais = new DataInputStream(bbis); + int size = EncodingUtils.readInteger(dais); + ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(dais, size); + final AMQShortString exchange = EncodingUtils.readAMQShortString(dais); + final AMQShortString routingKey = EncodingUtils.readAMQShortString(dais); - final byte flags = EncodingUtils.readByte(minaSrc); - long arrivalTime = EncodingUtils.readLong(minaSrc); + final byte flags = EncodingUtils.readByte(dais); + long arrivalTime = EncodingUtils.readLong(dais); MessagePublishInfo publishBody = new MessagePublishInfo() @@ -216,6 +220,10 @@ public class MessageMetaData implements StorableMessageMetaData { throw new RuntimeException(e); } + catch (IOException e) + { + throw new RuntimeException(e); + } } }; diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 2cebec373e..3970e5a2d4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -26,6 +26,7 @@ */
package org.apache.qpid.server.output.amqp0_8;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
@@ -34,22 +35,18 @@ import org.apache.qpid.server.output.HeaderPropertiesConverter; import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
-import java.nio.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER =
- METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
public static Factory getInstanceFactory()
{
return new Factory()
@@ -62,6 +59,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter };
}
+
private final AMQProtocolSession _protocolSession;
private ProtocolOutputConverterImpl(AMQProtocolSession session)
@@ -78,10 +76,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
}
+
private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
throws AMQException
{
@@ -93,65 +92,120 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter {
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
+ ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
return chb;
}
}
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
{
- AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
}
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver)
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
throws AMQException
{
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb);
-
+ int bodySize = (int) message.getSize();
- final int bodySize = (int) message.getSize();
if(bodySize == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
writeFrame(compositeBlock);
}
else
{
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- ByteBuffer buf = ByteBuffer.allocate(capacity);
- int writtenSize = 0;
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
}
+ }
+ }
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
}
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
}
- private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
throws AMQException
{
+
final AMQShortString exchangeName;
final AMQShortString routingKey;
@@ -172,21 +226,58 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final boolean isRedelivered = entry.isRedelivered();
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
- BasicDeliverBody deliverBody =
- METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame;
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
}
- private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
throws AMQException
{
final AMQShortString exchangeName;
@@ -215,9 +306,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter exchangeName,
routingKey,
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame;
+ return getOkBody;
}
public byte getProtocolMinorVersion()
@@ -230,31 +320,28 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return getProtocolSession().getProtocolMajorVersion();
}
- private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
{
+
BasicReturnBody basicReturnBody =
METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
- return returnFrame;
+
+ return basicReturnBody;
}
- public void writeReturn(MessagePublishInfo messagePublishInfo,
- ContentHeaderBody header,
- MessageContentSource content,
- int channelId,
- int replyCode,
- AMQShortString replyText)
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
-
- writeMessageDelivery(content, header, channelId, returnFrame);
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+ writeMessageDelivery(message, header, channelId, returnFrame);
}
@@ -266,8 +353,68 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
+
BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 319b5cc7bd..aef3483282 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -20,9 +20,6 @@ package org.apache.qpid.server.output.amqp0_9; *
*/
-
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -38,11 +35,13 @@ import org.apache.qpid.AMQException; import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- private static final ProtocolVersionMethodConverter
- PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -121,15 +120,12 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- int writtenSize = 0;
+ int writtenSize = capacity;
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
CompositeAMQBodyBlock
compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +133,55 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ writeFrame(new AMQFrame(channelId, body));
}
}
}
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
@@ -221,7 +257,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _underlyingBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if(_underlyingBody == null)
{
@@ -346,7 +382,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
@@ -374,7 +410,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java index cffbe445ee..10748298bc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java @@ -20,9 +20,6 @@ package org.apache.qpid.server.output.amqp0_9_1; *
*/
-
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -33,17 +30,16 @@ import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
- private static final ProtocolVersionMethodConverter
- PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
public static Factory getInstanceFactory()
{
@@ -121,15 +117,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- int writtenSize = 0;
+ int writtenSize = capacity;
-
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
CompositeAMQBodyBlock
compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +129,54 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ writeFrame(new AMQFrame(channelId, body));
}
}
}
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
@@ -221,7 +252,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _underlyingBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if(_underlyingBody == null)
{
@@ -346,7 +377,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
@@ -374,7 +405,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 88022ba519..5332031362 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.protocol; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -348,7 +350,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again - ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); + (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); try { // Log incomming protocol negotiation request @@ -368,7 +370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr null, mechanisms.getBytes(), locales.getBytes()); - _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); + _sender.send(asByteBuffer(responseBody.generateFrame(0))); _sender.flush(); } @@ -376,11 +378,43 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()))); _sender.flush(); } } + private ByteBuffer asByteBuffer(AMQDataBlock block) + { + final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); + + try + { + block.writePayload(new DataOutputStream(new OutputStream() + { + + + @Override + public void write(int b) throws IOException + { + buf.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + buf.put(b, off, len); + } + })); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + buf.flip(); + return buf; + } + public void methodFrameReceived(int channelId, AMQMethodBody methodBody) { final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); @@ -491,7 +525,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public synchronized void writeFrame(AMQDataBlock frame) { _lastSent = frame; - final ByteBuffer buf = frame.toNioByteBuffer(); + final ByteBuffer buf = asByteBuffer(frame); _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); _sender.send(buf); @@ -1020,7 +1054,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public void writerIdle() { - _sender.send(HeartbeatBody.FRAME.toNioByteBuffer()); + _sender.send(asByteBuffer(HeartbeatBody.FRAME)); } public void exception(Throwable throwable) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 3e3288404f..a56f5685b8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -139,7 +139,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public int addContentBodyFrame(final ContentChunk contentChunk) throws AMQException { - _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf()); + _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData())); _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); @@ -263,7 +263,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes int written = 0; for(ContentChunk cb : _contentChunks) { - ByteBuffer data = cb.getData().buf(); + ByteBuffer data = ByteBuffer.wrap(cb.getData()); if(offset+written >= pos && offset < pos + data.limit()) { ByteBuffer src = data.duplicate(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java index 9f56b8521a..dee40e7069 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.security.auth.sasl.amqplain; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import javax.security.auth.callback.Callback; @@ -31,7 +33,6 @@ import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -60,7 +61,7 @@ public class AmqPlainSaslServer implements SaslServer { try { - final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length); + final FieldTable ft = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(response)), response.length); String username = (String) ft.getString("LOGIN"); // we do not care about the prompt but it throws if null NameCallback nameCb = new NameCallback("prompt", username); diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java index b4cce15d88..52d36023c2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java @@ -20,21 +20,9 @@ */ package org.apache.qpid.server.security.auth.sasl.anonymous; -import java.io.IOException; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; public class AnonymousSaslServer implements SaslServer { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 2e694b24ea..8b099b62ce 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.lang.ref.SoftReference; @@ -479,9 +480,15 @@ public class DerbyMessageStore implements MessageStore FieldTable arguments; if(dataAsBytes.length > 0) { - org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes); - arguments = new FieldTable(buffer,buffer.limit()); + try + { + arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length); + } + catch (IOException e) + { + throw new RuntimeException("IO Exception should not be thrown",e); + } } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java new file mode 100644 index 0000000000..898a667736 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class ByteBufferInputStream extends InputStream +{ + private final ByteBuffer _buffer; + + public ByteBufferInputStream(ByteBuffer buffer) + { + _buffer = buffer; + } + + @Override + public int read() throws IOException + { + return _buffer.get() & 0xFF; + } + + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + if(_buffer.remaining() < len) + { + len = _buffer.remaining(); + } + _buffer.get(b, off, len); + + return len; + } + + @Override + public void mark(int readlimit) + { + _buffer.mark(); + } + + @Override + public void reset() throws IOException + { + _buffer.reset(); + } + + @Override + public boolean markSupported() + { + return true; + } + + @Override + public long skip(long n) throws IOException + { + + _buffer.position(_buffer.position()+(int)n); + + return n; + } + + @Override + public int available() throws IOException + { + return _buffer.remaining(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java new file mode 100644 index 0000000000..ca9a41bc32 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class ByteBufferOutputStream extends OutputStream +{ + private final ByteBuffer _buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) + { + _buffer = buffer; + } + + @Override + public void write(int b) + { + _buffer.put((byte)b); + } + + @Override + public void write(byte[] b, int off, int len) + { + _buffer.put(b, off, len); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 96a9ac729e..0fd31973b2 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -43,7 +43,10 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; +import org.apache.qpid.server.util.ByteBufferInputStream; +import java.io.DataInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -236,7 +239,14 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa FieldTable argumentsFT = null; if(buf != null) { - argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit()); + try + { + argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit()); + } + catch (IOException e) + { + throw new RuntimeException("IOException should not be thrown here", e); + } } BindingFactory bf = _virtualHost.getBindingFactory(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6db1560fb7..3b7f5f3a51 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -52,6 +52,7 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.InternalBrokerBaseCase; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -565,8 +566,8 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase int pos = 0; for(ContentBody body : bodies) { - storedMessage.addContent(pos, body.payload.duplicate().buf()); - pos += body.payload.limit(); + storedMessage.addContent(pos, ByteBuffer.wrap(body._payload)); + pos += body._payload.length; } _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 4272c77798..47b8b7eb18 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -312,18 +311,14 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase { messages[i].addContentBodyFrame(new ContentChunk(){ - ByteBuffer _data = ByteBuffer.allocate((int)size); - - { - _data.limit((int)size); - } + byte[] _data = new byte[(int)size]; public int getSize() { return (int) size; } - public ByteBuffer getData() + public byte[] getData() { return _data; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 365353e734..070d105805 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.mina.common.ByteBuffer; import javax.management.JMException; @@ -275,18 +274,14 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase msg.addContentBodyFrame(new ContentChunk() { - ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE); - - { - _data.limit((int)MESSAGE_SIZE); - } + byte[] _data = new byte[((int)MESSAGE_SIZE)]; public int getSize() { return (int) MESSAGE_SIZE; } - public ByteBuffer getData() + public byte[] getData() { return _data; } @@ -441,8 +436,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase getSession().getMethodRegistry() .getProtocolVersionMethodConverter() .convertToContentChunk( - new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), - MESSAGE_SIZE))); + new ContentBody(new byte[(int) MESSAGE_SIZE]))); AMQMessage m = new AMQMessage(currentMessage.getStoredMessage()); for(BaseQueue q : currentMessage.getDestinationQueues()) |
