summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java10
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java17
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java248
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java369
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java9
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java6
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java7
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java4
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java17
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java)12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java)12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java14
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java21
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java14
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java15
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java19
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java5
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java10
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java15
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java11
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java8
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java8
52 files changed, 928 insertions, 253 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 3c25e0934c..eb7599c5cc 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
public class MultiVersionProtocolEngine implements ServerProtocolEngine
@@ -54,7 +54,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private String _fqdn;
private final Broker<?> _broker;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private final Protocol _defaultSupportedReply;
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
@@ -171,7 +171,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
SocketAddress address = _network.getLocalAddress();
@@ -253,7 +253,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
}
@@ -494,7 +494,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 58b6a3abcb..401c6fc939 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -32,10 +32,9 @@ import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -68,7 +67,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
}
- public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
{
if(!getSubject().equals(Subject.getSubject(AccessController.getContext())))
{
@@ -88,7 +87,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
_network = network;
_connection.setNetworkConnection(network);
- Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
+ ServerDisassembler disassembler = new ServerDisassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
_connection.setSender(disassembler);
_connection.addFrameSizeObserver(disassembler);
// FIXME Two log messages to maintain compatibility with earlier protocol versions
@@ -97,19 +96,15 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
}
- private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
+ private ByteBufferSender wrapSender(final ByteBufferSender sender)
{
- return new Sender<ByteBuffer>()
+ return new ByteBufferSender()
{
@Override
public void send(ByteBuffer msg)
{
_lastWriteTime = System.currentTimeMillis();
- ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]);
- copy.put(msg);
- copy.flip();
- sender.send(copy);
-
+ sender.send(msg);
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
new file mode 100644
index 0000000000..a42238a40d
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.lang.Math.min;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ProtocolDelegate;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventSender;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.SegmentType;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.codec.Encoder;
+import org.apache.qpid.transport.network.Frame;
+
+/**
+ * Disassembler
+ */
+public final class ServerDisassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
+{
+ private final ByteBufferSender _sender;
+ private int _maxPayload;
+ private final Object _sendLock = new Object();
+ private final Encoder _encoder = new ServerEncoder();
+
+ public ServerDisassembler(ByteBufferSender sender, int maxFrame)
+ {
+ _sender = sender;
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64 * 1024)
+ {
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ _maxPayload = maxFrame - HEADER_SIZE;
+ }
+
+ public void send(ProtocolEvent event)
+ {
+ synchronized (_sendLock)
+ {
+ event.delegate(null, this);
+ }
+ }
+
+ public void flush()
+ {
+ synchronized (_sendLock)
+ {
+ _sender.flush();
+ }
+ }
+
+ public void close()
+ {
+ synchronized (_sendLock)
+ {
+ _sender.close();
+ }
+ }
+
+ private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
+ {
+ ByteBuffer data = ByteBuffer.wrap(new byte[HEADER_SIZE]);
+
+ data.put(0, flags);
+ data.put(1, type);
+ data.putShort(2, (short) (size + HEADER_SIZE));
+ data.put(5, track);
+ data.putShort(6, (short) channel);
+
+
+ ByteBuffer dup = buf.duplicate();
+ dup.limit(dup.position() + size);
+ buf.position(buf.position() + size);
+ _sender.send(data);
+ _sender.send(dup);
+
+
+ }
+
+ private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf)
+ {
+ byte typeb = (byte) type.getValue();
+ byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
+
+ int remaining = buf.remaining();
+ boolean first = true;
+ while (true)
+ {
+ int size = min(_maxPayload, remaining);
+ remaining -= size;
+
+ byte newflags = flags;
+ if (first)
+ {
+ newflags |= FIRST_FRAME;
+ first = false;
+ }
+ if (remaining == 0)
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ frame(newflags, typeb, track, event.getChannel(), size, buf);
+
+ if (remaining == 0)
+ {
+ break;
+ }
+ }
+ }
+
+ public void init(Void v, ProtocolHeader header)
+ {
+ _sender.send(header.toByteBuffer());
+ _sender.flush();
+}
+
+ public void control(Void v, Method method)
+ {
+ method(method, SegmentType.CONTROL);
+ }
+
+ public void command(Void v, Method method)
+ {
+ method(method, SegmentType.COMMAND);
+ }
+
+ private void method(Method method, SegmentType type)
+ {
+ Encoder enc = _encoder;
+ enc.init();
+ enc.writeUint16(method.getEncodedType());
+ if (type == SegmentType.COMMAND)
+ {
+ if (method.isSync())
+ {
+ enc.writeUint16(0x0101);
+ }
+ else
+ {
+ enc.writeUint16(0x0100);
+ }
+ }
+ method.write(enc);
+ int methodLimit = enc.position();
+
+ byte flags = FIRST_SEG;
+
+ boolean payload = method.hasPayload();
+ if (!payload)
+ {
+ flags |= LAST_SEG;
+ }
+
+ int headerLimit = -1;
+ if (payload)
+ {
+ final Header hdr = method.getHeader();
+ if (hdr != null)
+ {
+ if (hdr.getDeliveryProperties() != null)
+ {
+ enc.writeStruct32(hdr.getDeliveryProperties());
+ }
+ if (hdr.getMessageProperties() != null)
+ {
+ enc.writeStruct32(hdr.getMessageProperties());
+ }
+ if (hdr.getNonStandardProperties() != null)
+ {
+ for (Struct st : hdr.getNonStandardProperties())
+ {
+ enc.writeStruct32(st);
+ }
+ }
+ }
+
+ headerLimit = enc.position();
+ }
+ synchronized (_sendLock)
+ {
+ ByteBuffer buf = enc.underlyingBuffer();
+ buf.position(0);
+ buf.limit(methodLimit);
+
+ fragment(flags, type, method, buf.duplicate());
+ if (payload)
+ {
+ ByteBuffer body = method.getBody();
+ buf.limit(headerLimit);
+ buf.position(methodLimit);
+
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf.duplicate());
+ if (body != null)
+ {
+ fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate());
+ }
+
+ }
+ }
+ }
+
+ public void error(Void v, ProtocolError error)
+ {
+ throw new IllegalArgumentException(String.valueOf(error));
+ }
+
+ @Override
+ public void setMaxFrameSize(final int maxFrame)
+ {
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+ {
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ this._maxPayload = maxFrame - HEADER_SIZE;
+
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
new file mode 100644
index 0000000000..94a444b590
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
@@ -0,0 +1,369 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.qpid.transport.codec.AbstractEncoder;
+
+
+public final class ServerEncoder extends AbstractEncoder
+{
+ public static final int DEFAULT_CAPACITY = 4096;
+ private ByteBuffer _out;
+ private int _segment;
+ private int _initialCapacity;
+
+ public ServerEncoder()
+ {
+ this(DEFAULT_CAPACITY);
+ }
+
+ public ServerEncoder(int capacity)
+ {
+ _initialCapacity = capacity;
+ _out = ByteBuffer.allocate(capacity);
+ _segment = 0;
+ }
+
+ public void init()
+ {
+ _out.position(_out.limit());
+ _out.limit(_out.capacity());
+ _out = _out.slice();
+ if(_out.remaining() < 256)
+ {
+ _out = ByteBuffer.allocate(_initialCapacity);
+ }
+ _segment = 0;
+ }
+
+ public ByteBuffer buffer()
+ {
+ int pos = _out.position();
+ _out.position(_segment);
+ ByteBuffer slice = _out.slice();
+ slice.limit(pos - _segment);
+ _out.position(pos);
+ return slice;
+ }
+
+ public int position()
+ {
+ return _out.position();
+ }
+
+ public ByteBuffer underlyingBuffer()
+ {
+ return _out;
+ }
+
+ private void grow(int size)
+ {
+ ByteBuffer old = _out;
+ int capacity = old.capacity();
+ _out = ByteBuffer.allocate(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity));
+ old.flip();
+ _out.put(old);
+ }
+
+ protected void doPut(byte b)
+ {
+ try
+ {
+ _out.put(b);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ _out.put(b);
+ }
+ }
+
+ protected void doPut(ByteBuffer src)
+ {
+ try
+ {
+ _out.put(src);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(src.remaining());
+ _out.put(src);
+ }
+ }
+
+ protected void put(byte[] bytes)
+ {
+ try
+ {
+ _out.put(bytes);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(bytes.length);
+ _out.put(bytes);
+ }
+ }
+
+ public void writeUint8(short b)
+ {
+ assert b < 0x100;
+
+ try
+ {
+ _out.put((byte) b);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ _out.put((byte) b);
+ }
+ }
+
+ public void writeUint16(int s)
+ {
+ assert s < 0x10000;
+
+ try
+ {
+ _out.putShort((short) s);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(2);
+ _out.putShort((short) s);
+ }
+ }
+
+ public void writeUint32(long i)
+ {
+ assert i < 0x100000000L;
+
+ try
+ {
+ _out.putInt((int) i);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(4);
+ _out.putInt((int) i);
+ }
+ }
+
+ public void writeUint64(long l)
+ {
+ try
+ {
+ _out.putLong(l);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(8);
+ _out.putLong(l);
+ }
+ }
+
+ public int beginSize8()
+ {
+ int pos = _out.position();
+ try
+ {
+ _out.put((byte) 0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ _out.put((byte) 0);
+ }
+ return pos;
+ }
+
+ public void endSize8(int pos)
+ {
+ int cur = _out.position();
+ _out.put(pos, (byte) (cur - pos - 1));
+ }
+
+ public int beginSize16()
+ {
+ int pos = _out.position();
+ try
+ {
+ _out.putShort((short) 0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(2);
+ _out.putShort((short) 0);
+ }
+ return pos;
+ }
+
+ public void endSize16(int pos)
+ {
+ int cur = _out.position();
+ _out.putShort(pos, (short) (cur - pos - 2));
+ }
+
+ public int beginSize32()
+ {
+ int pos = _out.position();
+ try
+ {
+ _out.putInt(0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(4);
+ _out.putInt(0);
+ }
+ return pos;
+
+ }
+
+ public void endSize32(int pos)
+ {
+ int cur = _out.position();
+ _out.putInt(pos, (cur - pos - 4));
+
+ }
+
+ public void writeDouble(double aDouble)
+ {
+ try
+ {
+ _out.putDouble(aDouble);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(8);
+ _out.putDouble(aDouble);
+ }
+ }
+
+ public void writeInt16(short aShort)
+ {
+ try
+ {
+ _out.putShort(aShort);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(2);
+ _out.putShort(aShort);
+ }
+ }
+
+ public void writeInt32(int anInt)
+ {
+ try
+ {
+ _out.putInt(anInt);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(4);
+ _out.putInt(anInt);
+ }
+ }
+
+ public void writeInt64(long aLong)
+ {
+ try
+ {
+ _out.putLong(aLong);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(8);
+ _out.putLong(aLong);
+ }
+ }
+
+ public void writeInt8(byte aByte)
+ {
+ try
+ {
+ _out.put(aByte);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(1);
+ _out.put(aByte);
+ }
+ }
+
+ public void writeBin128(byte[] byteArray)
+ {
+ byteArray = (byteArray != null) ? byteArray : new byte [16];
+
+ assert byteArray.length == 16;
+
+ try
+ {
+ _out.put(byteArray);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(16);
+ _out.put(byteArray);
+ }
+ }
+
+ public void writeBin128(UUID id)
+ {
+ byte[] data = new byte[16];
+
+ long msb = id.getMostSignificantBits();
+ long lsb = id.getLeastSignificantBits();
+
+ assert data.length == 16;
+ for (int i=7; i>=0; i--)
+ {
+ data[i] = (byte)(msb & 0xff);
+ msb = msb >> 8;
+ }
+
+ for (int i=15; i>=8; i--)
+ {
+ data[i] = (byte)(lsb & 0xff);
+ lsb = (lsb >> 8);
+ }
+ writeBin128(data);
+ }
+
+ public void writeFloat(float aFloat)
+ {
+ try
+ {
+ _out.putFloat(aFloat);
+ }
+ catch(BufferOverflowException exception)
+ {
+ grow(4);
+ _out.putFloat(aFloat);
+ }
+ }
+
+}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index cea9b0930f..f2c51d0203 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -85,7 +85,7 @@ import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
@@ -167,7 +167,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private volatile boolean _deferFlush;
private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want?
@@ -272,7 +272,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
setNetworkConnection(network, network.getSender());
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
_sender = sender;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 622cf32d17..0f198a8d46 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -50,7 +51,7 @@ import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
@@ -282,11 +283,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
private String _remoteHost = "127.0.0.1";
private String _localHost = "127.0.0.1";
private int _port = portNumber.incrementAndGet();
- private final Sender<ByteBuffer> _sender;
+ private final ByteBufferSender _sender;
public TestNetworkConnection()
{
- _sender = new Sender<ByteBuffer>()
+ _sender = new ByteBufferSender()
{
public void send(ByteBuffer msg)
{
@@ -348,7 +349,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
}
@Override
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _sender;
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index b2783a2da2..147ccd4edd 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -59,7 +59,7 @@ import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -116,7 +116,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private byte _revision;
private PrintWriter _out;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private Connection_1_0 _connection;
private volatile boolean _transportBlockedForWriting;
@@ -185,7 +185,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
}
- public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
{
_network = network;
_sender = sender;
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index 940e24d7cf..896a7119f7 100644
--- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -53,7 +53,7 @@ import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
@@ -240,7 +240,7 @@ class WebSocketProvider implements AcceptingTransport
}
}
- private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer>
+ private class ConnectionWrapper implements NetworkConnection, ByteBufferSender
{
private final WebSocket.Connection _connection;
private final SocketAddress _localAddress;
@@ -259,7 +259,7 @@ class WebSocketProvider implements AcceptingTransport
}
@Override
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return this;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 35582d92b7..d0d9d28398 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.XASession;
-import org.apache.qpid.transport.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +59,7 @@ import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.Session;
import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
@@ -527,12 +527,12 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
- private static class ReceiverClosedWaiter implements Receiver<ByteBuffer>
+ private static class ReceiverClosedWaiter implements ByteBufferReceiver
{
private final CountDownLatch _closedWatcher;
- private final Receiver<ByteBuffer> _receiver;
+ private final ByteBufferReceiver _receiver;
- public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver)
+ public ReceiverClosedWaiter(ByteBufferReceiver receiver)
{
_receiver = receiver;
_closedWatcher = new CountDownLatch(1);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index f038fc6e4f..17b0fe1abb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.client.handler;
-import java.nio.ByteBuffer;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +33,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.TransportException;
public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
@@ -95,7 +93,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
}
finally
{
- Sender<ByteBuffer> sender = session.getSender();
+ ByteBufferSender sender = session.getSender();
if (error != null)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index d5e3027601..f50447b930 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -66,8 +66,8 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
@@ -179,7 +179,7 @@ public class AMQProtocolHandler implements ProtocolEngine
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private long _lastReadTime = System.currentTimeMillis();
private long _lastWriteTime = System.currentTimeMillis();
private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
@@ -905,7 +905,7 @@ public class AMQProtocolHandler implements ProtocolEngine
setNetworkConnection(network, network.getSender());
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
_sender = sender;
@@ -923,7 +923,7 @@ public class AMQProtocolHandler implements ProtocolEngine
return _lastWriteTime;
}
- protected Sender<ByteBuffer> getSender()
+ protected ByteBufferSender getSender()
{
return _sender;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index e5765ee00f..9b0b21f06e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -52,8 +51,8 @@ import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
/**
@@ -382,7 +381,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _protocolHandler.getSender();
}
@@ -476,7 +475,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_protocolHandler.propagateExceptionToAllWaiters(error);
}
- public void setSender(Sender<java.nio.ByteBuffer> sender)
+ public void setSender(ByteBufferSender sender)
{
// No-op, interface munging
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index c7dee5b985..2543c5b500 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -666,7 +666,7 @@ public class AMQSession_0_10Test extends QpidTestCase
}
}
- class MockSender implements Sender<ProtocolEvent>
+ class MockSender implements ProtocolEventSender
{
private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>();
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
index 11b34d3dff..ee6704bb39 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
@@ -22,9 +22,9 @@ package org.apache.qpid.client.transport;
import java.nio.ByteBuffer;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
-public class MockSender implements Sender<ByteBuffer>
+public class MockSender implements ByteBufferSender
{
public void send(ByteBuffer msg)
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
index c9af1de6a7..cdfa83571b 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
@@ -20,18 +20,18 @@
*/
package org.apache.qpid.client.transport;
-import java.security.Principal;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
-
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.Principal;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.network.NetworkConnection;
/**
* Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
@@ -147,7 +147,7 @@ public class TestNetworkConnection implements NetworkConnection
_remoteAddress = address;
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _sender;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 0c643f6322..73c8653677 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -26,9 +26,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.transport.Sender;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.ByteBufferSender;
/**
@@ -56,6 +54,6 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
- public void setSender(Sender<ByteBuffer> sender);
+ public void setSender(ByteBufferSender sender);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index cad5461d83..f73f6d931a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -21,10 +21,9 @@
package org.apache.qpid.protocol;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
@@ -32,7 +31,7 @@ import org.apache.qpid.transport.network.TransportActivity;
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
* decodes it and then process the result.
*/
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity
+public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity
{
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
@@ -58,6 +57,6 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport
void encryptedTransport();
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
index 8418c42189..f703c01567 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
@@ -26,11 +26,11 @@ package org.apache.qpid.transport;
*
*/
-public interface Binding<E,T>
+public interface Binding<E>
{
- E endpoint(Sender<T> sender);
+ E endpoint(ByteBufferSender sender);
- Receiver<T> receiver(E endpoint);
+ ByteBufferReceiver receiver(E endpoint);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
new file mode 100644
index 0000000000..1015f061c8
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferReceiver
+{
+ void received(ByteBuffer msg);
+
+ void exception(Throwable t);
+
+ void closed();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
new file mode 100644
index 0000000000..7dcaf61a26
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferSender
+{
+ void send(ByteBuffer msg);
+
+ void flush();
+
+ void close();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 92ccdb84af..39f27b0fe0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -27,7 +27,6 @@ import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -68,7 +67,7 @@ import org.apache.qpid.util.Strings;
*/
public class Connection extends ConnectionInvoker
- implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
+ implements ProtocolEventReceiver, ProtocolEventSender
{
protected static final Logger log = Logger.get(Connection.class);
@@ -113,7 +112,7 @@ public class Connection extends ConnectionInvoker
private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY;
private ConnectionDelegate delegate;
- private Sender<ProtocolEvent> sender;
+ private ProtocolEventSender sender;
final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
final private Map<Integer,Session> channels = new ConcurrentHashMap<Integer,Session>();
@@ -151,12 +150,12 @@ public class Connection extends ConnectionInvoker
listeners.add(listener);
}
- public Sender<ProtocolEvent> getSender()
+ public ProtocolEventSender getSender()
{
return sender;
}
- public void setSender(Sender<ProtocolEvent> sender)
+ public void setSender(ProtocolEventSender sender)
{
this.sender = sender;
}
@@ -234,7 +233,7 @@ public class Connection extends ConnectionInvoker
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
final InputHandler inputHandler = new InputHandler(new Assembler(this));
addFrameSizeObserver(inputHandler);
- Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler);
+ ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
if(secureReceiver instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureReceiver);
@@ -246,7 +245,7 @@ public class Connection extends ConnectionInvoker
setRemoteAddress(_networkConnection.getRemoteAddress());
setLocalAddress(_networkConnection.getLocalAddress());
- final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender());
+ final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender());
if(secureSender instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureSender);
@@ -411,7 +410,7 @@ public class Connection extends ConnectionInvoker
{
log.debug("SEND: [%s] %s", this, event);
}
- Sender<ProtocolEvent> s = sender;
+ ProtocolEventSender s = sender;
if (s == null)
{
throw new ConnectionException("connection closed");
@@ -425,7 +424,7 @@ public class Connection extends ConnectionInvoker
{
log.debug("FLUSH: [%s]", this);
}
- final Sender<ProtocolEvent> theSender = sender;
+ final ProtocolEventSender theSender = sender;
if(theSender != null)
{
theSender.flush();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
new file mode 100644
index 0000000000..8b9c3f4f83
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import org.apache.qpid.transport.network.NetworkEvent;
+
+public interface NetworkEventReceiver
+{
+ void received(NetworkEvent msg);
+
+ void exception(Throwable t);
+
+ void closed();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
index 2a994580dc..e4ab540ce9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
@@ -20,19 +20,11 @@
*/
package org.apache.qpid.transport;
-
-/**
- * Receiver
- *
- */
-
-public interface Receiver<T>
+public interface ProtocolEventReceiver
{
-
- void received(T msg);
+ void received(ProtocolEvent msg);
void exception(Throwable t);
void closed();
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
index 9a6f675d7f..418f31b42a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
@@ -20,19 +20,11 @@
*/
package org.apache.qpid.transport;
-
-/**
- * Sender
- *
- */
-
-public interface Sender<T>
+public interface ProtocolEventSender
{
-
- void send(T msg);
+ void send(ProtocolEvent msg);
void flush();
void close();
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
index 2b93697bfc..070621db9b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.transport.codec;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.Type;
-
-import org.apache.qpid.transport.Xid;
import static org.apache.qpid.transport.util.Functions.lsb;
import java.io.UnsupportedEncodingException;
@@ -36,6 +30,12 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.Type;
+import org.apache.qpid.transport.Xid;
+
/**
* AbstractEncoder
@@ -43,7 +43,7 @@ import java.util.UUID;
* @author Rafael H. Schloming
*/
-abstract class AbstractEncoder implements Encoder
+public abstract class AbstractEncoder implements Encoder
{
private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
index d9150bed65..407df71824 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
@@ -360,8 +360,4 @@ public final class BBEncoder extends AbstractEncoder
}
}
- public void writeMagicNumber()
- {
- out.put("AM2".getBytes());
- }
-} \ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
index a9eea13104..b5ab29cdcf 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.transport.codec;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+
/**
* Encoder interface.
@@ -274,9 +275,10 @@ public interface Encoder
* @param bytes the bytes array to be encoded.
*/
void writeBin128(byte [] bytes);
-
- /**
- * Encodes the AMQP magic number.
- */
- void writeMagicNumber();
-} \ No newline at end of file
+
+ int position();
+
+ ByteBuffer underlyingBuffer();
+
+ void init();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index a80b988cea..a7e96167c2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -20,28 +20,29 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventReceiver;
import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBDecoder;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* Assembler
*
*/
-public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
+public class Assembler implements NetworkEventReceiver, NetworkDelegate
{
// Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
// array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
@@ -49,7 +50,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>();
- private final Receiver<ProtocolEvent> receiver;
+ private final ProtocolEventReceiver receiver;
private final Map<Integer,List<Frame>> segments;
private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>()
{
@@ -59,7 +60,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
}
};
- public Assembler(Receiver<ProtocolEvent> receiver)
+ public Assembler(ProtocolEventReceiver receiver)
{
this.receiver = receiver;
segments = new HashMap<Integer,List<Frame>>();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
index 26e8f1850b..5463cd2587 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
@@ -20,15 +20,13 @@
*/
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
-
import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -38,10 +36,10 @@ import org.apache.qpid.transport.network.security.sasl.SASLSender;
*/
public abstract class ConnectionBinding
- implements Binding<Connection,ByteBuffer>
+ implements Binding<Connection>
{
- public static Binding<Connection,ByteBuffer> get(final Connection connection)
+ public static Binding<Connection> get(final Connection connection)
{
return new ConnectionBinding()
{
@@ -52,7 +50,7 @@ public abstract class ConnectionBinding
};
}
- public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate)
+ public static Binding<Connection> get(final ConnectionDelegate delegate)
{
return new ConnectionBinding()
{
@@ -69,7 +67,7 @@ public abstract class ConnectionBinding
public abstract Connection connection();
- public Connection endpoint(Sender<ByteBuffer> sender)
+ public Connection endpoint(ByteBufferSender sender)
{
Connection conn = connection();
@@ -87,7 +85,7 @@ public abstract class ConnectionBinding
return conn;
}
- public Receiver<ByteBuffer> receiver(Connection conn)
+ public ByteBufferReceiver receiver(Connection conn)
{
final InputHandler inputHandler = new InputHandler(new Assembler(conn));
conn.addFrameSizeObserver(inputHandler);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 81a4c781a4..c45b2049a1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -30,27 +30,29 @@ import static org.apache.qpid.transport.network.Frame.LAST_SEG;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolDelegate;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventSender;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.SegmentType;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Encoder;
/**
* Disassembler
*/
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
+public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
{
- private final Sender<ByteBuffer> sender;
+ private final ByteBufferSender sender;
private int maxPayload;
private final Object sendlock = new Object();
- private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
+ private final static ThreadLocal<Encoder> _encoder = new ThreadLocal<Encoder>()
{
public BBEncoder initialValue()
{
@@ -58,7 +60,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
}
};
- public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
+ public Disassembler(ByteBufferSender sender, int maxFrame)
{
this.sender = sender;
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
@@ -174,7 +176,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
private void method(Method method, SegmentType type)
{
- BBEncoder enc = _encoder.get();
+ Encoder enc = _encoder.get();
enc.init();
enc.writeUint16(method.getEncodedType());
if (type == SegmentType.COMMAND)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
index 758c2e1eda..a58bed5877 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
@@ -29,11 +29,12 @@ import static org.apache.qpid.transport.util.Functions.str;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.SegmentType;
@@ -43,7 +44,7 @@ import org.apache.qpid.transport.SegmentType;
* @author Rafael H. Schloming
*/
-public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
+public class InputHandler implements ByteBufferReceiver, FrameSizeObserver
{
private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
@@ -56,7 +57,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
ERROR
}
- private final Receiver<NetworkEvent> receiver;
+ private final NetworkEventReceiver receiver;
private State state;
private ByteBuffer input = null;
private int needed;
@@ -66,7 +67,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
private byte track;
private int channel;
- public InputHandler(Receiver<NetworkEvent> receiver, State state)
+ public InputHandler(NetworkEventReceiver receiver, State state)
{
this.receiver = receiver;
this.state = state;
@@ -82,7 +83,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
}
}
- public InputHandler(Receiver<NetworkEvent> receiver)
+ public InputHandler(NetworkEventReceiver receiver)
{
this(receiver, PROTO_HDR);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
index 2810e7a9e1..bef266f214 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
@@ -21,13 +21,13 @@
package org.apache.qpid.transport.network;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.Principal;
-import org.apache.qpid.transport.Sender;
+
+import org.apache.qpid.transport.ByteBufferSender;
public interface NetworkConnection
{
- Sender<ByteBuffer> getSender();
+ ByteBufferSender getSender();
void start();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
index 45231aa05d..f2735f1800 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.transport.network;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-
-import java.nio.ByteBuffer;
public interface OutgoingNetworkTransport extends NetworkTransport
{
public NetworkConnection getConnection();
public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
+ ByteBufferReceiver delegate,
TransportActivity transportActivity);
-} \ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
index 9d0fe5ddf6..8d19c5a2ce 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
@@ -26,7 +26,6 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
-import java.nio.ByteBuffer;
import java.util.Set;
import javax.net.ssl.SSLContext;
@@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -62,7 +61,7 @@ public abstract class AbstractNetworkTransport implements OutgoingNetworkTranspo
private AcceptingThread _acceptor;
public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
+ ByteBufferReceiver delegate,
TransportActivity transportActivity)
{
int sendBufferSize = settings.getWriteBufferSize();
@@ -159,7 +158,7 @@ public abstract class AbstractNetworkTransport implements OutgoingNetworkTranspo
}
protected abstract NetworkConnection createNetworkConnection(Socket socket,
- Receiver<ByteBuffer> engine,
+ ByteBufferReceiver engine,
Integer sendBufferSize,
Integer receiveBufferSize,
int timeout,
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
index 5c3124c2ec..5008849ef3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -22,7 +22,6 @@ package org.apache.qpid.transport.network.io;
import java.net.Socket;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.Principal;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -31,8 +30,8 @@ import javax.net.ssl.SSLSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
@@ -49,7 +48,7 @@ public class IoNetworkConnection implements NetworkConnection
private boolean _principalChecked;
private final Object _lock = new Object();
- public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+ public IoNetworkConnection(Socket socket, ByteBufferReceiver delegate,
int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
{
_socket = socket;
@@ -70,7 +69,7 @@ public class IoNetworkConnection implements NetworkConnection
_ioReceiver.initiate();
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _ioSender;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index f33f626601..ccab1d93cf 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -21,9 +21,8 @@
package org.apache.qpid.transport.network.io;
import java.net.Socket;
-import java.nio.ByteBuffer;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.ByteBufferReceiver;
public class IoNetworkTransport extends AbstractNetworkTransport
{
@@ -31,7 +30,7 @@ public class IoNetworkTransport extends AbstractNetworkTransport
@Override
protected IoNetworkConnection createNetworkConnection(final Socket socket,
- final Receiver<ByteBuffer> engine,
+ final ByteBufferReceiver engine,
final Integer sendBufferSize,
final Integer receiveBufferSize,
final int timeout,
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 467115c76f..790583e92b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLSocket;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.util.Logger;
@@ -47,7 +47,7 @@ final class IoReceiver implements Runnable
private static final Logger log = Logger.get(IoReceiver.class);
- private final Receiver<ByteBuffer> receiver;
+ private final ByteBufferReceiver receiver;
private final int bufferSize;
private final Socket socket;
private final long timeout;
@@ -61,7 +61,7 @@ final class IoReceiver implements Runnable
shutdownBroken = SystemUtils.isWindows();
}
- public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
+ public IoReceiver(Socket socket, ByteBufferReceiver receiver, int bufferSize, long timeout)
{
this.receiver = receiver;
this.bufferSize = bufferSize;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 79e99287c4..61beae4c25 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -27,14 +27,14 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
-public final class IoSender implements Runnable, Sender<ByteBuffer>
+public final class IoSender implements Runnable, ByteBufferSender
{
private static final Logger log = Logger.get(IoSender.class);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
index 68670d1a9d..fe6e707f7e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -21,7 +21,6 @@
package org.apache.qpid.transport.network.io;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.util.Set;
@@ -32,7 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
@@ -88,7 +87,7 @@ public class NonBlockingConnection implements NetworkConnection
{
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _nonBlockingSenderReceiver;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index 347a41ee07..02099dee15 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -40,7 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.network.Ticker;
@@ -48,7 +48,7 @@ import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
import org.apache.qpid.util.SystemUtils;
-public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
+public class NonBlockingSenderReceiver implements ByteBufferSender
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
index 51ef266ee9..271135f411 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.transport.network.security;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
public interface SecurityLayer
{
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate);
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate);
+ public ByteBufferSender sender(ByteBufferSender delegate);
+ public ByteBufferReceiver receiver(ByteBufferReceiver delegate);
public String getUserID();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
index 2a2f3d8362..d25e97ffe4 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
@@ -20,15 +20,13 @@
*/
package org.apache.qpid.transport.network.security;
-import java.nio.ByteBuffer;
-
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -110,14 +108,14 @@ public class SecurityLayerFactory
}
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ public ByteBufferSender sender(ByteBufferSender delegate)
{
SSLSender sender = new SSLSender(_engine, _layer.sender(delegate), _sslStatus);
sender.setHostname(_hostname);
return sender;
}
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
{
SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus);
receiver.setHostname(_hostname);
@@ -141,13 +139,13 @@ public class SecurityLayerFactory
_layer = layer;
}
- public SASLSender sender(Sender<ByteBuffer> delegate)
+ public SASLSender sender(ByteBufferSender delegate)
{
SASLSender sender = new SASLSender(_layer.sender(delegate));
return sender;
}
- public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
+ public SASLReceiver receiver(ByteBufferReceiver delegate)
{
SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate));
return receiver;
@@ -169,12 +167,12 @@ public class SecurityLayerFactory
{
}
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ public ByteBufferSender sender(ByteBufferSender delegate)
{
return delegate;
}
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
{
return delegate;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
index 59e9453454..983e3bdf90 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
@@ -21,20 +21,21 @@
package org.apache.qpid.transport.network.security.sasl;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.util.Logger;
+import java.nio.ByteBuffer;
import javax.security.sasl.SaslException;
-import java.nio.ByteBuffer;
-public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> {
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SASLReceiver extends SASLEncryptor implements ByteBufferReceiver {
- private Receiver<ByteBuffer> delegate;
+ private ByteBufferReceiver delegate;
private byte[] netData;
private static final Logger log = Logger.get(SASLReceiver.class);
- public SASLReceiver(Receiver<ByteBuffer> delegate)
+ public SASLReceiver(ByteBufferReceiver delegate)
{
this.delegate = delegate;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
index fa1801cb65..335f8992ca 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -21,22 +21,24 @@
package org.apache.qpid.transport.network.security.sasl;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.util.Logger;
-
-import javax.security.sasl.SaslException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
-public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SASLSender extends SASLEncryptor implements ByteBufferSender
+{
- private Sender<ByteBuffer> delegate;
+ private ByteBufferSender delegate;
private byte[] appData;
private final AtomicBoolean closed = new AtomicBoolean(false);
private static final Logger log = Logger.get(SASLSender.class);
- public SASLSender(Sender<ByteBuffer> delegate)
+ public SASLSender(ByteBufferSender delegate)
{
this.delegate = delegate;
log.debug("SASL Sender enabled");
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
index 1bbf166d82..ce3bace9e8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
@@ -28,16 +28,16 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.security.SSLStatus;
import org.apache.qpid.transport.util.Logger;
-public class SSLReceiver implements Receiver<ByteBuffer>
+public class SSLReceiver implements ByteBufferReceiver
{
private static final Logger log = Logger.get(SSLReceiver.class);
- private final Receiver<ByteBuffer> delegate;
+ private final ByteBufferReceiver delegate;
private final SSLEngine engine;
private final int sslBufSize;
private final ByteBuffer localBuffer;
@@ -47,7 +47,7 @@ public class SSLReceiver implements Receiver<ByteBuffer>
private String _hostname;
- public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus)
+ public SSLReceiver(final SSLEngine engine, final ByteBufferReceiver delegate, final SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
index 7d64012fea..755f7430ba 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -19,24 +19,25 @@
*/
package org.apache.qpid.transport.network.security.ssl;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-public class SSLSender implements Sender<ByteBuffer>
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLSender implements ByteBufferSender
{
private static final Logger log = Logger.get(SSLSender.class);
- private final Sender<ByteBuffer> delegate;
+ private final ByteBufferSender delegate;
private final SSLEngine engine;
private final int sslBufSize;
private final ByteBuffer netData;
@@ -48,7 +49,7 @@ public class SSLSender implements Sender<ByteBuffer>
private final AtomicBoolean closed = new AtomicBoolean(false);
- public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
+ public SSLSender(SSLEngine engine, ByteBufferSender delegate, SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
index 865a3603f5..3da2a03f42 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
@@ -21,7 +21,6 @@
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
import java.util.Set;
import javax.net.ssl.SSLContext;
@@ -29,9 +28,9 @@ import javax.net.ssl.SSLContext;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.io.IoNetworkTransport;
@@ -130,7 +129,7 @@ public class TransportTest extends QpidTestCase
}
public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
+ ByteBufferReceiver delegate,
TransportActivity transportActivity)
{
throw new UnsupportedOperationException();
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
index a445cff0a7..69724438ec 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
@@ -21,14 +21,12 @@
package org.apache.qpid.transport.network.io;
-import junit.framework.TestCase;
-
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.Principal;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.Sender;
+import junit.framework.TestCase;
+
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
@@ -193,7 +191,7 @@ public class IdleTimeoutTickerTest extends TestCase implements TransportActivity
//-------------------------------------------------------------------------
@Override
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return null;
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
index bb864cd434..67d360fa9e 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
@@ -20,16 +20,15 @@
*/
package org.apache.qpid.transport.network.io;
-import org.apache.log4j.Logger;
-import org.apache.qpid.transport.Binding;
-import org.apache.qpid.transport.TransportException;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.transport.Binding;
/**
@@ -44,9 +43,9 @@ public class IoAcceptor<E> extends Thread
private volatile boolean _closed = false;
private ServerSocket socket;
- private Binding<E,ByteBuffer> binding;
+ private Binding<E> binding;
- public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding)
+ public IoAcceptor(SocketAddress address, Binding<E> binding)
throws IOException
{
socket = new ServerSocket();
@@ -70,7 +69,7 @@ public class IoAcceptor<E> extends Thread
}
}
- public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding)
+ public IoAcceptor(String host, int port, Binding<E> binding)
throws IOException
{
this(new InetSocketAddress(host, port), binding);
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
index f74051aa32..4b5b4448ee 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -20,10 +20,9 @@
package org.apache.qpid.transport.network.io;
import java.net.Socket;
-import java.nio.ByteBuffer;
import org.apache.qpid.transport.Binding;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.util.Logger;
/**
@@ -48,18 +47,18 @@ public final class IoTransport<E>
("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
private Socket socket;
- private Sender<ByteBuffer> sender;
+ private ByteBufferSender sender;
private E endpoint;
private IoReceiver receiver;
private long timeout = 60000;
- IoTransport(Socket socket, Binding<E,ByteBuffer> binding)
+ IoTransport(Socket socket, Binding<E> binding)
{
this.socket = socket;
setupTransport(socket, binding);
}
- private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding)
+ private void setupTransport(Socket socket, Binding<E> binding)
{
IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
ios.initiate();
@@ -73,7 +72,7 @@ public final class IoTransport<E>
ios.setReceiver(this.receiver);
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return sender;
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index 9a8d021828..84eb761899 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -41,7 +41,7 @@ import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
@@ -230,11 +230,11 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
private String _remoteHost = "127.0.0.1";
private String _localHost = "127.0.0.1";
private int _port = 1;
- private final Sender<ByteBuffer> _sender;
+ private final ByteBufferSender _sender;
public TestNetworkConnection()
{
- _sender = new Sender<ByteBuffer>()
+ _sender = new ByteBufferSender()
{
public void send(ByteBuffer msg)
{
@@ -296,7 +296,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
}
@Override
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _sender;
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 347bf933ad..54e5a5c9bc 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -32,7 +32,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
public class AMQProtocolSessionTest extends QpidBrokerTestCase
@@ -107,11 +107,11 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase
private String _localHost = "127.0.0.1";
private int _port = 1;
private SocketAddress _localAddress = null;
- private final Sender<ByteBuffer> _sender;
+ private final ByteBufferSender _sender;
public TestNetworkConnection()
{
- _sender = new Sender<ByteBuffer>()
+ _sender = new ByteBufferSender()
{
public void send(ByteBuffer msg)
@@ -181,7 +181,7 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase
_localAddress = address;
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _sender;
}