diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-01-31 20:07:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-01-31 20:07:36 +0000 |
| commit | 26eab7ed4556717fca50ad93025fdc8d112f9715 (patch) | |
| tree | c4019683f17a8ec570786ff067a0d261c5c603e0 /qpid/java/broker-plugins | |
| parent | aef6c73485912be3be3d9bc60bb9671c951368c6 (diff) | |
| download | qpid-python-26eab7ed4556717fca50ad93025fdc8d112f9715.tar.gz | |
Separate Byte and ProtocolEvent sender/receivers, add server specific 0-10 encoder
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1656248 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
7 files changed, 637 insertions, 24 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 58b6a3abcb..401c6fc939 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -32,10 +32,9 @@ import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Constant; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; @@ -68,7 +67,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) { @@ -88,7 +87,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _network = network; _connection.setNetworkConnection(network); - Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); + ServerDisassembler disassembler = new ServerDisassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); _connection.setSender(disassembler); _connection.addFrameSizeObserver(disassembler); // FIXME Two log messages to maintain compatibility with earlier protocol versions @@ -97,19 +96,15 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender) + private ByteBufferSender wrapSender(final ByteBufferSender sender) { - return new Sender<ByteBuffer>() + return new ByteBufferSender() { @Override public void send(ByteBuffer msg) { _lastWriteTime = System.currentTimeMillis(); - ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]); - copy.put(msg); - copy.flip(); - sender.send(copy); - + sender.send(msg); } @Override diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java new file mode 100644 index 0000000000..a42238a40d --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java @@ -0,0 +1,248 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.FrameSizeObserver; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.ProtocolDelegate; +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolEventSender; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.SegmentType; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.codec.Encoder; +import org.apache.qpid.transport.network.Frame; + +/** + * Disassembler + */ +public final class ServerDisassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver +{ + private final ByteBufferSender _sender; + private int _maxPayload; + private final Object _sendLock = new Object(); + private final Encoder _encoder = new ServerEncoder(); + + public ServerDisassembler(ByteBufferSender sender, int maxFrame) + { + _sender = sender; + if (maxFrame <= HEADER_SIZE || maxFrame >= 64 * 1024) + { + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + _maxPayload = maxFrame - HEADER_SIZE; + } + + public void send(ProtocolEvent event) + { + synchronized (_sendLock) + { + event.delegate(null, this); + } + } + + public void flush() + { + synchronized (_sendLock) + { + _sender.flush(); + } + } + + public void close() + { + synchronized (_sendLock) + { + _sender.close(); + } + } + + private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) + { + ByteBuffer data = ByteBuffer.wrap(new byte[HEADER_SIZE]); + + data.put(0, flags); + data.put(1, type); + data.putShort(2, (short) (size + HEADER_SIZE)); + data.put(5, track); + data.putShort(6, (short) channel); + + + ByteBuffer dup = buf.duplicate(); + dup.limit(dup.position() + size); + buf.position(buf.position() + size); + _sender.send(data); + _sender.send(dup); + + + } + + private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf) + { + byte typeb = (byte) type.getValue(); + byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; + + int remaining = buf.remaining(); + boolean first = true; + while (true) + { + int size = min(_maxPayload, remaining); + remaining -= size; + + byte newflags = flags; + if (first) + { + newflags |= FIRST_FRAME; + first = false; + } + if (remaining == 0) + { + newflags |= LAST_FRAME; + } + + frame(newflags, typeb, track, event.getChannel(), size, buf); + + if (remaining == 0) + { + break; + } + } + } + + public void init(Void v, ProtocolHeader header) + { + _sender.send(header.toByteBuffer()); + _sender.flush(); +} + + public void control(Void v, Method method) + { + method(method, SegmentType.CONTROL); + } + + public void command(Void v, Method method) + { + method(method, SegmentType.COMMAND); + } + + private void method(Method method, SegmentType type) + { + Encoder enc = _encoder; + enc.init(); + enc.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + if (method.isSync()) + { + enc.writeUint16(0x0101); + } + else + { + enc.writeUint16(0x0100); + } + } + method.write(enc); + int methodLimit = enc.position(); + + byte flags = FIRST_SEG; + + boolean payload = method.hasPayload(); + if (!payload) + { + flags |= LAST_SEG; + } + + int headerLimit = -1; + if (payload) + { + final Header hdr = method.getHeader(); + if (hdr != null) + { + if (hdr.getDeliveryProperties() != null) + { + enc.writeStruct32(hdr.getDeliveryProperties()); + } + if (hdr.getMessageProperties() != null) + { + enc.writeStruct32(hdr.getMessageProperties()); + } + if (hdr.getNonStandardProperties() != null) + { + for (Struct st : hdr.getNonStandardProperties()) + { + enc.writeStruct32(st); + } + } + } + + headerLimit = enc.position(); + } + synchronized (_sendLock) + { + ByteBuffer buf = enc.underlyingBuffer(); + buf.position(0); + buf.limit(methodLimit); + + fragment(flags, type, method, buf.duplicate()); + if (payload) + { + ByteBuffer body = method.getBody(); + buf.limit(headerLimit); + buf.position(methodLimit); + + fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf.duplicate()); + if (body != null) + { + fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate()); + } + + } + } + } + + public void error(Void v, ProtocolError error) + { + throw new IllegalArgumentException(String.valueOf(error)); + } + + @Override + public void setMaxFrameSize(final int maxFrame) + { + if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) + { + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + this._maxPayload = maxFrame - HEADER_SIZE; + + } +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java new file mode 100644 index 0000000000..94a444b590 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java @@ -0,0 +1,369 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.UUID; + +import org.apache.qpid.transport.codec.AbstractEncoder; + + +public final class ServerEncoder extends AbstractEncoder +{ + public static final int DEFAULT_CAPACITY = 4096; + private ByteBuffer _out; + private int _segment; + private int _initialCapacity; + + public ServerEncoder() + { + this(DEFAULT_CAPACITY); + } + + public ServerEncoder(int capacity) + { + _initialCapacity = capacity; + _out = ByteBuffer.allocate(capacity); + _segment = 0; + } + + public void init() + { + _out.position(_out.limit()); + _out.limit(_out.capacity()); + _out = _out.slice(); + if(_out.remaining() < 256) + { + _out = ByteBuffer.allocate(_initialCapacity); + } + _segment = 0; + } + + public ByteBuffer buffer() + { + int pos = _out.position(); + _out.position(_segment); + ByteBuffer slice = _out.slice(); + slice.limit(pos - _segment); + _out.position(pos); + return slice; + } + + public int position() + { + return _out.position(); + } + + public ByteBuffer underlyingBuffer() + { + return _out; + } + + private void grow(int size) + { + ByteBuffer old = _out; + int capacity = old.capacity(); + _out = ByteBuffer.allocate(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity)); + old.flip(); + _out.put(old); + } + + protected void doPut(byte b) + { + try + { + _out.put(b); + } + catch (BufferOverflowException e) + { + grow(1); + _out.put(b); + } + } + + protected void doPut(ByteBuffer src) + { + try + { + _out.put(src); + } + catch (BufferOverflowException e) + { + grow(src.remaining()); + _out.put(src); + } + } + + protected void put(byte[] bytes) + { + try + { + _out.put(bytes); + } + catch (BufferOverflowException e) + { + grow(bytes.length); + _out.put(bytes); + } + } + + public void writeUint8(short b) + { + assert b < 0x100; + + try + { + _out.put((byte) b); + } + catch (BufferOverflowException e) + { + grow(1); + _out.put((byte) b); + } + } + + public void writeUint16(int s) + { + assert s < 0x10000; + + try + { + _out.putShort((short) s); + } + catch (BufferOverflowException e) + { + grow(2); + _out.putShort((short) s); + } + } + + public void writeUint32(long i) + { + assert i < 0x100000000L; + + try + { + _out.putInt((int) i); + } + catch (BufferOverflowException e) + { + grow(4); + _out.putInt((int) i); + } + } + + public void writeUint64(long l) + { + try + { + _out.putLong(l); + } + catch (BufferOverflowException e) + { + grow(8); + _out.putLong(l); + } + } + + public int beginSize8() + { + int pos = _out.position(); + try + { + _out.put((byte) 0); + } + catch (BufferOverflowException e) + { + grow(1); + _out.put((byte) 0); + } + return pos; + } + + public void endSize8(int pos) + { + int cur = _out.position(); + _out.put(pos, (byte) (cur - pos - 1)); + } + + public int beginSize16() + { + int pos = _out.position(); + try + { + _out.putShort((short) 0); + } + catch (BufferOverflowException e) + { + grow(2); + _out.putShort((short) 0); + } + return pos; + } + + public void endSize16(int pos) + { + int cur = _out.position(); + _out.putShort(pos, (short) (cur - pos - 2)); + } + + public int beginSize32() + { + int pos = _out.position(); + try + { + _out.putInt(0); + } + catch (BufferOverflowException e) + { + grow(4); + _out.putInt(0); + } + return pos; + + } + + public void endSize32(int pos) + { + int cur = _out.position(); + _out.putInt(pos, (cur - pos - 4)); + + } + + public void writeDouble(double aDouble) + { + try + { + _out.putDouble(aDouble); + } + catch(BufferOverflowException exception) + { + grow(8); + _out.putDouble(aDouble); + } + } + + public void writeInt16(short aShort) + { + try + { + _out.putShort(aShort); + } + catch(BufferOverflowException exception) + { + grow(2); + _out.putShort(aShort); + } + } + + public void writeInt32(int anInt) + { + try + { + _out.putInt(anInt); + } + catch(BufferOverflowException exception) + { + grow(4); + _out.putInt(anInt); + } + } + + public void writeInt64(long aLong) + { + try + { + _out.putLong(aLong); + } + catch(BufferOverflowException exception) + { + grow(8); + _out.putLong(aLong); + } + } + + public void writeInt8(byte aByte) + { + try + { + _out.put(aByte); + } + catch(BufferOverflowException exception) + { + grow(1); + _out.put(aByte); + } + } + + public void writeBin128(byte[] byteArray) + { + byteArray = (byteArray != null) ? byteArray : new byte [16]; + + assert byteArray.length == 16; + + try + { + _out.put(byteArray); + } + catch(BufferOverflowException exception) + { + grow(16); + _out.put(byteArray); + } + } + + public void writeBin128(UUID id) + { + byte[] data = new byte[16]; + + long msb = id.getMostSignificantBits(); + long lsb = id.getLeastSignificantBits(); + + assert data.length == 16; + for (int i=7; i>=0; i--) + { + data[i] = (byte)(msb & 0xff); + msb = msb >> 8; + } + + for (int i=15; i>=8; i--) + { + data[i] = (byte)(lsb & 0xff); + lsb = (lsb >> 8); + } + writeBin128(data); + } + + public void writeFloat(float aFloat) + { + try + { + _out.putFloat(aFloat); + } + catch(BufferOverflowException exception) + { + grow(4); + _out.putFloat(aFloat); + } + } + +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index cea9b0930f..f2c51d0203 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -85,7 +85,7 @@ import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; @@ -167,7 +167,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private volatile boolean _deferFlush; private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want? @@ -272,7 +272,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, setNetworkConnection(network, network.getSender()); } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { _network = network; _sender = sender; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 622cf32d17..0f198a8d46 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -50,7 +51,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter @@ -282,11 +283,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr private String _remoteHost = "127.0.0.1"; private String _localHost = "127.0.0.1"; private int _port = portNumber.incrementAndGet(); - private final Sender<ByteBuffer> _sender; + private final ByteBufferSender _sender; public TestNetworkConnection() { - _sender = new Sender<ByteBuffer>() + _sender = new ByteBufferSender() { public void send(ByteBuffer msg) { @@ -348,7 +349,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr } @Override - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _sender; } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index b2783a2da2..147ccd4edd 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -59,7 +59,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; @@ -116,7 +116,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private byte _revision; private PrintWriter _out; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private Connection_1_0 _connection; private volatile boolean _transportBlockedForWriting; @@ -185,7 +185,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { } - public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { _network = network; _sender = sender; diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index 940e24d7cf..896a7119f7 100644 --- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -53,7 +53,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.transport.AcceptingTransport; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.security.ssl.SSLUtil; @@ -240,7 +240,7 @@ class WebSocketProvider implements AcceptingTransport } } - private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer> + private class ConnectionWrapper implements NetworkConnection, ByteBufferSender { private final WebSocket.Connection _connection; private final SocketAddress _localAddress; @@ -259,7 +259,7 @@ class WebSocketProvider implements AcceptingTransport } @Override - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return this; } |
