summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-09-09 17:47:22 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-09-09 17:47:22 +0000
commit5270591c7831e559925e720c6bfc0c78c514b95a (patch)
tree8ce7aabb2f5a4a87c4c53b3e8f810c62392eba11 /java/broker/src
parent282e16aab532d842dffad3935bfd1a952bc584be (diff)
downloadqpid-python-5270591c7831e559925e720c6bfc0c78c514b95a.tar.gz
QPID-2627 : Remove dependency on MINA
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1167311 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java126
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java72
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java263
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java72
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java73
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java87
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java46
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java12
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java9
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java12
16 files changed, 556 insertions, 297 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java
index 0e3a3894fe..d76b163fa1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java
@@ -274,132 +274,6 @@ public class HeadersParser
}
- public static void main(String[] args) throws AMQFrameDecodingException
- {
-
- FieldTable bindingTable = new FieldTable();
-
- bindingTable.setString(new AMQShortString("x-match"),"all");
- bindingTable.setInteger("a",1);
- bindingTable.setVoid(new AMQShortString("b"));
- bindingTable.setString("c","");
- bindingTable.setInteger("d",4);
- bindingTable.setInteger("e",1);
-
-
-
- FieldTable bindingTable2 = new FieldTable();
- bindingTable2.setString(new AMQShortString("x-match"),"all");
- bindingTable2.setInteger("a",1);
- bindingTable2.setVoid(new AMQShortString("b"));
- bindingTable2.setString("c","");
- bindingTable2.setInteger("d",4);
- bindingTable2.setInteger("e",1);
- bindingTable2.setInteger("f",1);
-
-
- FieldTable table = new FieldTable();
- table.setInteger("a",1);
- table.setInteger("b",2);
- table.setString("c","");
- table.setInteger("d",4);
- table.setInteger("e",1);
- table.setInteger("f",1);
- table.setInteger("h",1);
- table.setInteger("i",1);
- table.setInteger("j",1);
- table.setInteger("k",1);
- table.setInteger("l",1);
-
- org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.allocate( (int) table.getEncodedSize());
- EncodingUtils.writeFieldTableBytes(buffer, table);
- buffer.flip();
-
- FieldTable table2 = EncodingUtils.readFieldTable(buffer);
-
-
-
- FieldTable bindingTable3 = new FieldTable();
- bindingTable3.setString(new AMQShortString("x-match"),"any");
- bindingTable3.setInteger("a",1);
- bindingTable3.setInteger("b",3);
-
-
- FieldTable bindingTable4 = new FieldTable();
- bindingTable4.setString(new AMQShortString("x-match"),"any");
- bindingTable4.setVoid(new AMQShortString("a"));
-
-
- FieldTable bindingTable5 = new FieldTable();
- bindingTable5.setString(new AMQShortString("x-match"),"all");
- bindingTable5.setString(new AMQShortString("h"),"hello");
-
- for(int i = 0; i < 100; i++)
- {
- printMatches(new FieldTable[] {bindingTable5} , table2);
- }
-
-
-
- }
-
-
-
- private static void printMatches(final FieldTable[] bindingKeys, final FieldTable routingKey)
- {
- HeadersMatcherDFAState sm = null;
- Map<HeaderMatcherResult, String> resultMap = new HashMap<HeaderMatcherResult, String>();
-
- HeadersParser parser = new HeadersParser();
-
- for(int i = 0; i < bindingKeys.length; i++)
- {
- HeaderMatcherResult r = new HeaderMatcherResult();
- resultMap.put(r, bindingKeys[i].toString());
-
-
- if(i==0)
- {
- sm = parser.createStateMachine(bindingKeys[i], r);
- }
- else
- {
- sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeys[i], r));
- }
- }
-
- Collection<HeaderMatcherResult> results = null;
- long beforeTime = System.currentTimeMillis();
- for(int i = 0; i < 1000000; i++)
- {
- routingKey.size();
-
- assert sm != null;
- results = sm.match(routingKey);
-
- }
- long elapsed = System.currentTimeMillis() - beforeTime;
- System.out.println("1000000 Iterations took: " + elapsed);
- Collection<String> resultStrings = new ArrayList<String>();
-
- assert results != null;
- for(HeaderMatcherResult result : results)
- {
- resultStrings.add(resultMap.get(result));
- }
-
- final ArrayList<String> nonMatches = new ArrayList<String>();
- for(FieldTable key : bindingKeys)
- {
- nonMatches.add(key.toString());
- }
- nonMatches.removeAll(resultStrings);
- System.out.println("\""+routingKey+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches);
-
-
- }
-
-
public final static class KeyValuePair
{
public final HeaderKey _key;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
index 66cb7ed83b..5992e42fb7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
@@ -29,7 +29,10 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.MessageMetaDataType;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.util.ByteBufferInputStream;
+import org.apache.qpid.server.util.ByteBufferOutputStream;
+import java.io.*;
import java.nio.ByteBuffer;
import java.util.Set;
@@ -120,38 +123,38 @@ public class MessageMetaData implements StorableMessageMetaData
return size;
}
+
public int writeToBuffer(int offset, ByteBuffer dest)
{
- ByteBuffer src = ByteBuffer.allocate((int)getStorableSize());
-
- org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(src);
- EncodingUtils.writeInteger(minaSrc, _contentHeaderBody.getSize());
- _contentHeaderBody.writePayload(minaSrc);
- EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getExchange());
- EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getRoutingKey());
- byte flags = 0;
- if(_messagePublishInfo.isMandatory())
- {
- flags |= MANDATORY_FLAG;
- }
- if(_messagePublishInfo.isImmediate())
+ int oldPosition = dest.position();
+ try
{
- flags |= IMMEDIATE_FLAG;
+
+ DataOutputStream dataOutputStream = new DataOutputStream(new ByteBufferOutputStream(dest));
+ EncodingUtils.writeInteger(dataOutputStream, _contentHeaderBody.getSize());
+ _contentHeaderBody.writePayload(dataOutputStream);
+ EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getExchange());
+ EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getRoutingKey());
+ byte flags = 0;
+ if(_messagePublishInfo.isMandatory())
+ {
+ flags |= MANDATORY_FLAG;
+ }
+ if(_messagePublishInfo.isImmediate())
+ {
+ flags |= IMMEDIATE_FLAG;
+ }
+ dest.put(flags);
+ dest.putLong(_arrivalTime);
+
}
- EncodingUtils.writeByte(minaSrc, flags);
- EncodingUtils.writeLong(minaSrc,_arrivalTime);
- src.position(minaSrc.position());
- src.flip();
- src.position(offset);
- src = src.slice();
- if(dest.remaining() < src.limit())
+ catch (IOException e)
{
- src.limit(dest.remaining());
+ // This shouldn't happen as we are not actually using anything that can throw an IO Exception
+ throw new RuntimeException(e);
}
- dest.put(src);
-
- return src.limit();
+ return dest.position()-oldPosition;
}
public int getContentSize()
@@ -173,14 +176,15 @@ public class MessageMetaData implements StorableMessageMetaData
{
try
{
- org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(buf);
- int size = EncodingUtils.readInteger(minaSrc);
- ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(minaSrc, size);
- final AMQShortString exchange = EncodingUtils.readAMQShortString(minaSrc);
- final AMQShortString routingKey = EncodingUtils.readAMQShortString(minaSrc);
+ ByteBufferInputStream bbis = new ByteBufferInputStream(buf);
+ DataInputStream dais = new DataInputStream(bbis);
+ int size = EncodingUtils.readInteger(dais);
+ ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(dais, size);
+ final AMQShortString exchange = EncodingUtils.readAMQShortString(dais);
+ final AMQShortString routingKey = EncodingUtils.readAMQShortString(dais);
- final byte flags = EncodingUtils.readByte(minaSrc);
- long arrivalTime = EncodingUtils.readLong(minaSrc);
+ final byte flags = EncodingUtils.readByte(dais);
+ long arrivalTime = EncodingUtils.readLong(dais);
MessagePublishInfo publishBody =
new MessagePublishInfo()
@@ -216,6 +220,10 @@ public class MessageMetaData implements StorableMessageMetaData
{
throw new RuntimeException(e);
}
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
};
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 2cebec373e..3970e5a2d4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -26,6 +26,7 @@
*/
package org.apache.qpid.server.output.amqp0_8;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
@@ -34,22 +35,18 @@ import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
-import java.nio.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER =
- METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
public static Factory getInstanceFactory()
{
return new Factory()
@@ -62,6 +59,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
};
}
+
private final AMQProtocolSession _protocolSession;
private ProtocolOutputConverterImpl(AMQProtocolSession session)
@@ -78,10 +76,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
}
+
private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
throws AMQException
{
@@ -93,65 +92,120 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
+ ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
return chb;
}
}
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
{
- AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
}
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver)
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
throws AMQException
{
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb);
-
+ int bodySize = (int) message.getSize();
- final int bodySize = (int) message.getSize();
if(bodySize == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
writeFrame(compositeBlock);
}
else
{
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- ByteBuffer buf = ByteBuffer.allocate(capacity);
- int writtenSize = 0;
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
}
+ }
+ }
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
}
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
}
- private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
throws AMQException
{
+
final AMQShortString exchangeName;
final AMQShortString routingKey;
@@ -172,21 +226,58 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final boolean isRedelivered = entry.isRedelivered();
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
- BasicDeliverBody deliverBody =
- METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame;
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
}
- private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
throws AMQException
{
final AMQShortString exchangeName;
@@ -215,9 +306,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
exchangeName,
routingKey,
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame;
+ return getOkBody;
}
public byte getProtocolMinorVersion()
@@ -230,31 +320,28 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return getProtocolSession().getProtocolMajorVersion();
}
- private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
{
+
BasicReturnBody basicReturnBody =
METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
- return returnFrame;
+
+ return basicReturnBody;
}
- public void writeReturn(MessagePublishInfo messagePublishInfo,
- ContentHeaderBody header,
- MessageContentSource content,
- int channelId,
- int replyCode,
- AMQShortString replyText)
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
-
- writeMessageDelivery(content, header, channelId, returnFrame);
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+ writeMessageDelivery(message, header, channelId, returnFrame);
}
@@ -266,8 +353,68 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
+
BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 319b5cc7bd..aef3483282 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -20,9 +20,6 @@ package org.apache.qpid.server.output.amqp0_9;
*
*/
-
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -38,11 +35,13 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- private static final ProtocolVersionMethodConverter
- PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -121,15 +120,12 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- int writtenSize = 0;
+ int writtenSize = capacity;
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
CompositeAMQBodyBlock
compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +133,55 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ writeFrame(new AMQFrame(channelId, body));
}
}
}
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
@@ -221,7 +257,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return _underlyingBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if(_underlyingBody == null)
{
@@ -346,7 +382,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
@@ -374,7 +410,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
index cffbe445ee..10748298bc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
@@ -20,9 +20,6 @@ package org.apache.qpid.server.output.amqp0_9_1;
*
*/
-
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -33,17 +30,16 @@ import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
- private static final ProtocolVersionMethodConverter
- PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
public static Factory getInstanceFactory()
{
@@ -121,15 +117,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- int writtenSize = 0;
+ int writtenSize = capacity;
-
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
CompositeAMQBodyBlock
compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +129,54 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ writeFrame(new AMQFrame(channelId, body));
}
}
}
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
@@ -221,7 +252,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return _underlyingBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if(_underlyingBody == null)
{
@@ -346,7 +377,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
@@ -374,7 +405,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 88022ba519..5332031362 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -348,7 +350,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
- ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
try
{
// Log incomming protocol negotiation request
@@ -368,7 +370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
null,
mechanisms.getBytes(),
locales.getBytes());
- _sender.send(responseBody.generateFrame(0).toNioByteBuffer());
+ _sender.send(asByteBuffer(responseBody.generateFrame(0)));
_sender.flush();
}
@@ -376,11 +378,43 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
+ _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
_sender.flush();
}
}
+ private ByteBuffer asByteBuffer(AMQDataBlock block)
+ {
+ final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+
+ try
+ {
+ block.writePayload(new DataOutputStream(new OutputStream()
+ {
+
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ buf.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ buf.put(b, off, len);
+ }
+ }));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ buf.flip();
+ return buf;
+ }
+
public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
@@ -491,7 +525,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public synchronized void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
- final ByteBuffer buf = frame.toNioByteBuffer();
+ final ByteBuffer buf = asByteBuffer(frame);
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
@@ -1020,7 +1054,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public void writerIdle()
{
- _sender.send(HeartbeatBody.FRAME.toNioByteBuffer());
+ _sender.send(asByteBuffer(HeartbeatBody.FRAME));
}
public void exception(Throwable throwable)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 3e3288404f..a56f5685b8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -139,7 +139,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
public int addContentBodyFrame(final ContentChunk contentChunk)
throws AMQException
{
- _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
+ _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
@@ -263,7 +263,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
int written = 0;
for(ContentChunk cb : _contentChunks)
{
- ByteBuffer data = cb.getData().buf();
+ ByteBuffer data = ByteBuffer.wrap(cb.getData());
if(offset+written >= pos && offset < pos + data.limit())
{
ByteBuffer src = data.duplicate();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
index 9f56b8521a..dee40e7069 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.security.auth.sasl.amqplain;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import javax.security.auth.callback.Callback;
@@ -31,7 +33,6 @@ import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -60,7 +61,7 @@ public class AmqPlainSaslServer implements SaslServer
{
try
{
- final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length);
+ final FieldTable ft = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(response)), response.length);
String username = (String) ft.getString("LOGIN");
// we do not care about the prompt but it throws if null
NameCallback nameCb = new NameCallback("prompt", username);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java
index b4cce15d88..52d36023c2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java
@@ -20,21 +20,9 @@
*/
package org.apache.qpid.server.security.auth.sasl.anonymous;
-import java.io.IOException;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
public class AnonymousSaslServer implements SaslServer
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 2e694b24ea..8b099b62ce 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
@@ -479,9 +480,15 @@ public class DerbyMessageStore implements MessageStore
FieldTable arguments;
if(dataAsBytes.length > 0)
{
- org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes);
- arguments = new FieldTable(buffer,buffer.limit());
+ try
+ {
+ arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("IO Exception should not be thrown",e);
+ }
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java
new file mode 100644
index 0000000000..898a667736
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferInputStream.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ByteBufferInputStream extends InputStream
+{
+ private final ByteBuffer _buffer;
+
+ public ByteBufferInputStream(ByteBuffer buffer)
+ {
+ _buffer = buffer;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return _buffer.get() & 0xFF;
+ }
+
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ if(_buffer.remaining() < len)
+ {
+ len = _buffer.remaining();
+ }
+ _buffer.get(b, off, len);
+
+ return len;
+ }
+
+ @Override
+ public void mark(int readlimit)
+ {
+ _buffer.mark();
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ _buffer.reset();
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return true;
+ }
+
+ @Override
+ public long skip(long n) throws IOException
+ {
+
+ _buffer.position(_buffer.position()+(int)n);
+
+ return n;
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ return _buffer.remaining();
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java
new file mode 100644
index 0000000000..ca9a41bc32
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/util/ByteBufferOutputStream.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class ByteBufferOutputStream extends OutputStream
+{
+ private final ByteBuffer _buffer;
+
+ public ByteBufferOutputStream(ByteBuffer buffer)
+ {
+ _buffer = buffer;
+ }
+
+ @Override
+ public void write(int b)
+ {
+ _buffer.put((byte)b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len)
+ {
+ _buffer.put(b, off, len);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 96a9ac729e..0fd31973b2 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -43,7 +43,10 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.util.ByteBufferInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -236,7 +239,14 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
FieldTable argumentsFT = null;
if(buf != null)
{
- argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
+ try
+ {
+ argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("IOException should not be thrown here", e);
+ }
}
BindingFactory bf = _virtualHost.getBindingFactory();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 6db1560fb7..3b7f5f3a51 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -52,6 +52,7 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -565,8 +566,8 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
int pos = 0;
for(ContentBody body : bodies)
{
- storedMessage.addContent(pos, body.payload.duplicate().buf());
- pos += body.payload.limit();
+ storedMessage.addContent(pos, ByteBuffer.wrap(body._payload));
+ pos += body._payload.length;
}
_incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 4272c77798..47b8b7eb18 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -312,18 +311,14 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase
{
messages[i].addContentBodyFrame(new ContentChunk(){
- ByteBuffer _data = ByteBuffer.allocate((int)size);
-
- {
- _data.limit((int)size);
- }
+ byte[] _data = new byte[(int)size];
public int getSize()
{
return (int) size;
}
- public ByteBuffer getData()
+ public byte[] getData()
{
return _data;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 365353e734..070d105805 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -37,7 +37,6 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
@@ -275,18 +274,14 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
msg.addContentBodyFrame(new ContentChunk()
{
- ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
-
- {
- _data.limit((int)MESSAGE_SIZE);
- }
+ byte[] _data = new byte[((int)MESSAGE_SIZE)];
public int getSize()
{
return (int) MESSAGE_SIZE;
}
- public ByteBuffer getData()
+ public byte[] getData()
{
return _data;
}
@@ -441,8 +436,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
getSession().getMethodRegistry()
.getProtocolVersionMethodConverter()
.convertToContentChunk(
- new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
- MESSAGE_SIZE)));
+ new ContentBody(new byte[(int) MESSAGE_SIZE])));
AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
for(BaseQueue q : currentMessage.getDestinationQueues())