summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-08-15 03:40:49 +0000
committerRafael H. Schloming <rhs@apache.org>2008-08-15 03:40:49 +0000
commitb6a376a4797e4988cdae48e0e5395a9b1f4e9f85 (patch)
treedc41b190202b592d35579af35cc8b18bb1f1b702 /java/common
parentc521097d6d6f44e437e2ce67f5a8ae66706e4476 (diff)
downloadqpid-python-b6a376a4797e4988cdae48e0e5395a9b1f4e9f85.tar.gz
updated qpid.0-10/java to match trunk/qpid/java@686097
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-10@686136 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/Composite.tpl73
-rw-r--r--java/common/Constant.tpl2
-rw-r--r--java/common/Enum.tpl2
-rw-r--r--java/common/Invoker.tpl13
-rw-r--r--java/common/MethodDelegate.tpl2
-rw-r--r--java/common/Option.tpl5
-rw-r--r--java/common/StructFactory.tpl2
-rw-r--r--java/common/Type.tpl2
-rwxr-xr-xjava/common/bin/qpid-run48
-rwxr-xr-xjava/common/codegen2
-rw-r--r--java/common/genutil.py17
-rw-r--r--java/common/pom.xml2
-rw-r--r--java/common/src/main/java/log4j.properties3
-rw-r--r--java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java63
-rw-r--r--java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelException.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java18
-rw-r--r--java/common/src/main/java/org/apache/qpid/BrokerDetails.java (renamed from java/common/src/main/java/org/apache/qpidity/BrokerDetails.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/BrokerDetailsImpl.java (renamed from java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/ConsoleOutput.java (renamed from java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java)11
-rw-r--r--java/common/src/main/java/org/apache/qpid/ErrorCode.java (renamed from java/common/src/main/java/org/apache/qpidity/ErrorCode.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/QpidConfig.java (renamed from java/common/src/main/java/org/apache/qpidity/QpidConfig.java)8
-rw-r--r--java/common/src/main/java/org/apache/qpid/QpidException.java (renamed from java/common/src/main/java/org/apache/qpidity/QpidException.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/SecurityHelper.java (renamed from java/common/src/main/java/org/apache/qpidity/SecurityHelper.java)6
-rw-r--r--java/common/src/main/java/org/apache/qpid/ToyBroker.java (renamed from java/common/src/main/java/org/apache/qpidity/ToyBroker.java)143
-rw-r--r--java/common/src/main/java/org/apache/qpid/ToyClient.java (renamed from java/common/src/main/java/org/apache/qpidity/ToyClient.java)40
-rw-r--r--java/common/src/main/java/org/apache/qpid/ToyExchange.java (renamed from java/common/src/main/java/org/apache/qpidity/ToyExchange.java)39
-rw-r--r--java/common/src/main/java/org/apache/qpid/api/Message.java (renamed from java/common/src/main/java/org/apache/qpidity/api/Message.java)8
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/ClientProperties.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/dtx/XidImpl.java (renamed from java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java)8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java67
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java105
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Job.java24
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java432
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java24
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/security/AMQPCallbackHandler.java (renamed from java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/security/CallbackHandlerRegistry.java (renamed from java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java)4
-rw-r--r--java/common/src/main/java/org/apache/qpid/security/DynamicSaslRegistrar.java (renamed from java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java)4
-rw-r--r--java/common/src/main/java/org/apache/qpid/security/JCAProvider.java (renamed from java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/security/UsernamePasswordCallbackHandler.java (renamed from java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/security/amqplain/AmqPlainSaslClient.java (renamed from java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/security/amqplain/AmqPlainSaslClientFactory.java (renamed from java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Binary.java129
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Binding.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Binding.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Channel.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Channel.java)76
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java)7
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Connection.java)34
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java)46
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java45
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Echo.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Echo.java)31
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Field.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Field.java)6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Future.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Future.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Header.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Header.java)51
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Method.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Method.java)97
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java)6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ProtocolError.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java)19
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ProtocolEvent.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java)6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java)23
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java)30
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Range.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Range.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/RangeSet.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java)7
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Receiver.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Receiver.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Result.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Result.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Sender.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Sender.java)4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Session.java)156
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java39
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java)8
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionException.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/SessionException.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Sink.java137
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Struct.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/Struct.java)8
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/TransportException.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/TransportException.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java)78
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java)171
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java)23
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java232
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java)6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/Encodable.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/codec/Encodable.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java)6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java226
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java)55
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java236
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Frame.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java)52
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java204
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/NetworkDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java)6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/NetworkEvent.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java109
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java81
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java129
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java291
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java174
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java)61
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java)11
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java)18
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java)9
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/util/Functions.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/util/Logger.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/util/Logger.java)7
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java (renamed from java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java)2
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/QpidURL.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java (renamed from java/common/src/main/java/org/apache/qpid/util/ReflectionUtilsException.java)45
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java39
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java228
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/Strings.java82
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/UUIDGen.java36
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/UUIDs.java59
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ProtocolException.java36
-rw-r--r--java/common/src/main/java/org/apache/qpidity/exchange/ExchangeDefaults.java51
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Data.java108
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java29
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java112
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java134
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java76
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/Validator.java177
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java202
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java222
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java277
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java116
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java181
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java111
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (renamed from java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java)16
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java (renamed from java/common/src/test/java/org/apache/qpidity/transport/RangeSetTest.java)2
128 files changed, 3863 insertions, 2973 deletions
diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl
index 46a45b0b91..283fa24641 100644
--- a/java/common/Composite.tpl
+++ b/java/common/Composite.tpl
@@ -1,28 +1,30 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.transport.codec.Decoder;
-import org.apache.qpidity.transport.codec.Encodable;
-import org.apache.qpidity.transport.codec.Encoder;
-import org.apache.qpidity.transport.codec.Validator;
+import org.apache.qpid.transport.codec.Decoder;
+import org.apache.qpid.transport.codec.Encodable;
+import org.apache.qpid.transport.codec.Encoder;
-import org.apache.qpidity.transport.network.Frame;
+import org.apache.qpid.transport.network.Frame;
${
from genutil import *
cls = klass(type)["@name"]
+segments = type["segments"]
+
if type.name in ("control", "command"):
base = "Method"
size = 0
pack = 2
- if type["segments"]:
+ if segments:
payload = "true"
else:
payload = "false"
@@ -80,12 +82,16 @@ if pack > 0:
out(" private $(PACK_TYPES[pack]) packing_flags = 0;\n");
fields = get_fields(type)
-params = get_parameters(fields)
+params = get_parameters(type, fields)
options = get_options(fields)
for f in fields:
if not f.empty:
out(" private $(f.type) $(f.name);\n")
+
+if segments:
+ out(" private Header header;\n")
+ out(" private ByteBuffer body;\n")
}
${
@@ -99,7 +105,11 @@ for f in fields:
if f.option: continue
out(" $(f.set)($(f.name));\n")
-if options:
+if segments:
+ out(" setHeader(header);\n")
+ out(" setBody(body);\n")
+
+if options or base == "Method":
out("""
for (int i=0; i < _options.length; i++) {
switch (_options[i]) {
@@ -108,7 +118,11 @@ if options:
for f in options:
out(" case $(f.option): packing_flags |= $(f.flag_mask(pack)); break;\n")
- out(""" case NO_OPTION: break;
+ if base == "Method":
+ out(""" case SYNC: this.setSync(true); break;
+ case BATCH: this.setBatch(true); break;
+""")
+ out(""" case NONE: break;
default: throw new IllegalArgumentException("invalid option: " + _options[i]);
}
}
@@ -150,7 +164,6 @@ else:
}
public final $name $(f.set)($(f.type) value) {
- $(f.check)
${
if not f.empty:
out(" this.$(f.name) = value;")
@@ -169,6 +182,44 @@ if pack > 0:
""")
}
+${
+if segments:
+ out(""" public final Header getHeader() {
+ return this.header;
+ }
+
+ public final void setHeader(Header header) {
+ this.header = header;
+ }
+
+ public final $name header(Header header) {
+ setHeader(header);
+ return this;
+ }
+
+ public final ByteBuffer getBody() {
+ if (this.body == null)
+ {
+ return null;
+ }
+ else
+ {
+ return this.body.slice();
+ }
+ }
+
+ public final void setBody(ByteBuffer body) {
+ this.body = body;
+ }
+
+ public final $name body(ByteBuffer body)
+ {
+ setBody(body);
+ return this;
+ }
+""")
+}
+
public void write(Encoder enc)
{
${
diff --git a/java/common/Constant.tpl b/java/common/Constant.tpl
index 695812ea75..7194a61dfc 100644
--- a/java/common/Constant.tpl
+++ b/java/common/Constant.tpl
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
${from genutil import *}
diff --git a/java/common/Enum.tpl b/java/common/Enum.tpl
index 337feb7065..2ec1d22522 100644
--- a/java/common/Enum.tpl
+++ b/java/common/Enum.tpl
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
public enum $name {
${
diff --git a/java/common/Invoker.tpl b/java/common/Invoker.tpl
index d9905c71a0..9158922df7 100644
--- a/java/common/Invoker.tpl
+++ b/java/common/Invoker.tpl
@@ -1,5 +1,6 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -15,8 +16,8 @@ from genutil import *
for c in composites:
name = cname(c)
fields = get_fields(c)
- params = get_parameters(fields)
- args = get_arguments(fields)
+ params = get_parameters(c, fields)
+ args = get_arguments(c, fields)
result = c["result"]
if result:
if not result["@type"]:
@@ -32,9 +33,9 @@ for c in composites:
jclass = ""
out("""
- public $jresult $(dromedary(name))($(", ".join(params))) {
- $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
- }
+ public final $jresult $(dromedary(name))($(", ".join(params))) {
+ $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
+ }
""")
}
diff --git a/java/common/MethodDelegate.tpl b/java/common/MethodDelegate.tpl
index e5ab1ae1e7..84fa0e43da 100644
--- a/java/common/MethodDelegate.tpl
+++ b/java/common/MethodDelegate.tpl
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
public abstract class MethodDelegate<C> {
diff --git a/java/common/Option.tpl b/java/common/Option.tpl
index 5fa2b95b9f..d45c004f6f 100644
--- a/java/common/Option.tpl
+++ b/java/common/Option.tpl
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
public enum Option {
@@ -15,5 +15,6 @@ for c in composites:
if not options.has_key(option):
options[option] = None
out(" $option,\n")}
- NO_OPTION
+ BATCH,
+ NONE
}
diff --git a/java/common/StructFactory.tpl b/java/common/StructFactory.tpl
index b27621b1d2..f3dcbbd68a 100644
--- a/java/common/StructFactory.tpl
+++ b/java/common/StructFactory.tpl
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
class StructFactory {
diff --git a/java/common/Type.tpl b/java/common/Type.tpl
index c869934538..c58e08a342 100644
--- a/java/common/Type.tpl
+++ b/java/common/Type.tpl
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
${from genutil import *}
diff --git a/java/common/bin/qpid-run b/java/common/bin/qpid-run
index c9e37b21a1..1de0048f48 100755
--- a/java/common/bin/qpid-run
+++ b/java/common/bin/qpid-run
@@ -37,19 +37,39 @@ die() {
exit 1
}
+OFF=0
+WARN=1
+INFO=2
+
+if [ -z "$QPID_RUN_LOG" ]; then
+ QPID_RUN_LOG=$OFF
+fi
+
+log() {
+ if [ "$1" -le "$QPID_RUN_LOG" ]; then
+ shift
+ echo "$@"
+ fi
+}
+
if [ -z $AMQJ_LOGGING_LEVEL ]; then
export AMQJ_LOGGING_LEVEL=info
fi
if [ -z "$QPID_HOME" ]; then
- die "QPID_HOME must be set"
+ export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
+ export PATH=${PATH}:${QPID_HOME}/bin
fi
if [ -z "$QPID_WORK" ]; then
- echo Setting QPID_WORK to $HOME as default
+ log $INFO Setting QPID_WORK to $HOME as default
QPID_WORK=$HOME
fi
+if [ -z "$JAVA" ]; then
+ JAVA=java
+fi
+
if $cygwin; then
QPID_HOME=$(cygpath -w $QPID_HOME)
QPID_WORK=$(cygpath -w $QPID_WORK)
@@ -64,10 +84,10 @@ SYSTEM_PROPS="-Damqj.logging.level=$AMQJ_LOGGING_LEVEL -DQPID_HOME=$QPID_HOME -D
#Using X character to avoid probs with empty strings
if [ -n "$QPID_LOG_PREFIX" ]; then
if [ "X$QPID_LOG_PREFIX" = "XPID" ]; then
- echo Using pid in qpid log name prefix
+ log $INFO Using pid in qpid log name prefix
LOG_PREFIX=" -Dlogprefix=$$"
else
- echo Using qpid logprefix property
+ log $INFO Using qpid logprefix property
LOG_PREFIX=" -Dlogprefix=$QPID_LOG_PREFIX"
fi
SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_PREFIX}"
@@ -75,16 +95,16 @@ fi
if [ -n "$QPID_LOG_SUFFIX" ]; then
if [ "X$QPID_LOG_SUFFIX" = "XPID" ]; then
- echo Using pid in qpid log name suffix
+ log $INFO Using pid in qpid log name suffix
LOG_SUFFIX=" -Dlogsuffix=$$"
else
- echo Using qpig logsuffix property
+ log $INFO Using qpig logsuffix property
LOG_SUFFIX=" -Dlogsuffix=$QPID_LOG_SUFFIX"
fi
SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_SUFFIX}"
fi
-echo System Properties set to $SYSTEM_PROPS
+log $INFO System Properties set to $SYSTEM_PROPS
program=$(basename $0)
sourced=${BASH_SOURCE[0]}
@@ -109,26 +129,26 @@ unset CLASSPATH
#Use QPID_CLASSPATH if set
if [ -n "$QPID_CLASSPATH" ]; then
export CLASSPATH=$QPID_CLASSPATH
- echo "Using QPID_CLASSPATH" $QPID_CLASSPATH
+ log $INFO "Using QPID_CLASSPATH" $QPID_CLASSPATH
else
- echo "Warning: Qpid classpath not set. CLASSPATH must include qpid jars."
+ log $WARN "Warning: Qpid classpath not set. CLASSPATH must include qpid jars."
fi
#Use QPID_JAVA_GC if set
if [ -n "$QPID_JAVA_GC" ]; then
export JAVA_GC=$QPID_JAVA_GC
- echo "Using QPID_JAVA_GC setting" $QPID_JAVA_GC
+ log $INFO "Using QPID_JAVA_GC setting" $QPID_JAVA_GC
else
- echo "Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC" $JAVA_GC
+ log $INFO "Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC" $JAVA_GC
fi
#Use QPID_JAVA_MEM if set
if [ -n "$QPID_JAVA_MEM" ]; then
export JAVA_MEM=$QPID_JAVA_MEM
- echo "Using QPID_JAVA_MEM setting" $QPID_JAVA_MEM
+ log $INFO "Using QPID_JAVA_MEM setting" $QPID_JAVA_MEM
else
- echo "Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM" $JAVA_MEM
+ log $INFO "Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM" $JAVA_MEM
fi
declare -a RUN_ARGS JAVA_ARGS
@@ -172,7 +192,7 @@ for arg in "${RUN_ARGS[@]}"; do
;;
-run:jpda)
#USAGE: adds debugging options to the java command, use
-#USAGE: JDPA_TRANSPORT and JPDA_ADDRESS to customize the debugging
+#USAGE: JPDA_TRANSPORT and JPDA_ADDRESS to customize the debugging
#USAGE: behavior and use JPDA_OPTS to override it entirely
if [ -z "$JPDA_OPTS" ]; then
JPDA_OPTS="-Xdebug -Xrunjdwp:transport=${JPDA_TRANSPORT:-dt_socket},address=${JPDA_ADDRESS:-8000},server=y,suspend=n"
diff --git a/java/common/codegen b/java/common/codegen
index ab1ab1c542..6cd51565ea 100755
--- a/java/common/codegen
+++ b/java/common/codegen
@@ -7,7 +7,7 @@ from genutil import *
out_dir = sys.argv[1]
spec_file = sys.argv[2]
tpl_dir = sys.argv[3]
-pkg_dir = os.path.join(out_dir, "org/apache/qpidity/transport")
+pkg_dir = os.path.join(out_dir, "org/apache/qpid/transport")
if not os.path.exists(pkg_dir):
os.makedirs(pkg_dir)
diff --git a/java/common/genutil.py b/java/common/genutil.py
index 9636a91cc3..f8f234548c 100644
--- a/java/common/genutil.py
+++ b/java/common/genutil.py
@@ -170,18 +170,15 @@ class Field:
if self.type_node.name == "struct":
self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname)
self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name)
- self.check = ""
self.coder = "Struct"
elif self.type_node.name == "domain":
self.coder = camel(0, self.prim_type["@name"])
self.read = "%s.get(dec.read%s())" % (tname, self.coder)
self.write = "enc.write%s(check(struct).%s.getValue())" % (self.coder, self.name)
- self.check = ""
else:
self.coder = camel(0, self.type_node["@name"])
self.read = "dec.read%s()" % self.coder
self.write = "enc.write%s(check(struct).%s)" % (self.coder, self.name)
- self.check = "Validator.check%s(value);" % self.coder
self.type = jtype(self.type_node)
self.default = DEFAULTS.get(self.type, "null")
self.has = camel(1, "has", self.name)
@@ -206,7 +203,7 @@ def get_fields(nd):
index += 1
return fields
-def get_parameters(fields):
+def get_parameters(type, fields):
params = []
options = False
for f in fields:
@@ -214,11 +211,14 @@ def get_parameters(fields):
options = True
else:
params.append("%s %s" % (f.type, f.name))
- if options:
+ if type["segments"]:
+ params.append("Header header")
+ params.append("ByteBuffer body")
+ if options or type.name in ("control", "command"):
params.append("Option ... _options")
return params
-def get_arguments(fields):
+def get_arguments(type, fields):
args = []
options = False
for f in fields:
@@ -226,7 +226,10 @@ def get_arguments(fields):
options = True
else:
args.append(f.name)
- if options:
+ if type["segments"]:
+ args.append("header")
+ args.append("body")
+ if options or type.name in ("control", "command"):
args.append("_options")
return args
diff --git a/java/common/pom.xml b/java/common/pom.xml
index 714087d843..894ca26710 100644
--- a/java/common/pom.xml
+++ b/java/common/pom.xml
@@ -61,7 +61,7 @@
<!-- <exec executable="python">
<arg line="generate"/>
<arg line="${generated.path}"/>
- <arg line="org.apache.qpidity"/>
+ <arg line="org.apache.qpid"/>
<arg line="${specs.dir}/amqp-transitional.0-10.xml"/>
</exec> -->
</tasks>
diff --git a/java/common/src/main/java/log4j.properties b/java/common/src/main/java/log4j.properties
index 6d596d1d19..44f89dc805 100644
--- a/java/common/src/main/java/log4j.properties
+++ b/java/common/src/main/java/log4j.properties
@@ -19,9 +19,12 @@
log4j.rootLogger=${root.logging.level}
+log4j.logger.qpid.protocol=${amqj.protocol.logging.level}, console
+log4j.additivity.qpid.protocol=false
log4j.logger.org.apache.qpid=${amqj.logging.level}, console
log4j.additivity.org.apache.qpid=false
+
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=all
log4j.appender.console.layout=org.apache.log4j.PatternLayout
diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
index bed80d5954..0c311b6645 100644
--- a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
+++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
@@ -62,7 +62,6 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
private static final class FixedSizeByteBuffer extends ByteBuffer
{
private java.nio.ByteBuffer buf;
- private int refCount = 1;
private int mark = -1;
@@ -70,36 +69,14 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
{
this.buf = buf;
buf.order( ByteOrder.BIG_ENDIAN );
- refCount = 1;
}
public synchronized void acquire()
{
- if( refCount <= 0 )
- {
- throw new IllegalStateException( "Already released buffer." );
- }
-
- refCount ++;
}
public void release()
{
- synchronized( this )
- {
- if( refCount <= 0 )
- {
- refCount = 0;
- throw new IllegalStateException(
- "Already released buffer. You released the buffer too many times." );
- }
-
- refCount --;
- if( refCount > 0)
- {
- return;
- }
- }
}
public java.nio.ByteBuffer buf()
@@ -157,50 +134,12 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
{
if( newCapacity > capacity() )
{
- // Allocate a new buffer and transfer all settings to it.
- int pos = position();
- int limit = limit();
- ByteOrder bo = order();
-
- capacity0( newCapacity );
- buf.limit( limit );
- if( mark >= 0 )
- {
- buf.position( mark );
- buf.mark();
- }
- buf.position( pos );
- buf.order( bo );
+ throw new IllegalArgumentException();
}
return this;
}
- protected void capacity0( int requestedCapacity )
- {
- int newCapacity = MINIMUM_CAPACITY;
- while( newCapacity < requestedCapacity )
- {
- newCapacity <<= 1;
- }
-
- java.nio.ByteBuffer oldBuf = this.buf;
- java.nio.ByteBuffer newBuf;
- if( isDirect() )
- {
- newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity );
- }
- else
- {
- newBuf = java.nio.ByteBuffer.allocate( newCapacity );
- }
-
- newBuf.clear();
- oldBuf.clear();
- newBuf.put( oldBuf );
- this.buf = newBuf;
- }
-
public boolean isAutoExpand()
diff --git a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
index c515263317..4fd28c4eb5 100644
--- a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
+++ b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
@@ -31,7 +31,6 @@ import java.util.Iterator;
* A default implementation of {@link org.apache.mina.common.IoFuture}.
*
* @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 440259 $, $Date: 2006-09-05 14:01:47 +0900 (í™”, 05 9ì›” 2006) $
*/
public class DefaultIoFuture implements IoFuture
{
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
index f6f596da95..ef9420ba87 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
@@ -54,6 +54,6 @@ public class AMQChannelException extends AMQException
public AMQFrame getCloseFrame(int channel)
{
MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor));
- return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId));
+ return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
index 6cdd57d6f2..fa69f7f91b 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
@@ -21,6 +21,10 @@
package org.apache.qpid;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
import org.apache.qpid.protocol.AMQConstant;
/**
@@ -35,6 +39,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQConnectionFailureException extends AMQException
{
+ Collection<Exception> _exceptions;
+
public AMQConnectionFailureException(String message, Throwable cause)
{
super(null, message, cause);
@@ -44,4 +50,16 @@ public class AMQConnectionFailureException extends AMQException
{
super(errorCode, message, cause);
}
+
+ public AMQConnectionFailureException(String message, Collection<Exception> exceptions)
+ {
+ // Blah, I hate ? but java won't let super() be anything other than the first thing, sorry...
+ super (null, message, exceptions.isEmpty() ? null : exceptions.iterator().next());
+ this._exceptions = exceptions;
+ }
+
+ public Collection<Exception> getLinkedExceptions()
+ {
+ return _exceptions;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpidity/BrokerDetails.java b/java/common/src/main/java/org/apache/qpid/BrokerDetails.java
index 29a4f5a9c0..63f67a7857 100644
--- a/java/common/src/main/java/org/apache/qpidity/BrokerDetails.java
+++ b/java/common/src/main/java/org/apache/qpid/BrokerDetails.java
@@ -15,7 +15,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpidity;
+package org.apache.qpid;
import java.util.Map;
diff --git a/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java b/java/common/src/main/java/org/apache/qpid/BrokerDetailsImpl.java
index 6de6055f1c..201d43e21f 100644
--- a/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java
+++ b/java/common/src/main/java/org/apache/qpid/BrokerDetailsImpl.java
@@ -15,7 +15,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpidity;
+package org.apache.qpid;
import java.util.HashMap;
import java.util.Map;
diff --git a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
index 4e05aa574c..f17782ebf4 100644
--- a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
+++ b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
@@ -18,13 +18,13 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpid;
import java.nio.ByteBuffer;
-import org.apache.qpidity.transport.Sender;
+import org.apache.qpid.transport.Sender;
-import static org.apache.qpidity.transport.util.Functions.*;
+import static org.apache.qpid.transport.util.Functions.*;
/**
@@ -41,6 +41,11 @@ public class ConsoleOutput implements Sender<ByteBuffer>
System.out.println(str(buf));
}
+ public void flush()
+ {
+ // pass
+ }
+
public void close()
{
System.out.println("CLOSED");
diff --git a/java/common/src/main/java/org/apache/qpidity/ErrorCode.java b/java/common/src/main/java/org/apache/qpid/ErrorCode.java
index 4b18c46d16..be1ad16dd5 100644
--- a/java/common/src/main/java/org/apache/qpidity/ErrorCode.java
+++ b/java/common/src/main/java/org/apache/qpid/ErrorCode.java
@@ -1,4 +1,4 @@
-package org.apache.qpidity;
+package org.apache.qpid;
public enum ErrorCode
{
diff --git a/java/common/src/main/java/org/apache/qpidity/QpidConfig.java b/java/common/src/main/java/org/apache/qpid/QpidConfig.java
index b5aad12f10..e8d42fdf83 100644
--- a/java/common/src/main/java/org/apache/qpidity/QpidConfig.java
+++ b/java/common/src/main/java/org/apache/qpid/QpidConfig.java
@@ -1,4 +1,4 @@
-package org.apache.qpidity;
+package org.apache.qpid;
/**
* API to configure the Security parameters of the client.
@@ -11,11 +11,11 @@ public class QpidConfig
private static QpidConfig _instance = new QpidConfig();
private SecurityMechanism[] securityMechanisms =
- new SecurityMechanism[]{new SecurityMechanism("PLAIN","org.apache.qpidity.security.UsernamePasswordCallbackHandler"),
- new SecurityMechanism("CRAM_MD5","org.apache.qpidity.security.UsernamePasswordCallbackHandler")};
+ new SecurityMechanism[]{new SecurityMechanism("PLAIN","org.apache.qpid.security.UsernamePasswordCallbackHandler"),
+ new SecurityMechanism("CRAM_MD5","org.apache.qpid.security.UsernamePasswordCallbackHandler")};
private SaslClientFactory[] saslClientFactories =
- new SaslClientFactory[]{new SaslClientFactory("AMQPLAIN","org.apache.qpidity.security.amqplain.AmqPlainSaslClientFactory")};
+ new SaslClientFactory[]{new SaslClientFactory("AMQPLAIN","org.apache.qpid.security.amqplain.AmqPlainSaslClientFactory")};
private QpidConfig(){}
diff --git a/java/common/src/main/java/org/apache/qpidity/QpidException.java b/java/common/src/main/java/org/apache/qpid/QpidException.java
index 81e9145282..8503adaef8 100644
--- a/java/common/src/main/java/org/apache/qpidity/QpidException.java
+++ b/java/common/src/main/java/org/apache/qpid/QpidException.java
@@ -15,7 +15,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpidity;
+package org.apache.qpid;
public class QpidException extends Exception
{
diff --git a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java b/java/common/src/main/java/org/apache/qpid/SecurityHelper.java
index a72997813a..dda5a6506d 100644
--- a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java
+++ b/java/common/src/main/java/org/apache/qpid/SecurityHelper.java
@@ -18,15 +18,15 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpid;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
import java.util.List;
import java.util.StringTokenizer;
-import org.apache.qpidity.security.AMQPCallbackHandler;
-import org.apache.qpidity.security.CallbackHandlerRegistry;
+import org.apache.qpid.security.AMQPCallbackHandler;
+import org.apache.qpid.security.CallbackHandlerRegistry;
public class SecurityHelper
{
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpid/ToyBroker.java
index 5f9917e30a..83d434b20a 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpid/ToyBroker.java
@@ -18,12 +18,12 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpid;
-import org.apache.qpidity.transport.*;
-import org.apache.qpidity.transport.network.mina.MinaHandler;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.mina.MinaHandler;
-import static org.apache.qpidity.transport.util.Functions.str;
+import static org.apache.qpid.transport.util.Functions.str;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -45,10 +45,6 @@ class ToyBroker extends SessionDelegate
{
private ToyExchange exchange;
- private MessageTransfer xfr = null;
- private DeliveryProperties props = null;
- private Header header = null;
- private List<Data> body = null;
private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
public ToyBroker(ToyExchange exchange)
@@ -103,22 +99,10 @@ class ToyBroker extends SessionDelegate
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
- this.xfr = xfr;
- body = new ArrayList<Data>();
- System.out.println("received transfer " + xfr.getDestination());
- }
-
- @Override public void header(Session ssn, Header header)
- {
- if (xfr == null || body == null)
- {
- ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR,
- "no method segment");
- ssn.close();
- return;
- }
-
- props = header.get(DeliveryProperties.class);
+ String dest = xfr.getDestination();
+ System.out.println("received transfer " + dest);
+ Header header = xfr.getHeader();
+ DeliveryProperties props = header.get(DeliveryProperties.class);
if (props != null)
{
System.out.println("received headers routing_key " + props.getRoutingKey());
@@ -130,70 +114,31 @@ class ToyBroker extends SessionDelegate
System.out.println(mp.getApplicationHeaders());
}
- this.header = header;
- }
-
- @Override public void data(Session ssn, Data data)
- {
- if (xfr == null || body == null)
+ if (exchange.route(dest,props.getRoutingKey(),xfr))
{
- ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment");
- ssn.close();
- return;
+ System.out.println("queued " + xfr);
+ dispatchMessages(ssn);
}
-
- body.add(data);
-
- if (data.isLast())
+ else
{
- String dest = xfr.getDestination();
- Message m = new Message(header, body);
- if (exchange.route(dest,props.getRoutingKey(),m))
- {
- System.out.println("queued " + m);
- dispatchMessages(ssn);
- }
- else
+ if (props == null || !props.getDiscardUnroutable())
{
-
- reject(ssn);
+ RangeSet ranges = new RangeSet();
+ ranges.add(xfr.getId());
+ ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
+ "no such destination");
}
- ssn.processed(xfr);
- xfr = null;
- body = null;
- }
- }
-
- private void reject(Session ssn)
- {
- if (props != null && props.getDiscardUnroutable())
- {
- return;
- }
- else
- {
- RangeSet ranges = new RangeSet();
- ranges.add(xfr.getId());
- ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
- "no such destination");
}
+ ssn.processed(xfr);
}
- private void transferMessageToPeer(Session ssn,String dest, Message m)
+ private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m)
{
System.out.println("\n==================> Transfering message to: " +dest + "\n");
- ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED);
- ssn.header(m.header);
- for (Data d : m.body)
- {
- for (ByteBuffer b : d.getFragments())
- {
- ssn.data(b);
- }
- }
- ssn.endData();
+ ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ m.getHeader(), m.getBody());
}
private void dispatchMessages(Session ssn)
@@ -207,8 +152,8 @@ class ToyBroker extends SessionDelegate
private void checkAndSendMessagesToConsumer(Session ssn,String dest)
{
Consumer c = consumers.get(dest);
- LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName);
- Message m = queue.poll();
+ LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName);
+ MessageTransfer m = queue.poll();
while (m != null && c._credit>0)
{
transferMessageToPeer(ssn,dest,m);
@@ -217,46 +162,6 @@ class ToyBroker extends SessionDelegate
}
}
- class Message
- {
- private final Header header;
- private final List<Data> body;
-
- public Message(Header header, List<Data> body)
- {
- this.header = header;
- this.body = body;
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
-
- if (header != null)
- {
- boolean first = true;
- for (Struct st : header.getStructs())
- {
- if (first) { first = false; }
- else { sb.append(" "); }
- sb.append(st);
- }
- }
-
- for (Data d : body)
- {
- for (ByteBuffer b : d.getFragments())
- {
- sb.append(" | ");
- sb.append(str(b));
- }
- }
-
- return sb.toString();
- }
-
- }
-
// ugly, but who cares :)
// assumes unit is always no of messages, not bytes
// assumes it's credit mode and not window
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpid/ToyClient.java
index a3233afcbe..cb10859c9f 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpid/ToyClient.java
@@ -18,12 +18,13 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpid;
+import java.nio.*;
import java.util.*;
-import org.apache.qpidity.transport.*;
-import org.apache.qpidity.transport.network.mina.MinaHandler;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.mina.MinaHandler;
/**
@@ -47,17 +48,9 @@ class ToyClient extends SessionDelegate
}
}
- @Override public void header(Session ssn, Header header)
+ @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
- for (Struct st : header.getStructs())
- {
- System.out.println("header: " + st);
- }
- }
-
- @Override public void data(Session ssn, Data data)
- {
- System.out.println("got data: " + data);
+ System.out.println("msg: " + xfr);
}
public static final void main(String[] args)
@@ -75,9 +68,8 @@ class ToyClient extends SessionDelegate
}
public void closed() {}
});
- conn.send(new ConnectionEvent(0, new ProtocolHeader(1,
- TransportConstants.getVersionMajor(),
- TransportConstants.getVersionMinor())));
+ conn.send(new ProtocolHeader
+ (1, 0, 10));
Channel ch = conn.getChannel(0);
Session ssn = new Session("my-session".getBytes());
@@ -112,16 +104,16 @@ class ToyClient extends SessionDelegate
map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED);
- ssn.header(new DeliveryProperties(),
- new MessageProperties().setApplicationHeaders(map));
- ssn.data("this is the data");
- ssn.endData();
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties(),
+ new MessageProperties()
+ .setApplicationHeaders(map)),
+ "this is the data");
ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("this should be rejected");
- ssn.endData();
+ MessageAcquireMode.PRE_ACQUIRED,
+ null,
+ "this should be rejected");
ssn.sync();
Future<QueueQueryResult> future = ssn.queueQuery("asdf");
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpid/ToyExchange.java
index eab5f6c078..c638679596 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
+++ b/java/common/src/main/java/org/apache/qpid/ToyExchange.java
@@ -1,4 +1,4 @@
-package org.apache.qpidity;
+package org.apache.qpid;
import java.util.ArrayList;
import java.util.HashMap;
@@ -9,42 +9,43 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.qpidity.ToyBroker.Message;
+import org.apache.qpid.transport.MessageTransfer;
+
public class ToyExchange
{
final static String DIRECT = "amq.direct";
final static String TOPIC = "amq.topic";
- private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
- private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
- private Map<String,LinkedBlockingQueue<Message>> queues = new HashMap<String,LinkedBlockingQueue<Message>>();
+ private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
+ private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
+ private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>();
public void createQueue(String name)
{
- queues.put(name, new LinkedBlockingQueue<Message>());
+ queues.put(name, new LinkedBlockingQueue<MessageTransfer>());
}
- public LinkedBlockingQueue<Message> getQueue(String name)
+ public LinkedBlockingQueue<MessageTransfer> getQueue(String name)
{
return queues.get(name);
}
public void bindQueue(String type,String binding,String queueName)
{
- LinkedBlockingQueue<Message> queue = queues.get(queueName);
+ LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName);
binding = normalizeKey(binding);
if(DIRECT.equals(type))
{
if (directEx.containsKey(binding))
{
- List<LinkedBlockingQueue<Message>> list = directEx.get(binding);
+ List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding);
list.add(queue);
}
else
{
- List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
+ List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
list.add(queue);
directEx.put(binding,list);
}
@@ -53,21 +54,21 @@ public class ToyExchange
{
if (topicEx.containsKey(binding))
{
- List<LinkedBlockingQueue<Message>> list = topicEx.get(binding);
+ List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding);
list.add(queue);
}
else
{
- List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
+ List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
list.add(queue);
topicEx.put(binding,list);
}
}
}
- public boolean route(String dest,String routingKey,Message msg)
+ public boolean route(String dest, String routingKey, MessageTransfer msg)
{
- List<LinkedBlockingQueue<Message>> queues;
+ List<LinkedBlockingQueue<MessageTransfer>> queues;
if(DIRECT.equals(dest))
{
queues = directEx.get(routingKey);
@@ -101,9 +102,9 @@ public class ToyExchange
}
}
- private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey)
+ private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey)
{
- List<LinkedBlockingQueue<Message>> selected = new ArrayList<LinkedBlockingQueue<Message>>();
+ List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>();
for(String key: topicEx.keySet())
{
@@ -111,7 +112,7 @@ public class ToyExchange
Matcher m = p.matcher(routingKey);
if (m.find())
{
- for(LinkedBlockingQueue<Message> queue : topicEx.get(key))
+ for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key))
{
selected.add(queue);
}
@@ -121,9 +122,9 @@ public class ToyExchange
return selected;
}
- private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> selected)
+ private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected)
{
- for(LinkedBlockingQueue<Message> queue : selected)
+ for(LinkedBlockingQueue<MessageTransfer> queue : selected)
{
queue.offer(msg);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpid/api/Message.java
index f5488fde52..df6f279026 100644
--- a/java/common/src/main/java/org/apache/qpidity/api/Message.java
+++ b/java/common/src/main/java/org/apache/qpid/api/Message.java
@@ -1,11 +1,11 @@
-package org.apache.qpidity.api;
+package org.apache.qpid.api;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpidity.transport.MessageProperties;
-import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.Header;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
/*
* Licensed to the Apache Software Foundation (ASF) under one
diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
index 67f16e6a87..7371c12519 100644
--- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.common;
+import org.apache.qpid.framing.AMQShortString;
+
/**
* Specifies the available client property types that different clients can use to identify themselves with.
*
@@ -30,8 +32,21 @@ package org.apache.qpid.common;
*/
public enum ClientProperties
{
- instance,
- product,
- version,
- platform
+ instance("instance"),
+ product("product"),
+ version("version"),
+ platform("platform");
+
+ private final AMQShortString _amqShortString;
+
+ private ClientProperties(String name)
+ {
+ _amqShortString = new AMQShortString(name);
+ }
+
+
+ public AMQShortString toAMQShortString()
+ {
+ return _amqShortString;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java b/java/common/src/main/java/org/apache/qpid/dtx/XidImpl.java
index 89d7e7917f..49effc2dae 100644
--- a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java
+++ b/java/common/src/main/java/org/apache/qpid/dtx/XidImpl.java
@@ -15,11 +15,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpidity.dtx;
+package org.apache.qpid.dtx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpidity.QpidException;
+import org.apache.qpid.QpidException;
import javax.transaction.xa.Xid;
@@ -241,9 +241,9 @@ public class XidImpl implements Xid
* @return The String representation of this Xid
* @throws QpidException In case of problem when converting this Xid into a string.
*/
- public static org.apache.qpidity.transport.Xid convert(Xid xid) throws QpidException
+ public static org.apache.qpid.transport.Xid convert(Xid xid) throws QpidException
{
- return new org.apache.qpidity.transport.Xid(xid.getFormatId(),
+ return new org.apache.qpid.transport.Xid(xid.getFormatId(),
xid.getGlobalTransactionId(),
xid.getBranchQualifier());
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
index 903b5bfa7a..a2fc3a03ef 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
@@ -50,4 +50,14 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock
return buffer;
}
+ public java.nio.ByteBuffer toNioByteBuffer()
+ {
+ final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
+
+ ByteBuffer buf = ByteBuffer.wrap(buffer);
+ writePayload(buf);
+ buffer.flip();
+ return buffer;
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index a747aaeda7..a8e7f47db0 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -111,6 +111,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private final byte[] _data;
private final int _offset;
private int _hashCode;
+ private String _asString = null;
+
private final int _length;
private static final char[] EMPTY_CHAR_ARRAY = new char[0];
@@ -137,7 +139,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
-
+ _asString = data;
}
public AMQShortString(char[] data)
@@ -224,7 +226,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
}
}
-
/**
* Get the length of the short string
* @return length of the underlying byte array
@@ -419,9 +420,14 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return chars;
}
+
public String asString()
{
- return new String(asChars());
+ if (_asString == null)
+ {
+ _asString = new String(asChars());
+ }
+ return _asString;
}
public boolean equals(Object o)
@@ -464,13 +470,49 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return false;
}
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ final int hashCode = _hashCode;
+
+ final int otherHashCode = otherString._hashCode;
+
+ if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode))
+ {
+ return false;
+ }
+
+ final int length = _length;
+
+ if(length != otherString._length)
{
return false;
}
- return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
- || Arrays.equals(getBytes(),otherString.getBytes());
+
+ final byte[] data = _data;
+
+ final byte[] otherData = otherString._data;
+
+ final int offset = _offset;
+
+ final int otherOffset = otherString._offset;
+
+ if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length)
+ {
+ return Arrays.equals(data, otherData);
+ }
+ else
+ {
+ int thisIdx = offset;
+ int otherIdx = otherOffset;
+ for(int i = length; i-- != 0; )
+ {
+ if(!(data[thisIdx++] == otherData[otherIdx++]))
+ {
+ return false;
+ }
+ }
+ }
+
+ return true;
}
@@ -718,4 +760,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return false; //To change body of created methods use File | Settings | File Templates.
}
+
+ public static void main(String args[])
+ {
+ AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k");
+ AMQShortString s2 = s.substring(2, 7);
+
+ AMQShortStringTokenizer t = s2.tokenize((byte) '.');
+ while(t.hasMoreTokens())
+ {
+ System.err.println(t.nextToken());
+ }
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index d6359baa0f..1ff39ca790 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -93,4 +93,24 @@ public class AMQTypedValue
{
return "[" + getType() + ": " + getValue() + "]";
}
+
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof AMQTypedValue)
+ {
+ AMQTypedValue other = (AMQTypedValue) o;
+ return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index 9ba9b53b13..ed01c91804 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -74,7 +74,7 @@ public class FieldTable
buffer.skip((int) length);
}
- private AMQTypedValue getProperty(AMQShortString string)
+ public AMQTypedValue getProperty(AMQShortString string)
{
checkPropertyName(string);
@@ -891,6 +891,20 @@ public class FieldTable
return keys;
}
+ public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator()
+ {
+ if(_encodedForm != null)
+ {
+ return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize);
+ }
+ else
+ {
+ initMapIfNecessary();
+ return _properties.entrySet().iterator();
+ }
+ }
+
+
public Object get(AMQShortString key)
{
@@ -1050,6 +1064,95 @@ public class FieldTable
}
}
+ private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue>
+ {
+ private final AMQTypedValue _value;
+ private final AMQShortString _key;
+
+ public FieldTableEntry(final AMQShortString key, final AMQTypedValue value)
+ {
+ _key = key;
+ _value = value;
+ }
+
+ public AMQShortString getKey()
+ {
+ return _key;
+ }
+
+ public AMQTypedValue getValue()
+ {
+ return _value;
+ }
+
+ public AMQTypedValue setValue(final AMQTypedValue value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof FieldTableEntry)
+ {
+ FieldTableEntry other = (FieldTableEntry) o;
+ return (_key == null ? other._key == null : _key.equals(other._key))
+ && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return (getKey()==null ? 0 : getKey().hashCode())
+ ^ (getValue()==null ? 0 : getValue().hashCode());
+ }
+
+ }
+
+
+ private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>>
+ {
+
+ private final ByteBuffer _buffer;
+ private int _expectedRemaining;
+
+ public FieldTableIterator(ByteBuffer buffer, int length)
+ {
+ _buffer = buffer;
+ _expectedRemaining = buffer.remaining() - length;
+ }
+
+ public boolean hasNext()
+ {
+ return (_buffer.remaining() > _expectedRemaining);
+ }
+
+ public Map.Entry<AMQShortString, AMQTypedValue> next()
+ {
+ if(hasNext())
+ {
+ final AMQShortString key = EncodingUtils.readAMQShortString(_buffer);
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer);
+ return new FieldTableEntry(key, value);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+
+
public int hashCode()
{
initMapIfNecessary();
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java
index b2a09ac592..00da005515 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -50,7 +50,7 @@ import org.apache.mina.common.IoSession;
*
* @todo For better re-usability could make the completion handler optional. Only run it when one is set.
*/
-public class Job implements Runnable
+public class Job implements ReadWriteRunnable
{
/** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
private final int _maxEvents;
@@ -67,18 +67,22 @@ public class Job implements Runnable
/** Holds the completion continuation, called upon completion of a run of the job. */
private final JobCompletionHandler _completionHandler;
+ private final boolean _readJob;
+
/**
* Creates a new job that aggregates many continuations together.
*
* @param session The Mina session.
* @param completionHandler The per job run, terminal continuation.
* @param maxEvents The maximum number of aggregated continuations to process per run of the job.
+ * @param readJob
*/
- Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
+ Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
{
_session = session;
_completionHandler = completionHandler;
_maxEvents = maxEvents;
+ _readJob = readJob;
}
/**
@@ -157,6 +161,22 @@ public class Job implements Runnable
}
}
+ public boolean isReadJob()
+ {
+ return _readJob;
+ }
+
+ public boolean isRead()
+ {
+ return _readJob;
+ }
+
+ public boolean isWrite()
+ {
+ return !_readJob;
+ }
+
+
/**
* Another interface for a continuation.
*
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index 2912e54662..a080cc7e04 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -60,24 +60,6 @@ import java.util.concurrent.ExecutorService;
* <td> {@link Job}, {@link Job.JobCompletionHandler}
* </table>
*
- * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events.
- * The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread
- * pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads
- * so there is concurrency. But why go to the trouble of seperating out the read and write events in that case?
- * Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these
- * stages.
- *
- * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in
- * it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run
- * and trips off another batch of 10 until they are all done. Why not just have a straight forward
- * consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10
- * in them, just have one queue of events and worker threads taking the next event. There will be coordination
- * between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same
- * amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker
- * pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be
- * better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily
- * be substituted in.
- *
* @todo The static helper methods are pointless. Could just call new.
*/
public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
@@ -96,17 +78,20 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
private final int _maxEvents;
+ private final boolean _readFilter;
+
/**
* Creates a named pooling filter, on the specified shared thread pool.
*
* @param refCountingPool The thread pool reference.
* @param name The identifying name of the filter type.
*/
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter)
{
_poolReference = refCountingPool;
_name = name;
_maxEvents = maxEvents;
+ _readFilter = readFilter;
}
/**
@@ -167,7 +152,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
void fireAsynchEvent(Job job, Event event)
{
- // job.acquire(); //prevents this job being removed from _jobs
job.add(event);
final ExecutorService pool = _poolReference.getPool();
@@ -201,7 +185,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void createNewJobForSession(IoSession session)
{
- Job job = new Job(session, this, MAX_JOB_EVENTS);
+ Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
session.setAttribute(_name, job);
}
@@ -433,7 +417,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true);
}
/**
@@ -476,7 +460,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false);
}
/**
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
new file mode 100644
index 0000000000..8de0f93ce9
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
@@ -0,0 +1,432 @@
+package org.apache.qpid.pool;
+
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
+{
+
+ private final AtomicInteger _count = new AtomicInteger(0);
+
+ private final ReentrantLock _takeLock = new ReentrantLock();
+
+ private final Condition _notEmpty = _takeLock.newCondition();
+
+ private final ReentrantLock _putLock = new ReentrantLock();
+
+ private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
+
+ private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
+
+
+ private class ReadWriteJobIterator implements Iterator<Runnable>
+ {
+
+ private boolean _onReads;
+ private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
+
+ public boolean hasNext()
+ {
+ if(!_iter.hasNext())
+ {
+ if(_onReads)
+ {
+ _iter = _readJobQueue.iterator();
+ _onReads = true;
+ return _iter.hasNext();
+ }
+ else
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public Runnable next()
+ {
+ if(_iter.hasNext())
+ {
+ return _iter.next();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void remove()
+ {
+ _takeLock.lock();
+ try
+ {
+ _iter.remove();
+ _count.decrementAndGet();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+ }
+
+ public Iterator<Runnable> iterator()
+ {
+ return new ReadWriteJobIterator();
+ }
+
+ public int size()
+ {
+ return _count.get();
+ }
+
+ public boolean offer(final Runnable runnable)
+ {
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+ try
+ {
+ if(job.isRead())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+ return true;
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+ }
+
+ public void put(final Runnable runnable) throws InterruptedException
+ {
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+
+ try
+ {
+ if(job.isRead())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+ }
+
+
+
+ public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
+ {
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+
+ try
+ {
+ if(job.isRead())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+
+ return true;
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+
+ }
+
+ public Runnable take() throws InterruptedException
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lockInterruptibly();
+ try
+ {
+ try
+ {
+ while (_count.get() == 0)
+ {
+ _notEmpty.await();
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ _notEmpty.signal();
+ throw ie;
+ }
+
+ ReadWriteRunnable job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ int c = _count.getAndDecrement();
+ if (c > 1)
+ {
+ _notEmpty.signal();
+ }
+ return job;
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+
+ }
+
+ public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException
+ {
+ final ReentrantLock takeLock = _takeLock;
+ final AtomicInteger count = _count;
+ long nanos = unit.toNanos(timeout);
+ takeLock.lockInterruptibly();
+ ReadWriteRunnable job = null;
+ try
+ {
+
+ for (;;)
+ {
+ if (count.get() > 0)
+ {
+ job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ int c = count.getAndDecrement();
+ if (c > 1)
+ {
+ _notEmpty.signal();
+ }
+ break;
+ }
+ if (nanos <= 0)
+ {
+ return null;
+ }
+ try
+ {
+ nanos = _notEmpty.awaitNanos(nanos);
+ }
+ catch (InterruptedException ie)
+ {
+ _notEmpty.signal();
+ throw ie;
+ }
+ }
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+ return job;
+ }
+
+ public int remainingCapacity()
+ {
+ return Integer.MAX_VALUE;
+ }
+
+ public int drainTo(final Collection<? super Runnable> c)
+ {
+ int total = 0;
+
+ _putLock.lock();
+ _takeLock.lock();
+ try
+ {
+ ReadWriteRunnable job;
+ while((job = _writeJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _writeJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ while((job = _readJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _readJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ }
+ finally
+ {
+ _takeLock.unlock();
+ _putLock.unlock();
+ }
+ return total;
+ }
+
+ public int drainTo(final Collection<? super Runnable> c, final int maxElements)
+ {
+ int total = 0;
+
+ _putLock.lock();
+ _takeLock.lock();
+ try
+ {
+ ReadWriteRunnable job;
+ while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _writeJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ while(total<=maxElements && (job = _readJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _readJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ }
+ finally
+ {
+ _takeLock.unlock();
+ _putLock.unlock();
+ }
+ return total;
+
+ }
+
+ public Runnable poll()
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lock();
+ try
+ {
+ if(_count.get() > 0)
+ {
+ ReadWriteRunnable job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ _count.decrementAndGet();
+ return job;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+ }
+
+ public Runnable peek()
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lock();
+ try
+ {
+ ReadWriteRunnable job = _writeJobQueue.peek();
+ if(job == null)
+ {
+ job = _readJobQueue.peek();
+ }
+ return job;
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
new file mode 100644
index 0000000000..ad04a923e1
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
@@ -0,0 +1,27 @@
+package org.apache.qpid.pool;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface ReadWriteRunnable extends Runnable
+{
+ boolean isRead();
+ boolean isWrite();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
index 84c9e1f465..ce9c6ae4cb 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -22,6 +22,9 @@ package org.apache.qpid.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
@@ -84,6 +87,8 @@ public class ReferenceCountingExecutorService
/** Holds the number of executor threads to create. */
private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
+ private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+
/**
* Retrieves the singleton instance of this reference counter.
*
@@ -105,15 +110,28 @@ public class ReferenceCountingExecutorService
*
* @return An executor service.
*/
- ExecutorService acquireExecutorService()
+ public ExecutorService acquireExecutorService()
{
synchronized (_lock)
{
if (_refCount++ == 0)
{
- _pool = Executors.newFixedThreadPool(_poolSize);
+// _pool = Executors.newFixedThreadPool(_poolSize);
+
+ // Use a job queue that biases to writes
+ if(_useBiasedPool)
+ {
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new ReadWriteJobQueue());
+ }
+ else
+ {
+ _pool = Executors.newFixedThreadPool(_poolSize);
+ }
}
+
return _pool;
}
}
@@ -122,7 +140,7 @@ public class ReferenceCountingExecutorService
* Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
* to zero, the executor service is shut down.
*/
- void releaseExecutorService()
+ public void releaseExecutorService()
{
synchronized (_lock)
{
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 59003225b7..b58e7d01dc 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -21,8 +21,12 @@
package org.apache.qpid.protocol;
import org.apache.qpid.framing.*;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.AMQException;
+import java.nio.ByteBuffer;
+
+
/**
* AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to
* callers of the version of the AMQP protocol that they are able to work with.
@@ -54,4 +58,7 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
+ public void setSender(Sender<ByteBuffer> sender);
+ public void init();
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java b/java/common/src/main/java/org/apache/qpid/security/AMQPCallbackHandler.java
index 2e7afa1b87..a3dad9acdc 100644
--- a/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java
+++ b/java/common/src/main/java/org/apache/qpid/security/AMQPCallbackHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.security;
+package org.apache.qpid.security;
import javax.security.auth.callback.CallbackHandler;
diff --git a/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java b/java/common/src/main/java/org/apache/qpid/security/CallbackHandlerRegistry.java
index b23fea7e4a..8c80a1b5b7 100644
--- a/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java
+++ b/java/common/src/main/java/org/apache/qpid/security/CallbackHandlerRegistry.java
@@ -18,12 +18,12 @@
* under the License.
*
*/
-package org.apache.qpidity.security;
+package org.apache.qpid.security;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpidity.QpidConfig;
+import org.apache.qpid.QpidConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java b/java/common/src/main/java/org/apache/qpid/security/DynamicSaslRegistrar.java
index 1798f0c210..9f48ac96a3 100644
--- a/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java
+++ b/java/common/src/main/java/org/apache/qpid/security/DynamicSaslRegistrar.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.security;
+package org.apache.qpid.security;
import java.security.Security;
import java.util.Map;
@@ -26,7 +26,7 @@ import java.util.TreeMap;
import javax.security.sasl.SaslClientFactory;
-import org.apache.qpidity.QpidConfig;
+import org.apache.qpid.QpidConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java b/java/common/src/main/java/org/apache/qpid/security/JCAProvider.java
index c775171a5f..033deb550c 100644
--- a/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java
+++ b/java/common/src/main/java/org/apache/qpid/security/JCAProvider.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.security;
+package org.apache.qpid.security;
import java.security.Provider;
import java.security.Security;
diff --git a/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java b/java/common/src/main/java/org/apache/qpid/security/UsernamePasswordCallbackHandler.java
index 0fd647e015..89a63abeab 100644
--- a/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java
+++ b/java/common/src/main/java/org/apache/qpid/security/UsernamePasswordCallbackHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.security;
+package org.apache.qpid.security;
import java.io.IOException;
diff --git a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java b/java/common/src/main/java/org/apache/qpid/security/amqplain/AmqPlainSaslClient.java
index 6e4a0218d2..81acc66de4 100644
--- a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java
+++ b/java/common/src/main/java/org/apache/qpid/security/amqplain/AmqPlainSaslClient.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.security.amqplain;
+package org.apache.qpid.security.amqplain;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
diff --git a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java b/java/common/src/main/java/org/apache/qpid/security/amqplain/AmqPlainSaslClientFactory.java
index abc881f433..6c66c87f4c 100644
--- a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/security/amqplain/AmqPlainSaslClientFactory.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.security.amqplain;
+package org.apache.qpid.security.amqplain;
import javax.security.sasl.SaslClientFactory;
import javax.security.sasl.SaslClient;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Binary.java b/java/common/src/main/java/org/apache/qpid/transport/Binary.java
new file mode 100644
index 0000000000..e6dedc536f
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/Binary.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * Binary
+ *
+ */
+
+public final class Binary
+{
+
+ private byte[] bytes;
+ private int offset;
+ private int size;
+ private int hash = 0;
+
+ public Binary(byte[] bytes, int offset, int size)
+ {
+ if (offset + size > bytes.length)
+ {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ this.bytes = bytes;
+ this.offset = offset;
+ this.size = size;
+ }
+
+ public Binary(byte[] bytes)
+ {
+ this(bytes, 0, bytes.length);
+ }
+
+ public final byte[] array()
+ {
+ return bytes;
+ }
+
+ public final int offset()
+ {
+ return offset;
+ }
+
+ public final int size()
+ {
+ return size;
+ }
+
+ public final Binary slice(int low, int high)
+ {
+ int sz;
+
+ if (high < 0)
+ {
+ sz = size + high;
+ }
+ else
+ {
+ sz = high - low;
+ }
+
+ if (sz < 0)
+ {
+ sz = 0;
+ }
+
+ return new Binary(bytes, offset + low, sz);
+ }
+
+ public final int hashCode()
+ {
+ if (hash == 0)
+ {
+ int hc = 0;
+ for (int i = 0; i < size; i++)
+ {
+ hc = 31*hc + (0xFF & bytes[offset + i]);
+ }
+ hash = hc;
+ }
+
+ return hash;
+ }
+
+ public final boolean equals(Object o)
+ {
+ if (!(o instanceof Binary))
+ {
+ return false;
+ }
+
+ Binary buf = (Binary) o;
+ if (this.size != buf.size)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < size; i++)
+ {
+ if (bytes[offset + i] != buf.bytes[buf.offset + i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Binding.java b/java/common/src/main/java/org/apache/qpid/transport/Binding.java
index 18ed97098d..8418c42189 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Binding.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Binding.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpid/transport/Channel.java
index fb8918eb7b..d6b015930b 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Channel.java
@@ -18,10 +18,10 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
-import org.apache.qpidity.transport.network.Frame;
-import org.apache.qpidity.transport.util.Logger;
+import org.apache.qpid.transport.network.Frame;
+import org.apache.qpid.transport.util.Logger;
import java.nio.ByteBuffer;
@@ -30,8 +30,8 @@ import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.qpidity.transport.network.Frame.*;
-import static org.apache.qpidity.transport.util.Functions.*;
+import static org.apache.qpid.transport.network.Frame.*;
+import static org.apache.qpid.transport.util.Functions.*;
/**
@@ -53,10 +53,6 @@ public class Channel extends Invoker
// session may be null
private Session session;
- private Lock commandLock = new ReentrantLock();
- private boolean first = true;
- private ByteBuffer data = null;
-
public Channel(Connection connection, int channel, SessionDelegate delegate)
{
this.connection = connection;
@@ -104,16 +100,6 @@ public class Channel extends Invoker
method.delegate(session, sessionDelegate);
}
- public void header(Void v, Header header)
- {
- header.delegate(session, sessionDelegate);
- }
-
- public void data(Void v, Data data)
- {
- data.delegate(session, sessionDelegate);
- }
-
public void error(Void v, ProtocolError error)
{
throw new RuntimeException(error.getMessage());
@@ -148,58 +134,28 @@ public class Channel extends Invoker
this.session = session;
}
- private void emit(ProtocolEvent event)
- {
- connection.send(new ConnectionEvent(channel, event));
- }
-
- public void method(Method m)
+ void closeCode(ConnectionClose close)
{
- if (m.getEncodedTrack() == Frame.L4)
- {
- commandLock.lock();
- }
-
- emit(m);
-
- if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload())
+ if (session != null)
{
- commandLock.unlock();
+ session.closeCode(close);
}
}
- public void header(Header header)
+ private void emit(ProtocolEvent event)
{
- emit(header);
+ event.setChannel(channel);
+ connection.send(event);
}
- public void data(ByteBuffer buf)
+ public void method(Method m)
{
- if (data != null)
+ emit(m);
+
+ if (!m.isBatch())
{
- emit(new Data(data, first, false));
- first = false;
+ connection.flush();
}
-
- data = buf;
- }
-
- public void data(String str)
- {
- data(str.getBytes());
- }
-
- public void data(byte[] bytes)
- {
- data(ByteBuffer.wrap(bytes));
- }
-
- public void end()
- {
- emit(new Data(data, first, true));
- first = true;
- data = null;
- commandLock.unlock();
}
protected void invoke(Method m)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java
index 96578ffeb8..8475fbf174 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
import java.util.UUID;
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 699854fb3b..bbdadfadb9 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
/**
@@ -31,10 +31,9 @@ public abstract class ClientDelegate extends ConnectionDelegate
public void init(Channel ch, ProtocolHeader hdr)
{
- if (hdr.getMajor() != TransportConstants.getVersionMajor() &&
- hdr.getMinor() != TransportConstants.getVersionMinor())
+ if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
{
- throw new RuntimeException("version missmatch: " + hdr);
+ throw new ProtocolVersionException(hdr.getMajor(), hdr.getMinor());
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 9829343491..68b9b209bb 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -18,9 +18,9 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
-import org.apache.qpidity.transport.util.Logger;
+import org.apache.qpid.transport.util.Logger;
import java.util.ArrayList;
import java.util.HashMap;
@@ -40,14 +40,13 @@ import java.nio.ByteBuffer;
* short instead of Short
*/
-// RA making this public until we sort out the package issues
public class Connection
- implements Receiver<ConnectionEvent>, Sender<ConnectionEvent>
+ implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
{
private static final Logger log = Logger.get(Connection.class);
- final private Sender<ConnectionEvent> sender;
+ final private Sender<ProtocolEvent> sender;
final private ConnectionDelegate delegate;
private int channelMax = 1;
// want to make this final
@@ -55,7 +54,7 @@ public class Connection
final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
- public Connection(Sender<ConnectionEvent> sender,
+ public Connection(Sender<ProtocolEvent> sender,
ConnectionDelegate delegate)
{
this.sender = sender;
@@ -77,19 +76,25 @@ public class Connection
return delegate;
}
- public void received(ConnectionEvent event)
+ public void received(ProtocolEvent event)
{
log.debug("RECV: [%s] %s", this, event);
Channel channel = getChannel(event.getChannel());
- channel.received(event.getProtocolEvent());
+ channel.received(event);
}
- public void send(ConnectionEvent event)
+ public void send(ProtocolEvent event)
{
log.debug("SEND: [%s] %s", this, event);
sender.send(event);
}
+ public void flush()
+ {
+ log.debug("FLUSH: [%s]", this);
+ sender.flush();
+ }
+
public int getChannelMax()
{
return channelMax;
@@ -143,6 +148,17 @@ public class Connection
delegate.exception(t);
}
+ void closeCode(ConnectionClose close)
+ {
+ synchronized (channels)
+ {
+ for (Channel ch : channels.values())
+ {
+ ch.closeCode(close);
+ }
+ }
+ }
+
public void closed()
{
log.debug("connection closed: %s", this);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index 14344991c6..2aa1db7b28 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -18,12 +18,12 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
-import org.apache.qpidity.transport.util.Logger;
+import org.apache.qpid.transport.util.Logger;
-import org.apache.qpidity.SecurityHelper;
-import org.apache.qpidity.QpidException;
+import org.apache.qpid.SecurityHelper;
+import org.apache.qpid.QpidException;
import java.io.UnsupportedEncodingException;
@@ -82,28 +82,12 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
public void init(Channel ch, ProtocolHeader hdr)
{
- ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader
- (1,
- TransportConstants.getVersionMajor(),
- TransportConstants.getVersionMinor())));
- if (hdr.getMajor() != TransportConstants.getVersionMajor() &&
- hdr.getMinor() != TransportConstants.getVersionMinor())
- {
- // XXX
- ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader
- (1,
- TransportConstants.getVersionMajor(),
- TransportConstants.getVersionMinor())));
- ch.getConnection().close();
- }
- else
- {
- List<Object> plain = new ArrayList<Object>();
- plain.add("PLAIN");
- List<Object> utf8 = new ArrayList<Object>();
- utf8.add("utf8");
- ch.connectionStart(null, plain, utf8);
- }
+ ch.getConnection().send(new ProtocolHeader (1, hdr.getMajor(), hdr.getMinor()));
+ List<Object> plain = new ArrayList<Object>();
+ plain.add("PLAIN");
+ List<Object> utf8 = new ArrayList<Object>();
+ utf8.add("utf8");
+ ch.connectionStart(null, plain, utf8);
}
// ----------------------------------------------
@@ -264,6 +248,12 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
context.connectionOpenOk(hosts);
}
+ @Override public void connectionClose(Channel ch, ConnectionClose close)
+ {
+ ch.getConnection().closeCode(close);
+ ch.connectionCloseOk();
+ }
+
public String getPassword()
{
return _password;
@@ -294,8 +284,4 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
_virtualHost = host;
}
- public String getUnsupportedProtocol()
- {
- return null;
- }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
new file mode 100644
index 0000000000..c3239ef684
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * ConnectionException
+ *
+ */
+
+public class ConnectionException extends RuntimeException
+{
+
+ private ConnectionClose close;
+
+ public ConnectionException(ConnectionClose close)
+ {
+ super(close.getReplyText());
+ this.close = close;
+ }
+
+ public ConnectionClose getClose()
+ {
+ return close;
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java
index 03d0d3e161..b2be32331a 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Echo.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java
@@ -18,12 +18,13 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpidity.transport.network.mina.MinaHandler;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
/**
@@ -34,31 +35,9 @@ import org.apache.qpidity.transport.network.mina.MinaHandler;
public class Echo extends SessionDelegate
{
- private MessageTransfer xfr = null;
-
public void messageTransfer(Session ssn, MessageTransfer xfr)
{
- this.xfr = xfr;
ssn.invoke(xfr);
- }
-
- public void header(Session ssn, Header hdr)
- {
- ssn.header(hdr);
- }
-
- public void data(Session ssn, Data data)
- {
- for (ByteBuffer buf : data.getFragments())
- {
- ssn.data(buf);
- }
- if (data.isLast())
- {
- ssn.endData();
- }
-
- // XXX: should be able to get command-id from any segment
ssn.processed(xfr);
}
@@ -81,7 +60,9 @@ public class Echo extends SessionDelegate
delegate.setUsername("guest");
delegate.setPassword("guest");
- MinaHandler.accept("0.0.0.0", 5672, delegate);
+ IoAcceptor ioa = new IoAcceptor
+ ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+ ioa.start();
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Field.java b/java/common/src/main/java/org/apache/qpid/transport/Field.java
index ebbd59288b..bc6bf10041 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Field.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Field.java
@@ -18,10 +18,10 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
-import org.apache.qpidity.transport.codec.Decoder;
-import org.apache.qpidity.transport.codec.Encoder;
+import org.apache.qpid.transport.codec.Decoder;
+import org.apache.qpid.transport.codec.Encoder;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Future.java b/java/common/src/main/java/org/apache/qpid/transport/Future.java
index 8936f06831..d8cde61af5 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Future.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Future.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Header.java b/java/common/src/main/java/org/apache/qpid/transport/Header.java
index ae11bb0c69..9439e5e0de 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Header.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Header.java
@@ -18,11 +18,14 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
-import org.apache.qpidity.transport.network.Frame;
+import org.apache.qpid.transport.network.Frame;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.LinkedHashMap;
import java.nio.ByteBuffer;
@@ -32,65 +35,43 @@ import java.nio.ByteBuffer;
* @author Rafael H. Schloming
*/
-public class Header implements ProtocolEvent {
+public class Header {
- private final List<Struct> structs;
- private ByteBuffer _buf;
- private boolean _noPayload;
+ private final Struct[] structs;
- public Header(List<Struct> structs, boolean lastframe)
+ public Header(List<Struct> structs)
{
- this.structs = structs;
- _noPayload= lastframe;
+ this(structs.toArray(new Struct[structs.size()]));
}
- public List<Struct> getStructs()
+ public Header(Struct ... structs)
{
- return structs;
+ this.structs = structs;
}
- public void setBuf(ByteBuffer buf)
+ public Struct[] getStructs()
{
- _buf = buf;
+ return structs;
}
- public ByteBuffer getBuf()
- {
- return _buf;
- }
+
public <T> T get(Class<T> klass)
{
for (Struct st : structs)
{
if (klass.isInstance(st))
{
- return klass.cast(st);
+ return (T) st;
}
}
return null;
}
- public byte getEncodedTrack()
- {
- return Frame.L4;
- }
-
- public <C> void delegate(C context, ProtocolDelegate<C> delegate)
- {
- delegate.header(context, this);
- }
-
- public boolean hasNoPayload()
- {
- return _noPayload;
- }
-
-
public String toString()
{
StringBuffer str = new StringBuffer();
- str.append("Header(");
+ str.append(" Header(");
boolean first = true;
for (Struct s : structs)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpid/transport/Method.java
index f72ebd570c..6b99f6d5d3 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -18,9 +18,13 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
-import org.apache.qpidity.transport.network.Frame;
+import org.apache.qpid.transport.network.Frame;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.qpid.transport.util.Functions.*;
/**
* Method
@@ -40,8 +44,10 @@ public abstract class Method extends Struct implements ProtocolEvent
// XXX: command subclass?
private int id;
+ private int channel;
private boolean idSet = false;
private boolean sync = false;
+ private boolean batch = false;
public final int getId()
{
@@ -54,18 +60,58 @@ public abstract class Method extends Struct implements ProtocolEvent
this.idSet = true;
}
+ public final int getChannel()
+ {
+ return channel;
+ }
+
+ public final void setChannel(int channel)
+ {
+ this.channel = channel;
+ }
+
public final boolean isSync()
{
return sync;
}
- void setSync(boolean value)
+ final void setSync(boolean value)
{
this.sync = value;
}
+ public final boolean isBatch()
+ {
+ return batch;
+ }
+
+ final void setBatch(boolean value)
+ {
+ this.batch = value;
+ }
+
public abstract boolean hasPayload();
+ public Header getHeader()
+ {
+ return null;
+ }
+
+ public void setHeader(Header header)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer getBody()
+ {
+ return null;
+ }
+
+ public void setBody(ByteBuffer body)
+ {
+ throw new UnsupportedOperationException();
+ }
+
public abstract byte getEncodedTrack();
public abstract <C> void dispatch(C context, MethodDelegate<C> delegate);
@@ -84,33 +130,50 @@ public abstract class Method extends Struct implements ProtocolEvent
public String toString()
{
- if (getEncodedTrack() != Frame.L4)
- {
- return super.toString();
- }
-
StringBuilder str = new StringBuilder();
- if (idSet)
+ str.append("ch=");
+ str.append(channel);
+
+ if (getEncodedTrack() == Frame.L4 && idSet)
{
- str.append("id=");
+ str.append(" id=");
str.append(id);
}
- if (sync)
+ if (sync || batch)
{
- if (str.length() > 0)
+ str.append(" ");
+ str.append("[");
+ if (sync)
{
- str.append(" ");
+ str.append("S");
}
- str.append(" [sync]");
+ if (batch)
+ {
+ str.append("B");
+ }
+ str.append("]");
}
- if (str.length() > 0)
+ str.append(" ");
+ str.append(super.toString());
+ Header hdr = getHeader();
+ if (hdr != null)
{
- str.append(" ");
+ for (Struct st : hdr.getStructs())
+ {
+ str.append("\n ");
+ str.append(st);
+ }
}
- str.append(super.toString());
+ ByteBuffer body = getBody();
+ if (body != null)
+ {
+ str.append("\n body=");
+ str.append(str(body, 64));
+ }
+
return str.toString();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java
index 028d570416..a90948fc1d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
/**
@@ -35,10 +35,6 @@ public interface ProtocolDelegate<C>
void command(C context, Method command);
- void header(C context, Header header);
-
- void data(C context, Data data);
-
void error(C context, ProtocolError error);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolError.java
index 59fc3d2552..bd6ab81997 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolError.java
@@ -18,10 +18,10 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
-import org.apache.qpidity.transport.network.NetworkDelegate;
-import org.apache.qpidity.transport.network.NetworkEvent;
+import org.apache.qpid.transport.network.NetworkDelegate;
+import org.apache.qpid.transport.network.NetworkEvent;
/**
@@ -30,9 +30,10 @@ import org.apache.qpidity.transport.network.NetworkEvent;
* @author Rafael H. Schloming
*/
-public class ProtocolError implements NetworkEvent, ProtocolEvent
+public final class ProtocolError implements NetworkEvent, ProtocolEvent
{
+ private int channel;
private final byte track;
private final String format;
private final Object[] args;
@@ -44,6 +45,16 @@ public class ProtocolError implements NetworkEvent, ProtocolEvent
this.args = args;
}
+ public int getChannel()
+ {
+ return channel;
+ }
+
+ public void setChannel(int channel)
+ {
+ this.channel = channel;
+ }
+
public byte getEncodedTrack()
{
return track;
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolEvent.java
index 0b38dc6f28..60234c1537 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolEvent.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
/**
@@ -29,6 +29,10 @@ package org.apache.qpidity.transport;
public interface ProtocolEvent
{
+ int getChannel();
+
+ void setChannel(int channel);
+
byte getEncodedTrack();
<C> void delegate(C context, ProtocolDelegate<C> delegate);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
index f9cd6f3947..fa0c1e9c63 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
@@ -18,13 +18,13 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
import java.nio.ByteBuffer;
-import org.apache.qpidity.transport.network.NetworkDelegate;
-import org.apache.qpidity.transport.network.NetworkEvent;
-import org.apache.qpidity.transport.network.Frame;
+import org.apache.qpid.transport.network.NetworkDelegate;
+import org.apache.qpid.transport.network.NetworkEvent;
+import org.apache.qpid.transport.network.Frame;
/**
@@ -33,9 +33,7 @@ import org.apache.qpidity.transport.network.Frame;
* @author Rafael H. Schloming
*/
-//RA making this public until we sort out the package issues
-
-public class ProtocolHeader implements NetworkEvent, ProtocolEvent
+public final class ProtocolHeader implements NetworkEvent, ProtocolEvent
{
private static final byte[] AMQP = {'A', 'M', 'Q', 'P' };
@@ -44,6 +42,7 @@ public class ProtocolHeader implements NetworkEvent, ProtocolEvent
final private byte instance;
final private byte major;
final private byte minor;
+ private int channel;
public ProtocolHeader(byte instance, byte major, byte minor)
{
@@ -72,6 +71,16 @@ public class ProtocolHeader implements NetworkEvent, ProtocolEvent
return minor;
}
+ public int getChannel()
+ {
+ return channel;
+ }
+
+ public void setChannel(int channel)
+ {
+ this.channel = channel;
+ }
+
public byte getEncodedTrack()
{
return Frame.L1;
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java
index 62d8f4d99d..2de0c169a5 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java
@@ -18,39 +18,35 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
/**
- * ConnectionEvent
+ * ProtocolVersionException
*
*/
-public class ConnectionEvent
+public final class ProtocolVersionException extends TransportException
{
- private final int channel;
- private final ProtocolEvent event;
+ private final byte major;
+ private final byte minor;
- public ConnectionEvent(int channel, ProtocolEvent event)
+ public ProtocolVersionException(byte major, byte minor)
{
- this.channel = channel;
- this.event = event;
+ super(String.format("version missmatch: %s-%s", major, minor));
+ this.major = major;
+ this.minor = minor;
}
- public int getChannel()
+ public byte getMajor()
{
- return channel;
+ return this.major;
}
- public ProtocolEvent getProtocolEvent()
+ public byte getMinor()
{
- return event;
- }
-
- public String toString()
- {
- return String.format("[%d] %s", channel, event);
+ return this.minor;
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Range.java b/java/common/src/main/java/org/apache/qpid/transport/Range.java
index 14522cd45f..f4335dc8a6 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Range.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Range.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
import java.util.ArrayList;
import java.util.List;
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
index e05e399262..9b2744ee8b 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
import java.util.Iterator;
import java.util.ListIterator;
@@ -47,6 +47,11 @@ public final class RangeSet implements Iterable<Range>
return ranges.iterator();
}
+ public Range getFirst()
+ {
+ return ranges.getFirst();
+ }
+
public boolean includes(Range range)
{
for (Range r : this)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java b/java/common/src/main/java/org/apache/qpid/transport/Receiver.java
index 8952ebf2a5..2a994580dc 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Receiver.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Result.java b/java/common/src/main/java/org/apache/qpid/transport/Result.java
index 2126a76a53..1116492a8d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Result.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Result.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Sender.java b/java/common/src/main/java/org/apache/qpid/transport/Sender.java
index 6da8358bd6..9a6f675d7f 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Sender.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
/**
@@ -31,6 +31,8 @@ public interface Sender<T>
void send(T msg);
+ void flush();
+
void close();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 988ac4788f..10ca6cfb0a 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -18,12 +18,12 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
-import org.apache.qpidity.transport.network.Frame;
+import org.apache.qpid.transport.network.Frame;
-import org.apache.qpidity.transport.util.Logger;
+import org.apache.qpid.transport.util.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -33,9 +33,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.qpidity.transport.Option.*;
-import static org.apache.qpidity.transport.util.Functions.*;
+import static org.apache.qpid.transport.Option.*;
+import static org.apache.qpid.transport.util.Functions.*;
import static org.apache.qpid.util.Serial.*;
+import static org.apache.qpid.util.Strings.*;
/**
* Session
@@ -75,7 +76,8 @@ public class Session extends Invoker
// completed incoming commands
private final Object processedLock = new Object();
private RangeSet processed = new RangeSet();
- private Range syncPoint = null;
+ private int maxProcessed = commandsIn - 1;
+ private int syncPoint = maxProcessed;
// outgoing command count
private int commandsOut = 0;
@@ -127,10 +129,16 @@ public class Session extends Invoker
{
int id = nextCommandId();
cmd.setId(id);
- log.debug("ID: [%s] %s", this.channel, id);
- if ((id % 65536) == 0)
+
+ if(log.isDebugEnabled())
+ {
+ log.debug("ID: [%s] %s", this.channel, id);
+ }
+
+ //if ((id % 65536) == 0)
+ if ((id & 0xff) == 0)
{
- flushProcessed(true);
+ flushProcessed(TIMELY_REPLY);
}
}
@@ -158,7 +166,16 @@ public class Session extends Invoker
synchronized (processedLock)
{
processed.add(range);
- flush = syncPoint != null && processed.includes(syncPoint);
+ Range first = processed.getFirst();
+ int lower = first.getLower();
+ int upper = first.getUpper();
+ int old = maxProcessed;
+ if (le(lower, maxProcessed + 1))
+ {
+ maxProcessed = max(maxProcessed, upper);
+ }
+ flush = lt(old, syncPoint) && ge(maxProcessed, syncPoint);
+ syncPoint = maxProcessed;
}
if (flush)
{
@@ -166,19 +183,14 @@ public class Session extends Invoker
}
}
- public void flushProcessed()
- {
- flushProcessed(false);
- }
-
- private void flushProcessed(boolean timely_reply)
+ public void flushProcessed(Option ... options)
{
RangeSet copy;
synchronized (processedLock)
{
copy = processed.copy();
}
- sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION);
+ sessionCompleted(copy, options);
}
void knownComplete(RangeSet kc)
@@ -204,15 +216,11 @@ public class Session extends Invoker
{
int id = getCommandsIn() - 1;
log.debug("%s synced to %d", this, id);
- Range range = new Range(0, id - 1);
boolean flush;
synchronized (processedLock)
{
- flush = processed.includes(range);
- if (!flush)
- {
- syncPoint = range;
- }
+ syncPoint = id;
+ flush = ge(maxProcessed, syncPoint);
}
if (flush)
{
@@ -236,7 +244,11 @@ public class Session extends Invoker
boolean complete(int lower, int upper)
{
- log.debug("%s complete(%d, %d)", this, lower, upper);
+ //avoid autoboxing
+ if(log.isDebugEnabled())
+ {
+ log.debug("%s complete(%d, %d)", this, lower, upper);
+ }
synchronized (commands)
{
int old = maxComplete;
@@ -254,8 +266,25 @@ public class Session extends Invoker
}
}
- protected void invoke(Method m)
+ public void invoke(Method m)
{
+ if (closed.get())
+ {
+ List<ExecutionException> exc = getExceptions();
+ if (!exc.isEmpty())
+ {
+ throw new SessionException(exc);
+ }
+ else if (close != null)
+ {
+ throw new ConnectionException(close);
+ }
+ else
+ {
+ throw new SessionClosedException();
+ }
+ }
+
if (m.getEncodedTrack() == Frame.L4)
{
synchronized (commands)
@@ -276,7 +305,7 @@ public class Session extends Invoker
}
needSync = !m.isSync();
channel.method(m);
- if (autoSync && !m.hasPayload())
+ if (autoSync)
{
sync();
}
@@ -295,50 +324,6 @@ public class Session extends Invoker
}
}
- public void header(Header header)
- {
- channel.header(header);
- }
-
- public Header header(List<Struct> structs)
- {
- Header res = new Header(structs, false);
- header(res);
- return res;
- }
-
- public Header header(Struct ... structs)
- {
- return header(Arrays.asList(structs));
- }
-
- public void data(ByteBuffer buf)
- {
- channel.data(buf);
- }
-
- public void data(String str)
- {
- channel.data(str);
- }
-
- public void data(byte[] bytes)
- {
- channel.data(bytes);
- }
-
- public void endData()
- {
- channel.end();
- synchronized (commands)
- {
- if (autoSync)
- {
- sync();
- }
- }
- }
-
public void sync()
{
sync(timeout);
@@ -353,9 +338,7 @@ public class Session extends Invoker
if (needSync && lt(maxComplete, point))
{
- ExecutionSync sync = new ExecutionSync();
- sync.setSync(true);
- invoke(sync);
+ executionSync(SYNC);
}
long start = System.currentTimeMillis();
@@ -413,6 +396,13 @@ public class Session extends Invoker
}
}
+ private ConnectionClose close = null;
+
+ void closeCode(ConnectionClose close)
+ {
+ this.close = close;
+ }
+
List<ExecutionException> getExceptions()
{
synchronized (exceptions)
@@ -508,6 +498,26 @@ public class Session extends Invoker
}
+ public final void messageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ byte[] body,
+ Option ... _options) {
+ messageTransfer(destination, acceptMode, acquireMode, header,
+ ByteBuffer.wrap(body), _options);
+ }
+
+ public final void messageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ String body,
+ Option ... _options) {
+ messageTransfer(destination, acceptMode, acquireMode, header,
+ toUTF8(body), _options);
+ }
+
public void close()
{
sessionRequestTimeout(0);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
new file mode 100644
index 0000000000..d2c54cf339
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.util.Collections;
+
+
+/**
+ * SessionClosedException
+ *
+ */
+
+public class SessionClosedException extends SessionException
+{
+
+ public SessionClosedException()
+ {
+ super(Collections.EMPTY_LIST);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 0e289c54e9..b91763509c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -18,9 +18,9 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
-import org.apache.qpidity.transport.network.Frame;
+import org.apache.qpid.transport.network.Frame;
/**
@@ -48,10 +48,6 @@ public abstract class SessionDelegate
}
}
- public void header(Session ssn, Header header) { }
-
- public void data(Session ssn, Data data) { }
-
public void error(Session ssn, ProtocolError error) { }
@Override public void executionResult(Session ssn, ExecutionResult result)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/SessionException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
index fc866f3694..dc294b2206 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/SessionException.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
import java.util.List;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/java/common/src/main/java/org/apache/qpid/transport/Sink.java
new file mode 100644
index 0000000000..8653acedbe
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/Sink.java
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
+
+/**
+ * Sink
+ *
+ */
+
+public class Sink extends SessionDelegate
+{
+
+ private static final String FORMAT_HDR = "%-12s %-18s %-18s %-18s";
+ private static final String FORMAT_ROW = "SSN#%-8X %-18s %-18s %-18s";
+
+ private long interval = 100000;
+ private long start = System.currentTimeMillis();
+ private long count = 0;
+ private long bytes = 0;
+ private long interval_start = start;
+ private long bytes_start = bytes;
+ private long time = start;
+ private int id = System.identityHashCode(this);
+
+ public Sink()
+ {
+ }
+
+ private double msg_rate()
+ {
+ return 1000 * (double) count / (double) (time - start);
+ }
+
+ private double byte_rate()
+ {
+ return (1000 * (double) bytes / (double) (time - start)) / (1024*1024);
+ }
+
+ private double msg_interval_rate()
+ {
+ return 1000 * (double) interval / (double) (time - interval_start);
+ }
+
+ private double byte_interval_rate()
+ {
+ return (1000 * (double) (bytes - bytes_start) / (double) (time - interval_start)) / (1024*1024);
+ }
+
+ private String rates()
+ {
+ return String.format("%.2f/%.2f", msg_rate(), byte_rate());
+ }
+
+ private String interval_rates()
+ {
+ return String.format("%.2f/%.2f", msg_interval_rate(), byte_interval_rate());
+ }
+
+ private String counts()
+ {
+ return String.format("%d/%.2f", count, ((double) bytes)/(1024*1024));
+ }
+
+ public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ count++;
+ bytes += xfr.getBody().remaining();
+ if ((count % interval) == 0)
+ {
+ time = System.currentTimeMillis();
+ System.out.println
+ (String.format
+ (FORMAT_ROW, id, counts(), rates(), interval_rates()));
+ interval_start = time;
+ bytes_start = bytes;
+ }
+ ssn.processed(xfr);
+ }
+
+ public static final void main(String[] args) throws IOException
+ {
+ ConnectionDelegate delegate = new ConnectionDelegate()
+ {
+
+ public SessionDelegate getSessionDelegate()
+ {
+ return new Sink();
+ }
+
+ public void exception(Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ public void closed() {}
+ };
+
+ //hack
+ delegate.setUsername("guest");
+ delegate.setPassword("guest");
+
+ IoAcceptor ioa = new IoAcceptor
+ ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+ System.out.println
+ (String.format
+ (FORMAT_HDR, "Session", "Count/MBytes", "Cumulative Rate", "Interval Rate"));
+ System.out.println
+ (String.format
+ (FORMAT_HDR, "-------", "------------", "---------------", "-------------"));
+ ioa.start();
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java b/java/common/src/main/java/org/apache/qpid/transport/Struct.java
index a15d0a1fb8..22bd9f34ad 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Struct.java
@@ -18,14 +18,14 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
import java.util.List;
import java.util.Map;
-import org.apache.qpidity.transport.codec.Decoder;
-import org.apache.qpidity.transport.codec.Encodable;
-import org.apache.qpidity.transport.codec.Encoder;
+import org.apache.qpid.transport.codec.Decoder;
+import org.apache.qpid.transport.codec.Encodable;
+import org.apache.qpid.transport.codec.Encoder;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportException.java b/java/common/src/main/java/org/apache/qpid/transport/TransportException.java
index 593209df82..5ef15154fc 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/TransportException.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/TransportException.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
index ebfc6b120f..a8a4997ae7 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.codec;
+package org.apache.qpid.transport.codec;
import java.io.UnsupportedEncodingException;
@@ -29,11 +29,12 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.transport.RangeSet;
-import org.apache.qpidity.transport.Struct;
-import org.apache.qpidity.transport.Type;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.Type;
-import static org.apache.qpidity.transport.util.Functions.*;
+import static org.apache.qpid.transport.util.Functions.*;
/**
@@ -45,6 +46,14 @@ import static org.apache.qpidity.transport.util.Functions.*;
abstract class AbstractDecoder implements Decoder
{
+ private final Map<Binary,String> str8cache = new LinkedHashMap<Binary,String>()
+ {
+ @Override protected boolean removeEldestEntry(Map.Entry<Binary,String> me)
+ {
+ return size() > 4*1024;
+ }
+ };
+
protected abstract byte doGet();
protected abstract void doGet(byte[] bytes);
@@ -59,6 +68,13 @@ abstract class AbstractDecoder implements Decoder
doGet(bytes);
}
+ protected Binary get(int size)
+ {
+ byte[] bytes = new byte[size];
+ get(bytes);
+ return new Binary(bytes);
+ }
+
protected short uget()
{
return (short) (0xFF & get());
@@ -105,11 +121,11 @@ abstract class AbstractDecoder implements Decoder
return readUint64();
}
- private static final String decode(byte[] bytes, String charset)
+ private static final String decode(byte[] bytes, int offset, int length, String charset)
{
try
{
- return new String(bytes, charset);
+ return new String(bytes, offset, length, charset);
}
catch (UnsupportedEncodingException e)
{
@@ -117,13 +133,22 @@ abstract class AbstractDecoder implements Decoder
}
}
+ private static final String decode(byte[] bytes, String charset)
+ {
+ return decode(bytes, 0, bytes.length, charset);
+ }
public String readStr8()
{
short size = readUint8();
- byte[] bytes = new byte[size];
- get(bytes);
- return decode(bytes, "UTF-8");
+ Binary bin = get(size);
+ String str = str8cache.get(bin);
+ if (str == null)
+ {
+ str = decode(bin.array(), bin.offset(), bin.size(), "UTF-8");
+ str8cache.put(bin, str);
+ }
+ return str;
}
public String readStr16()
@@ -233,7 +258,19 @@ abstract class AbstractDecoder implements Decoder
public Map<String,Object> readMap()
{
long size = readUint32();
+
+ if (size == 0)
+ {
+ return null;
+ }
+
long count = readUint32();
+
+ if (count == 0)
+ {
+ return Collections.EMPTY_MAP;
+ }
+
Map<String,Object> result = new LinkedHashMap();
for (int i = 0; i < count; i++)
{
@@ -243,13 +280,26 @@ abstract class AbstractDecoder implements Decoder
Object value = read(t);
result.put(key, value);
}
+
return result;
}
public List<Object> readList()
{
long size = readUint32();
+
+ if (size == 0)
+ {
+ return null;
+ }
+
long count = readUint32();
+
+ if (count == 0)
+ {
+ return Collections.EMPTY_LIST;
+ }
+
List<Object> result = new ArrayList();
for (int i = 0; i < count; i++)
{
@@ -264,15 +314,21 @@ abstract class AbstractDecoder implements Decoder
public List<Object> readArray()
{
long size = readUint32();
+
if (size == 0)
{
- return Collections.EMPTY_LIST;
+ return null;
}
byte code = get();
Type t = getType(code);
long count = readUint32();
+ if (count == 0)
+ {
+ return Collections.EMPTY_LIST;
+ }
+
List<Object> result = new ArrayList<Object>();
for (int i = 0; i < count; i++)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
index aa90627943..908d14a307 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.codec;
+package org.apache.qpid.transport.codec;
import java.io.UnsupportedEncodingException;
@@ -26,16 +26,17 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.transport.Range;
-import org.apache.qpidity.transport.RangeSet;
-import org.apache.qpidity.transport.Struct;
-import org.apache.qpidity.transport.Type;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.Type;
-import static org.apache.qpidity.transport.util.Functions.*;
+import static org.apache.qpid.transport.util.Functions.*;
/**
@@ -64,10 +65,13 @@ abstract class AbstractEncoder implements Encoder
ENCODINGS.put(byte[].class, Type.VBIN32);
}
- protected Sizer sizer()
+ private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()
{
- return new SizeEncoder();
- }
+ @Override protected boolean removeEldestEntry(Map.Entry<String,byte[]> me)
+ {
+ return size() > 4*1024;
+ }
+ };
protected abstract void doPut(byte b);
@@ -88,6 +92,15 @@ abstract class AbstractEncoder implements Encoder
put(ByteBuffer.wrap(bytes));
}
+ protected abstract int beginSize8();
+ protected abstract void endSize8(int pos);
+
+ protected abstract int beginSize16();
+ protected abstract void endSize16(int pos);
+
+ protected abstract int beginSize32();
+ protected abstract void endSize32(int pos);
+
public void writeUint8(short b)
{
assert b < 0x100;
@@ -132,23 +145,6 @@ abstract class AbstractEncoder implements Encoder
writeUint64(l);
}
- private static final String checkLength(String s, int n)
- {
- if (s == null)
- {
- return "";
- }
-
- if (s.length() > n)
- {
- throw new IllegalArgumentException("string too long: " + s);
- }
- else
- {
- return s;
- }
- }
-
private static final byte[] encode(String s, String charset)
{
try
@@ -163,16 +159,31 @@ abstract class AbstractEncoder implements Encoder
public void writeStr8(String s)
{
- s = checkLength(s, 255);
- writeUint8((short) s.length());
- put(ByteBuffer.wrap(encode(s, "UTF-8")));
+ if (s == null)
+ {
+ s = "";
+ }
+
+ byte[] bytes = str8cache.get(s);
+ if (bytes == null)
+ {
+ bytes = encode(s, "UTF-8");
+ str8cache.put(s, bytes);
+ }
+ writeUint8((short) bytes.length);
+ put(bytes);
}
public void writeStr16(String s)
{
- s = checkLength(s, 65535);
- writeUint16(s.length());
- put(ByteBuffer.wrap(encode(s, "UTF-8")));
+ if (s == null)
+ {
+ s = "";
+ }
+
+ byte[] bytes = encode(s, "UTF-8");
+ writeUint16(bytes.length);
+ put(bytes);
}
public void writeVbin8(byte[] bytes)
@@ -245,18 +256,10 @@ abstract class AbstractEncoder implements Encoder
}
int width = s.getSizeWidth();
+ int pos = -1;
if (width > 0)
{
- if (empty)
- {
- writeSize(width, 0);
- }
- else
- {
- Sizer sizer = sizer();
- s.write(sizer);
- writeSize(width, sizer.size());
- }
+ pos = beginSize(width);
}
if (type > 0)
@@ -265,6 +268,11 @@ abstract class AbstractEncoder implements Encoder
}
s.write(this);
+
+ if (width > 0)
+ {
+ endSize(width, pos);
+ }
}
public void writeStruct32(Struct s)
@@ -275,12 +283,10 @@ abstract class AbstractEncoder implements Encoder
}
else
{
- Sizer sizer = sizer();
- sizer.writeUint16(s.getEncodedType());
- s.write(sizer);
- writeUint32(sizer.size());
+ int pos = beginSize32();
writeUint16(s.getEncodedType());
s.write(this);
+ endSize32(pos);
}
}
@@ -338,18 +344,13 @@ abstract class AbstractEncoder implements Encoder
public void writeMap(Map<String,Object> map)
{
- if (map == null)
+ int pos = beginSize32();
+ if (map != null)
{
- writeUint32(0);
- return;
+ writeUint32(map.size());
+ writeMapEntries(map);
}
-
- Sizer sizer = sizer();
- sizer.writeMap(map);
- // XXX: - 4
- writeUint32(sizer.size() - 4);
- writeUint32(map.size());
- writeMapEntries(map);
+ endSize32(pos);
}
protected void writeMapEntries(Map<String,Object> map)
@@ -367,12 +368,13 @@ abstract class AbstractEncoder implements Encoder
public void writeList(List<Object> list)
{
- Sizer sizer = sizer();
- sizer.writeList(list);
- // XXX: - 4
- writeUint32(sizer.size() - 4);
- writeUint32(list.size());
- writeListEntries(list);
+ int pos = beginSize32();
+ if (list != null)
+ {
+ writeUint32(list.size());
+ writeListEntries(list);
+ }
+ endSize32(pos);
}
protected void writeListEntries(List<Object> list)
@@ -387,16 +389,12 @@ abstract class AbstractEncoder implements Encoder
public void writeArray(List<Object> array)
{
- if (array == null)
+ int pos = beginSize32();
+ if (array != null)
{
- array = Collections.EMPTY_LIST;
+ writeArrayEntries(array);
}
-
- Sizer sizer = sizer();
- sizer.writeArray(array);
- // XXX: -4
- writeUint32(sizer.size() - 4);
- writeArrayEntries(array);
+ endSize32(pos);
}
protected void writeArrayEntries(List<Object> array)
@@ -458,6 +456,39 @@ abstract class AbstractEncoder implements Encoder
}
}
+ private int beginSize(int width)
+ {
+ switch (width)
+ {
+ case 1:
+ return beginSize8();
+ case 2:
+ return beginSize16();
+ case 4:
+ return beginSize32();
+ default:
+ throw new IllegalStateException("illegal width: " + width);
+ }
+ }
+
+ private void endSize(int width, int pos)
+ {
+ switch (width)
+ {
+ case 1:
+ endSize8(pos);
+ break;
+ case 2:
+ endSize16(pos);
+ break;
+ case 4:
+ endSize32(pos);
+ break;
+ default:
+ throw new IllegalStateException("illegal width: " + width);
+ }
+ }
+
private void writeBytes(Type t, byte[] bytes)
{
writeSize(t, bytes.length);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
index cf40cef8bf..dd634eb94a 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
@@ -18,11 +18,13 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.codec;
+package org.apache.qpid.transport.codec;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.Binary;
+
/**
* BBDecoder
@@ -33,9 +35,9 @@ import java.nio.ByteOrder;
public final class BBDecoder extends AbstractDecoder
{
- private final ByteBuffer in;
+ private ByteBuffer in;
- public BBDecoder(ByteBuffer in)
+ public void init(ByteBuffer in)
{
this.in = in;
this.in.order(ByteOrder.BIG_ENDIAN);
@@ -51,6 +53,21 @@ public final class BBDecoder extends AbstractDecoder
in.get(bytes);
}
+ protected Binary get(int size)
+ {
+ if (in.hasArray())
+ {
+ byte[] bytes = in.array();
+ Binary bin = new Binary(bytes, in.arrayOffset() + in.position(), size);
+ in.position(in.position() + size);
+ return bin;
+ }
+ else
+ {
+ return super.get(size);
+ }
+ }
+
public boolean hasRemaining()
{
return in.hasRemaining();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
new file mode 100644
index 0000000000..390de881ab
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
@@ -0,0 +1,232 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.codec;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+
+/**
+ * BBEncoder
+ *
+ * @author Rafael H. Schloming
+ */
+
+public final class BBEncoder extends AbstractEncoder
+{
+
+ private ByteBuffer out;
+ private int segment;
+
+ public BBEncoder(int capacity) {
+ out = ByteBuffer.allocate(capacity);
+ out.order(ByteOrder.BIG_ENDIAN);
+ segment = 0;
+ }
+
+ public void init()
+ {
+ out.clear();
+ segment = 0;
+ }
+
+ public ByteBuffer segment()
+ {
+ int pos = out.position();
+ out.position(segment);
+ ByteBuffer slice = out.slice();
+ slice.limit(pos - segment);
+ out.position(pos);
+ segment = pos;
+ return slice;
+ }
+
+ private void grow(int size)
+ {
+ ByteBuffer old = out;
+ int capacity = old.capacity();
+ out = ByteBuffer.allocate(Math.max(capacity + size, 2*capacity));
+ out.order(ByteOrder.BIG_ENDIAN);
+ out.put(old);
+ }
+
+ protected void doPut(byte b)
+ {
+ try
+ {
+ out.put(b);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ out.put(b);
+ }
+ }
+
+ protected void doPut(ByteBuffer src)
+ {
+ try
+ {
+ out.put(src);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(src.remaining());
+ out.put(src);
+ }
+ }
+
+ protected void put(byte[] bytes)
+ {
+ try
+ {
+ out.put(bytes);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(bytes.length);
+ out.put(bytes);
+ }
+ }
+
+ public void writeUint8(short b)
+ {
+ assert b < 0x100;
+
+ try
+ {
+ out.put((byte) b);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ out.put((byte) b);
+ }
+ }
+
+ public void writeUint16(int s)
+ {
+ assert s < 0x10000;
+
+ try
+ {
+ out.putShort((short) s);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(2);
+ out.putShort((short) s);
+ }
+ }
+
+ public void writeUint32(long i)
+ {
+ assert i < 0x100000000L;
+
+ try
+ {
+ out.putInt((int) i);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(4);
+ out.putInt((int) i);
+ }
+ }
+
+ public void writeUint64(long l)
+ {
+ try
+ {
+ out.putLong(l);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(8);
+ out.putLong(l);
+ }
+ }
+
+ public int beginSize8()
+ {
+ int pos = out.position();
+ try
+ {
+ out.put((byte) 0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ out.put((byte) 0);
+ }
+ return pos;
+ }
+
+ public void endSize8(int pos)
+ {
+ int cur = out.position();
+ out.put(pos, (byte) (cur - pos - 1));
+ }
+
+ public int beginSize16()
+ {
+ int pos = out.position();
+ try
+ {
+ out.putShort((short) 0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(2);
+ out.putShort((short) 0);
+ }
+ return pos;
+ }
+
+ public void endSize16(int pos)
+ {
+ int cur = out.position();
+ out.putShort(pos, (short) (cur - pos - 2));
+ }
+
+ public int beginSize32()
+ {
+ int pos = out.position();
+ try
+ {
+ out.putInt(0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(4);
+ out.putInt(0);
+ }
+ return pos;
+ }
+
+ public void endSize32(int pos)
+ {
+ int cur = out.position();
+ out.putInt(pos, (cur - pos - 4));
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java
index dec901748d..50e787ccb2 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java
@@ -18,14 +18,14 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.codec;
+package org.apache.qpid.transport.codec;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.transport.RangeSet;
-import org.apache.qpidity.transport.Struct;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encodable.java b/java/common/src/main/java/org/apache/qpid/transport/codec/Encodable.java
index 60c2ea97b8..2aefafd19c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encodable.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/Encodable.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.codec;
+package org.apache.qpid.transport.codec;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
index 9d7a1a695d..2d8d13e80a 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
@@ -18,14 +18,14 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.codec;
+package org.apache.qpid.transport.codec;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpidity.transport.RangeSet;
-import org.apache.qpidity.transport.Struct;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
/**
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
new file mode 100644
index 0000000000..33d552b91e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -0,0 +1,226 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.codec.Decoder;
+
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.SegmentType;
+import org.apache.qpid.transport.Struct;
+
+
+/**
+ * Assembler
+ *
+ */
+
+public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
+{
+
+ private final Receiver<ProtocolEvent> receiver;
+ private final Map<Integer,List<Frame>> segments;
+ private final Method[] incomplete;
+ private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
+ {
+ public BBDecoder initialValue()
+ {
+ return new BBDecoder();
+ }
+ };
+
+ public Assembler(Receiver<ProtocolEvent> receiver)
+ {
+ this.receiver = receiver;
+ segments = new HashMap<Integer,List<Frame>>();
+ incomplete = new Method[64*1024];
+ }
+
+ private int segmentKey(Frame frame)
+ {
+ return (frame.getTrack() + 1) * frame.getChannel();
+ }
+
+ private List<Frame> getSegment(Frame frame)
+ {
+ return segments.get(segmentKey(frame));
+ }
+
+ private void setSegment(Frame frame, List<Frame> segment)
+ {
+ int key = segmentKey(frame);
+ if (segments.containsKey(key))
+ {
+ error(new ProtocolError(Frame.L2, "segment in progress: %s",
+ frame));
+ }
+ segments.put(segmentKey(frame), segment);
+ }
+
+ private void clearSegment(Frame frame)
+ {
+ segments.remove(segmentKey(frame));
+ }
+
+ private void emit(int channel, ProtocolEvent event)
+ {
+ event.setChannel(channel);
+ receiver.received(event);
+ }
+
+ public void received(NetworkEvent event)
+ {
+ event.delegate(this);
+ }
+
+ public void exception(Throwable t)
+ {
+ this.receiver.exception(t);
+ }
+
+ public void closed()
+ {
+ this.receiver.closed();
+ }
+
+ public void init(ProtocolHeader header)
+ {
+ emit(0, header);
+ }
+
+ public void error(ProtocolError error)
+ {
+ emit(0, error);
+ }
+
+ public void frame(Frame frame)
+ {
+ ByteBuffer segment;
+ if (frame.isFirstFrame() && frame.isLastFrame())
+ {
+ segment = frame.getBody();
+ assemble(frame, segment);
+ }
+ else
+ {
+ List<Frame> frames;
+ if (frame.isFirstFrame())
+ {
+ frames = new ArrayList<Frame>();
+ setSegment(frame, frames);
+ }
+ else
+ {
+ frames = getSegment(frame);
+ }
+
+ frames.add(frame);
+
+ if (frame.isLastFrame())
+ {
+ clearSegment(frame);
+
+ int size = 0;
+ for (Frame f : frames)
+ {
+ size += f.getSize();
+ }
+ segment = ByteBuffer.allocate(size);
+ for (Frame f : frames)
+ {
+ segment.put(f.getBody());
+ }
+ segment.flip();
+ assemble(frame, segment);
+ }
+ }
+
+ }
+
+ private void assemble(Frame frame, ByteBuffer segment)
+ {
+ BBDecoder dec = decoder.get();
+ dec.init(segment);
+
+ int channel = frame.getChannel();
+ Method command;
+
+ switch (frame.getType())
+ {
+ case CONTROL:
+ int controlType = dec.readUint16();
+ Method control = Method.create(controlType);
+ control.read(dec);
+ emit(channel, control);
+ break;
+ case COMMAND:
+ int commandType = dec.readUint16();
+ // read in the session header, right now we don't use it
+ dec.readUint16();
+ command = Method.create(commandType);
+ command.read(dec);
+ if (command.hasPayload())
+ {
+ incomplete[channel] = command;
+ }
+ else
+ {
+ emit(channel, command);
+ }
+ break;
+ case HEADER:
+ command = incomplete[channel];
+ List<Struct> structs = new ArrayList(2);
+ while (dec.hasRemaining())
+ {
+ structs.add(dec.readStruct32());
+ }
+ command.setHeader(new Header(structs));
+ if (frame.isLastSegment())
+ {
+ incomplete[channel] = null;
+ emit(channel, command);
+ }
+ break;
+ case BODY:
+ command = incomplete[channel];
+ command.setBody(segment);
+ incomplete[channel] = null;
+ emit(channel, command);
+ break;
+ default:
+ throw new IllegalStateException("unknown frame type: " + frame.getType());
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
index 2e7b41bf42..6886cb3a5a 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
@@ -18,62 +18,43 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.codec;
+package org.apache.qpid.transport.network;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
/**
- * BBEncoder
+ * ConnectionBinding
*
- * @author Rafael H. Schloming
*/
-public final class BBEncoder extends AbstractEncoder
+public class ConnectionBinding implements Binding<Connection,ByteBuffer>
{
- private final ByteBuffer out;
+ private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
- public BBEncoder(ByteBuffer out) {
- this.out = out;
- this.out.order(ByteOrder.BIG_ENDIAN);
- }
-
- protected void doPut(byte b)
- {
- out.put(b);
- }
+ private final ConnectionDelegate delegate;
- protected void doPut(ByteBuffer src)
+ public ConnectionBinding(ConnectionDelegate delegate)
{
- out.put(src);
+ this.delegate = delegate;
}
- public void writeUint8(short b)
+ public Connection endpoint(Sender<ByteBuffer> sender)
{
- assert b < 0x100;
-
- out.put((byte) b);
- }
-
- public void writeUint16(int s)
- {
- assert s < 0x10000;
-
- out.putShort((short) s);
- }
-
- public void writeUint32(long i)
- {
- assert i < 0x100000000L;
-
- out.putInt((int) i);
+ // XXX: hardcoded max-frame
+ return new Connection
+ (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
}
- public void writeUint64(long l)
+ public Receiver<ByteBuffer> receiver(Connection conn)
{
- out.putLong(l);
+ return new InputHandler(new Assembler(conn));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
new file mode 100644
index 0000000000..bb7d2506e3
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -0,0 +1,236 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import org.apache.qpid.transport.codec.BBEncoder;
+
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ProtocolDelegate;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.SegmentType;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.Struct;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.qpid.transport.network.Frame.*;
+
+import static java.lang.Math.*;
+
+
+/**
+ * Disassembler
+ *
+ */
+
+public final class Disassembler implements Sender<ProtocolEvent>,
+ ProtocolDelegate<Void>
+{
+
+ private final Sender<ByteBuffer> sender;
+ private final int maxPayload;
+ private final ByteBuffer header;
+ private final Object sendlock = new Object();
+ private final ThreadLocal<BBEncoder> encoder = new ThreadLocal()
+ {
+ public BBEncoder initialValue()
+ {
+ return new BBEncoder(4*1024);
+ }
+ };
+
+ public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
+ {
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+ {
+ throw new IllegalArgumentException
+ ("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ this.sender = sender;
+ this.maxPayload = maxFrame - HEADER_SIZE;
+ this.header = ByteBuffer.allocate(HEADER_SIZE);
+ this.header.order(ByteOrder.BIG_ENDIAN);
+
+ }
+
+ public void send(ProtocolEvent event)
+ {
+ event.delegate(null, this);
+ }
+
+ public void flush()
+ {
+ synchronized (sendlock)
+ {
+ sender.flush();
+ }
+ }
+
+ public void close()
+ {
+ synchronized (sendlock)
+ {
+ sender.close();
+ }
+ }
+
+ private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
+ {
+ synchronized (sendlock)
+ {
+ header.put(0, flags);
+ header.put(1, type);
+ header.putShort(2, (short) (size + HEADER_SIZE));
+ header.put(5, track);
+ header.putShort(6, (short) channel);
+
+ header.rewind();
+
+ sender.send(header);
+
+ int limit = buf.limit();
+ buf.limit(buf.position() + size);
+ sender.send(buf);
+ buf.limit(limit);
+ }
+ }
+
+ private void fragment(byte flags, SegmentType type, ProtocolEvent event,
+ ByteBuffer buf)
+ {
+ byte typeb = (byte) type.getValue();
+ byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
+
+ int remaining = buf.remaining();
+ boolean first = true;
+ while (true)
+ {
+ int size = min(maxPayload, remaining);
+ remaining -= size;
+
+ byte newflags = flags;
+ if (first)
+ {
+ newflags |= FIRST_FRAME;
+ first = false;
+ }
+ if (remaining == 0)
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ frame(newflags, typeb, track, event.getChannel(), size, buf);
+
+ if (remaining == 0)
+ {
+ break;
+ }
+ }
+ }
+
+ public void init(Void v, ProtocolHeader header)
+ {
+ synchronized (sendlock)
+ {
+ sender.send(header.toByteBuffer());
+ sender.flush();
+ }
+ }
+
+ public void control(Void v, Method method)
+ {
+ method(method, SegmentType.CONTROL);
+ }
+
+ public void command(Void v, Method method)
+ {
+ method(method, SegmentType.COMMAND);
+ }
+
+ private ByteBuffer copy(ByteBuffer src)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(src.remaining());
+ buf.put(src);
+ buf.flip();
+ return buf;
+ }
+
+ private void method(Method method, SegmentType type)
+ {
+ BBEncoder enc = encoder.get();
+ enc.init();
+ enc.writeUint16(method.getEncodedType());
+ if (type == SegmentType.COMMAND)
+ {
+ if (method.isSync())
+ {
+ enc.writeUint16(0x0101);
+ }
+ else
+ {
+ enc.writeUint16(0x0100);
+ }
+ }
+ method.write(enc);
+ ByteBuffer methodSeg = enc.segment();
+
+ byte flags = FIRST_SEG;
+
+ boolean payload = method.hasPayload();
+ if (!payload)
+ {
+ flags |= LAST_SEG;
+ }
+
+ ByteBuffer headerSeg = null;
+ if (payload)
+ {
+ final Header hdr = method.getHeader();
+ final Struct[] structs = hdr.getStructs();
+
+ for (Struct st : structs)
+ {
+ enc.writeStruct32(st);
+ }
+ headerSeg = enc.segment();
+ }
+
+ synchronized (sendlock)
+ {
+ fragment(flags, type, method, methodSeg);
+ if (payload)
+ {
+ fragment((byte) 0x0, SegmentType.HEADER, method, headerSeg);
+ fragment(LAST_SEG, SegmentType.BODY, method, method.getBody());
+ }
+ }
+ }
+
+ public void error(Void v, ProtocolError error)
+ {
+ throw new IllegalArgumentException("" + error);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
index 2abac382e6..849355276e 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
@@ -18,10 +18,10 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.network;
+package org.apache.qpid.transport.network;
-import org.apache.qpidity.transport.SegmentType;
-import org.apache.qpidity.transport.util.SliceIterator;
+import org.apache.qpid.transport.SegmentType;
+import org.apache.qpid.transport.util.SliceIterator;
import java.nio.ByteBuffer;
@@ -29,7 +29,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
-import static org.apache.qpidity.transport.util.Functions.*;
+import static org.apache.qpid.transport.util.Functions.*;
/**
@@ -38,7 +38,7 @@ import static org.apache.qpidity.transport.util.Functions.*;
* @author Rafael H. Schloming
*/
-public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
+public final class Frame implements NetworkEvent
{
public static final int HEADER_SIZE = 12;
@@ -61,23 +61,21 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
final private SegmentType type;
final private byte track;
final private int channel;
- final private List<ByteBuffer> fragments;
- private int size;
+ final private ByteBuffer body;
- public Frame(byte flags, SegmentType type, byte track, int channel)
+ public Frame(byte flags, SegmentType type, byte track, int channel,
+ ByteBuffer body)
{
this.flags = flags;
this.type = type;
this.track = track;
this.channel = channel;
- this.size = 0;
- this.fragments = new ArrayList<ByteBuffer>();
+ this.body = body;
}
- public void addFragment(ByteBuffer fragment)
+ public ByteBuffer getBody()
{
- fragments.add(fragment);
- size += fragment.remaining();
+ return body.slice();
}
public byte getFlags()
@@ -92,7 +90,7 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
public int getSize()
{
- return size;
+ return body.remaining();
}
public SegmentType getType()
@@ -130,16 +128,6 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
return flag(LAST_FRAME);
}
- public Iterator<ByteBuffer> getFragments()
- {
- return new SliceIterator(fragments.iterator());
- }
-
- public Iterator<ByteBuffer> iterator()
- {
- return getFragments();
- }
-
public void delegate(NetworkDelegate delegate)
{
delegate.frame(this);
@@ -148,26 +136,14 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
public String toString()
{
StringBuilder str = new StringBuilder();
+
str.append(String.format
("[%05d %05d %1d %s %d%d%d%d] ", getChannel(), getSize(),
getTrack(), getType(),
isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0));
- boolean first = true;
- for (ByteBuffer buf : this)
- {
- if (first)
- {
- first = false;
- }
- else
- {
- str.append(" | ");
- }
-
- str.append(str(buf));
- }
+ str.append(str(body));
return str.toString();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
new file mode 100644
index 0000000000..408c95e075
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.SegmentType;
+
+import static org.apache.qpid.transport.util.Functions.*;
+
+import static org.apache.qpid.transport.network.InputHandler.State.*;
+
+
+/**
+ * InputHandler
+ *
+ * @author Rafael H. Schloming
+ */
+
+public final class InputHandler implements Receiver<ByteBuffer>
+{
+
+ public enum State
+ {
+ PROTO_HDR,
+ FRAME_HDR,
+ FRAME_BODY,
+ ERROR;
+ }
+
+ private final Receiver<NetworkEvent> receiver;
+ private State state;
+ private ByteBuffer input = null;
+ private int needed;
+
+ private byte flags;
+ private SegmentType type;
+ private byte track;
+ private int channel;
+
+ public InputHandler(Receiver<NetworkEvent> receiver, State state)
+ {
+ this.receiver = receiver;
+ this.state = state;
+
+ switch (state)
+ {
+ case PROTO_HDR:
+ needed = 8;
+ break;
+ case FRAME_HDR:
+ needed = Frame.HEADER_SIZE;
+ break;
+ }
+ }
+
+ public InputHandler(Receiver<NetworkEvent> receiver)
+ {
+ this(receiver, PROTO_HDR);
+ }
+
+ private void error(String fmt, Object ... args)
+ {
+ receiver.received(new ProtocolError(Frame.L1, fmt, args));
+ }
+
+ public void received(ByteBuffer buf)
+ {
+ int limit = buf.limit();
+ int remaining = buf.remaining();
+ while (remaining > 0)
+ {
+ if (remaining >= needed)
+ {
+ int consumed = needed;
+ int pos = buf.position();
+ if (input == null)
+ {
+ buf.limit(pos + needed);
+ input = buf;
+ state = next(pos);
+ buf.limit(limit);
+ buf.position(pos + consumed);
+ }
+ else
+ {
+ buf.limit(pos + needed);
+ input.put(buf);
+ buf.limit(limit);
+ input.flip();
+ state = next(0);
+ }
+
+ remaining -= consumed;
+ input = null;
+ }
+ else
+ {
+ if (input == null)
+ {
+ input = ByteBuffer.allocate(needed);
+ }
+ input.put(buf);
+ needed -= remaining;
+ remaining = 0;
+ }
+ }
+ }
+
+ private State next(int pos)
+ {
+ input.order(ByteOrder.BIG_ENDIAN);
+
+ switch (state) {
+ case PROTO_HDR:
+ if (input.get(pos) != 'A' &&
+ input.get(pos + 1) != 'M' &&
+ input.get(pos + 2) != 'Q' &&
+ input.get(pos + 3) != 'P')
+ {
+ error("bad protocol header: %s", str(input));
+ return ERROR;
+ }
+
+ byte instance = input.get(pos + 5);
+ byte major = input.get(pos + 6);
+ byte minor = input.get(pos + 7);
+ receiver.received(new ProtocolHeader(instance, major, minor));
+ needed = Frame.HEADER_SIZE;
+ return FRAME_HDR;
+ case FRAME_HDR:
+ flags = input.get(pos);
+ type = SegmentType.get(input.get(pos + 1));
+ int size = (0xFFFF & input.getShort(pos + 2));
+ size -= Frame.HEADER_SIZE;
+ if (size < 0 || size > (64*1024 - 12))
+ {
+ error("bad frame size: %d", size);
+ return ERROR;
+ }
+ byte b = input.get(pos + 5);
+ if ((b & 0xF0) != 0) {
+ error("non-zero reserved bits in upper nibble of " +
+ "frame header byte 5: '%x'", b);
+ return ERROR;
+ } else {
+ track = (byte) (b & 0xF);
+ }
+ channel = (0xFFFF & input.getShort(pos + 6));
+ if (size == 0)
+ {
+ Frame frame = new Frame(flags, type, track, channel, ByteBuffer.allocate(0));
+ receiver.received(frame);
+ needed = Frame.HEADER_SIZE;
+ return FRAME_HDR;
+ }
+ else
+ {
+ needed = size;
+ return FRAME_BODY;
+ }
+ case FRAME_BODY:
+ Frame frame = new Frame(flags, type, track, channel, input.slice());
+ receiver.received(frame);
+ needed = Frame.HEADER_SIZE;
+ return FRAME_HDR;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ public void exception(Throwable t)
+ {
+ receiver.exception(t);
+ }
+
+ public void closed()
+ {
+ receiver.closed();
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkDelegate.java
index 48655edd0c..fbdfe6e84c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkDelegate.java
@@ -18,10 +18,10 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.network;
+package org.apache.qpid.transport.network;
-import org.apache.qpidity.transport.ProtocolError;
-import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolHeader;
/**
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkEvent.java
index 080efee704..91314cd4ad 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkEvent.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.network;
+package org.apache.qpid.transport.network;
/**
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
new file mode 100644
index 0000000000..b63020913b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
@@ -0,0 +1,109 @@
+package org.apache.qpid.transport.network.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.BodyFactory;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentBodyFactory;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderBodyFactory;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.HeartbeatBodyFactory;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Receiver;
+
+public class InputHandler_0_9 implements Receiver<ByteBuffer>
+{
+
+ private AMQVersionAwareProtocolSession _session;
+ private MethodRegistry _registry;
+ private BodyFactory bodyFactory;
+ private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
+
+ static
+ {
+ _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
+ _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
+ _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+ }
+
+ public InputHandler_0_9(AMQVersionAwareProtocolSession session)
+ {
+ _session = session;
+ _registry = _session.getMethodRegistry();
+ }
+
+ public void closed()
+ {
+ // AS FIXME: implement
+ }
+
+ public void exception(Throwable t)
+ {
+ // TODO: propogate exception to things
+ t.printStackTrace();
+ }
+
+ public void received(ByteBuffer buf)
+ {
+ org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf);
+ try
+ {
+ final byte type = in.get();
+ if (type == AMQMethodBody.TYPE)
+ {
+ bodyFactory = new AMQMethodBodyFactory(_session);
+ }
+ else
+ {
+ bodyFactory = _bodiesSupported[type];
+ }
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
+ }
+
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ // bodySize can be zero
+ if ((channel < 0) || (bodySize < 0))
+ {
+ throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize, null);
+ }
+
+ AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+
+ byte marker = in.get();
+ if ((marker & 0xFF) != 0xCE)
+ {
+ throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type, null);
+ }
+
+ try
+ {
+ frame.getBodyFrame().handle(frame.getChannel(), _session);
+ }
+ catch (AMQException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
new file mode 100644
index 0000000000..c4559ae6b4
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.TransportException;
+
+import java.io.IOException;
+
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * IoAcceptor
+ *
+ */
+
+public class IoAcceptor<E> extends Thread
+{
+
+
+ private ServerSocket socket;
+ private Binding<E,ByteBuffer> binding;
+
+ public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding)
+ throws IOException
+ {
+ socket = new ServerSocket();
+ socket.setReuseAddress(true);
+ socket.bind(address);
+ this.binding = binding;
+
+ setName(String.format("IoAcceptor - %s", socket.getInetAddress()));
+ }
+
+ public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding)
+ throws IOException
+ {
+ this(new InetSocketAddress(host, port), binding);
+ }
+
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ Socket sock = socket.accept();
+ IoTransport<E> transport = new IoTransport<E>(sock, binding);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
new file mode 100644
index 0000000000..d6d1df573c
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.util.Logger;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * IoReceiver
+ *
+ */
+
+final class IoReceiver extends Thread
+{
+
+ private static final Logger log = Logger.get(IoReceiver.class);
+
+ private final IoTransport transport;
+ private final Receiver<ByteBuffer> receiver;
+ private final int bufferSize;
+ private final Socket socket;
+ private final long timeout;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
+ int bufferSize, long timeout)
+ {
+ this.transport = transport;
+ this.receiver = receiver;
+ this.bufferSize = bufferSize;
+ this.socket = transport.getSocket();
+ this.timeout = timeout;
+
+ setDaemon(true);
+ setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+ start();
+ }
+
+ void close()
+ {
+ if (!closed.getAndSet(true))
+ {
+ try
+ {
+ socket.shutdownInput();
+ if (Thread.currentThread() != this)
+ {
+ join(timeout);
+ if (isAlive())
+ {
+ throw new TransportException("join timed out");
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new TransportException(e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+ public void run()
+ {
+ final int threshold = bufferSize / 2;
+
+ // I set the read buffer size simillar to SO_RCVBUF
+ // Haven't tested with a lower value to see if it's better or worse
+ byte[] buffer = new byte[bufferSize];
+ try
+ {
+ InputStream in = socket.getInputStream();
+ int read = 0;
+ int offset = 0;
+ while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ {
+ if (read > 0)
+ {
+ ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
+ receiver.received(b);
+ offset+=read;
+ if (offset > threshold)
+ {
+ offset = 0;
+ buffer = new byte[bufferSize];
+ }
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ receiver.exception(t);
+ }
+ finally
+ {
+ receiver.closed();
+ transport.getSender().close();
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
new file mode 100644
index 0000000000..ef892744ab
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.util.Logger;
+
+import static org.apache.qpid.transport.util.Functions.*;
+
+
+public final class IoSender extends Thread implements Sender<ByteBuffer>
+{
+
+ private static final Logger log = Logger.get(IoSender.class);
+
+ // by starting here, we ensure that we always test the wraparound
+ // case, we should probably make this configurable somehow so that
+ // we can test other cases as well
+ private final static int START = Integer.MAX_VALUE - 10;
+
+ private final IoTransport transport;
+ private final long timeout;
+ private final Socket socket;
+ private final OutputStream out;
+
+ private final byte[] buffer;
+ private volatile int head = START;
+ private volatile int tail = START;
+ private volatile boolean idle = true;
+ private final Object notFull = new Object();
+ private final Object notEmpty = new Object();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private volatile Throwable exception = null;
+
+
+ public IoSender(IoTransport transport, int bufferSize, long timeout)
+ {
+ this.transport = transport;
+ this.socket = transport.getSocket();
+ this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
+ this.timeout = timeout;
+
+ try
+ {
+ out = socket.getOutputStream();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Error getting output stream for socket", e);
+ }
+
+ setDaemon(true);
+ setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+ start();
+ }
+
+ private static final int pof2(int n)
+ {
+ int result = 1;
+ while (result < n)
+ {
+ result *= 2;
+ }
+ return result;
+ }
+
+ private static final int mod(int n, int m)
+ {
+ int r = n % m;
+ return r < 0 ? m + r : r;
+ }
+
+ public void send(ByteBuffer buf)
+ {
+ if (closed.get())
+ {
+ throw new TransportException("sender is closed", exception);
+ }
+
+ final int size = buffer.length;
+ int remaining = buf.remaining();
+
+ while (remaining > 0)
+ {
+ final int hd = head;
+ final int tl = tail;
+
+ if (hd - tl >= size)
+ {
+ flush();
+ synchronized (notFull)
+ {
+ long start = System.currentTimeMillis();
+ long elapsed = 0;
+ while (!closed.get() && head - tail >= size && elapsed < timeout)
+ {
+ try
+ {
+ notFull.wait(timeout - elapsed);
+ }
+ catch (InterruptedException e)
+ {
+ // pass
+ }
+ elapsed += System.currentTimeMillis() - start;
+ }
+
+ if (closed.get())
+ {
+ throw new TransportException("sender is closed", exception);
+ }
+
+ if (head - tail >= size)
+ {
+ throw new TransportException(String.format("write timed out: %s, %s", head, tail));
+ }
+ }
+ continue;
+ }
+
+ final int hd_idx = mod(hd, size);
+ final int tl_idx = mod(tl, size);
+ final int length;
+
+ if (tl_idx > hd_idx)
+ {
+ length = Math.min(tl_idx - hd_idx, remaining);
+ }
+ else
+ {
+ length = Math.min(size - hd_idx, remaining);
+ }
+
+ buf.get(buffer, hd_idx, length);
+ head += length;
+ remaining -= length;
+ }
+ }
+
+ public void flush()
+ {
+ if (idle)
+ {
+ synchronized (notEmpty)
+ {
+ notEmpty.notify();
+ }
+ }
+ }
+
+ public void close()
+ {
+ close(true);
+ }
+
+ void close(boolean reportException)
+ {
+ if (!closed.getAndSet(true))
+ {
+ synchronized (notEmpty)
+ {
+ notEmpty.notify();
+ }
+
+ try
+ {
+ if (Thread.currentThread() != this)
+ {
+ join(timeout);
+ if (isAlive())
+ {
+ throw new TransportException("join timed out");
+ }
+ }
+ transport.getReceiver().close();
+ socket.close();
+ }
+ catch (InterruptedException e)
+ {
+ throw new TransportException(e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+
+ if (reportException && exception != null)
+ {
+ throw new TransportException(exception);
+ }
+ }
+ }
+
+ public void run()
+ {
+ final int size = buffer.length;
+
+ while (true)
+ {
+ final int hd = head;
+ final int tl = tail;
+
+ if (hd == tl)
+ {
+ if (closed.get())
+ {
+ break;
+ }
+
+ idle = true;
+
+ synchronized (notEmpty)
+ {
+ while (head == tail && !closed.get())
+ {
+ try
+ {
+ notEmpty.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // pass
+ }
+ }
+ }
+
+ idle = false;
+
+ continue;
+ }
+
+ final int hd_idx = mod(hd, size);
+ final int tl_idx = mod(tl, size);
+
+ final int length;
+ if (tl_idx < hd_idx)
+ {
+ length = hd_idx - tl_idx;
+ }
+ else
+ {
+ length = size - tl_idx;
+ }
+
+ try
+ {
+ out.write(buffer, tl_idx, length);
+ }
+ catch (IOException e)
+ {
+ log.error(e, "error in write thread");
+ exception = e;
+ close(false);
+ break;
+ }
+ tail += length;
+ if (head - tl >= size)
+ {
+ synchronized (notFull)
+ {
+ notFull.notify();
+ }
+ }
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
new file mode 100644
index 0000000000..70fd8a3c06
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * This class provides a socket based transport using the java.io
+ * classes.
+ *
+ * The following params are configurable via JVM arguments
+ * TCP_NO_DELAY - amqj.tcpNoDelay
+ * SO_RCVBUF - amqj.receiveBufferSize
+ * SO_SNDBUF - amqj.sendBufferSize
+ */
+public final class IoTransport<E>
+{
+
+ static
+ {
+ org.apache.mina.common.ByteBuffer.setAllocator
+ (new org.apache.mina.common.SimpleByteBufferAllocator());
+ org.apache.mina.common.ByteBuffer.setUseDirectBuffers
+ (Boolean.getBoolean("amqj.enableDirectBuffers"));
+ }
+
+ private static final Logger log = Logger.get(IoTransport.class);
+
+ private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
+ private static int readBufferSize = Integer.getInteger
+ ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
+ private static int writeBufferSize = Integer.getInteger
+ ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
+
+ private Socket socket;
+ private IoSender sender;
+ private E endpoint;
+ private IoReceiver receiver;
+ private long timeout = 60000;
+
+ IoTransport(Socket socket, Binding<E,ByteBuffer> binding)
+ {
+ this.socket = socket;
+ this.sender = new IoSender(this, 2*writeBufferSize, timeout);
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(this, binding.receiver(endpoint),
+ 2*readBufferSize, timeout);
+ }
+
+ IoSender getSender()
+ {
+ return sender;
+ }
+
+ IoReceiver getReceiver()
+ {
+ return receiver;
+ }
+
+ Socket getSocket()
+ {
+ return socket;
+ }
+
+ public static final <E> E connect(String host, int port,
+ Binding<E,ByteBuffer> binding)
+ {
+ Socket socket = createSocket(host, port);
+ IoTransport<E> transport = new IoTransport<E>(socket, binding);
+ return transport.endpoint;
+ }
+
+ public static final Connection connect(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ return connect(host, port, new ConnectionBinding(delegate));
+ }
+
+ public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port)
+ {
+ connect(host, port, new Binding_0_9(session));
+ }
+
+ private static class Binding_0_9
+ implements Binding<AMQVersionAwareProtocolSession,ByteBuffer>
+ {
+
+ private AMQVersionAwareProtocolSession session;
+
+ Binding_0_9(AMQVersionAwareProtocolSession session)
+ {
+ this.session = session;
+ }
+
+ public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender)
+ {
+ session.setSender(sender);
+ return session;
+ }
+
+ public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn)
+ {
+ return new InputHandler_0_9(ssn);
+ }
+
+ }
+
+ private static Socket createSocket(String host, int port)
+ {
+ try
+ {
+ InetAddress address = InetAddress.getByName(host);
+ Socket socket = new Socket();
+ socket.setReuseAddress(true);
+ socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
+
+ log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+ socket.setSendBufferSize(writeBufferSize);
+ socket.setReceiveBufferSize(readBufferSize);
+
+ log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+ socket.connect(new InetSocketAddress(address, port));
+ return socket;
+ }
+ catch (SocketException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
index c6855e3d48..f8dbec3c3d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.network.mina;
+package org.apache.qpid.transport.network.mina;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -33,20 +33,20 @@ import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.qpidity.transport.Binding;
-import org.apache.qpidity.transport.Connection;
-import org.apache.qpidity.transport.ConnectionDelegate;
-import org.apache.qpidity.transport.Receiver;
-import org.apache.qpidity.transport.Sender;
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpidity.transport.util.Logger;
+import org.apache.qpid.transport.util.Logger;
-import org.apache.qpidity.transport.network.Assembler;
-import org.apache.qpidity.transport.network.Disassembler;
-import org.apache.qpidity.transport.network.InputHandler;
-import org.apache.qpidity.transport.network.OutputHandler;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
-import static org.apache.qpidity.transport.util.Functions.*;
+import static org.apache.qpid.transport.util.Functions.*;
/**
* MinaHandler
@@ -56,7 +56,6 @@ import static org.apache.qpidity.transport.util.Functions.*;
//RA making this public until we sort out the package issues
public class MinaHandler<E> implements IoHandler
{
- private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
/** Default buffer size for pending messages reads */
private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
/** Default buffer size for pending messages writes */
@@ -202,7 +201,7 @@ public class MinaHandler<E> implements IoHandler
IoAcceptor acceptor = new SocketAcceptor();
acceptor.bind(address, new MinaHandler<E>(binding));
}
-
+
public static final <E> E connect(String host, int port,
Binding<E,java.nio.ByteBuffer> binding)
{
@@ -263,43 +262,13 @@ public class MinaHandler<E> implements IoHandler
ConnectionDelegate delegate)
throws IOException
{
- accept(host, port, new ConnectionBinding
- (delegate, InputHandler.State.PROTO_HDR));
+ accept(host, port, new ConnectionBinding(delegate));
}
public static final Connection connect(String host, int port,
ConnectionDelegate delegate)
{
- return connect(host, port, new ConnectionBinding
- (delegate, InputHandler.State.PROTO_HDR));
- }
-
- private static class ConnectionBinding
- implements Binding<Connection,java.nio.ByteBuffer>
- {
-
- private final ConnectionDelegate delegate;
- private final InputHandler.State state;
-
- ConnectionBinding(ConnectionDelegate delegate,
- InputHandler.State state)
- {
- this.delegate = delegate;
- this.state = state;
- }
-
- public Connection endpoint(Sender<java.nio.ByteBuffer> sender)
- {
- // XXX: hardcoded max-frame
- return new Connection
- (new Disassembler(new OutputHandler(sender), MAX_FRAME_SIZE), delegate);
- }
-
- public Receiver<java.nio.ByteBuffer> receiver(Connection conn)
- {
- return new InputHandler(new Assembler(conn), state);
- }
-
+ return connect(host, port, new ConnectionBinding(delegate));
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
index f0f5731037..69d4061e0c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
@@ -18,15 +18,15 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.network.mina;
+package org.apache.qpid.transport.network.mina;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
-import org.apache.qpidity.transport.Sender;
-import org.apache.qpidity.transport.TransportException;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
/**
@@ -58,6 +58,11 @@ public class MinaSender implements Sender<java.nio.ByteBuffer>
}
}
+ public void flush()
+ {
+ // pass
+ }
+
public synchronized void close()
{
// MINA will sometimes throw away in-progress writes when you
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
index a7339427c7..51e41b26f7 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport.network.nio;
+package org.apache.qpid.transport.network.nio;
import java.io.EOFException;
import java.io.IOException;
@@ -11,13 +11,12 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.qpidity.transport.Connection;
-import org.apache.qpidity.transport.ConnectionDelegate;
-import org.apache.qpidity.transport.Receiver;
-import org.apache.qpidity.transport.network.Assembler;
-import org.apache.qpidity.transport.network.Disassembler;
-import org.apache.qpidity.transport.network.InputHandler;
-import org.apache.qpidity.transport.network.OutputHandler;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
public class NioHandler implements Runnable
{
@@ -68,8 +67,7 @@ public class NioHandler implements Runnable
NioSender sender = new NioSender(_ch);
Connection con = new Connection
- (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
- delegate);
+ (new Disassembler(sender, 64*1024 - 1), delegate);
con.setConnectionId(_count.incrementAndGet());
_handlers.put(con.getConnectionId(),sender);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
index 2cfe6c2089..33e888cc56 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
@@ -1,9 +1,9 @@
-package org.apache.qpidity.transport.network.nio;
+package org.apache.qpid.transport.network.nio;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import org.apache.qpidity.transport.Sender;
+import org.apache.qpid.transport.Sender;
public class NioSender implements Sender<java.nio.ByteBuffer>
{
@@ -47,6 +47,11 @@ public class NioSender implements Sender<java.nio.ByteBuffer>
}
}
+ public void flush()
+ {
+ // pass
+ }
+
private void write(java.nio.ByteBuffer buf)
{
synchronized (lock)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
index 0038cdf118..2c6984e302 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.util;
+package org.apache.qpid.transport.util;
import java.nio.ByteBuffer;
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Logger.java b/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java
index 0159036a34..8c4818df92 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/util/Logger.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.util;
+package org.apache.qpid.transport.util;
import org.slf4j.LoggerFactory;
@@ -42,6 +42,11 @@ public final class Logger
this.log = log;
}
+ public boolean isDebugEnabled()
+ {
+ return log.isDebugEnabled();
+ }
+
public void debug(String message, Object ... args)
{
if (log.isDebugEnabled())
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java b/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java
index 32392a3561..3db29847b2 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport.util;
+package org.apache.qpid.transport.util;
import java.nio.ByteBuffer;
diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java b/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java
index 2cf035f601..f12fb2cff2 100644
--- a/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java
+++ b/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java
@@ -17,7 +17,7 @@
*/
package org.apache.qpid.url;
-import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
diff --git a/java/common/src/main/java/org/apache/qpid/url/QpidURL.java b/java/common/src/main/java/org/apache/qpid/url/QpidURL.java
index 1d94b31de2..5ab4425323 100644
--- a/java/common/src/main/java/org/apache/qpid/url/QpidURL.java
+++ b/java/common/src/main/java/org/apache/qpid/url/QpidURL.java
@@ -17,7 +17,7 @@
*/
package org.apache.qpid.url;
-import org.apache.qpidity.BrokerDetails;
+import org.apache.qpid.BrokerDetails;
import java.util.List;
diff --git a/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java b/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java
index b4a55e2bf4..f92934db7f 100644
--- a/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java
+++ b/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java
@@ -17,8 +17,8 @@
*/
package org.apache.qpid.url;
-import org.apache.qpidity.BrokerDetails;
-import org.apache.qpidity.BrokerDetailsImpl;
+import org.apache.qpid.BrokerDetails;
+import org.apache.qpid.BrokerDetailsImpl;
import java.net.MalformedURLException;
import java.util.ArrayList;
diff --git a/java/common/src/main/java/org/apache/qpid/util/ReflectionUtilsException.java b/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java
index c2862a755b..e764c8536b 100644
--- a/java/common/src/main/java/org/apache/qpid/util/ReflectionUtilsException.java
+++ b/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java
@@ -20,25 +20,40 @@
*/
package org.apache.qpid.util;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+
/**
- * Wraps a checked exception that occurs when {@link ReflectionUtils} encounters checked exceptions using standard
- * Java reflection methods.
+ * NameUUIDGen
*
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Wrap a checked reflection exception.
- * </table>
*/
-public class ReflectionUtilsException extends RuntimeException
+
+public final class NameUUIDGen implements UUIDGen
{
- /**
- * Creates a runtime reflection exception, from a checked one.
- *
- * @param message The message.
- * @param cause The causing exception.
- */
- public ReflectionUtilsException(String message, Throwable cause)
+
+ private static final int WIDTH = 8;
+
+ final private byte[] seed;
+ final private ByteBuffer seedBuf;
+ private long counter;
+
+ public NameUUIDGen()
{
- super(message, cause);
+ String namespace = UUID.randomUUID().toString();
+ this.seed = new byte[namespace.length() + WIDTH];
+ for (int i = WIDTH; i < seed.length; i++)
+ {
+ seed[i] = (byte) namespace.charAt(i - WIDTH);
+ }
+ this.seedBuf = ByteBuffer.wrap(seed);
+ this.counter = 0;
}
+
+ public UUID generate()
+ {
+ seedBuf.putLong(0, counter++);
+ return UUID.nameUUIDFromBytes(seed);
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java b/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java
new file mode 100644
index 0000000000..60b402a105
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.UUID;
+
+
+/**
+ * RandomUUIDGen
+ *
+ */
+
+public final class RandomUUIDGen implements UUIDGen
+{
+
+ public UUID generate()
+ {
+ return UUID.randomUUID();
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java b/java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java
deleted file mode 100644
index 28fb2e0c8a..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.util;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-/**
- * Provides helper methods for operating on classes and methods using reflection. Reflection methods tend to return
- * a lot of checked exception so writing code to use them can be tedious and harder to read, especially when such errors
- * are not expected to occur. This class always works with {@link ReflectionUtilsException}, which is a runtime exception,
- * to wrap the checked exceptions raised by the standard Java reflection methods. Code using it does not normally
- * expect these errors to occur, usually does not have a recovery mechanism for them when they do, but is cleaner,
- * quicker to write and easier to read in the majority of cases.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Look up Classes by name.
- * <tr><td> Instantiate Classes by no-arg constructor.
- * </table>
- */
-public class ReflectionUtils
-{
- /**
- * Gets the Class object for a named class.
- *
- * @param className The class to get the Class object for.
- *
- * @return The Class object for the named class.
- */
- public static Class<?> forName(String className)
- {
- try
- {
- return Class.forName(className);
- }
- catch (ClassNotFoundException e)
- {
- throw new ReflectionUtilsException("ClassNotFoundException whilst finding class.", e);
- }
- }
-
- /**
- * Creates an instance of a Class, instantiated through its no-args constructor.
- *
- * @param cls The Class to instantiate.
- * @param <T> The Class type.
- *
- * @return An instance of the class.
- */
- public static <T> T newInstance(Class<? extends T> cls)
- {
- try
- {
- return cls.newInstance();
- }
- catch (InstantiationException e)
- {
- throw new ReflectionUtilsException("InstantiationException whilst instantiating class.", e);
- }
- catch (IllegalAccessException e)
- {
- throw new ReflectionUtilsException("IllegalAccessException whilst instantiating class.", e);
- }
- }
-
- /**
- * Calls a named method on an object with a specified set of parameters, any Java access modifier are overridden.
- *
- * @param o The object to call.
- * @param method The method name to call.
- * @param params The parameters to pass.
- * @param paramClasses The argument types.
- *
- * @return The return value from the method call.
- */
- public static Object callMethodOverridingIllegalAccess(Object o, String method, Object[] params, Class[] paramClasses)
- {
- // Get the objects class.
- Class cls = o.getClass();
-
- // Get the classes of the parameters.
- /*Class[] paramClasses = new Class[params.length];
-
- for (int i = 0; i < params.length; i++)
- {
- paramClasses[i] = params[i].getClass();
- }*/
-
- try
- {
- // Try to find the matching method on the class.
- Method m = cls.getDeclaredMethod(method, paramClasses);
-
- // Make it accessible.
- m.setAccessible(true);
-
- // Invoke it with the parameters.
- return m.invoke(o, params);
- }
- catch (NoSuchMethodException e)
- {
- throw new ReflectionUtilsException("NoSuchMethodException.", e);
- }
- catch (IllegalAccessException e)
- {
- throw new ReflectionUtilsException("IllegalAccessException.", e);
- }
- catch (InvocationTargetException e)
- {
- throw new ReflectionUtilsException("InvocationTargetException", e);
- }
- }
-
- /**
- * Calls a named method on an object with a specified set of parameters.
- *
- * @param o The object to call.
- * @param method The method name to call.
- * @param params The parameters to pass.
- *
- * @return The return value from the method call.
- */
- public static Object callMethod(Object o, String method, Object[] params)
- {
- // Get the objects class.
- Class cls = o.getClass();
-
- // Get the classes of the parameters.
- Class[] paramClasses = new Class[params.length];
-
- for (int i = 0; i < params.length; i++)
- {
- paramClasses[i] = params[i].getClass();
- }
-
- try
- {
- // Try to find the matching method on the class.
- Method m = cls.getMethod(method, paramClasses);
-
- // Invoke it with the parameters.
- return m.invoke(o, params);
- }
- catch (NoSuchMethodException e)
- {
- throw new ReflectionUtilsException("NoSuchMethodException.", e);
- }
- catch (IllegalAccessException e)
- {
- throw new ReflectionUtilsException("IllegalAccessException", e);
- }
- catch (InvocationTargetException e)
- {
- throw new ReflectionUtilsException("InvocationTargetException", e);
- }
- }
-
- /**
- * Calls a constuctor witht the specified arguments.
- *
- * @param constructor The constructor.
- * @param args The arguments.
- * @param <T> The Class type.
- *
- * @return An instance of the class that the constructor is for.
- */
- public static <T> T newInstance(Constructor<T> constructor, Object[] args)
- {
- try
- {
- return constructor.newInstance(args);
- }
- catch (InstantiationException e)
- {
- throw new ReflectionUtilsException("InstantiationException", e);
- }
- catch (IllegalAccessException e)
- {
- throw new ReflectionUtilsException("IllegalAccessException", e);
- }
- catch (InvocationTargetException e)
- {
- throw new ReflectionUtilsException("InvocationTargetException", e);
- }
- }
-
- /**
- * Gets the constructor of a class that takes the specified set of arguments if any matches. If no matching
- * constructor is found then a runtime exception is raised.
- *
- * @param cls The class to get a constructor from.
- * @param args The arguments to match.
- * @param <T> The class type.
- *
- * @return The constructor.
- */
- public static <T> Constructor<T> getConstructor(Class<T> cls, Class[] args)
- {
- try
- {
- return cls.getConstructor(args);
- }
- catch (NoSuchMethodException e)
- {
- throw new ReflectionUtilsException("NoSuchMethodException", e);
- }
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/Strings.java b/java/common/src/main/java/org/apache/qpid/util/Strings.java
new file mode 100644
index 0000000000..4b199bafe6
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/util/Strings.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.UnsupportedEncodingException;
+
+
+/**
+ * Strings
+ *
+ */
+
+public final class Strings
+{
+
+ private static final byte[] EMPTY = new byte[0];
+
+ private static final ThreadLocal<char[]> charbuf = new ThreadLocal()
+ {
+ public char[] initialValue()
+ {
+ return new char[4096];
+ }
+ };
+
+ public static final byte[] toUTF8(String str)
+ {
+ if (str == null)
+ {
+ return EMPTY;
+ }
+ else
+ {
+ final int size = str.length();
+ char[] chars = charbuf.get();
+ if (size > chars.length)
+ {
+ chars = new char[Math.max(size, 2*chars.length)];
+ charbuf.set(chars);
+ }
+
+ str.getChars(0, size, chars, 0);
+ final byte[] bytes = new byte[size];
+ for (int i = 0; i < size; i++)
+ {
+ if (chars[i] > 127)
+ {
+ try
+ {
+ return str.getBytes("UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ bytes[i] = (byte) chars[i];
+ }
+ return bytes;
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java b/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java
new file mode 100644
index 0000000000..3cfe5afdac
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+
+import java.util.UUID;
+
+/**
+ * UUIDGen
+ *
+ */
+
+public interface UUIDGen
+{
+
+ public UUID generate();
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/util/UUIDs.java b/java/common/src/main/java/org/apache/qpid/util/UUIDs.java
new file mode 100644
index 0000000000..4bf6b7f0a2
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/util/UUIDs.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+
+/**
+ * UUIDs
+ *
+ */
+
+public final class UUIDs
+{
+
+ public static final UUIDGen newGenerator()
+ {
+ return newGenerator(System.getProperty("qpid.uuid.generator",
+ NameUUIDGen.class.getName()));
+ }
+
+ public static UUIDGen newGenerator(String name)
+ {
+ try
+ {
+ Class cls = Class.forName(name);
+ return (UUIDGen) cls.newInstance();
+ }
+ catch (InstantiationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolException.java b/java/common/src/main/java/org/apache/qpidity/ProtocolException.java
deleted file mode 100644
index 596143a1b9..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/ProtocolException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpidity;
-
-public class ProtocolException extends QpidException
-{
- /**
- * Constructor for a Ptotocol Exception.
- * <p> This is the only provided constructor and the parameters have to be set to null when
- * they are unknown.
- * @param message A description of the reason of this exception.
- * @param errorCode A string specifyin the error code of this exception.
- * @param cause The linked Execption.
- *
- */
- public ProtocolException(String message, ErrorCode errorCode, Throwable cause)
- {
- super(message, errorCode, cause);
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpidity/exchange/ExchangeDefaults.java
deleted file mode 100644
index a99ea56d69..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/exchange/ExchangeDefaults.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.exchange;
-
-/**
- * Default exchange names
- */
-public class ExchangeDefaults
-{
- /** The default direct exchange, which is a special internal exchange that cannot be explicitly bound to. */
- public static final String DEFAULT_EXCHANGE_NAME = "<<default>>";
-
- /** The pre-defined topic exchange, the broker SHOULD provide this. */
- public static final String TOPIC_EXCHANGE_NAME = "amq.topic";
-
- /** Defines the identifying type name of topic exchanges. */
- public static final String TOPIC_EXCHANGE_CLASS = "topic";
-
- /** The pre-defined direct exchange, the broker MUST provide this. */
- public static final String DIRECT_EXCHANGE_NAME = "amq.direct";
-
- /** Defines the identifying type name of direct exchanges. */
- public static final String DIRECT_EXCHANGE_CLASS = "direct";
-
- /** The pre-defined headers exchange, the specification does not say this needs to be provided. */
- public static final String HEADERS_EXCHANGE_NAME = "amq.match";
-
- /** Defines the identifying type name of headers exchanges. */
- public static final String HEADERS_EXCHANGE_CLASS = "headers";
-
- /** The pre-defined fanout exchange, the boker MUST provide this. */
- public static final String FANOUT_EXCHANGE_NAME = "amq.fanout";
-
- /** Defines the identifying type name of fanout exchanges. */
- public static final String FANOUT_EXCHANGE_CLASS = "fanout";
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java
deleted file mode 100644
index 4f61380809..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/Data.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport;
-
-import org.apache.qpidity.transport.network.Frame;
-
-import java.nio.ByteBuffer;
-
-import java.util.Collections;
-
-import static org.apache.qpidity.transport.util.Functions.*;
-
-
-/**
- * Data
- *
- */
-
-public class Data implements ProtocolEvent
-{
-
- private final Iterable<ByteBuffer> fragments;
- private final boolean first;
- private final boolean last;
-
- public Data(Iterable<ByteBuffer> fragments, boolean first, boolean last)
- {
- this.fragments = fragments;
- this.first = first;
- this.last = last;
- }
-
- public Data(ByteBuffer buf, boolean first, boolean last)
- {
- this(Collections.singletonList(buf), first, last);
- }
-
- public Iterable<ByteBuffer> getFragments()
- {
- return fragments;
- }
-
- public boolean isFirst()
- {
- return first;
- }
-
- public boolean isLast()
- {
- return last;
- }
-
- public byte getEncodedTrack()
- {
- return Frame.L4;
- }
-
- public <C> void delegate(C context, ProtocolDelegate<C> delegate)
- {
- delegate.data(context, this);
- }
-
- public String toString()
- {
- StringBuffer str = new StringBuffer();
- str.append("Data(");
- boolean first = true;
- int left = 64;
- for (ByteBuffer buf : getFragments())
- {
- if (first)
- {
- first = false;
- }
- else
- {
- str.append(" | ");
- }
- str.append(str(buf, left));
- left -= buf.remaining();
- if (left < 0)
- {
- break;
- }
- }
- str.append(")");
- return str.toString();
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
deleted file mode 100644
index e9a0705de0..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.qpidity.transport;
-
-public class TransportConstants
-{
-
- private static byte _protocol_version_minor = 10;
- private static byte _protocol_version_major = 0;
-
- public static void setVersionMajor(byte value)
- {
- _protocol_version_major = value;
- }
-
- public static void setVersionMinor(byte value)
- {
- _protocol_version_minor = value;
- }
-
- public static byte getVersionMajor()
- {
- return _protocol_version_major;
- }
-
- public static byte getVersionMinor()
- {
- return _protocol_version_minor;
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java
deleted file mode 100644
index 474211ced2..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.codec;
-
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-
-import java.util.Iterator;
-
-import static java.lang.Math.*;
-
-
-/**
- * FragmentDecoder
- *
- * @author Rafael H. Schloming
- */
-
-public class FragmentDecoder extends AbstractDecoder
-{
-
- private final Iterator<ByteBuffer> fragments;
- private ByteBuffer current;
-
- public FragmentDecoder(Iterator<ByteBuffer> fragments)
- {
- this.fragments = fragments;
- this.current = null;
- }
-
- public boolean hasRemaining()
- {
- advance();
- return current != null || fragments.hasNext();
- }
-
- private void advance()
- {
- while (current == null && fragments.hasNext())
- {
- current = fragments.next();
- if (current.hasRemaining())
- {
- break;
- }
- else
- {
- current = null;
- }
- }
- }
-
- private void preRead()
- {
- advance();
-
- if (current == null)
- {
- throw new BufferUnderflowException();
- }
- }
-
- private void postRead()
- {
- if (current.remaining() == 0)
- {
- current = null;
- }
- }
-
- protected byte doGet()
- {
- preRead();
- byte b = current.get();
- postRead();
- return b;
- }
-
- protected void doGet(byte[] bytes)
- {
- int remaining = bytes.length;
- int offset = 0;
- while (remaining > 0)
- {
- preRead();
- int size = min(remaining, current.remaining());
- current.get(bytes, offset, size);
- offset += size;
- remaining -= size;
- postRead();
- }
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java
deleted file mode 100644
index 2e7e883a0b..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.codec;
-
-import java.nio.ByteBuffer;
-
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpidity.transport.RangeSet;
-
-
-/**
- * SizeEncoder
- *
- * @author Rafael H. Schloming
- */
-
-public class SizeEncoder extends AbstractEncoder implements Sizer
-{
-
- private int size;
-
- public SizeEncoder() {
- this(0);
- }
-
- public SizeEncoder(int size) {
- this.size = size;
- }
-
- protected Sizer sizer()
- {
- return Sizer.NULL;
- }
-
- public int getSize() {
- return size;
- }
-
- public void setSize(int size) {
- this.size = size;
- }
-
- public int size()
- {
- return getSize();
- }
-
- protected void doPut(byte b)
- {
- size += 1;
- }
-
- protected void doPut(ByteBuffer src)
- {
- size += src.remaining();
- }
-
- public void writeUint8(short b)
- {
- size += 1;
- }
-
- public void writeUint16(int s)
- {
- size += 2;
- }
-
- public void writeUint32(long i)
- {
- size += 4;
- }
-
- public void writeUint64(long l)
- {
- size += 8;
- }
-
- public void writeDatetime(long l)
- {
- size += 8;
- }
-
- public void writeUuid(UUID uuid)
- {
- size += 16;
- }
-
- public void writeSequenceNo(int s)
- {
- size += 4;
- }
-
- public void writeSequenceSet(RangeSet ranges)
- {
- size += 2 + 8*ranges.size();
- }
-
- //void writeByteRanges(RangeSet ranges); // XXX
-
- //void writeStr8(String s);
- //void writeStr16(String s);
-
- //void writeVbin8(byte[] bytes);
- //void writeVbin16(byte[] bytes);
- //void writeVbin32(byte[] bytes);
-
- //void writeStruct32(Struct s);
- //void writeMap(Map<String,Object> map);
- //void writeList(List<Object> list);
- //void writeArray(List<Object> array);
-
- //void writeStruct(int type, Struct s);
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
deleted file mode 100644
index d386987d64..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.codec;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpidity.transport.RangeSet;
-import org.apache.qpidity.transport.Struct;
-
-
-/**
- * Sizer
- *
- */
-
-public interface Sizer extends Encoder
-{
-
- public static final Sizer NULL = new Sizer()
- {
- public void writeUint8(short b) {}
- public void writeUint16(int s) {}
- public void writeUint32(long i) {}
- public void writeUint64(long l) {}
-
- public void writeDatetime(long l) {}
- public void writeUuid(UUID uuid) {}
-
- public void writeSequenceNo(int s) {}
- public void writeSequenceSet(RangeSet ranges) {} // XXX
- public void writeByteRanges(RangeSet ranges) {} // XXX
-
- public void writeStr8(String s) {}
- public void writeStr16(String s) {}
-
- public void writeVbin8(byte[] bytes) {}
- public void writeVbin16(byte[] bytes) {}
- public void writeVbin32(byte[] bytes) {}
-
- public void writeStruct32(Struct s) {}
- public void writeMap(Map<String,Object> map) {}
- public void writeList(List<Object> list) {}
- public void writeArray(List<Object> array) {}
-
- public void writeStruct(int type, Struct s) {}
-
- public int getSize() { return 0; }
-
- public int size() { return 0; }
- };
-
- int getSize();
-
- int size();
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Validator.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Validator.java
deleted file mode 100644
index 743e9f3cae..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Validator.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.codec;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpidity.transport.RangeSet;
-import org.apache.qpidity.transport.Struct;
-
-
-/**
- * Validator
- *
- */
-
-public class Validator
-{
-
- public static final void checkBit(boolean b)
- {
- // no illegal values
- }
-
- public static final void checkUint8(short s)
- {
- if (s > 0xFF || s < 0)
- {
- throw new IllegalArgumentException("" + s);
- }
- }
-
- public static final void checkUint16(int i)
- {
- if (i > 0xFFFF || i < 0)
- {
- throw new IllegalArgumentException("" + i);
- }
- }
-
- public static final void checkUint32(long l)
- {
- // XXX: we can't currently validate this because we do thinks
- // like pass in -1 for 0xFFFFFFFF
- // if (l > 0xFFFFFFFFL || l < 0)
- // {
- // throw new IllegalArgumentException("" + l);
- // }
- }
-
- public static final void checkSequenceNo(int s)
- {
- // no illegal values
- }
-
- public static final void checkUint64(long l)
- {
- // no illegal values
- }
-
- public static final void checkDatetime(long l)
- {
- // no illegal values
- }
-
- public static final void checkUuid(UUID u)
- {
- // no illegal values
- }
-
- public static final void checkStr8(String value)
- {
- if (value != null && value.length() > 255)
- {
- throw new IllegalArgumentException("" + value);
- }
- }
-
- public static final void checkStr16(String value)
- {
- if (value != null && value.length() > 0xFFFF)
- {
- throw new IllegalArgumentException("" + value);
- }
- }
-
- public static final void checkVbin8(byte[] value)
- {
- if (value != null && value.length > 255)
- {
- throw new IllegalArgumentException("" + value);
- }
- }
-
- public static final void checkVbin16(byte[] value)
- {
- if (value != null && value.length > 0xFFFF)
- {
- throw new IllegalArgumentException("" + value);
- }
- }
-
- public static final void checkByteRanges(RangeSet r)
- {
- // no illegal values
- }
-
- public static final void checkSequenceSet(RangeSet r)
- {
- // no illegal values
- }
-
- public static final void checkVbin32(byte[] value)
- {
- // no illegal values
- }
-
- public static final void checkStruct32(Struct s)
- {
- // no illegal values
- }
-
- public static final void checkArray(List<Object> array)
- {
- if (array == null)
- {
- return;
- }
-
- for (Object o : array)
- {
- checkObject(o);
- }
- }
-
- public static final void checkMap(Map<String,Object> map)
- {
- if (map == null)
- {
- return;
- }
-
- for (Map.Entry<String,Object> entry : map.entrySet())
- {
- checkStr8(entry.getKey());
- checkObject(entry.getValue());
- }
- }
-
- public static final void checkObject(Object o)
- {
- if (o != null && AbstractEncoder.resolve(o.getClass()) == null)
- {
- throw new IllegalArgumentException("cannot encode " + o.getClass());
- }
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
deleted file mode 100644
index 3a7a550573..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.network;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpidity.transport.codec.BBDecoder;
-import org.apache.qpidity.transport.codec.Decoder;
-import org.apache.qpidity.transport.codec.FragmentDecoder;
-
-import org.apache.qpidity.transport.ConnectionEvent;
-import org.apache.qpidity.transport.Data;
-import org.apache.qpidity.transport.Header;
-import org.apache.qpidity.transport.Method;
-import org.apache.qpidity.transport.ProtocolError;
-import org.apache.qpidity.transport.ProtocolEvent;
-import org.apache.qpidity.transport.ProtocolHeader;
-import org.apache.qpidity.transport.Receiver;
-import org.apache.qpidity.transport.SegmentType;
-import org.apache.qpidity.transport.Struct;
-
-
-/**
- * Assembler
- *
- */
-
-public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
-{
-
- private final Receiver<ConnectionEvent> receiver;
- private final Map<Integer,List<ByteBuffer>> segments;
-
- public Assembler(Receiver<ConnectionEvent> receiver)
- {
- this.receiver = receiver;
- segments = new HashMap<Integer,List<ByteBuffer>>();
- }
-
- private int segmentKey(Frame frame)
- {
- // XXX: can this overflow?
- return (frame.getTrack() + 1) * frame.getChannel();
- }
-
- private List<ByteBuffer> getSegment(Frame frame)
- {
- return segments.get(segmentKey(frame));
- }
-
- private void setSegment(Frame frame, List<ByteBuffer> segment)
- {
- int key = segmentKey(frame);
- if (segments.containsKey(key))
- {
- error(new ProtocolError(Frame.L2, "segment in progress: %s",
- frame));
- }
- segments.put(segmentKey(frame), segment);
- }
-
- private void clearSegment(Frame frame)
- {
- segments.remove(segmentKey(frame));
- }
-
- private void emit(int channel, ProtocolEvent event)
- {
- receiver.received(new ConnectionEvent(channel, event));
- }
-
- private void emit(Frame frame, ProtocolEvent event)
- {
- emit(frame.getChannel(), event);
- }
-
- public void received(NetworkEvent event)
- {
- event.delegate(this);
- }
-
- public void exception(Throwable t)
- {
- this.receiver.exception(t);
- }
-
- public void closed()
- {
- this.receiver.closed();
- }
-
- public void init(ProtocolHeader header)
- {
- emit(0, header);
- }
-
- public void frame(Frame frame)
- {
- switch (frame.getType())
- {
- case BODY:
- emit(frame, new Data(frame, frame.isFirstFrame(),
- frame.isLastFrame()));
- break;
- default:
- assemble(frame);
- break;
- }
- }
-
- public void error(ProtocolError error)
- {
- emit(0, error);
- }
-
- private void assemble(Frame frame)
- {
- List<ByteBuffer> segment;
- if (frame.isFirstFrame())
- {
- segment = new ArrayList<ByteBuffer>();
- setSegment(frame, segment);
- }
- else
- {
- segment = getSegment(frame);
- }
-
- for (ByteBuffer buf : frame)
- {
- segment.add(buf);
- }
-
- if (frame.isLastFrame())
- {
- clearSegment(frame);
- emit(frame, decode(frame, frame.getType(), segment));
- }
- }
-
- private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment)
- {
- Decoder dec;
- if (segment.size() == 1)
- {
- dec = new BBDecoder(segment.get(0));
- }
- else
- {
- dec = new FragmentDecoder(segment.iterator());
- }
-
- switch (type)
- {
- case CONTROL:
- int controlType = dec.readUint16();
- Method control = Method.create(controlType);
- control.read(dec);
- return control;
- case COMMAND:
- int commandType = dec.readUint16();
- // read in the session header, right now we don't use it
- dec.readUint16();
- Method command = Method.create(commandType);
- command.read(dec);
- return command;
- case HEADER:
- List<Struct> structs = new ArrayList();
- while (dec.hasRemaining())
- {
- structs.add(dec.readStruct32());
- }
- return new Header(structs,frame.isLastFrame() && frame.isLastSegment());
- default:
- throw new IllegalStateException("unknown frame type: " + type);
- }
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
deleted file mode 100644
index da9ba84ab0..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.network;
-
-import org.apache.qpidity.transport.codec.BBEncoder;
-import org.apache.qpidity.transport.codec.SizeEncoder;
-
-import org.apache.qpidity.transport.ConnectionEvent;
-import org.apache.qpidity.transport.Data;
-import org.apache.qpidity.transport.Header;
-import org.apache.qpidity.transport.Method;
-import org.apache.qpidity.transport.ProtocolDelegate;
-import org.apache.qpidity.transport.ProtocolError;
-import org.apache.qpidity.transport.ProtocolEvent;
-import org.apache.qpidity.transport.ProtocolHeader;
-import org.apache.qpidity.transport.SegmentType;
-import org.apache.qpidity.transport.Sender;
-import org.apache.qpidity.transport.Struct;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import static org.apache.qpidity.transport.network.Frame.*;
-
-import static java.lang.Math.*;
-
-
-/**
- * Disassembler
- *
- */
-
-public class Disassembler implements Sender<ConnectionEvent>,
- ProtocolDelegate<ConnectionEvent>
-{
-
- private final Sender<NetworkEvent> sender;
- private final int maxPayload;
-
- public Disassembler(Sender<NetworkEvent> sender, int maxFrame)
- {
- if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
- {
- throw new IllegalArgumentException
- ("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
- }
- this.sender = sender;
- this.maxPayload = maxFrame - HEADER_SIZE;
-
- }
-
- public void send(ConnectionEvent event)
- {
- event.getProtocolEvent().delegate(event, this);
- }
-
- public void close()
- {
- sender.close();
- }
-
- private void fragment(byte flags, SegmentType type, ConnectionEvent event,
- ByteBuffer buf, boolean first, boolean last)
- {
- byte track = event.getProtocolEvent().getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
-
- if(!buf.hasRemaining())
- {
- //empty data
- byte nflags = flags;
- if (first)
- {
- nflags |= FIRST_FRAME;
- first = false;
- }
- nflags |= LAST_FRAME;
- Frame frame = new Frame(nflags, type, track, event.getChannel());
- // frame.addFragment(buf);
- sender.send(frame);
- }
- else
- {
- while (buf.hasRemaining())
- {
- ByteBuffer slice = buf.slice();
- slice.limit(min(maxPayload, slice.remaining()));
- buf.position(buf.position() + slice.remaining());
-
- byte newflags = flags;
- if (first)
- {
- newflags |= FIRST_FRAME;
- first = false;
- }
- if (last && !buf.hasRemaining())
- {
- newflags |= LAST_FRAME;
- }
-
- Frame frame = new Frame(newflags, type, track, event.getChannel());
- frame.addFragment(slice);
- sender.send(frame);
- }
- }
- }
-
- public void init(ConnectionEvent event, ProtocolHeader header)
- {
- sender.send(header);
- }
-
- public void control(ConnectionEvent event, Method method)
- {
- method(event, method, SegmentType.CONTROL);
- }
-
- public void command(ConnectionEvent event, Method method)
- {
- method(event, method, SegmentType.COMMAND);
- }
-
- private void method(ConnectionEvent event, Method method, SegmentType type)
- {
- SizeEncoder sizer = new SizeEncoder();
- sizer.writeUint16(method.getEncodedType());
- if (type == SegmentType.COMMAND)
- {
- sizer.writeUint16(0);
- }
- method.write(sizer);
-
- ByteBuffer buf = ByteBuffer.allocate(sizer.size());
- BBEncoder enc = new BBEncoder(buf);
- enc.writeUint16(method.getEncodedType());
- if (type == SegmentType.COMMAND)
- {
- if (method.isSync())
- {
- enc.writeUint16(0x0101);
- }
- else
- {
- enc.writeUint16(0x0100);
- }
- }
- method.write(enc);
- buf.flip();
-
- byte flags = FIRST_SEG;
-
- if (!method.hasPayload())
- {
- flags |= LAST_SEG;
- }
-
- fragment(flags, type, event, buf, true, true);
- }
-
- public void header(ConnectionEvent event, Header header)
- {
- ByteBuffer buf;
- if( header.getBuf() == null)
- {
- SizeEncoder sizer = new SizeEncoder();
- for (Struct st : header.getStructs())
- {
- sizer.writeStruct32(st);
- }
-
- buf = ByteBuffer.allocate(sizer.size());
- BBEncoder enc = new BBEncoder(buf);
- for (Struct st : header.getStructs())
- {
- enc.writeStruct32(st);
- }
- header.setBuf(buf);
- }
- else
- {
- buf = header.getBuf();
- }
- buf.flip();
- fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true);
- }
-
- public void data(ConnectionEvent event, Data data)
- {
- boolean first = data.isFirst();
- for (Iterator<ByteBuffer> it = data.getFragments().iterator();
- it.hasNext(); )
- {
- ByteBuffer buf = it.next();
- boolean last = data.isLast() && !it.hasNext();
- fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last);
- first = false;
- }
- }
-
- public void error(ConnectionEvent event, ProtocolError error)
- {
- sender.send(error);
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
deleted file mode 100644
index d1c03348b4..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.network;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpidity.transport.ProtocolError;
-import org.apache.qpidity.transport.ProtocolHeader;
-import org.apache.qpidity.transport.Receiver;
-import org.apache.qpidity.transport.SegmentType;
-
-import static org.apache.qpidity.transport.util.Functions.*;
-
-import static org.apache.qpidity.transport.network.InputHandler.State.*;
-
-
-/**
- * InputHandler
- *
- * @author Rafael H. Schloming
- */
-
-public class InputHandler implements Receiver<ByteBuffer>
-{
-
- public enum State
- {
- PROTO_HDR,
- PROTO_HDR_M,
- PROTO_HDR_Q,
- PROTO_HDR_P,
- PROTO_HDR_CLASS,
- PROTO_HDR_INSTANCE,
- PROTO_HDR_MAJOR,
- PROTO_HDR_MINOR,
- FRAME_HDR,
- FRAME_HDR_TYPE,
- FRAME_HDR_SIZE1,
- FRAME_HDR_SIZE2,
- FRAME_HDR_RSVD1,
- FRAME_HDR_TRACK,
- FRAME_HDR_CH1,
- FRAME_HDR_CH2,
- FRAME_HDR_RSVD2,
- FRAME_HDR_RSVD3,
- FRAME_HDR_RSVD4,
- FRAME_HDR_RSVD5,
- FRAME_FRAGMENT,
- ERROR;
- }
-
- private final Receiver<NetworkEvent> receiver;
- private State state;
-
- private byte instance;
- private byte major;
- private byte minor;
-
- private byte flags;
- private SegmentType type;
- private byte track;
- private int channel;
- private int size;
- private Frame frame;
-
- public InputHandler(Receiver<NetworkEvent> receiver, State state)
- {
- this.receiver = receiver;
- this.state = state;
- }
-
- public InputHandler(Receiver<NetworkEvent> receiver)
- {
- this(receiver, PROTO_HDR);
- }
-
- private void init()
- {
- receiver.received(new ProtocolHeader(instance, major, minor));
- }
-
- private void frame()
- {
- assert size == frame.getSize();
- receiver.received(frame);
- frame = null;
- }
-
- private void error(String fmt, Object ... args)
- {
- receiver.received(new ProtocolError(Frame.L1, fmt, args));
- }
-
- public void received(ByteBuffer buf)
- {
- while (buf.hasRemaining())
- {
- state = next(buf);
- }
- }
-
- private State next(ByteBuffer buf)
- {
- switch (state) {
- case PROTO_HDR:
- return expect(buf, 'A', PROTO_HDR_M);
- case PROTO_HDR_M:
- return expect(buf, 'M', PROTO_HDR_Q);
- case PROTO_HDR_Q:
- return expect(buf, 'Q', PROTO_HDR_P);
- case PROTO_HDR_P:
- return expect(buf, 'P', PROTO_HDR_CLASS);
- case PROTO_HDR_CLASS:
- return expect(buf, 1, PROTO_HDR_INSTANCE);
- case PROTO_HDR_INSTANCE:
- instance = buf.get();
- return PROTO_HDR_MAJOR;
- case PROTO_HDR_MAJOR:
- major = buf.get();
- return PROTO_HDR_MINOR;
- case PROTO_HDR_MINOR:
- minor = buf.get();
- init();
- return FRAME_HDR;
- case FRAME_HDR:
- flags = buf.get();
- return FRAME_HDR_TYPE;
- case FRAME_HDR_TYPE:
- type = SegmentType.get(buf.get());
- return FRAME_HDR_SIZE1;
- case FRAME_HDR_SIZE1:
- size = (0xFF & buf.get()) << 8;
- return FRAME_HDR_SIZE2;
- case FRAME_HDR_SIZE2:
- size += 0xFF & buf.get();
- size -= 12;
- if (size < 0 || size > (64*1024 - 12))
- {
- error("bad frame size: %d", size);
- return ERROR;
- }
- else
- {
- return FRAME_HDR_RSVD1;
- }
- case FRAME_HDR_RSVD1:
- return expect(buf, 0, FRAME_HDR_TRACK);
- case FRAME_HDR_TRACK:
- byte b = buf.get();
- if ((b & 0xF0) != 0) {
- error("non-zero reserved bits in upper nibble of " +
- "frame header byte 5: '%x'", b);
- return ERROR;
- } else {
- track = (byte) (b & 0xF);
- return FRAME_HDR_CH1;
- }
- case FRAME_HDR_CH1:
- channel = (0xFF & buf.get()) << 8;
- return FRAME_HDR_CH2;
- case FRAME_HDR_CH2:
- channel += 0xFF & buf.get();
- return FRAME_HDR_RSVD2;
- case FRAME_HDR_RSVD2:
- return expect(buf, 0, FRAME_HDR_RSVD3);
- case FRAME_HDR_RSVD3:
- return expect(buf, 0, FRAME_HDR_RSVD4);
- case FRAME_HDR_RSVD4:
- return expect(buf, 0, FRAME_HDR_RSVD5);
- case FRAME_HDR_RSVD5:
- if (!expect(buf, 0))
- {
- return ERROR;
- }
-
- frame = new Frame(flags, type, track, channel);
- if (size > buf.remaining()) {
- frame.addFragment(buf.slice());
- buf.position(buf.limit());
- return FRAME_FRAGMENT;
- } else {
- ByteBuffer payload = buf.slice();
- payload.limit(size);
- buf.position(buf.position() + size);
- frame.addFragment(payload);
- frame();
- return FRAME_HDR;
- }
- case FRAME_FRAGMENT:
- int delta = size - frame.getSize();
- if (delta > buf.remaining()) {
- frame.addFragment(buf.slice());
- buf.position(buf.limit());
- return FRAME_FRAGMENT;
- } else {
- ByteBuffer fragment = buf.slice();
- fragment.limit(delta);
- buf.position(buf.position() + delta);
- frame.addFragment(fragment);
- frame();
- return FRAME_HDR;
- }
- default:
- throw new IllegalStateException();
- }
- }
-
- private State expect(ByteBuffer buf, int expected, State next)
- {
- return expect(buf, (byte) expected, next);
- }
-
- private State expect(ByteBuffer buf, char expected, State next)
- {
- return expect(buf, (byte) expected, next);
- }
-
- private State expect(ByteBuffer buf, byte expected, State next)
- {
- if (expect(buf, expected))
- {
- return next;
- }
- else
- {
- return ERROR;
- }
- }
-
- private boolean expect(ByteBuffer buf, int expected)
- {
- return expect(buf, (byte) expected);
- }
-
- private boolean expect(ByteBuffer buf, byte expected)
- {
- byte b = buf.get();
- if (b == expected)
- {
- return true;
- }
- else
- {
- error("expecting '%x', got '%x'", expected, b);
- return false;
- }
- }
-
- public void exception(Throwable t)
- {
- receiver.exception(t);
- }
-
- public void closed()
- {
- receiver.closed();
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
deleted file mode 100644
index b749332fa3..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.network;
-
-import java.nio.ByteBuffer;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpidity.transport.Constant;
-import org.apache.qpidity.transport.ProtocolError;
-import org.apache.qpidity.transport.ProtocolHeader;
-import org.apache.qpidity.transport.Sender;
-
-import static org.apache.qpidity.transport.network.Frame.*;
-
-
-/**
- * OutputHandler
- *
- */
-
-public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
-{
-
- private Sender<ByteBuffer> sender;
- private Object lock = new Object();
- private int bytes = 0;
- private List<Frame> frames = new ArrayList<Frame>();
-
- public OutputHandler(Sender<ByteBuffer> sender)
- {
- this.sender = sender;
- }
-
- public void send(NetworkEvent event)
- {
- event.delegate(this);
- }
-
- public void close()
- {
- synchronized (lock)
- {
- sender.close();
- }
- }
-
- public void init(ProtocolHeader header)
- {
- synchronized (lock)
- {
- sender.send(header.toByteBuffer());
- }
- }
-
- public void frame(Frame frame)
- {
- synchronized (lock)
- {
- frames.add(frame);
- bytes += HEADER_SIZE + frame.getSize();
-
- if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024)
- {
- ByteBuffer buf = ByteBuffer.allocate(bytes);
- for (Frame f : frames)
- {
- buf.put(f.getFlags());
- buf.put((byte) f.getType().getValue());
- buf.putShort((short) (f.getSize() + HEADER_SIZE));
- // RESERVED
- buf.put(RESERVED);
- buf.put(f.getTrack());
- buf.putShort((short) f.getChannel());
- // RESERVED
- buf.putInt(0);
- for(ByteBuffer frg : f)
- {
- buf.put(frg);
- }
- }
- buf.flip();
-
- frames.clear();
- bytes = 0;
-
- sender.send(buf);
- }
- }
- }
-
- public void error(ProtocolError error)
- {
- throw new IllegalStateException("XXX");
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
deleted file mode 100644
index 568526d9db..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpidity.transport.network.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.qpidity.transport.Connection;
-import org.apache.qpidity.transport.ConnectionDelegate;
-import org.apache.qpidity.transport.Receiver;
-import org.apache.qpidity.transport.network.Assembler;
-import org.apache.qpidity.transport.network.Disassembler;
-import org.apache.qpidity.transport.network.InputHandler;
-import org.apache.qpidity.transport.network.OutputHandler;
-import org.apache.qpidity.transport.util.Logger;
-
-/**
- * This class provides a synchronous IO implementation using
- * the java.io classes. The IoHandler runs in its own thread.
- * The following params are configurable via JVM arguments
- * TCP_NO_DELAY - amqj.tcpNoDelay
- * SO_RCVBUF - amqj.receiveBufferSize
- * SO_SNDBUF - amqj.sendBufferSize
- */
-public class IoHandler implements Runnable
-{
- private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
-
- private Receiver<ByteBuffer> _receiver;
- private Socket _socket;
- private byte[] _readBuf;
- private static AtomicInteger _count = new AtomicInteger();
- private int _readBufferSize;
- private int _writeBufferSize;
-
- private static final Logger log = Logger.get(IoHandler.class);
-
- private IoHandler()
- {
- _readBufferSize = Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
- _writeBufferSize = Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
- }
-
- public static final Connection connect(String host, int port,
- ConnectionDelegate delegate)
- {
- IoHandler handler = new IoHandler();
- return handler.connectInternal(host,port,delegate);
- }
-
- private Connection connectInternal(String host, int port,
- ConnectionDelegate delegate)
- {
- try
- {
- InetAddress address = InetAddress.getByName(host);
- _socket = new Socket();
- _socket.setReuseAddress(true);
- _socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
-
- log.debug("default-SO_RCVBUF : " + _socket.getReceiveBufferSize());
- log.debug("default-SO_SNDBUF : " + _socket.getSendBufferSize());
-
- _socket.setSendBufferSize(_writeBufferSize);
- _socket.setReceiveBufferSize(_readBufferSize);
-
- log.debug("new-SO_RCVBUF : " + _socket.getReceiveBufferSize());
- log.debug("new-SO_SNDBUF : " + _socket.getSendBufferSize());
-
- if (address != null)
- {
- _socket.connect(new InetSocketAddress(address, port));
- }
- while (!_socket.isConnected())
- {
-
- }
-
- }
- catch (SocketException e)
- {
- throw new RuntimeException("Error connecting to broker",e);
- }
- catch (IOException e)
- {
- throw new RuntimeException("Error connecting to broker",e);
- }
-
- IoSender sender = new IoSender(_socket);
- Connection con = new Connection
- (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
- delegate);
-
- con.setConnectionId(_count.incrementAndGet());
- _receiver = new InputHandler(new Assembler(con), InputHandler.State.PROTO_HDR);
-
- Thread t = new Thread(this);
- t.setName("IO Handler Thread-" + _count.get());
- t.start();
-
- return con;
- }
-
- public void run()
- {
- // I set the read_buffer size simillar to SO_RCVBUF
- // Haven't tested with a lower value to see its better or worse
- _readBuf = new byte[_readBufferSize];
- try
- {
- InputStream in = _socket.getInputStream();
- int read = 0;
- while(_socket.isConnected())
- {
- try
- {
- read = in.read(_readBuf);
- if (read > 0)
- {
- ByteBuffer b = ByteBuffer.allocate(read);
- b.put(_readBuf,0,read);
- b.flip();
- _receiver.received(b);
- }
- }
- catch(Exception e)
- {
- throw new RuntimeException("Error reading from socket input stream",e);
- }
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException("Error getting input stream from the socket",e);
- }
- finally
- {
- try
- {
- _socket.close();
- }
- catch(Exception e)
- {
- log.error(e,"Error closing socket");
- }
- }
- }
-
- /**
- * Will experiment in a future version with batching
- */
- public static void startBatchingFrames(int connectionId)
- {
-
- }
-
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
deleted file mode 100644
index 1adde531a6..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.transport.network.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import org.apache.qpidity.transport.Sender;
-
-public class IoSender implements Sender<java.nio.ByteBuffer>
-{
- private final Object lock = new Object();
- private Socket _socket;
- private OutputStream _outStream;
-
- public IoSender(Socket socket)
- {
- this._socket = socket;
- try
- {
- _outStream = _socket.getOutputStream();
- }
- catch(IOException e)
- {
- throw new RuntimeException("Error getting output stream for socket",e);
- }
- }
-
- /*
- * Currently I don't implement any in memory buffering
- * and just write straight to the wire.
- * I want to experiment with buffering and see if I can
- * get more performance, all though latency will suffer
- * a bit.
- */
- public void send(java.nio.ByteBuffer buf)
- {
- write(buf);
- }
-
- /* The extra copying sucks.
- * If I know for sure that the buf is backed
- * by an array then I could do buf.array()
- */
- private void write(java.nio.ByteBuffer buf)
- {
- byte[] array = new byte[buf.remaining()];
- buf.get(array);
- if( _socket.isConnected())
- {
- synchronized (lock)
- {
- try
- {
- _outStream.write(array);
- }
- catch(Exception e)
- {
- e.fillInStackTrace();
- throw new RuntimeException("Error trying to write to the socket",e);
- }
- }
- }
- else
- {
- throw new RuntimeException("Trying to write on a closed socket");
- }
- }
-
- /*
- * Haven't used this, but the intention is
- * to experiment with it in the future.
- * Also need to make sure the buffer size
- * is configurable
- */
- public void setStartBatching()
- {
- }
-
- public void close()
- {
- synchronized (lock)
- {
- try
- {
- _socket.close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- }
-}
diff --git a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index 1a83786e12..b9ca210483 100644
--- a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -18,14 +18,16 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
import org.apache.mina.util.AvailablePortFinder;
import org.apache.qpid.util.concurrent.Condition;
-import org.apache.qpidity.transport.network.mina.MinaHandler;
-import org.apache.qpidity.transport.util.Logger;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
+import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.util.Logger;
import junit.framework.TestCase;
@@ -62,12 +64,14 @@ public class ConnectionTest extends TestCase
public void closed() {}
};
- MinaHandler.accept("localhost", port, server);
+ IoAcceptor ioa = new IoAcceptor
+ ("localhost", port, new ConnectionBinding(server));
+ ioa.start();
}
private Connection connect(final Condition closed)
{
- Connection conn = MinaHandler.connect("localhost", port, new ConnectionDelegate()
+ Connection conn = IoTransport.connect("localhost", port, new ConnectionDelegate()
{
public SessionDelegate getSessionDelegate()
{
@@ -86,7 +90,7 @@ public class ConnectionTest extends TestCase
}
});
- conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
+ conn.send(new ProtocolHeader(1, 0, 10));
return conn;
}
diff --git a/java/common/src/test/java/org/apache/qpidity/transport/RangeSetTest.java b/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java
index 474d47d8d7..ad45d00e46 100644
--- a/java/common/src/test/java/org/apache/qpidity/transport/RangeSetTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
import java.util.ArrayList;
import java.util.Collections;