diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-01 15:18:17 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-01 15:18:17 +0000 |
| commit | 50876b8a80c5bfd4ba125f87e07fe77669520c80 (patch) | |
| tree | 1e1a28291299870ac58b658270ec16e42fe5de9e /qpid/java/broker-plugins | |
| parent | e8a5ab04596ff422b3609cf1c454bdc76b473399 (diff) | |
| download | qpid-python-50876b8a80c5bfd4ba125f87e07fe77669520c80.tar.gz | |
Reduce copying in 0-9 path
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1656312 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
2 files changed, 65 insertions, 7 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index f2c51d0203..1aa4ef0b3f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -486,7 +486,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, serverProperties, mechanisms.getBytes(), locales.getBytes()); - _sender.send(asByteBuffer(responseBody.generateFrame(0))); + try + { + responseBody.generateFrame(0).writePayload(_sender); + } + catch (IOException e) + { + throw new ServerScopedRuntimeException(e); + } _sender.flush(); } @@ -494,7 +501,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()))); + try + { + new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).writePayload(_sender); + } + catch (IOException ioex) + { + throw new ServerScopedRuntimeException(ioex); + } _sender.flush(); } } @@ -546,16 +560,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine, */ public synchronized void writeFrame(AMQDataBlock frame) { - - final ByteBuffer buf = asByteBuffer(frame); - _writtenBytes += buf.remaining(); - if(_logger.isDebugEnabled()) { _logger.debug("SEND: " + frame); } - _sender.send(buf); + try + { + _writtenBytes += frame.writePayload(_sender); + } + catch (IOException e) + { + throw new ServerScopedRuntimeException(e); + } + + final long time = System.currentTimeMillis(); _lastIoTime = time; _lastWriteTime.set(time); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java index b616aab126..4a84ccad37 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.util.GZIPUtils; public class ProtocolOutputConverterImpl implements ProtocolOutputConverter @@ -255,6 +256,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + ByteBuffer buf = _message.getContent(_offset, _length); + long size = buf.remaining(); + sender.send(buf.duplicate()); + return size; + } + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException { throw new UnsupportedOperationException(); @@ -346,6 +356,15 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter _underlyingBody.writePayload(buffer); } + public long writePayload(ByteBufferSender sender) throws IOException + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.writePayload(sender); + } + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException { @@ -449,6 +468,18 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender); + + size += (new AMQFrame(_channel, _headerBody)).writePayload(sender); + + size += (new AMQFrame(_channel, _contentBody)).writePayload(sender); + + return size; + } + + @Override public String toString() { StringBuilder builder = new StringBuilder(); @@ -490,6 +521,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender); + size += (new AMQFrame(_channel, _headerBody)).writePayload(sender); + return size; + } + + @Override public String toString() { StringBuilder builder = new StringBuilder(); |
