diff options
Diffstat (limited to 'qpid/java')
52 files changed, 928 insertions, 253 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 3c25e0934c..eb7599c5cc 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -37,7 +37,7 @@ import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ProtocolEngineCreator; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; public class MultiVersionProtocolEngine implements ServerProtocolEngine @@ -54,7 +54,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private String _fqdn; private final Broker<?> _broker; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private final Protocol _defaultSupportedReply; private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); @@ -171,7 +171,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { _network = network; SocketAddress address = _network.getLocalAddress(); @@ -253,7 +253,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { } @@ -494,7 +494,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 58b6a3abcb..401c6fc939 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -32,10 +32,9 @@ import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Constant; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; @@ -68,7 +67,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) { @@ -88,7 +87,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _network = network; _connection.setNetworkConnection(network); - Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); + ServerDisassembler disassembler = new ServerDisassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); _connection.setSender(disassembler); _connection.addFrameSizeObserver(disassembler); // FIXME Two log messages to maintain compatibility with earlier protocol versions @@ -97,19 +96,15 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender) + private ByteBufferSender wrapSender(final ByteBufferSender sender) { - return new Sender<ByteBuffer>() + return new ByteBufferSender() { @Override public void send(ByteBuffer msg) { _lastWriteTime = System.currentTimeMillis(); - ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]); - copy.put(msg); - copy.flip(); - sender.send(copy); - + sender.send(msg); } @Override diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java new file mode 100644 index 0000000000..a42238a40d --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java @@ -0,0 +1,248 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.FrameSizeObserver; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.ProtocolDelegate; +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolEventSender; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.SegmentType; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.codec.Encoder; +import org.apache.qpid.transport.network.Frame; + +/** + * Disassembler + */ +public final class ServerDisassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver +{ + private final ByteBufferSender _sender; + private int _maxPayload; + private final Object _sendLock = new Object(); + private final Encoder _encoder = new ServerEncoder(); + + public ServerDisassembler(ByteBufferSender sender, int maxFrame) + { + _sender = sender; + if (maxFrame <= HEADER_SIZE || maxFrame >= 64 * 1024) + { + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + _maxPayload = maxFrame - HEADER_SIZE; + } + + public void send(ProtocolEvent event) + { + synchronized (_sendLock) + { + event.delegate(null, this); + } + } + + public void flush() + { + synchronized (_sendLock) + { + _sender.flush(); + } + } + + public void close() + { + synchronized (_sendLock) + { + _sender.close(); + } + } + + private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) + { + ByteBuffer data = ByteBuffer.wrap(new byte[HEADER_SIZE]); + + data.put(0, flags); + data.put(1, type); + data.putShort(2, (short) (size + HEADER_SIZE)); + data.put(5, track); + data.putShort(6, (short) channel); + + + ByteBuffer dup = buf.duplicate(); + dup.limit(dup.position() + size); + buf.position(buf.position() + size); + _sender.send(data); + _sender.send(dup); + + + } + + private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf) + { + byte typeb = (byte) type.getValue(); + byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; + + int remaining = buf.remaining(); + boolean first = true; + while (true) + { + int size = min(_maxPayload, remaining); + remaining -= size; + + byte newflags = flags; + if (first) + { + newflags |= FIRST_FRAME; + first = false; + } + if (remaining == 0) + { + newflags |= LAST_FRAME; + } + + frame(newflags, typeb, track, event.getChannel(), size, buf); + + if (remaining == 0) + { + break; + } + } + } + + public void init(Void v, ProtocolHeader header) + { + _sender.send(header.toByteBuffer()); + _sender.flush(); +} + + public void control(Void v, Method method) + { + method(method, SegmentType.CONTROL); + } + + public void command(Void v, Method method) + { + method(method, SegmentType.COMMAND); + } + + private void method(Method method, SegmentType type) + { + Encoder enc = _encoder; + enc.init(); + enc.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + if (method.isSync()) + { + enc.writeUint16(0x0101); + } + else + { + enc.writeUint16(0x0100); + } + } + method.write(enc); + int methodLimit = enc.position(); + + byte flags = FIRST_SEG; + + boolean payload = method.hasPayload(); + if (!payload) + { + flags |= LAST_SEG; + } + + int headerLimit = -1; + if (payload) + { + final Header hdr = method.getHeader(); + if (hdr != null) + { + if (hdr.getDeliveryProperties() != null) + { + enc.writeStruct32(hdr.getDeliveryProperties()); + } + if (hdr.getMessageProperties() != null) + { + enc.writeStruct32(hdr.getMessageProperties()); + } + if (hdr.getNonStandardProperties() != null) + { + for (Struct st : hdr.getNonStandardProperties()) + { + enc.writeStruct32(st); + } + } + } + + headerLimit = enc.position(); + } + synchronized (_sendLock) + { + ByteBuffer buf = enc.underlyingBuffer(); + buf.position(0); + buf.limit(methodLimit); + + fragment(flags, type, method, buf.duplicate()); + if (payload) + { + ByteBuffer body = method.getBody(); + buf.limit(headerLimit); + buf.position(methodLimit); + + fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf.duplicate()); + if (body != null) + { + fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate()); + } + + } + } + } + + public void error(Void v, ProtocolError error) + { + throw new IllegalArgumentException(String.valueOf(error)); + } + + @Override + public void setMaxFrameSize(final int maxFrame) + { + if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) + { + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + this._maxPayload = maxFrame - HEADER_SIZE; + + } +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java new file mode 100644 index 0000000000..94a444b590 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java @@ -0,0 +1,369 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.UUID; + +import org.apache.qpid.transport.codec.AbstractEncoder; + + +public final class ServerEncoder extends AbstractEncoder +{ + public static final int DEFAULT_CAPACITY = 4096; + private ByteBuffer _out; + private int _segment; + private int _initialCapacity; + + public ServerEncoder() + { + this(DEFAULT_CAPACITY); + } + + public ServerEncoder(int capacity) + { + _initialCapacity = capacity; + _out = ByteBuffer.allocate(capacity); + _segment = 0; + } + + public void init() + { + _out.position(_out.limit()); + _out.limit(_out.capacity()); + _out = _out.slice(); + if(_out.remaining() < 256) + { + _out = ByteBuffer.allocate(_initialCapacity); + } + _segment = 0; + } + + public ByteBuffer buffer() + { + int pos = _out.position(); + _out.position(_segment); + ByteBuffer slice = _out.slice(); + slice.limit(pos - _segment); + _out.position(pos); + return slice; + } + + public int position() + { + return _out.position(); + } + + public ByteBuffer underlyingBuffer() + { + return _out; + } + + private void grow(int size) + { + ByteBuffer old = _out; + int capacity = old.capacity(); + _out = ByteBuffer.allocate(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity)); + old.flip(); + _out.put(old); + } + + protected void doPut(byte b) + { + try + { + _out.put(b); + } + catch (BufferOverflowException e) + { + grow(1); + _out.put(b); + } + } + + protected void doPut(ByteBuffer src) + { + try + { + _out.put(src); + } + catch (BufferOverflowException e) + { + grow(src.remaining()); + _out.put(src); + } + } + + protected void put(byte[] bytes) + { + try + { + _out.put(bytes); + } + catch (BufferOverflowException e) + { + grow(bytes.length); + _out.put(bytes); + } + } + + public void writeUint8(short b) + { + assert b < 0x100; + + try + { + _out.put((byte) b); + } + catch (BufferOverflowException e) + { + grow(1); + _out.put((byte) b); + } + } + + public void writeUint16(int s) + { + assert s < 0x10000; + + try + { + _out.putShort((short) s); + } + catch (BufferOverflowException e) + { + grow(2); + _out.putShort((short) s); + } + } + + public void writeUint32(long i) + { + assert i < 0x100000000L; + + try + { + _out.putInt((int) i); + } + catch (BufferOverflowException e) + { + grow(4); + _out.putInt((int) i); + } + } + + public void writeUint64(long l) + { + try + { + _out.putLong(l); + } + catch (BufferOverflowException e) + { + grow(8); + _out.putLong(l); + } + } + + public int beginSize8() + { + int pos = _out.position(); + try + { + _out.put((byte) 0); + } + catch (BufferOverflowException e) + { + grow(1); + _out.put((byte) 0); + } + return pos; + } + + public void endSize8(int pos) + { + int cur = _out.position(); + _out.put(pos, (byte) (cur - pos - 1)); + } + + public int beginSize16() + { + int pos = _out.position(); + try + { + _out.putShort((short) 0); + } + catch (BufferOverflowException e) + { + grow(2); + _out.putShort((short) 0); + } + return pos; + } + + public void endSize16(int pos) + { + int cur = _out.position(); + _out.putShort(pos, (short) (cur - pos - 2)); + } + + public int beginSize32() + { + int pos = _out.position(); + try + { + _out.putInt(0); + } + catch (BufferOverflowException e) + { + grow(4); + _out.putInt(0); + } + return pos; + + } + + public void endSize32(int pos) + { + int cur = _out.position(); + _out.putInt(pos, (cur - pos - 4)); + + } + + public void writeDouble(double aDouble) + { + try + { + _out.putDouble(aDouble); + } + catch(BufferOverflowException exception) + { + grow(8); + _out.putDouble(aDouble); + } + } + + public void writeInt16(short aShort) + { + try + { + _out.putShort(aShort); + } + catch(BufferOverflowException exception) + { + grow(2); + _out.putShort(aShort); + } + } + + public void writeInt32(int anInt) + { + try + { + _out.putInt(anInt); + } + catch(BufferOverflowException exception) + { + grow(4); + _out.putInt(anInt); + } + } + + public void writeInt64(long aLong) + { + try + { + _out.putLong(aLong); + } + catch(BufferOverflowException exception) + { + grow(8); + _out.putLong(aLong); + } + } + + public void writeInt8(byte aByte) + { + try + { + _out.put(aByte); + } + catch(BufferOverflowException exception) + { + grow(1); + _out.put(aByte); + } + } + + public void writeBin128(byte[] byteArray) + { + byteArray = (byteArray != null) ? byteArray : new byte [16]; + + assert byteArray.length == 16; + + try + { + _out.put(byteArray); + } + catch(BufferOverflowException exception) + { + grow(16); + _out.put(byteArray); + } + } + + public void writeBin128(UUID id) + { + byte[] data = new byte[16]; + + long msb = id.getMostSignificantBits(); + long lsb = id.getLeastSignificantBits(); + + assert data.length == 16; + for (int i=7; i>=0; i--) + { + data[i] = (byte)(msb & 0xff); + msb = msb >> 8; + } + + for (int i=15; i>=8; i--) + { + data[i] = (byte)(lsb & 0xff); + lsb = (lsb >> 8); + } + writeBin128(data); + } + + public void writeFloat(float aFloat) + { + try + { + _out.putFloat(aFloat); + } + catch(BufferOverflowException exception) + { + grow(4); + _out.putFloat(aFloat); + } + } + +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index cea9b0930f..f2c51d0203 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -85,7 +85,7 @@ import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; @@ -167,7 +167,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private volatile boolean _deferFlush; private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want? @@ -272,7 +272,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, setNetworkConnection(network, network.getSender()); } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { _network = network; _sender = sender; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 622cf32d17..0f198a8d46 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -50,7 +51,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter @@ -282,11 +283,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr private String _remoteHost = "127.0.0.1"; private String _localHost = "127.0.0.1"; private int _port = portNumber.incrementAndGet(); - private final Sender<ByteBuffer> _sender; + private final ByteBufferSender _sender; public TestNetworkConnection() { - _sender = new Sender<ByteBuffer>() + _sender = new ByteBufferSender() { public void send(ByteBuffer msg) { @@ -348,7 +349,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr } @Override - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _sender; } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index b2783a2da2..147ccd4edd 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -59,7 +59,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; @@ -116,7 +116,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private byte _revision; private PrintWriter _out; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private Connection_1_0 _connection; private volatile boolean _transportBlockedForWriting; @@ -185,7 +185,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { } - public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { _network = network; _sender = sender; diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index 940e24d7cf..896a7119f7 100644 --- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -53,7 +53,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.transport.AcceptingTransport; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.security.ssl.SSLUtil; @@ -240,7 +240,7 @@ class WebSocketProvider implements AcceptingTransport } } - private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer> + private class ConnectionWrapper implements NetworkConnection, ByteBufferSender { private final WebSocket.Connection _connection; private final SocketAddress _localAddress; @@ -259,7 +259,7 @@ class WebSocketProvider implements AcceptingTransport } @Override - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return this; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 35582d92b7..d0d9d28398 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.XASession; -import org.apache.qpid.transport.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +59,7 @@ import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.Session; import org.apache.qpid.properties.ConnectionStartProperties; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.OutgoingNetworkTransport; @@ -527,12 +527,12 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } - private static class ReceiverClosedWaiter implements Receiver<ByteBuffer> + private static class ReceiverClosedWaiter implements ByteBufferReceiver { private final CountDownLatch _closedWatcher; - private final Receiver<ByteBuffer> _receiver; + private final ByteBufferReceiver _receiver; - public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver) + public ReceiverClosedWaiter(ByteBufferReceiver receiver) { _receiver = receiver; _closedWatcher = new CountDownLatch(1); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index f038fc6e4f..17b0fe1abb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.client.handler; -import java.nio.ByteBuffer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +33,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.TransportException; public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> @@ -95,7 +93,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co } finally { - Sender<ByteBuffer> sender = session.getSender(); + ByteBufferSender sender = session.getSender(); if (error != null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index d5e3027601..f50447b930 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -66,8 +66,8 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; @@ -179,7 +179,7 @@ public class AMQProtocolHandler implements ProtocolEngine private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private long _lastReadTime = System.currentTimeMillis(); private long _lastWriteTime = System.currentTimeMillis(); private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; @@ -905,7 +905,7 @@ public class AMQProtocolHandler implements ProtocolEngine setNetworkConnection(network, network.getSender()); } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { _network = network; _sender = sender; @@ -923,7 +923,7 @@ public class AMQProtocolHandler implements ProtocolEngine return _lastWriteTime; } - protected Sender<ByteBuffer> getSender() + protected ByteBufferSender getSender() { return _sender; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index e5765ee00f..9b0b21f06e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.client.protocol; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -52,8 +51,8 @@ import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; /** @@ -382,7 +381,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _protocolHandler.getSender(); } @@ -476,7 +475,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _protocolHandler.propagateExceptionToAllWaiters(error); } - public void setSender(Sender<java.nio.ByteBuffer> sender) + public void setSender(ByteBufferSender sender) { // No-op, interface munging } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index c7dee5b985..2543c5b500 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -666,7 +666,7 @@ public class AMQSession_0_10Test extends QpidTestCase } } - class MockSender implements Sender<ProtocolEvent> + class MockSender implements ProtocolEventSender { private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>(); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java index 11b34d3dff..ee6704bb39 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java @@ -22,9 +22,9 @@ package org.apache.qpid.client.transport; import java.nio.ByteBuffer; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; -public class MockSender implements Sender<ByteBuffer> +public class MockSender implements ByteBufferSender { public void send(ByteBuffer msg) diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java index c9af1de6a7..cdfa83571b 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java @@ -20,18 +20,18 @@ */ package org.apache.qpid.client.transport; -import java.security.Principal; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; - import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.Principal; + +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.network.NetworkConnection; /** * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, @@ -147,7 +147,7 @@ public class TestNetworkConnection implements NetworkConnection _remoteAddress = address; } - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _sender; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 0c643f6322..73c8653677 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -26,9 +26,7 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.transport.Sender; - -import java.nio.ByteBuffer; +import org.apache.qpid.transport.ByteBufferSender; /** @@ -56,6 +54,6 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException; - public void setSender(Sender<ByteBuffer> sender); + public void setSender(ByteBufferSender sender); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index cad5461d83..f73f6d931a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -21,10 +21,9 @@ package org.apache.qpid.protocol; import java.net.SocketAddress; -import java.nio.ByteBuffer; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.TransportActivity; @@ -32,7 +31,7 @@ import org.apache.qpid.transport.network.TransportActivity; * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received * decodes it and then process the result. */ -public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity +public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity { // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); @@ -58,6 +57,6 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport void encryptedTransport(); - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender); + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java index 8418c42189..f703c01567 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java @@ -26,11 +26,11 @@ package org.apache.qpid.transport; * */ -public interface Binding<E,T> +public interface Binding<E> { - E endpoint(Sender<T> sender); + E endpoint(ByteBufferSender sender); - Receiver<T> receiver(E endpoint); + ByteBufferReceiver receiver(E endpoint); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java new file mode 100644 index 0000000000..1015f061c8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.nio.ByteBuffer; + +public interface ByteBufferReceiver +{ + void received(ByteBuffer msg); + + void exception(Throwable t); + + void closed(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java new file mode 100644 index 0000000000..7dcaf61a26 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.nio.ByteBuffer; + +public interface ByteBufferSender +{ + void send(ByteBuffer msg); + + void flush(); + + void close(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 92ccdb84af..39f27b0fe0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -27,7 +27,6 @@ import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -68,7 +67,7 @@ import org.apache.qpid.util.Strings; */ public class Connection extends ConnectionInvoker - implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> + implements ProtocolEventReceiver, ProtocolEventSender { protected static final Logger log = Logger.get(Connection.class); @@ -113,7 +112,7 @@ public class Connection extends ConnectionInvoker private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY; private ConnectionDelegate delegate; - private Sender<ProtocolEvent> sender; + private ProtocolEventSender sender; final private Map<Binary,Session> sessions = new HashMap<Binary,Session>(); final private Map<Integer,Session> channels = new ConcurrentHashMap<Integer,Session>(); @@ -151,12 +150,12 @@ public class Connection extends ConnectionInvoker listeners.add(listener); } - public Sender<ProtocolEvent> getSender() + public ProtocolEventSender getSender() { return sender; } - public void setSender(Sender<ProtocolEvent> sender) + public void setSender(ProtocolEventSender sender) { this.sender = sender; } @@ -234,7 +233,7 @@ public class Connection extends ConnectionInvoker OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10); final InputHandler inputHandler = new InputHandler(new Assembler(this)); addFrameSizeObserver(inputHandler); - Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler); + ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler); if(secureReceiver instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureReceiver); @@ -246,7 +245,7 @@ public class Connection extends ConnectionInvoker setRemoteAddress(_networkConnection.getRemoteAddress()); setLocalAddress(_networkConnection.getLocalAddress()); - final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender()); + final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender()); if(secureSender instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureSender); @@ -411,7 +410,7 @@ public class Connection extends ConnectionInvoker { log.debug("SEND: [%s] %s", this, event); } - Sender<ProtocolEvent> s = sender; + ProtocolEventSender s = sender; if (s == null) { throw new ConnectionException("connection closed"); @@ -425,7 +424,7 @@ public class Connection extends ConnectionInvoker { log.debug("FLUSH: [%s]", this); } - final Sender<ProtocolEvent> theSender = sender; + final ProtocolEventSender theSender = sender; if(theSender != null) { theSender.flush(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java new file mode 100644 index 0000000000..8b9c3f4f83 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import org.apache.qpid.transport.network.NetworkEvent; + +public interface NetworkEventReceiver +{ + void received(NetworkEvent msg); + + void exception(Throwable t); + + void closed(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java index 2a994580dc..e4ab540ce9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java @@ -20,19 +20,11 @@ */ package org.apache.qpid.transport; - -/** - * Receiver - * - */ - -public interface Receiver<T> +public interface ProtocolEventReceiver { - - void received(T msg); + void received(ProtocolEvent msg); void exception(Throwable t); void closed(); - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java index 9a6f675d7f..418f31b42a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java @@ -20,19 +20,11 @@ */ package org.apache.qpid.transport; - -/** - * Sender - * - */ - -public interface Sender<T> +public interface ProtocolEventSender { - - void send(T msg); + void send(ProtocolEvent msg); void flush(); void close(); - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java index 2b93697bfc..070621db9b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java @@ -20,12 +20,6 @@ */ package org.apache.qpid.transport.codec; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; -import org.apache.qpid.transport.Type; - -import org.apache.qpid.transport.Xid; import static org.apache.qpid.transport.util.Functions.lsb; import java.io.UnsupportedEncodingException; @@ -36,6 +30,12 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.Type; +import org.apache.qpid.transport.Xid; + /** * AbstractEncoder @@ -43,7 +43,7 @@ import java.util.UUID; * @author Rafael H. Schloming */ -abstract class AbstractEncoder implements Encoder +public abstract class AbstractEncoder implements Encoder { private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java index d9150bed65..407df71824 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java @@ -360,8 +360,4 @@ public final class BBEncoder extends AbstractEncoder } } - public void writeMagicNumber() - { - out.put("AM2".getBytes()); - } -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java index a9eea13104..b5ab29cdcf 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.transport.codec; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; - +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; + /** * Encoder interface. @@ -274,9 +275,10 @@ public interface Encoder * @param bytes the bytes array to be encoded. */ void writeBin128(byte [] bytes); - - /** - * Encodes the AMQP magic number. - */ - void writeMagicNumber(); -}
\ No newline at end of file + + int position(); + + ByteBuffer underlyingBuffer(); + + void init(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index a80b988cea..a7e96167c2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,28 +20,29 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.NetworkEventReceiver; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolEventReceiver; import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBDecoder; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * Assembler * */ -public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate +public class Assembler implements NetworkEventReceiver, NetworkDelegate { // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1. @@ -49,7 +50,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1]; private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>(); - private final Receiver<ProtocolEvent> receiver; + private final ProtocolEventReceiver receiver; private final Map<Integer,List<Frame>> segments; private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>() { @@ -59,7 +60,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } }; - public Assembler(Receiver<ProtocolEvent> receiver) + public Assembler(ProtocolEventReceiver receiver) { this.receiver = receiver; segments = new HashMap<Integer,List<Frame>>(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java index 26e8f1850b..5463cd2587 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java @@ -20,15 +20,13 @@ */ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; - import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.Constant; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.security.sasl.SASLReceiver; import org.apache.qpid.transport.network.security.sasl.SASLSender; @@ -38,10 +36,10 @@ import org.apache.qpid.transport.network.security.sasl.SASLSender; */ public abstract class ConnectionBinding - implements Binding<Connection,ByteBuffer> + implements Binding<Connection> { - public static Binding<Connection,ByteBuffer> get(final Connection connection) + public static Binding<Connection> get(final Connection connection) { return new ConnectionBinding() { @@ -52,7 +50,7 @@ public abstract class ConnectionBinding }; } - public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate) + public static Binding<Connection> get(final ConnectionDelegate delegate) { return new ConnectionBinding() { @@ -69,7 +67,7 @@ public abstract class ConnectionBinding public abstract Connection connection(); - public Connection endpoint(Sender<ByteBuffer> sender) + public Connection endpoint(ByteBufferSender sender) { Connection conn = connection(); @@ -87,7 +85,7 @@ public abstract class ConnectionBinding return conn; } - public Receiver<ByteBuffer> receiver(Connection conn) + public ByteBufferReceiver receiver(Connection conn) { final InputHandler inputHandler = new InputHandler(new Assembler(conn)); conn.addFrameSizeObserver(inputHandler); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 81a4c781a4..c45b2049a1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -30,27 +30,29 @@ import static org.apache.qpid.transport.network.Frame.LAST_SEG; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.FrameSizeObserver; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolDelegate; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolEventSender; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.SegmentType; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.transport.codec.Encoder; /** * Disassembler */ -public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver +public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver { - private final Sender<ByteBuffer> sender; + private final ByteBufferSender sender; private int maxPayload; private final Object sendlock = new Object(); - private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>() + private final static ThreadLocal<Encoder> _encoder = new ThreadLocal<Encoder>() { public BBEncoder initialValue() { @@ -58,7 +60,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega } }; - public Disassembler(Sender<ByteBuffer> sender, int maxFrame) + public Disassembler(ByteBufferSender sender, int maxFrame) { this.sender = sender; if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) @@ -174,7 +176,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega private void method(Method method, SegmentType type) { - BBEncoder enc = _encoder.get(); + Encoder enc = _encoder.get(); enc.init(); enc.writeUint16(method.getEncodedType()); if (type == SegmentType.COMMAND) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java index 758c2e1eda..a58bed5877 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -29,11 +29,12 @@ import static org.apache.qpid.transport.util.Functions.str; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.FrameSizeObserver; +import org.apache.qpid.transport.NetworkEventReceiver; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.SegmentType; @@ -43,7 +44,7 @@ import org.apache.qpid.transport.SegmentType; * @author Rafael H. Schloming */ -public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver +public class InputHandler implements ByteBufferReceiver, FrameSizeObserver { private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE; @@ -56,7 +57,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver ERROR } - private final Receiver<NetworkEvent> receiver; + private final NetworkEventReceiver receiver; private State state; private ByteBuffer input = null; private int needed; @@ -66,7 +67,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver private byte track; private int channel; - public InputHandler(Receiver<NetworkEvent> receiver, State state) + public InputHandler(NetworkEventReceiver receiver, State state) { this.receiver = receiver; this.state = state; @@ -82,7 +83,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver } } - public InputHandler(Receiver<NetworkEvent> receiver) + public InputHandler(NetworkEventReceiver receiver) { this(receiver, PROTO_HDR); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 2810e7a9e1..bef266f214 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -21,13 +21,13 @@ package org.apache.qpid.transport.network; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.security.Principal; -import org.apache.qpid.transport.Sender; + +import org.apache.qpid.transport.ByteBufferSender; public interface NetworkConnection { - Sender<ByteBuffer> getSender(); + ByteBufferSender getSender(); void start(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java index 45231aa05d..f2735f1800 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.transport.network; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Receiver; - -import java.nio.ByteBuffer; public interface OutgoingNetworkTransport extends NetworkTransport { public NetworkConnection getConnection(); public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, + ByteBufferReceiver delegate, TransportActivity transportActivity); -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java index 9d0fe5ddf6..8d19c5a2ce 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java @@ -26,7 +26,6 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; -import java.nio.ByteBuffer; import java.util.Set; import javax.net.ssl.SSLContext; @@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.configuration.CommonProperties; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkConnection; @@ -62,7 +61,7 @@ public abstract class AbstractNetworkTransport implements OutgoingNetworkTranspo private AcceptingThread _acceptor; public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, + ByteBufferReceiver delegate, TransportActivity transportActivity) { int sendBufferSize = settings.getWriteBufferSize(); @@ -159,7 +158,7 @@ public abstract class AbstractNetworkTransport implements OutgoingNetworkTranspo } protected abstract NetworkConnection createNetworkConnection(Socket socket, - Receiver<ByteBuffer> engine, + ByteBufferReceiver engine, Integer sendBufferSize, Integer receiveBufferSize, int timeout, diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 5c3124c2ec..5008849ef3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -22,7 +22,6 @@ package org.apache.qpid.transport.network.io; import java.net.Socket; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.security.Principal; import javax.net.ssl.SSLPeerUnverifiedException; @@ -31,8 +30,8 @@ import javax.net.ssl.SSLSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; @@ -49,7 +48,7 @@ public class IoNetworkConnection implements NetworkConnection private boolean _principalChecked; private final Object _lock = new Object(); - public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, + public IoNetworkConnection(Socket socket, ByteBufferReceiver delegate, int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker) { _socket = socket; @@ -70,7 +69,7 @@ public class IoNetworkConnection implements NetworkConnection _ioReceiver.initiate(); } - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _ioSender; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index f33f626601..ccab1d93cf 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -21,9 +21,8 @@ package org.apache.qpid.transport.network.io; import java.net.Socket; -import java.nio.ByteBuffer; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.ByteBufferReceiver; public class IoNetworkTransport extends AbstractNetworkTransport { @@ -31,7 +30,7 @@ public class IoNetworkTransport extends AbstractNetworkTransport @Override protected IoNetworkConnection createNetworkConnection(final Socket socket, - final Receiver<ByteBuffer> engine, + final ByteBufferReceiver engine, final Integer sendBufferSize, final Integer receiveBufferSize, final int timeout, diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 467115c76f..790583e92b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLSocket; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.util.Logger; @@ -47,7 +47,7 @@ final class IoReceiver implements Runnable private static final Logger log = Logger.get(IoReceiver.class); - private final Receiver<ByteBuffer> receiver; + private final ByteBufferReceiver receiver; private final int bufferSize; private final Socket socket; private final long timeout; @@ -61,7 +61,7 @@ final class IoReceiver implements Runnable shutdownBroken = SystemUtils.isWindows(); } - public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) + public IoReceiver(Socket socket, ByteBufferReceiver receiver, int bufferSize, long timeout) { this.receiver = receiver; this.bufferSize = bufferSize; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 79e99287c4..61beae4c25 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -27,14 +27,14 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; -public final class IoSender implements Runnable, Sender<ByteBuffer> +public final class IoSender implements Runnable, ByteBufferSender { private static final Logger log = Logger.get(IoSender.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java index 68670d1a9d..fe6e707f7e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java @@ -21,7 +21,6 @@ package org.apache.qpid.transport.network.io; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.security.Principal; import java.util.Set; @@ -32,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.TransportEncryption; @@ -88,7 +87,7 @@ public class NonBlockingConnection implements NetworkConnection { } - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _nonBlockingSenderReceiver; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index 347a41ee07..02099dee15 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -40,7 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.network.Ticker; @@ -48,7 +48,7 @@ import org.apache.qpid.transport.network.TransportEncryption; import org.apache.qpid.transport.network.security.ssl.SSLUtil; import org.apache.qpid.util.SystemUtils; -public class NonBlockingSenderReceiver implements Sender<ByteBuffer> +public class NonBlockingSenderReceiver implements ByteBufferSender { private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class); public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java index 51ef266ee9..271135f411 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.transport.network.security; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; - -import java.nio.ByteBuffer; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; public interface SecurityLayer { - public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate); - public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate); + public ByteBufferSender sender(ByteBufferSender delegate); + public ByteBufferReceiver receiver(ByteBufferReceiver delegate); public String getUserID(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java index 2a2f3d8362..d25e97ffe4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java @@ -20,15 +20,13 @@ */ package org.apache.qpid.transport.network.security; -import java.nio.ByteBuffer; - import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.security.sasl.SASLReceiver; import org.apache.qpid.transport.network.security.sasl.SASLSender; @@ -110,14 +108,14 @@ public class SecurityLayerFactory } - public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate) + public ByteBufferSender sender(ByteBufferSender delegate) { SSLSender sender = new SSLSender(_engine, _layer.sender(delegate), _sslStatus); sender.setHostname(_hostname); return sender; } - public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate) + public ByteBufferReceiver receiver(ByteBufferReceiver delegate) { SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus); receiver.setHostname(_hostname); @@ -141,13 +139,13 @@ public class SecurityLayerFactory _layer = layer; } - public SASLSender sender(Sender<ByteBuffer> delegate) + public SASLSender sender(ByteBufferSender delegate) { SASLSender sender = new SASLSender(_layer.sender(delegate)); return sender; } - public SASLReceiver receiver(Receiver<ByteBuffer> delegate) + public SASLReceiver receiver(ByteBufferReceiver delegate) { SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate)); return receiver; @@ -169,12 +167,12 @@ public class SecurityLayerFactory { } - public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate) + public ByteBufferSender sender(ByteBufferSender delegate) { return delegate; } - public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate) + public ByteBufferReceiver receiver(ByteBufferReceiver delegate) { return delegate; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java index 59e9453454..983e3bdf90 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java @@ -21,20 +21,21 @@ package org.apache.qpid.transport.network.security.sasl; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.util.Logger; +import java.nio.ByteBuffer; import javax.security.sasl.SaslException; -import java.nio.ByteBuffer; -public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> { +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.util.Logger; + +public class SASLReceiver extends SASLEncryptor implements ByteBufferReceiver { - private Receiver<ByteBuffer> delegate; + private ByteBufferReceiver delegate; private byte[] netData; private static final Logger log = Logger.get(SASLReceiver.class); - public SASLReceiver(Receiver<ByteBuffer> delegate) + public SASLReceiver(ByteBufferReceiver delegate) { this.delegate = delegate; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java index fa1801cb65..335f8992ca 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java @@ -21,22 +21,24 @@ package org.apache.qpid.transport.network.security.sasl; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.util.Logger; - -import javax.security.sasl.SaslException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; -public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { +import javax.security.sasl.SaslException; + +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.util.Logger; + +public class SASLSender extends SASLEncryptor implements ByteBufferSender +{ - private Sender<ByteBuffer> delegate; + private ByteBufferSender delegate; private byte[] appData; private final AtomicBoolean closed = new AtomicBoolean(false); private static final Logger log = Logger.get(SASLSender.class); - public SASLSender(Sender<ByteBuffer> delegate) + public SASLSender(ByteBufferSender delegate) { this.delegate = delegate; log.debug("SASL Sender enabled"); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java index 1bbf166d82..ce3bace9e8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java @@ -28,16 +28,16 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.security.SSLStatus; import org.apache.qpid.transport.util.Logger; -public class SSLReceiver implements Receiver<ByteBuffer> +public class SSLReceiver implements ByteBufferReceiver { private static final Logger log = Logger.get(SSLReceiver.class); - private final Receiver<ByteBuffer> delegate; + private final ByteBufferReceiver delegate; private final SSLEngine engine; private final int sslBufSize; private final ByteBuffer localBuffer; @@ -47,7 +47,7 @@ public class SSLReceiver implements Receiver<ByteBuffer> private String _hostname; - public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus) + public SSLReceiver(final SSLEngine engine, final ByteBufferReceiver delegate, final SSLStatus sslStatus) { this.engine = engine; this.delegate = delegate; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java index 7d64012fea..755f7430ba 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java @@ -19,24 +19,25 @@ */ package org.apache.qpid.transport.network.security.ssl; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.util.Logger; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -public class SSLSender implements Sender<ByteBuffer> +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.security.SSLStatus; +import org.apache.qpid.transport.util.Logger; + +public class SSLSender implements ByteBufferSender { private static final Logger log = Logger.get(SSLSender.class); - private final Sender<ByteBuffer> delegate; + private final ByteBufferSender delegate; private final SSLEngine engine; private final int sslBufSize; private final ByteBuffer netData; @@ -48,7 +49,7 @@ public class SSLSender implements Sender<ByteBuffer> private final AtomicBoolean closed = new AtomicBoolean(false); - public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus) + public SSLSender(SSLEngine engine, ByteBufferSender delegate, SSLStatus sslStatus) { this.engine = engine; this.delegate = delegate; diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java index 865a3603f5..3da2a03f42 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java @@ -21,7 +21,6 @@ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; import java.util.Set; import javax.net.ssl.SSLContext; @@ -29,9 +28,9 @@ import javax.net.ssl.SSLContext; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.io.IoNetworkTransport; @@ -130,7 +129,7 @@ public class TransportTest extends QpidTestCase } public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, + ByteBufferReceiver delegate, TransportActivity transportActivity) { throw new UnsupportedOperationException(); diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java index a445cff0a7..69724438ec 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java @@ -21,14 +21,12 @@ package org.apache.qpid.transport.network.io; -import junit.framework.TestCase; - import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.security.Principal; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.Sender; +import junit.framework.TestCase; + +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.TransportActivity; @@ -193,7 +191,7 @@ public class IdleTimeoutTickerTest extends TestCase implements TransportActivity //------------------------------------------------------------------------- @Override - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return null; } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java index bb864cd434..67d360fa9e 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java @@ -20,16 +20,15 @@ */ package org.apache.qpid.transport.network.io; -import org.apache.log4j.Logger; -import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.TransportException; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; -import java.nio.ByteBuffer; + +import org.apache.log4j.Logger; + +import org.apache.qpid.transport.Binding; /** @@ -44,9 +43,9 @@ public class IoAcceptor<E> extends Thread private volatile boolean _closed = false; private ServerSocket socket; - private Binding<E,ByteBuffer> binding; + private Binding<E> binding; - public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding) + public IoAcceptor(SocketAddress address, Binding<E> binding) throws IOException { socket = new ServerSocket(); @@ -70,7 +69,7 @@ public class IoAcceptor<E> extends Thread } } - public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding) + public IoAcceptor(String host, int port, Binding<E> binding) throws IOException { this(new InetSocketAddress(host, port), binding); diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java index f74051aa32..4b5b4448ee 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -20,10 +20,9 @@ package org.apache.qpid.transport.network.io; import java.net.Socket; -import java.nio.ByteBuffer; import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.util.Logger; /** @@ -48,18 +47,18 @@ public final class IoTransport<E> ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); private Socket socket; - private Sender<ByteBuffer> sender; + private ByteBufferSender sender; private E endpoint; private IoReceiver receiver; private long timeout = 60000; - IoTransport(Socket socket, Binding<E,ByteBuffer> binding) + IoTransport(Socket socket, Binding<E> binding) { this.socket = socket; setupTransport(socket, binding); } - private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding) + private void setupTransport(Socket socket, Binding<E> binding) { IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout); ios.initiate(); @@ -73,7 +72,7 @@ public final class IoTransport<E> ios.setReceiver(this.receiver); } - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return sender; } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index 9a8d021828..84eb761899 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -41,7 +41,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase @@ -230,11 +230,11 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase private String _remoteHost = "127.0.0.1"; private String _localHost = "127.0.0.1"; private int _port = 1; - private final Sender<ByteBuffer> _sender; + private final ByteBufferSender _sender; public TestNetworkConnection() { - _sender = new Sender<ByteBuffer>() + _sender = new ByteBufferSender() { public void send(ByteBuffer msg) { @@ -296,7 +296,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase } @Override - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _sender; } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 347bf933ad..54e5a5c9bc 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -32,7 +32,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; public class AMQProtocolSessionTest extends QpidBrokerTestCase @@ -107,11 +107,11 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase private String _localHost = "127.0.0.1"; private int _port = 1; private SocketAddress _localAddress = null; - private final Sender<ByteBuffer> _sender; + private final ByteBufferSender _sender; public TestNetworkConnection() { - _sender = new Sender<ByteBuffer>() + _sender = new ByteBufferSender() { public void send(ByteBuffer msg) @@ -181,7 +181,7 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase _localAddress = address; } - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _sender; } |
