summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-01 15:18:17 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-01 15:18:17 +0000
commit50876b8a80c5bfd4ba125f87e07fe77669520c80 (patch)
tree1e1a28291299870ac58b658270ec16e42fe5de9e /qpid/java/broker-plugins
parente8a5ab04596ff422b3609cf1c454bdc76b473399 (diff)
downloadqpid-python-50876b8a80c5bfd4ba125f87e07fe77669520c80.tar.gz
Reduce copying in 0-9 path
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1656312 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java33
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java39
2 files changed, 65 insertions, 7 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index f2c51d0203..1aa4ef0b3f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -486,7 +486,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
serverProperties,
mechanisms.getBytes(),
locales.getBytes());
- _sender.send(asByteBuffer(responseBody.generateFrame(0)));
+ try
+ {
+ responseBody.generateFrame(0).writePayload(_sender);
+ }
+ catch (IOException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
_sender.flush();
}
@@ -494,7 +501,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
+ try
+ {
+ new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).writePayload(_sender);
+ }
+ catch (IOException ioex)
+ {
+ throw new ServerScopedRuntimeException(ioex);
+ }
_sender.flush();
}
}
@@ -546,16 +560,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
*/
public synchronized void writeFrame(AMQDataBlock frame)
{
-
- final ByteBuffer buf = asByteBuffer(frame);
- _writtenBytes += buf.remaining();
-
if(_logger.isDebugEnabled())
{
_logger.debug("SEND: " + frame);
}
- _sender.send(buf);
+ try
+ {
+ _writtenBytes += frame.writePayload(_sender);
+ }
+ catch (IOException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+
+
final long time = System.currentTimeMillis();
_lastIoTime = time;
_lastWriteTime.set(time);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
index b616aab126..4a84ccad37 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.util.GZIPUtils;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -255,6 +256,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ ByteBuffer buf = _message.getContent(_offset, _length);
+ long size = buf.remaining();
+ sender.send(buf.duplicate());
+ return size;
+ }
+
public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
{
throw new UnsupportedOperationException();
@@ -346,6 +356,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
_underlyingBody.writePayload(buffer);
}
+ public long writePayload(ByteBufferSender sender) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.writePayload(sender);
+ }
+
public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
throws AMQException
{
@@ -449,6 +468,18 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
@Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender);
+
+ size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
+
+ size += (new AMQFrame(_channel, _contentBody)).writePayload(sender);
+
+ return size;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();
@@ -490,6 +521,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
@Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender);
+ size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
+ return size;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();