From 5270591c7831e559925e720c6bfc0c78c514b95a Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 9 Sep 2011 17:47:22 +0000 Subject: QPID-2627 : Remove dependency on MINA git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1167311 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/exchange/headers/HeadersParser.java | 126 ---------- .../qpid/server/message/MessageMetaData.java | 72 +++--- .../amqp0_8/ProtocolOutputConverterImpl.java | 263 ++++++++++++++++----- .../amqp0_9/ProtocolOutputConverterImpl.java | 72 ++++-- .../amqp0_9_1/ProtocolOutputConverterImpl.java | 73 ++++-- .../qpid/server/protocol/AMQProtocolEngine.java | 44 +++- .../apache/qpid/server/queue/IncomingMessage.java | 4 +- .../auth/sasl/amqplain/AmqPlainSaslServer.java | 5 +- .../auth/sasl/anonymous/AnonymousSaslServer.java | 12 - .../qpid/server/store/DerbyMessageStore.java | 11 +- .../qpid/server/util/ByteBufferInputStream.java | 87 +++++++ .../qpid/server/util/ByteBufferOutputStream.java | 46 ++++ .../VirtualHostConfigRecoveryHandler.java | 12 +- .../exchange/AbstractHeadersExchangeTestBase.java | 5 +- .../qpid/server/queue/AMQQueueAlertTest.java | 9 +- .../qpid/server/queue/AMQQueueMBeanTest.java | 12 +- 16 files changed, 556 insertions(+), 297 deletions(-) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java index 0e3a3894fe..d76b163fa1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java @@ -274,132 +274,6 @@ public class HeadersParser } - public static void main(String[] args) throws AMQFrameDecodingException - { - - FieldTable bindingTable = new FieldTable(); - - bindingTable.setString(new AMQShortString("x-match"),"all"); - bindingTable.setInteger("a",1); - bindingTable.setVoid(new AMQShortString("b")); - bindingTable.setString("c",""); - bindingTable.setInteger("d",4); - bindingTable.setInteger("e",1); - - - - FieldTable bindingTable2 = new FieldTable(); - bindingTable2.setString(new AMQShortString("x-match"),"all"); - bindingTable2.setInteger("a",1); - bindingTable2.setVoid(new AMQShortString("b")); - bindingTable2.setString("c",""); - bindingTable2.setInteger("d",4); - bindingTable2.setInteger("e",1); - bindingTable2.setInteger("f",1); - - - FieldTable table = new FieldTable(); - table.setInteger("a",1); - table.setInteger("b",2); - table.setString("c",""); - table.setInteger("d",4); - table.setInteger("e",1); - table.setInteger("f",1); - table.setInteger("h",1); - table.setInteger("i",1); - table.setInteger("j",1); - table.setInteger("k",1); - table.setInteger("l",1); - - org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.allocate( (int) table.getEncodedSize()); - EncodingUtils.writeFieldTableBytes(buffer, table); - buffer.flip(); - - FieldTable table2 = EncodingUtils.readFieldTable(buffer); - - - - FieldTable bindingTable3 = new FieldTable(); - bindingTable3.setString(new AMQShortString("x-match"),"any"); - bindingTable3.setInteger("a",1); - bindingTable3.setInteger("b",3); - - - FieldTable bindingTable4 = new FieldTable(); - bindingTable4.setString(new AMQShortString("x-match"),"any"); - bindingTable4.setVoid(new AMQShortString("a")); - - - FieldTable bindingTable5 = new FieldTable(); - bindingTable5.setString(new AMQShortString("x-match"),"all"); - bindingTable5.setString(new AMQShortString("h"),"hello"); - - for(int i = 0; i < 100; i++) - { - printMatches(new FieldTable[] {bindingTable5} , table2); - } - - - - } - - - - private static void printMatches(final FieldTable[] bindingKeys, final FieldTable routingKey) - { - HeadersMatcherDFAState sm = null; - Map resultMap = new HashMap(); - - HeadersParser parser = new HeadersParser(); - - for(int i = 0; i < bindingKeys.length; i++) - { - HeaderMatcherResult r = new HeaderMatcherResult(); - resultMap.put(r, bindingKeys[i].toString()); - - - if(i==0) - { - sm = parser.createStateMachine(bindingKeys[i], r); - } - else - { - sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeys[i], r)); - } - } - - Collection results = null; - long beforeTime = System.currentTimeMillis(); - for(int i = 0; i < 1000000; i++) - { - routingKey.size(); - - assert sm != null; - results = sm.match(routingKey); - - } - long elapsed = System.currentTimeMillis() - beforeTime; - System.out.println("1000000 Iterations took: " + elapsed); - Collection resultStrings = new ArrayList(); - - assert results != null; - for(HeaderMatcherResult result : results) - { - resultStrings.add(resultMap.get(result)); - } - - final ArrayList nonMatches = new ArrayList(); - for(FieldTable key : bindingKeys) - { - nonMatches.add(key.toString()); - } - nonMatches.removeAll(resultStrings); - System.out.println("\""+routingKey+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches); - - - } - - public final static class KeyValuePair { public final HeaderKey _key; diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java index 66cb7ed83b..5992e42fb7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java +++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java @@ -29,7 +29,10 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.MessageMetaDataType; import org.apache.qpid.AMQException; +import org.apache.qpid.server.util.ByteBufferInputStream; +import org.apache.qpid.server.util.ByteBufferOutputStream; +import java.io.*; import java.nio.ByteBuffer; import java.util.Set; @@ -120,38 +123,38 @@ public class MessageMetaData implements StorableMessageMetaData return size; } + public int writeToBuffer(int offset, ByteBuffer dest) { - ByteBuffer src = ByteBuffer.allocate((int)getStorableSize()); - - org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(src); - EncodingUtils.writeInteger(minaSrc, _contentHeaderBody.getSize()); - _contentHeaderBody.writePayload(minaSrc); - EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getExchange()); - EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getRoutingKey()); - byte flags = 0; - if(_messagePublishInfo.isMandatory()) - { - flags |= MANDATORY_FLAG; - } - if(_messagePublishInfo.isImmediate()) + int oldPosition = dest.position(); + try { - flags |= IMMEDIATE_FLAG; + + DataOutputStream dataOutputStream = new DataOutputStream(new ByteBufferOutputStream(dest)); + EncodingUtils.writeInteger(dataOutputStream, _contentHeaderBody.getSize()); + _contentHeaderBody.writePayload(dataOutputStream); + EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getExchange()); + EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getRoutingKey()); + byte flags = 0; + if(_messagePublishInfo.isMandatory()) + { + flags |= MANDATORY_FLAG; + } + if(_messagePublishInfo.isImmediate()) + { + flags |= IMMEDIATE_FLAG; + } + dest.put(flags); + dest.putLong(_arrivalTime); + } - EncodingUtils.writeByte(minaSrc, flags); - EncodingUtils.writeLong(minaSrc,_arrivalTime); - src.position(minaSrc.position()); - src.flip(); - src.position(offset); - src = src.slice(); - if(dest.remaining() < src.limit()) + catch (IOException e) { - src.limit(dest.remaining()); + // This shouldn't happen as we are not actually using anything that can throw an IO Exception + throw new RuntimeException(e); } - dest.put(src); - - return src.limit(); + return dest.position()-oldPosition; } public int getContentSize() @@ -173,14 +176,15 @@ public class MessageMetaData implements StorableMessageMetaData { try { - org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(buf); - int size = EncodingUtils.readInteger(minaSrc); - ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(minaSrc, size); - final AMQShortString exchange = EncodingUtils.readAMQShortString(minaSrc); - final AMQShortString routingKey = EncodingUtils.readAMQShortString(minaSrc); + ByteBufferInputStream bbis = new ByteBufferInputStream(buf); + DataInputStream dais = new DataInputStream(bbis); + int size = EncodingUtils.readInteger(dais); + ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(dais, size); + final AMQShortString exchange = EncodingUtils.readAMQShortString(dais); + final AMQShortString routingKey = EncodingUtils.readAMQShortString(dais); - final byte flags = EncodingUtils.readByte(minaSrc); - long arrivalTime = EncodingUtils.readLong(minaSrc); + final byte flags = EncodingUtils.readByte(dais); + long arrivalTime = EncodingUtils.readLong(dais); MessagePublishInfo publishBody = new MessagePublishInfo() @@ -216,6 +220,10 @@ public class MessageMetaData implements StorableMessageMetaData { throw new RuntimeException(e); } + catch (IOException e) + { + throw new RuntimeException(e); + } } }; diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 2cebec373e..3970e5a2d4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -26,6 +26,7 @@ */ package org.apache.qpid.server.output.amqp0_8; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.queue.QueueEntry; @@ -34,22 +35,18 @@ import org.apache.qpid.server.output.HeaderPropertiesConverter; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.AMQException; import org.apache.qpid.transport.DeliveryProperties; -import java.nio.ByteBuffer; +import java.io.DataOutputStream; +import java.io.IOException; public class ProtocolOutputConverterImpl implements ProtocolOutputConverter { private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER = - METHOD_REGISTRY.getProtocolVersionMethodConverter(); - public static Factory getInstanceFactory() { return new Factory() @@ -62,6 +59,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }; } + private final AMQProtocolSession _protocolSession; private ProtocolOutputConverterImpl(AMQProtocolSession session) @@ -78,10 +76,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { - AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag); - writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver); + AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag); + writeMessageDelivery(entry, channelId, deliverBody); } + private ContentHeaderBody getContentHeaderBody(QueueEntry entry) throws AMQException { @@ -93,65 +92,120 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); - ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); - chb.bodySize = message.getSize(); + ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID); + chb.bodySize = message.getSize(); return chb; } } - public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException + private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody) + throws AMQException { - AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize); - writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver); + writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody); } - private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver) + private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) throws AMQException { - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb); - + int bodySize = (int) message.getSize(); - final int bodySize = (int) message.getSize(); if(bodySize == 0) { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); + SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, + contentHeaderBody); + writeFrame(compositeBlock); } else { int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); - final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - ByteBuffer buf = ByteBuffer.allocate(capacity); - int writtenSize = 0; + int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; + + int writtenSize = capacity; + + AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); - writtenSize += message.getContent(buf, writtenSize); - buf.flip(); - AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)); - AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); + CompositeAMQBodyBlock + compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); writeFrame(compositeBlock); while(writtenSize < bodySize) { - buf = java.nio.ByteBuffer.allocate(capacity); - writtenSize += message.getContent(buf, writtenSize); - buf.flip(); - writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf))); + capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; + MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); + writtenSize += capacity; + + writeFrame(new AMQFrame(channelId, body)); } + } + } + private class MessageContentSourceBody implements AMQBody + { + public static final byte TYPE = 3; + private int _length; + private MessageContentSource _message; + private int _offset; + + public MessageContentSourceBody(MessageContentSource message, int offset, int length) + { + _message = message; + _offset = offset; + _length = length; + } + + public byte getFrameType() + { + return TYPE; } + + public int getSize() + { + return _length; + } + + public void writePayload(DataOutputStream buffer) throws IOException + { + byte[] data = new byte[_length]; + + _message.getContent(java.nio.ByteBuffer.wrap(data), _offset); + + buffer.write(data); + } + + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException + { + throw new UnsupportedOperationException(); + } + } + + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) + { + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + contentHeaderBody); + return contentHeader; } - private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) + public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException + { + AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); + writeMessageDelivery(entry, channelId, deliver); + } + + + private AMQBody createEncodedDeliverBody(QueueEntry entry, + final long deliveryTag, + final AMQShortString consumerTag) throws AMQException { + final AMQShortString exchangeName; final AMQShortString routingKey; @@ -172,21 +226,58 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final boolean isRedelivered = entry.isRedelivered(); + final AMQBody returnBlock = new AMQBody() + { + + public AMQBody _underlyingBody; - BasicDeliverBody deliverBody = - METHOD_REGISTRY.createBasicDeliverBody(consumerTag, - deliveryTag, - isRedelivered, - exchangeName, - routingKey); + public AMQBody createAMQBody() + { + return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, + deliveryTag, + isRedelivered, + exchangeName, + routingKey); - AMQFrame deliverFrame = deliverBody.generateFrame(channelId); - return deliverFrame; + + + } + + public byte getFrameType() + { + return AMQMethodBody.TYPE; + } + + public int getSize() + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.getSize(); + } + + public void writePayload(DataOutputStream buffer) throws IOException + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + _underlyingBody.writePayload(buffer); + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) + throws AMQException + { + throw new AMQException("This block should never be dispatched!"); + } + }; + return returnBlock; } - private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize) + private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) throws AMQException { final AMQShortString exchangeName; @@ -215,9 +306,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter exchangeName, routingKey, queueSize); - AMQFrame getOkFrame = getOkBody.generateFrame(channelId); - return getOkFrame; + return getOkBody; } public byte getProtocolMinorVersion() @@ -230,31 +320,28 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return getProtocolSession().getProtocolMajorVersion(); } - private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException + private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, + int replyCode, + AMQShortString replyText) throws AMQException { + BasicReturnBody basicReturnBody = METHOD_REGISTRY.createBasicReturnBody(replyCode, - replyText, - messagePublishInfo.getExchange(), - messagePublishInfo.getRoutingKey()); - AMQFrame returnFrame = basicReturnBody.generateFrame(channelId); + replyText, + messagePublishInfo.getExchange(), + messagePublishInfo.getRoutingKey()); - return returnFrame; + + return basicReturnBody; } - public void writeReturn(MessagePublishInfo messagePublishInfo, - ContentHeaderBody header, - MessageContentSource content, - int channelId, - int replyCode, - AMQShortString replyText) + public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText) throws AMQException { - AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText); - - writeMessageDelivery(content, header, channelId, returnFrame); + AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText); + writeMessageDelivery(message, header, channelId, returnFrame); } @@ -266,8 +353,68 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) { + BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); writeFrame(basicCancelOkBody.generateFrame(channelId)); } + + + public static final class CompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final AMQBody _contentBody; + private final int _channel; + + + public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + _contentBody = contentBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); + } + + public void writePayload(DataOutputStream buffer) throws IOException + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); + } + } + + public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final int _channel; + + + public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; + } + + public void writePayload(DataOutputStream buffer) throws IOException + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 319b5cc7bd..aef3483282 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -20,9 +20,6 @@ package org.apache.qpid.server.output.amqp0_9; * */ - -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.HeaderPropertiesConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -38,11 +35,13 @@ import org.apache.qpid.AMQException; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + public class ProtocolOutputConverterImpl implements ProtocolOutputConverter { private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - private static final ProtocolVersionMethodConverter - PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter(); public static Factory getInstanceFactory() @@ -121,15 +120,12 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); - final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity); + int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - int writtenSize = 0; + int writtenSize = capacity; + AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); - writtenSize += message.getContent(buf, writtenSize); - buf.flip(); - AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf); CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); @@ -137,15 +133,55 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter while(writtenSize < bodySize) { - buf = java.nio.ByteBuffer.allocate(capacity); + capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; + MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); + writtenSize += capacity; - writtenSize += message.getContent(buf, writtenSize); - buf.flip(); - writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf))); + writeFrame(new AMQFrame(channelId, body)); } } } + private class MessageContentSourceBody implements AMQBody + { + public static final byte TYPE = 3; + private int _length; + private MessageContentSource _message; + private int _offset; + + public MessageContentSourceBody(MessageContentSource message, int offset, int length) + { + _message = message; + _offset = offset; + _length = length; + } + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return _length; + } + + public void writePayload(DataOutputStream buffer) throws IOException + { + byte[] data = new byte[_length]; + + _message.getContent(ByteBuffer.wrap(data), _offset); + + buffer.write(data); + } + + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException + { + throw new UnsupportedOperationException(); + } + } + + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) { @@ -221,7 +257,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _underlyingBody.getSize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { if(_underlyingBody == null) { @@ -346,7 +382,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); } @@ -374,7 +410,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java index cffbe445ee..10748298bc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java @@ -20,9 +20,6 @@ package org.apache.qpid.server.output.amqp0_9_1; * */ - -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.HeaderPropertiesConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -33,17 +30,16 @@ import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.AMQException; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import java.io.DataOutputStream; +import java.io.IOException; + public class ProtocolOutputConverterImpl implements ProtocolOutputConverter { private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91); - private static final ProtocolVersionMethodConverter - PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter(); - public static Factory getInstanceFactory() { @@ -121,15 +117,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); - final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity); + int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - int writtenSize = 0; + int writtenSize = capacity; - - writtenSize += message.getContent(buf, writtenSize); - buf.flip(); - AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf); + AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); @@ -137,15 +129,54 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter while(writtenSize < bodySize) { - buf = java.nio.ByteBuffer.allocate(capacity); + capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; + MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); + writtenSize += capacity; - writtenSize += message.getContent(buf, writtenSize); - buf.flip(); - writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf))); + writeFrame(new AMQFrame(channelId, body)); } } } + private class MessageContentSourceBody implements AMQBody + { + public static final byte TYPE = 3; + private int _length; + private MessageContentSource _message; + private int _offset; + + public MessageContentSourceBody(MessageContentSource message, int offset, int length) + { + _message = message; + _offset = offset; + _length = length; + } + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return _length; + } + + public void writePayload(DataOutputStream buffer) throws IOException + { + byte[] data = new byte[_length]; + + _message.getContent(java.nio.ByteBuffer.wrap(data), _offset); + + buffer.write(data); + } + + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException + { + throw new UnsupportedOperationException(); + } + } + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) { @@ -221,7 +252,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _underlyingBody.getSize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { if(_underlyingBody == null) { @@ -346,7 +377,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); } @@ -374,7 +405,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 88022ba519..5332031362 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.protocol; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -348,7 +350,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again - ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); + (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); try { // Log incomming protocol negotiation request @@ -368,7 +370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr null, mechanisms.getBytes(), locales.getBytes()); - _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); + _sender.send(asByteBuffer(responseBody.generateFrame(0))); _sender.flush(); } @@ -376,11 +378,43 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()))); _sender.flush(); } } + private ByteBuffer asByteBuffer(AMQDataBlock block) + { + final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); + + try + { + block.writePayload(new DataOutputStream(new OutputStream() + { + + + @Override + public void write(int b) throws IOException + { + buf.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + buf.put(b, off, len); + } + })); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + buf.flip(); + return buf; + } + public void methodFrameReceived(int channelId, AMQMethodBody methodBody) { final AMQMethodEvent evt = new AMQMethodEvent(channelId, methodBody); @@ -491,7 +525,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public synchronized void writeFrame(AMQDataBlock frame) { _lastSent = frame; - final ByteBuffer buf = frame.toNioByteBuffer(); + final ByteBuffer buf = asByteBuffer(frame); _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); _sender.send(buf); @@ -1020,7 +1054,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public void writerIdle() { - _sender.send(HeartbeatBody.FRAME.toNioByteBuffer()); + _sender.send(asByteBuffer(HeartbeatBody.FRAME)); } public void exception(Throwable throwable) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 3e3288404f..a56f5685b8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -139,7 +139,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public int addContentBodyFrame(final ContentChunk contentChunk) throws AMQException { - _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf()); + _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData())); _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); @@ -263,7 +263,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes int written = 0; for(ContentChunk cb : _contentChunks) { - ByteBuffer data = cb.getData().buf(); + ByteBuffer data = ByteBuffer.wrap(cb.getData()); if(offset+written >= pos && offset < pos + data.limit()) { ByteBuffer src = data.duplicate(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java index 9f56b8521a..dee40e7069 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.security.auth.sasl.amqplain; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import javax.security.auth.callback.Callback; @@ -31,7 +33,6 @@ import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -60,7 +61,7 @@ public class AmqPlainSaslServer implements SaslServer { try { - final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length); + final FieldTable ft = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(response)), response.length); String username = (String) ft.getString("LOGIN"); // we do not care about the prompt but it throws if null NameCallback nameCb = new NameCallback("prompt", username); diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java index b4cce15d88..52d36023c2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java @@ -20,21 +20,9 @@ */ package org.apache.qpid.server.security.auth.sasl.anonymous; -import java.io.IOException; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; public class AnonymousSaslServer implements SaslServer { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 2e694b24ea..8b099b62ce 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.lang.ref.SoftReference; @@ -479,9 +480,15 @@ public class DerbyMessageStore implements MessageStore FieldTable arguments; if(dataAsBytes.length > 0) { - org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes); - arguments = new FieldTable(buffer,buffer.limit()); + try + { + arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length); + } + catch (IOException e) + { + throw new RuntimeException("IO Exception should not be thrown",e); + } } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java new file mode 100644 index 0000000000..898a667736 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class ByteBufferInputStream extends InputStream +{ + private final ByteBuffer _buffer; + + public ByteBufferInputStream(ByteBuffer buffer) + { + _buffer = buffer; + } + + @Override + public int read() throws IOException + { + return _buffer.get() & 0xFF; + } + + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + if(_buffer.remaining() < len) + { + len = _buffer.remaining(); + } + _buffer.get(b, off, len); + + return len; + } + + @Override + public void mark(int readlimit) + { + _buffer.mark(); + } + + @Override + public void reset() throws IOException + { + _buffer.reset(); + } + + @Override + public boolean markSupported() + { + return true; + } + + @Override + public long skip(long n) throws IOException + { + + _buffer.position(_buffer.position()+(int)n); + + return n; + } + + @Override + public int available() throws IOException + { + return _buffer.remaining(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java new file mode 100644 index 0000000000..ca9a41bc32 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class ByteBufferOutputStream extends OutputStream +{ + private final ByteBuffer _buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) + { + _buffer = buffer; + } + + @Override + public void write(int b) + { + _buffer.put((byte)b); + } + + @Override + public void write(byte[] b, int off, int len) + { + _buffer.put(b, off, len); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 96a9ac729e..0fd31973b2 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -43,7 +43,10 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; +import org.apache.qpid.server.util.ByteBufferInputStream; +import java.io.DataInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -236,7 +239,14 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa FieldTable argumentsFT = null; if(buf != null) { - argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit()); + try + { + argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit()); + } + catch (IOException e) + { + throw new RuntimeException("IOException should not be thrown here", e); + } } BindingFactory bf = _virtualHost.getBindingFactory(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6db1560fb7..3b7f5f3a51 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -52,6 +52,7 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.InternalBrokerBaseCase; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -565,8 +566,8 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase int pos = 0; for(ContentBody body : bodies) { - storedMessage.addContent(pos, body.payload.duplicate().buf()); - pos += body.payload.limit(); + storedMessage.addContent(pos, ByteBuffer.wrap(body._payload)); + pos += body._payload.length; } _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 4272c77798..47b8b7eb18 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -312,18 +311,14 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase { messages[i].addContentBodyFrame(new ContentChunk(){ - ByteBuffer _data = ByteBuffer.allocate((int)size); - - { - _data.limit((int)size); - } + byte[] _data = new byte[(int)size]; public int getSize() { return (int) size; } - public ByteBuffer getData() + public byte[] getData() { return _data; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 365353e734..070d105805 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.mina.common.ByteBuffer; import javax.management.JMException; @@ -275,18 +274,14 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase msg.addContentBodyFrame(new ContentChunk() { - ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE); - - { - _data.limit((int)MESSAGE_SIZE); - } + byte[] _data = new byte[((int)MESSAGE_SIZE)]; public int getSize() { return (int) MESSAGE_SIZE; } - public ByteBuffer getData() + public byte[] getData() { return _data; } @@ -441,8 +436,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase getSession().getMethodRegistry() .getProtocolVersionMethodConverter() .convertToContentChunk( - new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), - MESSAGE_SIZE))); + new ContentBody(new byte[(int) MESSAGE_SIZE]))); AMQMessage m = new AMQMessage(currentMessage.getStoredMessage()); for(BaseQueue q : currentMessage.getDestinationQueues()) -- cgit v1.2.1