diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-08-15 03:40:49 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-08-15 03:40:49 +0000 |
| commit | b6a376a4797e4988cdae48e0e5395a9b1f4e9f85 (patch) | |
| tree | dc41b190202b592d35579af35cc8b18bb1f1b702 /java/common | |
| parent | c521097d6d6f44e437e2ce67f5a8ae66706e4476 (diff) | |
| download | qpid-python-b6a376a4797e4988cdae48e0e5395a9b1f4e9f85.tar.gz | |
updated qpid.0-10/java to match trunk/qpid/java@686097
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-10@686136 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
128 files changed, 3863 insertions, 2973 deletions
diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl index 46a45b0b91..283fa24641 100644 --- a/java/common/Composite.tpl +++ b/java/common/Composite.tpl @@ -1,28 +1,30 @@ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.transport.codec.Decoder; -import org.apache.qpidity.transport.codec.Encodable; -import org.apache.qpidity.transport.codec.Encoder; -import org.apache.qpidity.transport.codec.Validator; +import org.apache.qpid.transport.codec.Decoder; +import org.apache.qpid.transport.codec.Encodable; +import org.apache.qpid.transport.codec.Encoder; -import org.apache.qpidity.transport.network.Frame; +import org.apache.qpid.transport.network.Frame; ${ from genutil import * cls = klass(type)["@name"] +segments = type["segments"] + if type.name in ("control", "command"): base = "Method" size = 0 pack = 2 - if type["segments"]: + if segments: payload = "true" else: payload = "false" @@ -80,12 +82,16 @@ if pack > 0: out(" private $(PACK_TYPES[pack]) packing_flags = 0;\n"); fields = get_fields(type) -params = get_parameters(fields) +params = get_parameters(type, fields) options = get_options(fields) for f in fields: if not f.empty: out(" private $(f.type) $(f.name);\n") + +if segments: + out(" private Header header;\n") + out(" private ByteBuffer body;\n") } ${ @@ -99,7 +105,11 @@ for f in fields: if f.option: continue out(" $(f.set)($(f.name));\n") -if options: +if segments: + out(" setHeader(header);\n") + out(" setBody(body);\n") + +if options or base == "Method": out(""" for (int i=0; i < _options.length; i++) { switch (_options[i]) { @@ -108,7 +118,11 @@ if options: for f in options: out(" case $(f.option): packing_flags |= $(f.flag_mask(pack)); break;\n") - out(""" case NO_OPTION: break; + if base == "Method": + out(""" case SYNC: this.setSync(true); break; + case BATCH: this.setBatch(true); break; +""") + out(""" case NONE: break; default: throw new IllegalArgumentException("invalid option: " + _options[i]); } } @@ -150,7 +164,6 @@ else: } public final $name $(f.set)($(f.type) value) { - $(f.check) ${ if not f.empty: out(" this.$(f.name) = value;") @@ -169,6 +182,44 @@ if pack > 0: """) } +${ +if segments: + out(""" public final Header getHeader() { + return this.header; + } + + public final void setHeader(Header header) { + this.header = header; + } + + public final $name header(Header header) { + setHeader(header); + return this; + } + + public final ByteBuffer getBody() { + if (this.body == null) + { + return null; + } + else + { + return this.body.slice(); + } + } + + public final void setBody(ByteBuffer body) { + this.body = body; + } + + public final $name body(ByteBuffer body) + { + setBody(body); + return this; + } +""") +} + public void write(Encoder enc) { ${ diff --git a/java/common/Constant.tpl b/java/common/Constant.tpl index 695812ea75..7194a61dfc 100644 --- a/java/common/Constant.tpl +++ b/java/common/Constant.tpl @@ -1,4 +1,4 @@ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; ${from genutil import *} diff --git a/java/common/Enum.tpl b/java/common/Enum.tpl index 337feb7065..2ec1d22522 100644 --- a/java/common/Enum.tpl +++ b/java/common/Enum.tpl @@ -1,4 +1,4 @@ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; public enum $name { ${ diff --git a/java/common/Invoker.tpl b/java/common/Invoker.tpl index d9905c71a0..9158922df7 100644 --- a/java/common/Invoker.tpl +++ b/java/common/Invoker.tpl @@ -1,5 +1,6 @@ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.UUID; @@ -15,8 +16,8 @@ from genutil import * for c in composites: name = cname(c) fields = get_fields(c) - params = get_parameters(fields) - args = get_arguments(fields) + params = get_parameters(c, fields) + args = get_arguments(c, fields) result = c["result"] if result: if not result["@type"]: @@ -32,9 +33,9 @@ for c in composites: jclass = "" out(""" - public $jresult $(dromedary(name))($(", ".join(params))) { - $(jreturn)invoke(new $name($(", ".join(args)))$jclass); - } + public final $jresult $(dromedary(name))($(", ".join(params))) { + $(jreturn)invoke(new $name($(", ".join(args)))$jclass); + } """) } diff --git a/java/common/MethodDelegate.tpl b/java/common/MethodDelegate.tpl index e5ab1ae1e7..84fa0e43da 100644 --- a/java/common/MethodDelegate.tpl +++ b/java/common/MethodDelegate.tpl @@ -1,4 +1,4 @@ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; public abstract class MethodDelegate<C> { diff --git a/java/common/Option.tpl b/java/common/Option.tpl index 5fa2b95b9f..d45c004f6f 100644 --- a/java/common/Option.tpl +++ b/java/common/Option.tpl @@ -1,4 +1,4 @@ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; public enum Option { @@ -15,5 +15,6 @@ for c in composites: if not options.has_key(option): options[option] = None out(" $option,\n")} - NO_OPTION + BATCH, + NONE } diff --git a/java/common/StructFactory.tpl b/java/common/StructFactory.tpl index b27621b1d2..f3dcbbd68a 100644 --- a/java/common/StructFactory.tpl +++ b/java/common/StructFactory.tpl @@ -1,4 +1,4 @@ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; class StructFactory { diff --git a/java/common/Type.tpl b/java/common/Type.tpl index c869934538..c58e08a342 100644 --- a/java/common/Type.tpl +++ b/java/common/Type.tpl @@ -1,4 +1,4 @@ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; ${from genutil import *} diff --git a/java/common/bin/qpid-run b/java/common/bin/qpid-run index c9e37b21a1..1de0048f48 100755 --- a/java/common/bin/qpid-run +++ b/java/common/bin/qpid-run @@ -37,19 +37,39 @@ die() { exit 1 } +OFF=0 +WARN=1 +INFO=2 + +if [ -z "$QPID_RUN_LOG" ]; then + QPID_RUN_LOG=$OFF +fi + +log() { + if [ "$1" -le "$QPID_RUN_LOG" ]; then + shift + echo "$@" + fi +} + if [ -z $AMQJ_LOGGING_LEVEL ]; then export AMQJ_LOGGING_LEVEL=info fi if [ -z "$QPID_HOME" ]; then - die "QPID_HOME must be set" + export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) + export PATH=${PATH}:${QPID_HOME}/bin fi if [ -z "$QPID_WORK" ]; then - echo Setting QPID_WORK to $HOME as default + log $INFO Setting QPID_WORK to $HOME as default QPID_WORK=$HOME fi +if [ -z "$JAVA" ]; then + JAVA=java +fi + if $cygwin; then QPID_HOME=$(cygpath -w $QPID_HOME) QPID_WORK=$(cygpath -w $QPID_WORK) @@ -64,10 +84,10 @@ SYSTEM_PROPS="-Damqj.logging.level=$AMQJ_LOGGING_LEVEL -DQPID_HOME=$QPID_HOME -D #Using X character to avoid probs with empty strings if [ -n "$QPID_LOG_PREFIX" ]; then if [ "X$QPID_LOG_PREFIX" = "XPID" ]; then - echo Using pid in qpid log name prefix + log $INFO Using pid in qpid log name prefix LOG_PREFIX=" -Dlogprefix=$$" else - echo Using qpid logprefix property + log $INFO Using qpid logprefix property LOG_PREFIX=" -Dlogprefix=$QPID_LOG_PREFIX" fi SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_PREFIX}" @@ -75,16 +95,16 @@ fi if [ -n "$QPID_LOG_SUFFIX" ]; then if [ "X$QPID_LOG_SUFFIX" = "XPID" ]; then - echo Using pid in qpid log name suffix + log $INFO Using pid in qpid log name suffix LOG_SUFFIX=" -Dlogsuffix=$$" else - echo Using qpig logsuffix property + log $INFO Using qpig logsuffix property LOG_SUFFIX=" -Dlogsuffix=$QPID_LOG_SUFFIX" fi SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_SUFFIX}" fi -echo System Properties set to $SYSTEM_PROPS +log $INFO System Properties set to $SYSTEM_PROPS program=$(basename $0) sourced=${BASH_SOURCE[0]} @@ -109,26 +129,26 @@ unset CLASSPATH #Use QPID_CLASSPATH if set if [ -n "$QPID_CLASSPATH" ]; then export CLASSPATH=$QPID_CLASSPATH - echo "Using QPID_CLASSPATH" $QPID_CLASSPATH + log $INFO "Using QPID_CLASSPATH" $QPID_CLASSPATH else - echo "Warning: Qpid classpath not set. CLASSPATH must include qpid jars." + log $WARN "Warning: Qpid classpath not set. CLASSPATH must include qpid jars." fi #Use QPID_JAVA_GC if set if [ -n "$QPID_JAVA_GC" ]; then export JAVA_GC=$QPID_JAVA_GC - echo "Using QPID_JAVA_GC setting" $QPID_JAVA_GC + log $INFO "Using QPID_JAVA_GC setting" $QPID_JAVA_GC else - echo "Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC" $JAVA_GC + log $INFO "Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC" $JAVA_GC fi #Use QPID_JAVA_MEM if set if [ -n "$QPID_JAVA_MEM" ]; then export JAVA_MEM=$QPID_JAVA_MEM - echo "Using QPID_JAVA_MEM setting" $QPID_JAVA_MEM + log $INFO "Using QPID_JAVA_MEM setting" $QPID_JAVA_MEM else - echo "Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM" $JAVA_MEM + log $INFO "Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM" $JAVA_MEM fi declare -a RUN_ARGS JAVA_ARGS @@ -172,7 +192,7 @@ for arg in "${RUN_ARGS[@]}"; do ;; -run:jpda) #USAGE: adds debugging options to the java command, use -#USAGE: JDPA_TRANSPORT and JPDA_ADDRESS to customize the debugging +#USAGE: JPDA_TRANSPORT and JPDA_ADDRESS to customize the debugging #USAGE: behavior and use JPDA_OPTS to override it entirely if [ -z "$JPDA_OPTS" ]; then JPDA_OPTS="-Xdebug -Xrunjdwp:transport=${JPDA_TRANSPORT:-dt_socket},address=${JPDA_ADDRESS:-8000},server=y,suspend=n" diff --git a/java/common/codegen b/java/common/codegen index ab1ab1c542..6cd51565ea 100755 --- a/java/common/codegen +++ b/java/common/codegen @@ -7,7 +7,7 @@ from genutil import * out_dir = sys.argv[1] spec_file = sys.argv[2] tpl_dir = sys.argv[3] -pkg_dir = os.path.join(out_dir, "org/apache/qpidity/transport") +pkg_dir = os.path.join(out_dir, "org/apache/qpid/transport") if not os.path.exists(pkg_dir): os.makedirs(pkg_dir) diff --git a/java/common/genutil.py b/java/common/genutil.py index 9636a91cc3..f8f234548c 100644 --- a/java/common/genutil.py +++ b/java/common/genutil.py @@ -170,18 +170,15 @@ class Field: if self.type_node.name == "struct": self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname) self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name) - self.check = "" self.coder = "Struct" elif self.type_node.name == "domain": self.coder = camel(0, self.prim_type["@name"]) self.read = "%s.get(dec.read%s())" % (tname, self.coder) self.write = "enc.write%s(check(struct).%s.getValue())" % (self.coder, self.name) - self.check = "" else: self.coder = camel(0, self.type_node["@name"]) self.read = "dec.read%s()" % self.coder self.write = "enc.write%s(check(struct).%s)" % (self.coder, self.name) - self.check = "Validator.check%s(value);" % self.coder self.type = jtype(self.type_node) self.default = DEFAULTS.get(self.type, "null") self.has = camel(1, "has", self.name) @@ -206,7 +203,7 @@ def get_fields(nd): index += 1 return fields -def get_parameters(fields): +def get_parameters(type, fields): params = [] options = False for f in fields: @@ -214,11 +211,14 @@ def get_parameters(fields): options = True else: params.append("%s %s" % (f.type, f.name)) - if options: + if type["segments"]: + params.append("Header header") + params.append("ByteBuffer body") + if options or type.name in ("control", "command"): params.append("Option ... _options") return params -def get_arguments(fields): +def get_arguments(type, fields): args = [] options = False for f in fields: @@ -226,7 +226,10 @@ def get_arguments(fields): options = True else: args.append(f.name) - if options: + if type["segments"]: + args.append("header") + args.append("body") + if options or type.name in ("control", "command"): args.append("_options") return args diff --git a/java/common/pom.xml b/java/common/pom.xml index 714087d843..894ca26710 100644 --- a/java/common/pom.xml +++ b/java/common/pom.xml @@ -61,7 +61,7 @@ <!-- <exec executable="python"> <arg line="generate"/> <arg line="${generated.path}"/> - <arg line="org.apache.qpidity"/> + <arg line="org.apache.qpid"/> <arg line="${specs.dir}/amqp-transitional.0-10.xml"/> </exec> --> </tasks> diff --git a/java/common/src/main/java/log4j.properties b/java/common/src/main/java/log4j.properties index 6d596d1d19..44f89dc805 100644 --- a/java/common/src/main/java/log4j.properties +++ b/java/common/src/main/java/log4j.properties @@ -19,9 +19,12 @@ log4j.rootLogger=${root.logging.level} +log4j.logger.qpid.protocol=${amqj.protocol.logging.level}, console +log4j.additivity.qpid.protocol=false log4j.logger.org.apache.qpid=${amqj.logging.level}, console log4j.additivity.org.apache.qpid=false + log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.Threshold=all log4j.appender.console.layout=org.apache.log4j.PatternLayout diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java index bed80d5954..0c311b6645 100644 --- a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java +++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java @@ -62,7 +62,6 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator private static final class FixedSizeByteBuffer extends ByteBuffer { private java.nio.ByteBuffer buf; - private int refCount = 1; private int mark = -1; @@ -70,36 +69,14 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator { this.buf = buf; buf.order( ByteOrder.BIG_ENDIAN ); - refCount = 1; } public synchronized void acquire() { - if( refCount <= 0 ) - { - throw new IllegalStateException( "Already released buffer." ); - } - - refCount ++; } public void release() { - synchronized( this ) - { - if( refCount <= 0 ) - { - refCount = 0; - throw new IllegalStateException( - "Already released buffer. You released the buffer too many times." ); - } - - refCount --; - if( refCount > 0) - { - return; - } - } } public java.nio.ByteBuffer buf() @@ -157,50 +134,12 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator { if( newCapacity > capacity() ) { - // Allocate a new buffer and transfer all settings to it. - int pos = position(); - int limit = limit(); - ByteOrder bo = order(); - - capacity0( newCapacity ); - buf.limit( limit ); - if( mark >= 0 ) - { - buf.position( mark ); - buf.mark(); - } - buf.position( pos ); - buf.order( bo ); + throw new IllegalArgumentException(); } return this; } - protected void capacity0( int requestedCapacity ) - { - int newCapacity = MINIMUM_CAPACITY; - while( newCapacity < requestedCapacity ) - { - newCapacity <<= 1; - } - - java.nio.ByteBuffer oldBuf = this.buf; - java.nio.ByteBuffer newBuf; - if( isDirect() ) - { - newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity ); - } - else - { - newBuf = java.nio.ByteBuffer.allocate( newCapacity ); - } - - newBuf.clear(); - oldBuf.clear(); - newBuf.put( oldBuf ); - this.buf = newBuf; - } - public boolean isAutoExpand() diff --git a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java index c515263317..4fd28c4eb5 100644 --- a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java +++ b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java @@ -31,7 +31,6 @@ import java.util.Iterator; * A default implementation of {@link org.apache.mina.common.IoFuture}. * * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 440259 $, $Date: 2006-09-05 14:01:47 +0900 (í™”, 05 9ì›” 2006) $ */ public class DefaultIoFuture implements IoFuture { diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index f6f596da95..ef9420ba87 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -54,6 +54,6 @@ public class AMQChannelException extends AMQException public AMQFrame getCloseFrame(int channel) { MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor)); - return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId)); + return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId)); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java index 6cdd57d6f2..fa69f7f91b 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -21,6 +21,10 @@ package org.apache.qpid; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; + import org.apache.qpid.protocol.AMQConstant; /** @@ -35,6 +39,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQConnectionFailureException extends AMQException { + Collection<Exception> _exceptions; + public AMQConnectionFailureException(String message, Throwable cause) { super(null, message, cause); @@ -44,4 +50,16 @@ public class AMQConnectionFailureException extends AMQException { super(errorCode, message, cause); } + + public AMQConnectionFailureException(String message, Collection<Exception> exceptions) + { + // Blah, I hate ? but java won't let super() be anything other than the first thing, sorry... + super (null, message, exceptions.isEmpty() ? null : exceptions.iterator().next()); + this._exceptions = exceptions; + } + + public Collection<Exception> getLinkedExceptions() + { + return _exceptions; + } } diff --git a/java/common/src/main/java/org/apache/qpidity/BrokerDetails.java b/java/common/src/main/java/org/apache/qpid/BrokerDetails.java index 29a4f5a9c0..63f67a7857 100644 --- a/java/common/src/main/java/org/apache/qpidity/BrokerDetails.java +++ b/java/common/src/main/java/org/apache/qpid/BrokerDetails.java @@ -15,7 +15,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.qpidity; +package org.apache.qpid; import java.util.Map; diff --git a/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java b/java/common/src/main/java/org/apache/qpid/BrokerDetailsImpl.java index 6de6055f1c..201d43e21f 100644 --- a/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java +++ b/java/common/src/main/java/org/apache/qpid/BrokerDetailsImpl.java @@ -15,7 +15,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.qpidity; +package org.apache.qpid; import java.util.HashMap; import java.util.Map; diff --git a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java index 4e05aa574c..f17782ebf4 100644 --- a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java +++ b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java @@ -18,13 +18,13 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpid; import java.nio.ByteBuffer; -import org.apache.qpidity.transport.Sender; +import org.apache.qpid.transport.Sender; -import static org.apache.qpidity.transport.util.Functions.*; +import static org.apache.qpid.transport.util.Functions.*; /** @@ -41,6 +41,11 @@ public class ConsoleOutput implements Sender<ByteBuffer> System.out.println(str(buf)); } + public void flush() + { + // pass + } + public void close() { System.out.println("CLOSED"); diff --git a/java/common/src/main/java/org/apache/qpidity/ErrorCode.java b/java/common/src/main/java/org/apache/qpid/ErrorCode.java index 4b18c46d16..be1ad16dd5 100644 --- a/java/common/src/main/java/org/apache/qpidity/ErrorCode.java +++ b/java/common/src/main/java/org/apache/qpid/ErrorCode.java @@ -1,4 +1,4 @@ -package org.apache.qpidity; +package org.apache.qpid; public enum ErrorCode { diff --git a/java/common/src/main/java/org/apache/qpidity/QpidConfig.java b/java/common/src/main/java/org/apache/qpid/QpidConfig.java index b5aad12f10..e8d42fdf83 100644 --- a/java/common/src/main/java/org/apache/qpidity/QpidConfig.java +++ b/java/common/src/main/java/org/apache/qpid/QpidConfig.java @@ -1,4 +1,4 @@ -package org.apache.qpidity; +package org.apache.qpid; /** * API to configure the Security parameters of the client. @@ -11,11 +11,11 @@ public class QpidConfig private static QpidConfig _instance = new QpidConfig(); private SecurityMechanism[] securityMechanisms = - new SecurityMechanism[]{new SecurityMechanism("PLAIN","org.apache.qpidity.security.UsernamePasswordCallbackHandler"), - new SecurityMechanism("CRAM_MD5","org.apache.qpidity.security.UsernamePasswordCallbackHandler")}; + new SecurityMechanism[]{new SecurityMechanism("PLAIN","org.apache.qpid.security.UsernamePasswordCallbackHandler"), + new SecurityMechanism("CRAM_MD5","org.apache.qpid.security.UsernamePasswordCallbackHandler")}; private SaslClientFactory[] saslClientFactories = - new SaslClientFactory[]{new SaslClientFactory("AMQPLAIN","org.apache.qpidity.security.amqplain.AmqPlainSaslClientFactory")}; + new SaslClientFactory[]{new SaslClientFactory("AMQPLAIN","org.apache.qpid.security.amqplain.AmqPlainSaslClientFactory")}; private QpidConfig(){} diff --git a/java/common/src/main/java/org/apache/qpidity/QpidException.java b/java/common/src/main/java/org/apache/qpid/QpidException.java index 81e9145282..8503adaef8 100644 --- a/java/common/src/main/java/org/apache/qpidity/QpidException.java +++ b/java/common/src/main/java/org/apache/qpid/QpidException.java @@ -15,7 +15,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.qpidity; +package org.apache.qpid; public class QpidException extends Exception { diff --git a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java b/java/common/src/main/java/org/apache/qpid/SecurityHelper.java index a72997813a..dda5a6506d 100644 --- a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java +++ b/java/common/src/main/java/org/apache/qpid/SecurityHelper.java @@ -18,15 +18,15 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpid; import java.io.UnsupportedEncodingException; import java.util.HashSet; import java.util.List; import java.util.StringTokenizer; -import org.apache.qpidity.security.AMQPCallbackHandler; -import org.apache.qpidity.security.CallbackHandlerRegistry; +import org.apache.qpid.security.AMQPCallbackHandler; +import org.apache.qpid.security.CallbackHandlerRegistry; public class SecurityHelper { diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpid/ToyBroker.java index 5f9917e30a..83d434b20a 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpid/ToyBroker.java @@ -18,12 +18,12 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpid; -import org.apache.qpidity.transport.*; -import org.apache.qpidity.transport.network.mina.MinaHandler; +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.mina.MinaHandler; -import static org.apache.qpidity.transport.util.Functions.str; +import static org.apache.qpid.transport.util.Functions.str; import java.io.IOException; import java.nio.ByteBuffer; @@ -45,10 +45,6 @@ class ToyBroker extends SessionDelegate { private ToyExchange exchange; - private MessageTransfer xfr = null; - private DeliveryProperties props = null; - private Header header = null; - private List<Data> body = null; private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>(); public ToyBroker(ToyExchange exchange) @@ -103,22 +99,10 @@ class ToyBroker extends SessionDelegate @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { - this.xfr = xfr; - body = new ArrayList<Data>(); - System.out.println("received transfer " + xfr.getDestination()); - } - - @Override public void header(Session ssn, Header header) - { - if (xfr == null || body == null) - { - ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, - "no method segment"); - ssn.close(); - return; - } - - props = header.get(DeliveryProperties.class); + String dest = xfr.getDestination(); + System.out.println("received transfer " + dest); + Header header = xfr.getHeader(); + DeliveryProperties props = header.get(DeliveryProperties.class); if (props != null) { System.out.println("received headers routing_key " + props.getRoutingKey()); @@ -130,70 +114,31 @@ class ToyBroker extends SessionDelegate System.out.println(mp.getApplicationHeaders()); } - this.header = header; - } - - @Override public void data(Session ssn, Data data) - { - if (xfr == null || body == null) + if (exchange.route(dest,props.getRoutingKey(),xfr)) { - ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment"); - ssn.close(); - return; + System.out.println("queued " + xfr); + dispatchMessages(ssn); } - - body.add(data); - - if (data.isLast()) + else { - String dest = xfr.getDestination(); - Message m = new Message(header, body); - if (exchange.route(dest,props.getRoutingKey(),m)) - { - System.out.println("queued " + m); - dispatchMessages(ssn); - } - else + if (props == null || !props.getDiscardUnroutable()) { - - reject(ssn); + RangeSet ranges = new RangeSet(); + ranges.add(xfr.getId()); + ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, + "no such destination"); } - ssn.processed(xfr); - xfr = null; - body = null; - } - } - - private void reject(Session ssn) - { - if (props != null && props.getDiscardUnroutable()) - { - return; - } - else - { - RangeSet ranges = new RangeSet(); - ranges.add(xfr.getId()); - ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, - "no such destination"); } + ssn.processed(xfr); } - private void transferMessageToPeer(Session ssn,String dest, Message m) + private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); - ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED); - ssn.header(m.header); - for (Data d : m.body) - { - for (ByteBuffer b : d.getFragments()) - { - ssn.data(b); - } - } - ssn.endData(); + ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + m.getHeader(), m.getBody()); } private void dispatchMessages(Session ssn) @@ -207,8 +152,8 @@ class ToyBroker extends SessionDelegate private void checkAndSendMessagesToConsumer(Session ssn,String dest) { Consumer c = consumers.get(dest); - LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName); - Message m = queue.poll(); + LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName); + MessageTransfer m = queue.poll(); while (m != null && c._credit>0) { transferMessageToPeer(ssn,dest,m); @@ -217,46 +162,6 @@ class ToyBroker extends SessionDelegate } } - class Message - { - private final Header header; - private final List<Data> body; - - public Message(Header header, List<Data> body) - { - this.header = header; - this.body = body; - } - - public String toString() - { - StringBuilder sb = new StringBuilder(); - - if (header != null) - { - boolean first = true; - for (Struct st : header.getStructs()) - { - if (first) { first = false; } - else { sb.append(" "); } - sb.append(st); - } - } - - for (Data d : body) - { - for (ByteBuffer b : d.getFragments()) - { - sb.append(" | "); - sb.append(str(b)); - } - } - - return sb.toString(); - } - - } - // ugly, but who cares :) // assumes unit is always no of messages, not bytes // assumes it's credit mode and not window diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpid/ToyClient.java index a3233afcbe..cb10859c9f 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpid/ToyClient.java @@ -18,12 +18,13 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpid; +import java.nio.*; import java.util.*; -import org.apache.qpidity.transport.*; -import org.apache.qpidity.transport.network.mina.MinaHandler; +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.mina.MinaHandler; /** @@ -47,17 +48,9 @@ class ToyClient extends SessionDelegate } } - @Override public void header(Session ssn, Header header) + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { - for (Struct st : header.getStructs()) - { - System.out.println("header: " + st); - } - } - - @Override public void data(Session ssn, Data data) - { - System.out.println("got data: " + data); + System.out.println("msg: " + xfr); } public static final void main(String[] args) @@ -75,9 +68,8 @@ class ToyClient extends SessionDelegate } public void closed() {} }); - conn.send(new ConnectionEvent(0, new ProtocolHeader(1, - TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor()))); + conn.send(new ProtocolHeader + (1, 0, 10)); Channel ch = conn.getChannel(0); Session ssn = new Session("my-session".getBytes()); @@ -112,16 +104,16 @@ class ToyClient extends SessionDelegate map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED); - ssn.header(new DeliveryProperties(), - new MessageProperties().setApplicationHeaders(map)); - ssn.data("this is the data"); - ssn.endData(); + MessageAcquireMode.PRE_ACQUIRED, + new Header(new DeliveryProperties(), + new MessageProperties() + .setApplicationHeaders(map)), + "this is the data"); ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED); - ssn.data("this should be rejected"); - ssn.endData(); + MessageAcquireMode.PRE_ACQUIRED, + null, + "this should be rejected"); ssn.sync(); Future<QueueQueryResult> future = ssn.queueQuery("asdf"); diff --git a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java b/java/common/src/main/java/org/apache/qpid/ToyExchange.java index eab5f6c078..c638679596 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyExchange.java +++ b/java/common/src/main/java/org/apache/qpid/ToyExchange.java @@ -1,4 +1,4 @@ -package org.apache.qpidity; +package org.apache.qpid; import java.util.ArrayList; import java.util.HashMap; @@ -9,42 +9,43 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.qpidity.ToyBroker.Message; +import org.apache.qpid.transport.MessageTransfer; + public class ToyExchange { final static String DIRECT = "amq.direct"; final static String TOPIC = "amq.topic"; - private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>(); - private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>(); - private Map<String,LinkedBlockingQueue<Message>> queues = new HashMap<String,LinkedBlockingQueue<Message>>(); + private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); + private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); + private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>(); public void createQueue(String name) { - queues.put(name, new LinkedBlockingQueue<Message>()); + queues.put(name, new LinkedBlockingQueue<MessageTransfer>()); } - public LinkedBlockingQueue<Message> getQueue(String name) + public LinkedBlockingQueue<MessageTransfer> getQueue(String name) { return queues.get(name); } public void bindQueue(String type,String binding,String queueName) { - LinkedBlockingQueue<Message> queue = queues.get(queueName); + LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName); binding = normalizeKey(binding); if(DIRECT.equals(type)) { if (directEx.containsKey(binding)) { - List<LinkedBlockingQueue<Message>> list = directEx.get(binding); + List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding); list.add(queue); } else { - List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>(); + List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>(); list.add(queue); directEx.put(binding,list); } @@ -53,21 +54,21 @@ public class ToyExchange { if (topicEx.containsKey(binding)) { - List<LinkedBlockingQueue<Message>> list = topicEx.get(binding); + List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding); list.add(queue); } else { - List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>(); + List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>(); list.add(queue); topicEx.put(binding,list); } } } - public boolean route(String dest,String routingKey,Message msg) + public boolean route(String dest, String routingKey, MessageTransfer msg) { - List<LinkedBlockingQueue<Message>> queues; + List<LinkedBlockingQueue<MessageTransfer>> queues; if(DIRECT.equals(dest)) { queues = directEx.get(routingKey); @@ -101,9 +102,9 @@ public class ToyExchange } } - private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey) + private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey) { - List<LinkedBlockingQueue<Message>> selected = new ArrayList<LinkedBlockingQueue<Message>>(); + List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>(); for(String key: topicEx.keySet()) { @@ -111,7 +112,7 @@ public class ToyExchange Matcher m = p.matcher(routingKey); if (m.find()) { - for(LinkedBlockingQueue<Message> queue : topicEx.get(key)) + for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key)) { selected.add(queue); } @@ -121,9 +122,9 @@ public class ToyExchange return selected; } - private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> selected) + private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected) { - for(LinkedBlockingQueue<Message> queue : selected) + for(LinkedBlockingQueue<MessageTransfer> queue : selected) { queue.offer(msg); } diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpid/api/Message.java index f5488fde52..df6f279026 100644 --- a/java/common/src/main/java/org/apache/qpidity/api/Message.java +++ b/java/common/src/main/java/org/apache/qpid/api/Message.java @@ -1,11 +1,11 @@ -package org.apache.qpidity.api; +package org.apache.qpid.api; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpidity.transport.MessageProperties; -import org.apache.qpidity.transport.DeliveryProperties; -import org.apache.qpidity.transport.Header; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; /* * Licensed to the Apache Software Foundation (ASF) under one diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java index 67f16e6a87..7371c12519 100644 --- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.common; +import org.apache.qpid.framing.AMQShortString; + /** * Specifies the available client property types that different clients can use to identify themselves with. * @@ -30,8 +32,21 @@ package org.apache.qpid.common; */ public enum ClientProperties { - instance, - product, - version, - platform + instance("instance"), + product("product"), + version("version"), + platform("platform"); + + private final AMQShortString _amqShortString; + + private ClientProperties(String name) + { + _amqShortString = new AMQShortString(name); + } + + + public AMQShortString toAMQShortString() + { + return _amqShortString; + } } diff --git a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java b/java/common/src/main/java/org/apache/qpid/dtx/XidImpl.java index 89d7e7917f..49effc2dae 100644 --- a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java +++ b/java/common/src/main/java/org/apache/qpid/dtx/XidImpl.java @@ -15,11 +15,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.qpidity.dtx; +package org.apache.qpid.dtx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpidity.QpidException; +import org.apache.qpid.QpidException; import javax.transaction.xa.Xid; @@ -241,9 +241,9 @@ public class XidImpl implements Xid * @return The String representation of this Xid * @throws QpidException In case of problem when converting this Xid into a string. */ - public static org.apache.qpidity.transport.Xid convert(Xid xid) throws QpidException + public static org.apache.qpid.transport.Xid convert(Xid xid) throws QpidException { - return new org.apache.qpidity.transport.Xid(xid.getFormatId(), + return new org.apache.qpid.transport.Xid(xid.getFormatId(), xid.getGlobalTransactionId(), xid.getBranchQualifier()); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java index 903b5bfa7a..a2fc3a03ef 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -50,4 +50,14 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock return buffer; } + public java.nio.ByteBuffer toNioByteBuffer() + { + final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize()); + + ByteBuffer buf = ByteBuffer.wrap(buffer); + writePayload(buf); + buffer.flip(); + return buffer; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index a747aaeda7..a8e7f47db0 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -111,6 +111,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt private final byte[] _data; private final int _offset; private int _hashCode; + private String _asString = null; + private final int _length; private static final char[] EMPTY_CHAR_ARRAY = new char[0]; @@ -137,7 +139,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public AMQShortString(String data) { this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray()); - + _asString = data; } public AMQShortString(char[] data) @@ -224,7 +226,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt } } - /** * Get the length of the short string * @return length of the underlying byte array @@ -419,9 +420,14 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return chars; } + public String asString() { - return new String(asChars()); + if (_asString == null) + { + _asString = new String(asChars()); + } + return _asString; } public boolean equals(Object o) @@ -464,13 +470,49 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return false; } - if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode)) + final int hashCode = _hashCode; + + final int otherHashCode = otherString._hashCode; + + if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode)) + { + return false; + } + + final int length = _length; + + if(length != otherString._length) { return false; } - return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data)) - || Arrays.equals(getBytes(),otherString.getBytes()); + + final byte[] data = _data; + + final byte[] otherData = otherString._data; + + final int offset = _offset; + + final int otherOffset = otherString._offset; + + if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length) + { + return Arrays.equals(data, otherData); + } + else + { + int thisIdx = offset; + int otherIdx = otherOffset; + for(int i = length; i-- != 0; ) + { + if(!(data[thisIdx++] == otherData[otherIdx++])) + { + return false; + } + } + } + + return true; } @@ -718,4 +760,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return false; //To change body of created methods use File | Settings | File Templates. } + + public static void main(String args[]) + { + AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k"); + AMQShortString s2 = s.substring(2, 7); + + AMQShortStringTokenizer t = s2.tokenize((byte) '.'); + while(t.hasMoreTokens()) + { + System.err.println(t.nextToken()); + } + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index d6359baa0f..1ff39ca790 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -93,4 +93,24 @@ public class AMQTypedValue { return "[" + getType() + ": " + getValue() + "]"; } + + + public boolean equals(Object o) + { + if(o instanceof AMQTypedValue) + { + AMQTypedValue other = (AMQTypedValue) o; + return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value)); + } + else + { + return false; + } + } + + public int hashCode() + { + return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode()); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 9ba9b53b13..ed01c91804 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -74,7 +74,7 @@ public class FieldTable buffer.skip((int) length); } - private AMQTypedValue getProperty(AMQShortString string) + public AMQTypedValue getProperty(AMQShortString string) { checkPropertyName(string); @@ -891,6 +891,20 @@ public class FieldTable return keys; } + public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator() + { + if(_encodedForm != null) + { + return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize); + } + else + { + initMapIfNecessary(); + return _properties.entrySet().iterator(); + } + } + + public Object get(AMQShortString key) { @@ -1050,6 +1064,95 @@ public class FieldTable } } + private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue> + { + private final AMQTypedValue _value; + private final AMQShortString _key; + + public FieldTableEntry(final AMQShortString key, final AMQTypedValue value) + { + _key = key; + _value = value; + } + + public AMQShortString getKey() + { + return _key; + } + + public AMQTypedValue getValue() + { + return _value; + } + + public AMQTypedValue setValue(final AMQTypedValue value) + { + throw new UnsupportedOperationException(); + } + + public boolean equals(Object o) + { + if(o instanceof FieldTableEntry) + { + FieldTableEntry other = (FieldTableEntry) o; + return (_key == null ? other._key == null : _key.equals(other._key)) + && (_value == null ? other._value == null : _value.equals(other._value)); + } + else + { + return false; + } + } + + public int hashCode() + { + return (getKey()==null ? 0 : getKey().hashCode()) + ^ (getValue()==null ? 0 : getValue().hashCode()); + } + + } + + + private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>> + { + + private final ByteBuffer _buffer; + private int _expectedRemaining; + + public FieldTableIterator(ByteBuffer buffer, int length) + { + _buffer = buffer; + _expectedRemaining = buffer.remaining() - length; + } + + public boolean hasNext() + { + return (_buffer.remaining() > _expectedRemaining); + } + + public Map.Entry<AMQShortString, AMQTypedValue> next() + { + if(hasNext()) + { + final AMQShortString key = EncodingUtils.readAMQShortString(_buffer); + AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer); + return new FieldTableEntry(key, value); + } + else + { + return null; + } + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } + + + + public int hashCode() { initMapIfNecessary(); diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index b2a09ac592..00da005515 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -50,7 +50,7 @@ import org.apache.mina.common.IoSession; * * @todo For better re-usability could make the completion handler optional. Only run it when one is set. */ -public class Job implements Runnable +public class Job implements ReadWriteRunnable { /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; @@ -67,18 +67,22 @@ public class Job implements Runnable /** Holds the completion continuation, called upon completion of a run of the job. */ private final JobCompletionHandler _completionHandler; + private final boolean _readJob; + /** * Creates a new job that aggregates many continuations together. * * @param session The Mina session. * @param completionHandler The per job run, terminal continuation. * @param maxEvents The maximum number of aggregated continuations to process per run of the job. + * @param readJob */ - Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) + Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) { _session = session; _completionHandler = completionHandler; _maxEvents = maxEvents; + _readJob = readJob; } /** @@ -157,6 +161,22 @@ public class Job implements Runnable } } + public boolean isReadJob() + { + return _readJob; + } + + public boolean isRead() + { + return _readJob; + } + + public boolean isWrite() + { + return !_readJob; + } + + /** * Another interface for a continuation. * diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 2912e54662..a080cc7e04 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -60,24 +60,6 @@ import java.util.concurrent.ExecutorService; * <td> {@link Job}, {@link Job.JobCompletionHandler} * </table> * - * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events. - * The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread - * pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads - * so there is concurrency. But why go to the trouble of seperating out the read and write events in that case? - * Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these - * stages. - * - * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in - * it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run - * and trips off another batch of 10 until they are all done. Why not just have a straight forward - * consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10 - * in them, just have one queue of events and worker threads taking the next event. There will be coordination - * between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same - * amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker - * pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be - * better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily - * be substituted in. - * * @todo The static helper methods are pointless. Could just call new. */ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler @@ -96,17 +78,20 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo private final int _maxEvents; + private final boolean _readFilter; + /** * Creates a named pooling filter, on the specified shared thread pool. * * @param refCountingPool The thread pool reference. * @param name The identifying name of the filter type. */ - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents) + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter) { _poolReference = refCountingPool; _name = name; _maxEvents = maxEvents; + _readFilter = readFilter; } /** @@ -167,7 +152,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo void fireAsynchEvent(Job job, Event event) { - // job.acquire(); //prevents this job being removed from _jobs job.add(event); final ExecutorService pool = _poolReference.getPool(); @@ -201,7 +185,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void createNewJobForSession(IoSession session) { - Job job = new Job(session, this, MAX_JOB_EVENTS); + Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); session.setAttribute(_name, job); } @@ -433,7 +417,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS)); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true); } /** @@ -476,7 +460,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS)); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false); } /** diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java new file mode 100644 index 0000000000..8de0f93ce9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java @@ -0,0 +1,432 @@ +package org.apache.qpid.pool; + +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.atomic.AtomicInteger; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> +{ + + private final AtomicInteger _count = new AtomicInteger(0); + + private final ReentrantLock _takeLock = new ReentrantLock(); + + private final Condition _notEmpty = _takeLock.newCondition(); + + private final ReentrantLock _putLock = new ReentrantLock(); + + private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>(); + + private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>(); + + + private class ReadWriteJobIterator implements Iterator<Runnable> + { + + private boolean _onReads; + private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator(); + + public boolean hasNext() + { + if(!_iter.hasNext()) + { + if(_onReads) + { + _iter = _readJobQueue.iterator(); + _onReads = true; + return _iter.hasNext(); + } + else + { + return false; + } + } + else + { + return true; + } + } + + public Runnable next() + { + if(_iter.hasNext()) + { + return _iter.next(); + } + else + { + return null; + } + } + + public void remove() + { + _takeLock.lock(); + try + { + _iter.remove(); + _count.decrementAndGet(); + } + finally + { + _takeLock.unlock(); + } + } + } + + public Iterator<Runnable> iterator() + { + return new ReadWriteJobIterator(); + } + + public int size() + { + return _count.get(); + } + + public boolean offer(final Runnable runnable) + { + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + try + { + if(job.isRead()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + return true; + } + finally + { + putLock.unlock(); + } + } + + public void put(final Runnable runnable) throws InterruptedException + { + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + + try + { + if(job.isRead()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + + } + finally + { + putLock.unlock(); + } + } + + + + public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException + { + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + + try + { + if(job.isRead()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + + return true; + } + finally + { + putLock.unlock(); + } + + } + + public Runnable take() throws InterruptedException + { + final ReentrantLock takeLock = _takeLock; + takeLock.lockInterruptibly(); + try + { + try + { + while (_count.get() == 0) + { + _notEmpty.await(); + } + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + + ReadWriteRunnable job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + int c = _count.getAndDecrement(); + if (c > 1) + { + _notEmpty.signal(); + } + return job; + } + finally + { + takeLock.unlock(); + } + + + } + + public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException + { + final ReentrantLock takeLock = _takeLock; + final AtomicInteger count = _count; + long nanos = unit.toNanos(timeout); + takeLock.lockInterruptibly(); + ReadWriteRunnable job = null; + try + { + + for (;;) + { + if (count.get() > 0) + { + job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + int c = count.getAndDecrement(); + if (c > 1) + { + _notEmpty.signal(); + } + break; + } + if (nanos <= 0) + { + return null; + } + try + { + nanos = _notEmpty.awaitNanos(nanos); + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + } + } + finally + { + takeLock.unlock(); + } + + return job; + } + + public int remainingCapacity() + { + return Integer.MAX_VALUE; + } + + public int drainTo(final Collection<? super Runnable> c) + { + int total = 0; + + _putLock.lock(); + _takeLock.lock(); + try + { + ReadWriteRunnable job; + while((job = _writeJobQueue.peek())!= null) + { + c.add(job); + _writeJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + while((job = _readJobQueue.peek())!= null) + { + c.add(job); + _readJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + } + finally + { + _takeLock.unlock(); + _putLock.unlock(); + } + return total; + } + + public int drainTo(final Collection<? super Runnable> c, final int maxElements) + { + int total = 0; + + _putLock.lock(); + _takeLock.lock(); + try + { + ReadWriteRunnable job; + while(total<=maxElements && (job = _writeJobQueue.peek())!= null) + { + c.add(job); + _writeJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + while(total<=maxElements && (job = _readJobQueue.peek())!= null) + { + c.add(job); + _readJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + } + finally + { + _takeLock.unlock(); + _putLock.unlock(); + } + return total; + + } + + public Runnable poll() + { + final ReentrantLock takeLock = _takeLock; + takeLock.lock(); + try + { + if(_count.get() > 0) + { + ReadWriteRunnable job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + _count.decrementAndGet(); + return job; + } + else + { + return null; + } + } + finally + { + takeLock.unlock(); + } + + } + + public Runnable peek() + { + final ReentrantLock takeLock = _takeLock; + takeLock.lock(); + try + { + ReadWriteRunnable job = _writeJobQueue.peek(); + if(job == null) + { + job = _readJobQueue.peek(); + } + return job; + } + finally + { + takeLock.unlock(); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java new file mode 100644 index 0000000000..ad04a923e1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java @@ -0,0 +1,27 @@ +package org.apache.qpid.pool; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public interface ReadWriteRunnable extends Runnable +{ + boolean isRead(); + boolean isWrite(); +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 84c9e1f465..ce9c6ae4cb 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -22,6 +22,9 @@ package org.apache.qpid.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts @@ -84,6 +87,8 @@ public class ReferenceCountingExecutorService /** Holds the number of executor threads to create. */ private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); + /** * Retrieves the singleton instance of this reference counter. * @@ -105,15 +110,28 @@ public class ReferenceCountingExecutorService * * @return An executor service. */ - ExecutorService acquireExecutorService() + public ExecutorService acquireExecutorService() { synchronized (_lock) { if (_refCount++ == 0) { - _pool = Executors.newFixedThreadPool(_poolSize); +// _pool = Executors.newFixedThreadPool(_poolSize); + + // Use a job queue that biases to writes + if(_useBiasedPool) + { + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new ReadWriteJobQueue()); + } + else + { + _pool = Executors.newFixedThreadPool(_poolSize); + } } + return _pool; } } @@ -122,7 +140,7 @@ public class ReferenceCountingExecutorService * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls * to zero, the executor service is shut down. */ - void releaseExecutorService() + public void releaseExecutorService() { synchronized (_lock) { diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 59003225b7..b58e7d01dc 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -21,8 +21,12 @@ package org.apache.qpid.protocol; import org.apache.qpid.framing.*; +import org.apache.qpid.transport.Sender; import org.apache.qpid.AMQException; +import java.nio.ByteBuffer; + + /** * AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to * callers of the version of the AMQP protocol that they are able to work with. @@ -54,4 +58,7 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException; + public void setSender(Sender<ByteBuffer> sender); + public void init(); + } diff --git a/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java b/java/common/src/main/java/org/apache/qpid/security/AMQPCallbackHandler.java index 2e7afa1b87..a3dad9acdc 100644 --- a/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java +++ b/java/common/src/main/java/org/apache/qpid/security/AMQPCallbackHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.security; +package org.apache.qpid.security; import javax.security.auth.callback.CallbackHandler; diff --git a/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java b/java/common/src/main/java/org/apache/qpid/security/CallbackHandlerRegistry.java index b23fea7e4a..8c80a1b5b7 100644 --- a/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/security/CallbackHandlerRegistry.java @@ -18,12 +18,12 @@ * under the License. * */ -package org.apache.qpidity.security; +package org.apache.qpid.security; import java.util.HashMap; import java.util.Map; -import org.apache.qpidity.QpidConfig; +import org.apache.qpid.QpidConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java b/java/common/src/main/java/org/apache/qpid/security/DynamicSaslRegistrar.java index 1798f0c210..9f48ac96a3 100644 --- a/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java +++ b/java/common/src/main/java/org/apache/qpid/security/DynamicSaslRegistrar.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.security; +package org.apache.qpid.security; import java.security.Security; import java.util.Map; @@ -26,7 +26,7 @@ import java.util.TreeMap; import javax.security.sasl.SaslClientFactory; -import org.apache.qpidity.QpidConfig; +import org.apache.qpid.QpidConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java b/java/common/src/main/java/org/apache/qpid/security/JCAProvider.java index c775171a5f..033deb550c 100644 --- a/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java +++ b/java/common/src/main/java/org/apache/qpid/security/JCAProvider.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.security; +package org.apache.qpid.security; import java.security.Provider; import java.security.Security; diff --git a/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java b/java/common/src/main/java/org/apache/qpid/security/UsernamePasswordCallbackHandler.java index 0fd647e015..89a63abeab 100644 --- a/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java +++ b/java/common/src/main/java/org/apache/qpid/security/UsernamePasswordCallbackHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.security; +package org.apache.qpid.security; import java.io.IOException; diff --git a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java b/java/common/src/main/java/org/apache/qpid/security/amqplain/AmqPlainSaslClient.java index 6e4a0218d2..81acc66de4 100644 --- a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java +++ b/java/common/src/main/java/org/apache/qpid/security/amqplain/AmqPlainSaslClient.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.security.amqplain; +package org.apache.qpid.security.amqplain; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; diff --git a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java b/java/common/src/main/java/org/apache/qpid/security/amqplain/AmqPlainSaslClientFactory.java index abc881f433..6c66c87f4c 100644 --- a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java +++ b/java/common/src/main/java/org/apache/qpid/security/amqplain/AmqPlainSaslClientFactory.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.security.amqplain; +package org.apache.qpid.security.amqplain; import javax.security.sasl.SaslClientFactory; import javax.security.sasl.SaslClient; diff --git a/java/common/src/main/java/org/apache/qpid/transport/Binary.java b/java/common/src/main/java/org/apache/qpid/transport/Binary.java new file mode 100644 index 0000000000..e6dedc536f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/Binary.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +/** + * Binary + * + */ + +public final class Binary +{ + + private byte[] bytes; + private int offset; + private int size; + private int hash = 0; + + public Binary(byte[] bytes, int offset, int size) + { + if (offset + size > bytes.length) + { + throw new ArrayIndexOutOfBoundsException(); + } + + this.bytes = bytes; + this.offset = offset; + this.size = size; + } + + public Binary(byte[] bytes) + { + this(bytes, 0, bytes.length); + } + + public final byte[] array() + { + return bytes; + } + + public final int offset() + { + return offset; + } + + public final int size() + { + return size; + } + + public final Binary slice(int low, int high) + { + int sz; + + if (high < 0) + { + sz = size + high; + } + else + { + sz = high - low; + } + + if (sz < 0) + { + sz = 0; + } + + return new Binary(bytes, offset + low, sz); + } + + public final int hashCode() + { + if (hash == 0) + { + int hc = 0; + for (int i = 0; i < size; i++) + { + hc = 31*hc + (0xFF & bytes[offset + i]); + } + hash = hc; + } + + return hash; + } + + public final boolean equals(Object o) + { + if (!(o instanceof Binary)) + { + return false; + } + + Binary buf = (Binary) o; + if (this.size != buf.size) + { + return false; + } + + for (int i = 0; i < size; i++) + { + if (bytes[offset + i] != buf.bytes[buf.offset + i]) + { + return false; + } + } + + return true; + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Binding.java b/java/common/src/main/java/org/apache/qpid/transport/Binding.java index 18ed97098d..8418c42189 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Binding.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Binding.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; /** diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpid/transport/Channel.java index fb8918eb7b..d6b015930b 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Channel.java @@ -18,10 +18,10 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; -import org.apache.qpidity.transport.network.Frame; -import org.apache.qpidity.transport.util.Logger; +import org.apache.qpid.transport.network.Frame; +import org.apache.qpid.transport.util.Logger; import java.nio.ByteBuffer; @@ -30,8 +30,8 @@ import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.qpidity.transport.network.Frame.*; -import static org.apache.qpidity.transport.util.Functions.*; +import static org.apache.qpid.transport.network.Frame.*; +import static org.apache.qpid.transport.util.Functions.*; /** @@ -53,10 +53,6 @@ public class Channel extends Invoker // session may be null private Session session; - private Lock commandLock = new ReentrantLock(); - private boolean first = true; - private ByteBuffer data = null; - public Channel(Connection connection, int channel, SessionDelegate delegate) { this.connection = connection; @@ -104,16 +100,6 @@ public class Channel extends Invoker method.delegate(session, sessionDelegate); } - public void header(Void v, Header header) - { - header.delegate(session, sessionDelegate); - } - - public void data(Void v, Data data) - { - data.delegate(session, sessionDelegate); - } - public void error(Void v, ProtocolError error) { throw new RuntimeException(error.getMessage()); @@ -148,58 +134,28 @@ public class Channel extends Invoker this.session = session; } - private void emit(ProtocolEvent event) - { - connection.send(new ConnectionEvent(channel, event)); - } - - public void method(Method m) + void closeCode(ConnectionClose close) { - if (m.getEncodedTrack() == Frame.L4) - { - commandLock.lock(); - } - - emit(m); - - if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload()) + if (session != null) { - commandLock.unlock(); + session.closeCode(close); } } - public void header(Header header) + private void emit(ProtocolEvent event) { - emit(header); + event.setChannel(channel); + connection.send(event); } - public void data(ByteBuffer buf) + public void method(Method m) { - if (data != null) + emit(m); + + if (!m.isBatch()) { - emit(new Data(data, first, false)); - first = false; + connection.flush(); } - - data = buf; - } - - public void data(String str) - { - data(str.getBytes()); - } - - public void data(byte[] bytes) - { - data(ByteBuffer.wrap(bytes)); - } - - public void end() - { - emit(new Data(data, first, true)); - first = true; - data = null; - commandLock.unlock(); } protected void invoke(Method m) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java index 96578ffeb8..8475fbf174 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; import java.util.UUID; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 699854fb3b..bbdadfadb9 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; /** @@ -31,10 +31,9 @@ public abstract class ClientDelegate extends ConnectionDelegate public void init(Channel ch, ProtocolHeader hdr) { - if (hdr.getMajor() != TransportConstants.getVersionMajor() && - hdr.getMinor() != TransportConstants.getVersionMinor()) + if (hdr.getMajor() != 0 && hdr.getMinor() != 10) { - throw new RuntimeException("version missmatch: " + hdr); + throw new ProtocolVersionException(hdr.getMajor(), hdr.getMinor()); } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 9829343491..68b9b209bb 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -18,9 +18,9 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; -import org.apache.qpidity.transport.util.Logger; +import org.apache.qpid.transport.util.Logger; import java.util.ArrayList; import java.util.HashMap; @@ -40,14 +40,13 @@ import java.nio.ByteBuffer; * short instead of Short */ -// RA making this public until we sort out the package issues public class Connection - implements Receiver<ConnectionEvent>, Sender<ConnectionEvent> + implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> { private static final Logger log = Logger.get(Connection.class); - final private Sender<ConnectionEvent> sender; + final private Sender<ProtocolEvent> sender; final private ConnectionDelegate delegate; private int channelMax = 1; // want to make this final @@ -55,7 +54,7 @@ public class Connection final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); - public Connection(Sender<ConnectionEvent> sender, + public Connection(Sender<ProtocolEvent> sender, ConnectionDelegate delegate) { this.sender = sender; @@ -77,19 +76,25 @@ public class Connection return delegate; } - public void received(ConnectionEvent event) + public void received(ProtocolEvent event) { log.debug("RECV: [%s] %s", this, event); Channel channel = getChannel(event.getChannel()); - channel.received(event.getProtocolEvent()); + channel.received(event); } - public void send(ConnectionEvent event) + public void send(ProtocolEvent event) { log.debug("SEND: [%s] %s", this, event); sender.send(event); } + public void flush() + { + log.debug("FLUSH: [%s]", this); + sender.flush(); + } + public int getChannelMax() { return channelMax; @@ -143,6 +148,17 @@ public class Connection delegate.exception(t); } + void closeCode(ConnectionClose close) + { + synchronized (channels) + { + for (Channel ch : channels.values()) + { + ch.closeCode(close); + } + } + } + public void closed() { log.debug("connection closed: %s", this); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index 14344991c6..2aa1db7b28 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -18,12 +18,12 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; -import org.apache.qpidity.transport.util.Logger; +import org.apache.qpid.transport.util.Logger; -import org.apache.qpidity.SecurityHelper; -import org.apache.qpidity.QpidException; +import org.apache.qpid.SecurityHelper; +import org.apache.qpid.QpidException; import java.io.UnsupportedEncodingException; @@ -82,28 +82,12 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> public void init(Channel ch, ProtocolHeader hdr) { - ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader - (1, - TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor()))); - if (hdr.getMajor() != TransportConstants.getVersionMajor() && - hdr.getMinor() != TransportConstants.getVersionMinor()) - { - // XXX - ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader - (1, - TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor()))); - ch.getConnection().close(); - } - else - { - List<Object> plain = new ArrayList<Object>(); - plain.add("PLAIN"); - List<Object> utf8 = new ArrayList<Object>(); - utf8.add("utf8"); - ch.connectionStart(null, plain, utf8); - } + ch.getConnection().send(new ProtocolHeader (1, hdr.getMajor(), hdr.getMinor())); + List<Object> plain = new ArrayList<Object>(); + plain.add("PLAIN"); + List<Object> utf8 = new ArrayList<Object>(); + utf8.add("utf8"); + ch.connectionStart(null, plain, utf8); } // ---------------------------------------------- @@ -264,6 +248,12 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> context.connectionOpenOk(hosts); } + @Override public void connectionClose(Channel ch, ConnectionClose close) + { + ch.getConnection().closeCode(close); + ch.connectionCloseOk(); + } + public String getPassword() { return _password; @@ -294,8 +284,4 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> _virtualHost = host; } - public String getUnsupportedProtocol() - { - return null; - } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java new file mode 100644 index 0000000000..c3239ef684 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +/** + * ConnectionException + * + */ + +public class ConnectionException extends RuntimeException +{ + + private ConnectionClose close; + + public ConnectionException(ConnectionClose close) + { + super(close.getReplyText()); + this.close = close; + } + + public ConnectionClose getClose() + { + return close; + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java index 03d0d3e161..b2be32331a 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Echo.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -18,12 +18,13 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpidity.transport.network.mina.MinaHandler; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoAcceptor; /** @@ -34,31 +35,9 @@ import org.apache.qpidity.transport.network.mina.MinaHandler; public class Echo extends SessionDelegate { - private MessageTransfer xfr = null; - public void messageTransfer(Session ssn, MessageTransfer xfr) { - this.xfr = xfr; ssn.invoke(xfr); - } - - public void header(Session ssn, Header hdr) - { - ssn.header(hdr); - } - - public void data(Session ssn, Data data) - { - for (ByteBuffer buf : data.getFragments()) - { - ssn.data(buf); - } - if (data.isLast()) - { - ssn.endData(); - } - - // XXX: should be able to get command-id from any segment ssn.processed(xfr); } @@ -81,7 +60,9 @@ public class Echo extends SessionDelegate delegate.setUsername("guest"); delegate.setPassword("guest"); - MinaHandler.accept("0.0.0.0", 5672, delegate); + IoAcceptor ioa = new IoAcceptor + ("0.0.0.0", 5672, new ConnectionBinding(delegate)); + ioa.start(); } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Field.java b/java/common/src/main/java/org/apache/qpid/transport/Field.java index ebbd59288b..bc6bf10041 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Field.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Field.java @@ -18,10 +18,10 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; -import org.apache.qpidity.transport.codec.Decoder; -import org.apache.qpidity.transport.codec.Encoder; +import org.apache.qpid.transport.codec.Decoder; +import org.apache.qpid.transport.codec.Encoder; /** diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Future.java b/java/common/src/main/java/org/apache/qpid/transport/Future.java index 8936f06831..d8cde61af5 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Future.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Future.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; /** diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Header.java b/java/common/src/main/java/org/apache/qpid/transport/Header.java index ae11bb0c69..9439e5e0de 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Header.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Header.java @@ -18,11 +18,14 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; -import org.apache.qpidity.transport.network.Frame; +import org.apache.qpid.transport.network.Frame; +import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.LinkedHashMap; import java.nio.ByteBuffer; @@ -32,65 +35,43 @@ import java.nio.ByteBuffer; * @author Rafael H. Schloming */ -public class Header implements ProtocolEvent { +public class Header { - private final List<Struct> structs; - private ByteBuffer _buf; - private boolean _noPayload; + private final Struct[] structs; - public Header(List<Struct> structs, boolean lastframe) + public Header(List<Struct> structs) { - this.structs = structs; - _noPayload= lastframe; + this(structs.toArray(new Struct[structs.size()])); } - public List<Struct> getStructs() + public Header(Struct ... structs) { - return structs; + this.structs = structs; } - public void setBuf(ByteBuffer buf) + public Struct[] getStructs() { - _buf = buf; + return structs; } - public ByteBuffer getBuf() - { - return _buf; - } + public <T> T get(Class<T> klass) { for (Struct st : structs) { if (klass.isInstance(st)) { - return klass.cast(st); + return (T) st; } } return null; } - public byte getEncodedTrack() - { - return Frame.L4; - } - - public <C> void delegate(C context, ProtocolDelegate<C> delegate) - { - delegate.header(context, this); - } - - public boolean hasNoPayload() - { - return _noPayload; - } - - public String toString() { StringBuffer str = new StringBuffer(); - str.append("Header("); + str.append(" Header("); boolean first = true; for (Struct s : structs) { diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpid/transport/Method.java index f72ebd570c..6b99f6d5d3 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -18,9 +18,13 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; -import org.apache.qpidity.transport.network.Frame; +import org.apache.qpid.transport.network.Frame; + +import java.nio.ByteBuffer; + +import static org.apache.qpid.transport.util.Functions.*; /** * Method @@ -40,8 +44,10 @@ public abstract class Method extends Struct implements ProtocolEvent // XXX: command subclass? private int id; + private int channel; private boolean idSet = false; private boolean sync = false; + private boolean batch = false; public final int getId() { @@ -54,18 +60,58 @@ public abstract class Method extends Struct implements ProtocolEvent this.idSet = true; } + public final int getChannel() + { + return channel; + } + + public final void setChannel(int channel) + { + this.channel = channel; + } + public final boolean isSync() { return sync; } - void setSync(boolean value) + final void setSync(boolean value) { this.sync = value; } + public final boolean isBatch() + { + return batch; + } + + final void setBatch(boolean value) + { + this.batch = value; + } + public abstract boolean hasPayload(); + public Header getHeader() + { + return null; + } + + public void setHeader(Header header) + { + throw new UnsupportedOperationException(); + } + + public ByteBuffer getBody() + { + return null; + } + + public void setBody(ByteBuffer body) + { + throw new UnsupportedOperationException(); + } + public abstract byte getEncodedTrack(); public abstract <C> void dispatch(C context, MethodDelegate<C> delegate); @@ -84,33 +130,50 @@ public abstract class Method extends Struct implements ProtocolEvent public String toString() { - if (getEncodedTrack() != Frame.L4) - { - return super.toString(); - } - StringBuilder str = new StringBuilder(); - if (idSet) + str.append("ch="); + str.append(channel); + + if (getEncodedTrack() == Frame.L4 && idSet) { - str.append("id="); + str.append(" id="); str.append(id); } - if (sync) + if (sync || batch) { - if (str.length() > 0) + str.append(" "); + str.append("["); + if (sync) { - str.append(" "); + str.append("S"); } - str.append(" [sync]"); + if (batch) + { + str.append("B"); + } + str.append("]"); } - if (str.length() > 0) + str.append(" "); + str.append(super.toString()); + Header hdr = getHeader(); + if (hdr != null) { - str.append(" "); + for (Struct st : hdr.getStructs()) + { + str.append("\n "); + str.append(st); + } } - str.append(super.toString()); + ByteBuffer body = getBody(); + if (body != null) + { + str.append("\n body="); + str.append(str(body, 64)); + } + return str.toString(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java index 028d570416..a90948fc1d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; /** @@ -35,10 +35,6 @@ public interface ProtocolDelegate<C> void command(C context, Method command); - void header(C context, Header header); - - void data(C context, Data data); - void error(C context, ProtocolError error); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolError.java index 59fc3d2552..bd6ab81997 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolError.java @@ -18,10 +18,10 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; -import org.apache.qpidity.transport.network.NetworkDelegate; -import org.apache.qpidity.transport.network.NetworkEvent; +import org.apache.qpid.transport.network.NetworkDelegate; +import org.apache.qpid.transport.network.NetworkEvent; /** @@ -30,9 +30,10 @@ import org.apache.qpidity.transport.network.NetworkEvent; * @author Rafael H. Schloming */ -public class ProtocolError implements NetworkEvent, ProtocolEvent +public final class ProtocolError implements NetworkEvent, ProtocolEvent { + private int channel; private final byte track; private final String format; private final Object[] args; @@ -44,6 +45,16 @@ public class ProtocolError implements NetworkEvent, ProtocolEvent this.args = args; } + public int getChannel() + { + return channel; + } + + public void setChannel(int channel) + { + this.channel = channel; + } + public byte getEncodedTrack() { return track; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolEvent.java index 0b38dc6f28..60234c1537 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolEvent.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; /** @@ -29,6 +29,10 @@ package org.apache.qpidity.transport; public interface ProtocolEvent { + int getChannel(); + + void setChannel(int channel); + byte getEncodedTrack(); <C> void delegate(C context, ProtocolDelegate<C> delegate); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java index f9cd6f3947..fa0c1e9c63 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java @@ -18,13 +18,13 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; import java.nio.ByteBuffer; -import org.apache.qpidity.transport.network.NetworkDelegate; -import org.apache.qpidity.transport.network.NetworkEvent; -import org.apache.qpidity.transport.network.Frame; +import org.apache.qpid.transport.network.NetworkDelegate; +import org.apache.qpid.transport.network.NetworkEvent; +import org.apache.qpid.transport.network.Frame; /** @@ -33,9 +33,7 @@ import org.apache.qpidity.transport.network.Frame; * @author Rafael H. Schloming */ -//RA making this public until we sort out the package issues - -public class ProtocolHeader implements NetworkEvent, ProtocolEvent +public final class ProtocolHeader implements NetworkEvent, ProtocolEvent { private static final byte[] AMQP = {'A', 'M', 'Q', 'P' }; @@ -44,6 +42,7 @@ public class ProtocolHeader implements NetworkEvent, ProtocolEvent final private byte instance; final private byte major; final private byte minor; + private int channel; public ProtocolHeader(byte instance, byte major, byte minor) { @@ -72,6 +71,16 @@ public class ProtocolHeader implements NetworkEvent, ProtocolEvent return minor; } + public int getChannel() + { + return channel; + } + + public void setChannel(int channel) + { + this.channel = channel; + } + public byte getEncodedTrack() { return Frame.L1; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java index 62d8f4d99d..2de0c169a5 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java @@ -18,39 +18,35 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; /** - * ConnectionEvent + * ProtocolVersionException * */ -public class ConnectionEvent +public final class ProtocolVersionException extends TransportException { - private final int channel; - private final ProtocolEvent event; + private final byte major; + private final byte minor; - public ConnectionEvent(int channel, ProtocolEvent event) + public ProtocolVersionException(byte major, byte minor) { - this.channel = channel; - this.event = event; + super(String.format("version missmatch: %s-%s", major, minor)); + this.major = major; + this.minor = minor; } - public int getChannel() + public byte getMajor() { - return channel; + return this.major; } - public ProtocolEvent getProtocolEvent() + public byte getMinor() { - return event; - } - - public String toString() - { - return String.format("[%d] %s", channel, event); + return this.minor; } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Range.java b/java/common/src/main/java/org/apache/qpid/transport/Range.java index 14522cd45f..f4335dc8a6 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Range.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Range.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; import java.util.ArrayList; import java.util.List; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java index e05e399262..9b2744ee8b 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java +++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; import java.util.Iterator; import java.util.ListIterator; @@ -47,6 +47,11 @@ public final class RangeSet implements Iterable<Range> return ranges.iterator(); } + public Range getFirst() + { + return ranges.getFirst(); + } + public boolean includes(Range range) { for (Range r : this) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java b/java/common/src/main/java/org/apache/qpid/transport/Receiver.java index 8952ebf2a5..2a994580dc 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Receiver.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; /** diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Result.java b/java/common/src/main/java/org/apache/qpid/transport/Result.java index 2126a76a53..1116492a8d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Result.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Result.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; /** diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Sender.java b/java/common/src/main/java/org/apache/qpid/transport/Sender.java index 6da8358bd6..9a6f675d7f 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Sender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Sender.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; /** @@ -31,6 +31,8 @@ public interface Sender<T> void send(T msg); + void flush(); + void close(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 988ac4788f..10ca6cfb0a 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -18,12 +18,12 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; -import org.apache.qpidity.transport.network.Frame; +import org.apache.qpid.transport.network.Frame; -import org.apache.qpidity.transport.util.Logger; +import org.apache.qpid.transport.util.Logger; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -33,9 +33,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.qpidity.transport.Option.*; -import static org.apache.qpidity.transport.util.Functions.*; +import static org.apache.qpid.transport.Option.*; +import static org.apache.qpid.transport.util.Functions.*; import static org.apache.qpid.util.Serial.*; +import static org.apache.qpid.util.Strings.*; /** * Session @@ -75,7 +76,8 @@ public class Session extends Invoker // completed incoming commands private final Object processedLock = new Object(); private RangeSet processed = new RangeSet(); - private Range syncPoint = null; + private int maxProcessed = commandsIn - 1; + private int syncPoint = maxProcessed; // outgoing command count private int commandsOut = 0; @@ -127,10 +129,16 @@ public class Session extends Invoker { int id = nextCommandId(); cmd.setId(id); - log.debug("ID: [%s] %s", this.channel, id); - if ((id % 65536) == 0) + + if(log.isDebugEnabled()) + { + log.debug("ID: [%s] %s", this.channel, id); + } + + //if ((id % 65536) == 0) + if ((id & 0xff) == 0) { - flushProcessed(true); + flushProcessed(TIMELY_REPLY); } } @@ -158,7 +166,16 @@ public class Session extends Invoker synchronized (processedLock) { processed.add(range); - flush = syncPoint != null && processed.includes(syncPoint); + Range first = processed.getFirst(); + int lower = first.getLower(); + int upper = first.getUpper(); + int old = maxProcessed; + if (le(lower, maxProcessed + 1)) + { + maxProcessed = max(maxProcessed, upper); + } + flush = lt(old, syncPoint) && ge(maxProcessed, syncPoint); + syncPoint = maxProcessed; } if (flush) { @@ -166,19 +183,14 @@ public class Session extends Invoker } } - public void flushProcessed() - { - flushProcessed(false); - } - - private void flushProcessed(boolean timely_reply) + public void flushProcessed(Option ... options) { RangeSet copy; synchronized (processedLock) { copy = processed.copy(); } - sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION); + sessionCompleted(copy, options); } void knownComplete(RangeSet kc) @@ -204,15 +216,11 @@ public class Session extends Invoker { int id = getCommandsIn() - 1; log.debug("%s synced to %d", this, id); - Range range = new Range(0, id - 1); boolean flush; synchronized (processedLock) { - flush = processed.includes(range); - if (!flush) - { - syncPoint = range; - } + syncPoint = id; + flush = ge(maxProcessed, syncPoint); } if (flush) { @@ -236,7 +244,11 @@ public class Session extends Invoker boolean complete(int lower, int upper) { - log.debug("%s complete(%d, %d)", this, lower, upper); + //avoid autoboxing + if(log.isDebugEnabled()) + { + log.debug("%s complete(%d, %d)", this, lower, upper); + } synchronized (commands) { int old = maxComplete; @@ -254,8 +266,25 @@ public class Session extends Invoker } } - protected void invoke(Method m) + public void invoke(Method m) { + if (closed.get()) + { + List<ExecutionException> exc = getExceptions(); + if (!exc.isEmpty()) + { + throw new SessionException(exc); + } + else if (close != null) + { + throw new ConnectionException(close); + } + else + { + throw new SessionClosedException(); + } + } + if (m.getEncodedTrack() == Frame.L4) { synchronized (commands) @@ -276,7 +305,7 @@ public class Session extends Invoker } needSync = !m.isSync(); channel.method(m); - if (autoSync && !m.hasPayload()) + if (autoSync) { sync(); } @@ -295,50 +324,6 @@ public class Session extends Invoker } } - public void header(Header header) - { - channel.header(header); - } - - public Header header(List<Struct> structs) - { - Header res = new Header(structs, false); - header(res); - return res; - } - - public Header header(Struct ... structs) - { - return header(Arrays.asList(structs)); - } - - public void data(ByteBuffer buf) - { - channel.data(buf); - } - - public void data(String str) - { - channel.data(str); - } - - public void data(byte[] bytes) - { - channel.data(bytes); - } - - public void endData() - { - channel.end(); - synchronized (commands) - { - if (autoSync) - { - sync(); - } - } - } - public void sync() { sync(timeout); @@ -353,9 +338,7 @@ public class Session extends Invoker if (needSync && lt(maxComplete, point)) { - ExecutionSync sync = new ExecutionSync(); - sync.setSync(true); - invoke(sync); + executionSync(SYNC); } long start = System.currentTimeMillis(); @@ -413,6 +396,13 @@ public class Session extends Invoker } } + private ConnectionClose close = null; + + void closeCode(ConnectionClose close) + { + this.close = close; + } + List<ExecutionException> getExceptions() { synchronized (exceptions) @@ -508,6 +498,26 @@ public class Session extends Invoker } + public final void messageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + byte[] body, + Option ... _options) { + messageTransfer(destination, acceptMode, acquireMode, header, + ByteBuffer.wrap(body), _options); + } + + public final void messageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + String body, + Option ... _options) { + messageTransfer(destination, acceptMode, acquireMode, header, + toUTF8(body), _options); + } + public void close() { sessionRequestTimeout(0); diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java new file mode 100644 index 0000000000..d2c54cf339 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.util.Collections; + + +/** + * SessionClosedException + * + */ + +public class SessionClosedException extends SessionException +{ + + public SessionClosedException() + { + super(Collections.EMPTY_LIST); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 0e289c54e9..b91763509c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -18,9 +18,9 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; -import org.apache.qpidity.transport.network.Frame; +import org.apache.qpid.transport.network.Frame; /** @@ -48,10 +48,6 @@ public abstract class SessionDelegate } } - public void header(Session ssn, Header header) { } - - public void data(Session ssn, Data data) { } - public void error(Session ssn, ProtocolError error) { } @Override public void executionResult(Session ssn, ExecutionResult result) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/SessionException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java index fc866f3694..dc294b2206 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/SessionException.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; import java.util.List; diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/java/common/src/main/java/org/apache/qpid/transport/Sink.java new file mode 100644 index 0000000000..8653acedbe --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/Sink.java @@ -0,0 +1,137 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoAcceptor; + +/** + * Sink + * + */ + +public class Sink extends SessionDelegate +{ + + private static final String FORMAT_HDR = "%-12s %-18s %-18s %-18s"; + private static final String FORMAT_ROW = "SSN#%-8X %-18s %-18s %-18s"; + + private long interval = 100000; + private long start = System.currentTimeMillis(); + private long count = 0; + private long bytes = 0; + private long interval_start = start; + private long bytes_start = bytes; + private long time = start; + private int id = System.identityHashCode(this); + + public Sink() + { + } + + private double msg_rate() + { + return 1000 * (double) count / (double) (time - start); + } + + private double byte_rate() + { + return (1000 * (double) bytes / (double) (time - start)) / (1024*1024); + } + + private double msg_interval_rate() + { + return 1000 * (double) interval / (double) (time - interval_start); + } + + private double byte_interval_rate() + { + return (1000 * (double) (bytes - bytes_start) / (double) (time - interval_start)) / (1024*1024); + } + + private String rates() + { + return String.format("%.2f/%.2f", msg_rate(), byte_rate()); + } + + private String interval_rates() + { + return String.format("%.2f/%.2f", msg_interval_rate(), byte_interval_rate()); + } + + private String counts() + { + return String.format("%d/%.2f", count, ((double) bytes)/(1024*1024)); + } + + public void messageTransfer(Session ssn, MessageTransfer xfr) + { + count++; + bytes += xfr.getBody().remaining(); + if ((count % interval) == 0) + { + time = System.currentTimeMillis(); + System.out.println + (String.format + (FORMAT_ROW, id, counts(), rates(), interval_rates())); + interval_start = time; + bytes_start = bytes; + } + ssn.processed(xfr); + } + + public static final void main(String[] args) throws IOException + { + ConnectionDelegate delegate = new ConnectionDelegate() + { + + public SessionDelegate getSessionDelegate() + { + return new Sink(); + } + + public void exception(Throwable t) + { + t.printStackTrace(); + } + + public void closed() {} + }; + + //hack + delegate.setUsername("guest"); + delegate.setPassword("guest"); + + IoAcceptor ioa = new IoAcceptor + ("0.0.0.0", 5672, new ConnectionBinding(delegate)); + System.out.println + (String.format + (FORMAT_HDR, "Session", "Count/MBytes", "Cumulative Rate", "Interval Rate")); + System.out.println + (String.format + (FORMAT_HDR, "-------", "------------", "---------------", "-------------")); + ioa.start(); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java b/java/common/src/main/java/org/apache/qpid/transport/Struct.java index a15d0a1fb8..22bd9f34ad 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Struct.java @@ -18,14 +18,14 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; import java.util.List; import java.util.Map; -import org.apache.qpidity.transport.codec.Decoder; -import org.apache.qpidity.transport.codec.Encodable; -import org.apache.qpidity.transport.codec.Encoder; +import org.apache.qpid.transport.codec.Decoder; +import org.apache.qpid.transport.codec.Encodable; +import org.apache.qpid.transport.codec.Encoder; /** diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportException.java b/java/common/src/main/java/org/apache/qpid/transport/TransportException.java index 593209df82..5ef15154fc 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/TransportException.java +++ b/java/common/src/main/java/org/apache/qpid/transport/TransportException.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; /** diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java index ebfc6b120f..a8a4997ae7 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport.codec; +package org.apache.qpid.transport.codec; import java.io.UnsupportedEncodingException; @@ -29,11 +29,12 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.transport.RangeSet; -import org.apache.qpidity.transport.Struct; -import org.apache.qpidity.transport.Type; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.Type; -import static org.apache.qpidity.transport.util.Functions.*; +import static org.apache.qpid.transport.util.Functions.*; /** @@ -45,6 +46,14 @@ import static org.apache.qpidity.transport.util.Functions.*; abstract class AbstractDecoder implements Decoder { + private final Map<Binary,String> str8cache = new LinkedHashMap<Binary,String>() + { + @Override protected boolean removeEldestEntry(Map.Entry<Binary,String> me) + { + return size() > 4*1024; + } + }; + protected abstract byte doGet(); protected abstract void doGet(byte[] bytes); @@ -59,6 +68,13 @@ abstract class AbstractDecoder implements Decoder doGet(bytes); } + protected Binary get(int size) + { + byte[] bytes = new byte[size]; + get(bytes); + return new Binary(bytes); + } + protected short uget() { return (short) (0xFF & get()); @@ -105,11 +121,11 @@ abstract class AbstractDecoder implements Decoder return readUint64(); } - private static final String decode(byte[] bytes, String charset) + private static final String decode(byte[] bytes, int offset, int length, String charset) { try { - return new String(bytes, charset); + return new String(bytes, offset, length, charset); } catch (UnsupportedEncodingException e) { @@ -117,13 +133,22 @@ abstract class AbstractDecoder implements Decoder } } + private static final String decode(byte[] bytes, String charset) + { + return decode(bytes, 0, bytes.length, charset); + } public String readStr8() { short size = readUint8(); - byte[] bytes = new byte[size]; - get(bytes); - return decode(bytes, "UTF-8"); + Binary bin = get(size); + String str = str8cache.get(bin); + if (str == null) + { + str = decode(bin.array(), bin.offset(), bin.size(), "UTF-8"); + str8cache.put(bin, str); + } + return str; } public String readStr16() @@ -233,7 +258,19 @@ abstract class AbstractDecoder implements Decoder public Map<String,Object> readMap() { long size = readUint32(); + + if (size == 0) + { + return null; + } + long count = readUint32(); + + if (count == 0) + { + return Collections.EMPTY_MAP; + } + Map<String,Object> result = new LinkedHashMap(); for (int i = 0; i < count; i++) { @@ -243,13 +280,26 @@ abstract class AbstractDecoder implements Decoder Object value = read(t); result.put(key, value); } + return result; } public List<Object> readList() { long size = readUint32(); + + if (size == 0) + { + return null; + } + long count = readUint32(); + + if (count == 0) + { + return Collections.EMPTY_LIST; + } + List<Object> result = new ArrayList(); for (int i = 0; i < count; i++) { @@ -264,15 +314,21 @@ abstract class AbstractDecoder implements Decoder public List<Object> readArray() { long size = readUint32(); + if (size == 0) { - return Collections.EMPTY_LIST; + return null; } byte code = get(); Type t = getType(code); long count = readUint32(); + if (count == 0) + { + return Collections.EMPTY_LIST; + } + List<Object> result = new ArrayList<Object>(); for (int i = 0; i < count; i++) { diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java index aa90627943..908d14a307 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport.codec; +package org.apache.qpid.transport.codec; import java.io.UnsupportedEncodingException; @@ -26,16 +26,17 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.transport.Range; -import org.apache.qpidity.transport.RangeSet; -import org.apache.qpidity.transport.Struct; -import org.apache.qpidity.transport.Type; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.Type; -import static org.apache.qpidity.transport.util.Functions.*; +import static org.apache.qpid.transport.util.Functions.*; /** @@ -64,10 +65,13 @@ abstract class AbstractEncoder implements Encoder ENCODINGS.put(byte[].class, Type.VBIN32); } - protected Sizer sizer() + private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>() { - return new SizeEncoder(); - } + @Override protected boolean removeEldestEntry(Map.Entry<String,byte[]> me) + { + return size() > 4*1024; + } + }; protected abstract void doPut(byte b); @@ -88,6 +92,15 @@ abstract class AbstractEncoder implements Encoder put(ByteBuffer.wrap(bytes)); } + protected abstract int beginSize8(); + protected abstract void endSize8(int pos); + + protected abstract int beginSize16(); + protected abstract void endSize16(int pos); + + protected abstract int beginSize32(); + protected abstract void endSize32(int pos); + public void writeUint8(short b) { assert b < 0x100; @@ -132,23 +145,6 @@ abstract class AbstractEncoder implements Encoder writeUint64(l); } - private static final String checkLength(String s, int n) - { - if (s == null) - { - return ""; - } - - if (s.length() > n) - { - throw new IllegalArgumentException("string too long: " + s); - } - else - { - return s; - } - } - private static final byte[] encode(String s, String charset) { try @@ -163,16 +159,31 @@ abstract class AbstractEncoder implements Encoder public void writeStr8(String s) { - s = checkLength(s, 255); - writeUint8((short) s.length()); - put(ByteBuffer.wrap(encode(s, "UTF-8"))); + if (s == null) + { + s = ""; + } + + byte[] bytes = str8cache.get(s); + if (bytes == null) + { + bytes = encode(s, "UTF-8"); + str8cache.put(s, bytes); + } + writeUint8((short) bytes.length); + put(bytes); } public void writeStr16(String s) { - s = checkLength(s, 65535); - writeUint16(s.length()); - put(ByteBuffer.wrap(encode(s, "UTF-8"))); + if (s == null) + { + s = ""; + } + + byte[] bytes = encode(s, "UTF-8"); + writeUint16(bytes.length); + put(bytes); } public void writeVbin8(byte[] bytes) @@ -245,18 +256,10 @@ abstract class AbstractEncoder implements Encoder } int width = s.getSizeWidth(); + int pos = -1; if (width > 0) { - if (empty) - { - writeSize(width, 0); - } - else - { - Sizer sizer = sizer(); - s.write(sizer); - writeSize(width, sizer.size()); - } + pos = beginSize(width); } if (type > 0) @@ -265,6 +268,11 @@ abstract class AbstractEncoder implements Encoder } s.write(this); + + if (width > 0) + { + endSize(width, pos); + } } public void writeStruct32(Struct s) @@ -275,12 +283,10 @@ abstract class AbstractEncoder implements Encoder } else { - Sizer sizer = sizer(); - sizer.writeUint16(s.getEncodedType()); - s.write(sizer); - writeUint32(sizer.size()); + int pos = beginSize32(); writeUint16(s.getEncodedType()); s.write(this); + endSize32(pos); } } @@ -338,18 +344,13 @@ abstract class AbstractEncoder implements Encoder public void writeMap(Map<String,Object> map) { - if (map == null) + int pos = beginSize32(); + if (map != null) { - writeUint32(0); - return; + writeUint32(map.size()); + writeMapEntries(map); } - - Sizer sizer = sizer(); - sizer.writeMap(map); - // XXX: - 4 - writeUint32(sizer.size() - 4); - writeUint32(map.size()); - writeMapEntries(map); + endSize32(pos); } protected void writeMapEntries(Map<String,Object> map) @@ -367,12 +368,13 @@ abstract class AbstractEncoder implements Encoder public void writeList(List<Object> list) { - Sizer sizer = sizer(); - sizer.writeList(list); - // XXX: - 4 - writeUint32(sizer.size() - 4); - writeUint32(list.size()); - writeListEntries(list); + int pos = beginSize32(); + if (list != null) + { + writeUint32(list.size()); + writeListEntries(list); + } + endSize32(pos); } protected void writeListEntries(List<Object> list) @@ -387,16 +389,12 @@ abstract class AbstractEncoder implements Encoder public void writeArray(List<Object> array) { - if (array == null) + int pos = beginSize32(); + if (array != null) { - array = Collections.EMPTY_LIST; + writeArrayEntries(array); } - - Sizer sizer = sizer(); - sizer.writeArray(array); - // XXX: -4 - writeUint32(sizer.size() - 4); - writeArrayEntries(array); + endSize32(pos); } protected void writeArrayEntries(List<Object> array) @@ -458,6 +456,39 @@ abstract class AbstractEncoder implements Encoder } } + private int beginSize(int width) + { + switch (width) + { + case 1: + return beginSize8(); + case 2: + return beginSize16(); + case 4: + return beginSize32(); + default: + throw new IllegalStateException("illegal width: " + width); + } + } + + private void endSize(int width, int pos) + { + switch (width) + { + case 1: + endSize8(pos); + break; + case 2: + endSize16(pos); + break; + case 4: + endSize32(pos); + break; + default: + throw new IllegalStateException("illegal width: " + width); + } + } + private void writeBytes(Type t, byte[] bytes) { writeSize(t, bytes.length); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java index cf40cef8bf..dd634eb94a 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java @@ -18,11 +18,13 @@ * under the License. * */ -package org.apache.qpidity.transport.codec; +package org.apache.qpid.transport.codec; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpid.transport.Binary; + /** * BBDecoder @@ -33,9 +35,9 @@ import java.nio.ByteOrder; public final class BBDecoder extends AbstractDecoder { - private final ByteBuffer in; + private ByteBuffer in; - public BBDecoder(ByteBuffer in) + public void init(ByteBuffer in) { this.in = in; this.in.order(ByteOrder.BIG_ENDIAN); @@ -51,6 +53,21 @@ public final class BBDecoder extends AbstractDecoder in.get(bytes); } + protected Binary get(int size) + { + if (in.hasArray()) + { + byte[] bytes = in.array(); + Binary bin = new Binary(bytes, in.arrayOffset() + in.position(), size); + in.position(in.position() + size); + return bin; + } + else + { + return super.get(size); + } + } + public boolean hasRemaining() { return in.hasRemaining(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java new file mode 100644 index 0000000000..390de881ab --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java @@ -0,0 +1,232 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.codec; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + + +/** + * BBEncoder + * + * @author Rafael H. Schloming + */ + +public final class BBEncoder extends AbstractEncoder +{ + + private ByteBuffer out; + private int segment; + + public BBEncoder(int capacity) { + out = ByteBuffer.allocate(capacity); + out.order(ByteOrder.BIG_ENDIAN); + segment = 0; + } + + public void init() + { + out.clear(); + segment = 0; + } + + public ByteBuffer segment() + { + int pos = out.position(); + out.position(segment); + ByteBuffer slice = out.slice(); + slice.limit(pos - segment); + out.position(pos); + segment = pos; + return slice; + } + + private void grow(int size) + { + ByteBuffer old = out; + int capacity = old.capacity(); + out = ByteBuffer.allocate(Math.max(capacity + size, 2*capacity)); + out.order(ByteOrder.BIG_ENDIAN); + out.put(old); + } + + protected void doPut(byte b) + { + try + { + out.put(b); + } + catch (BufferOverflowException e) + { + grow(1); + out.put(b); + } + } + + protected void doPut(ByteBuffer src) + { + try + { + out.put(src); + } + catch (BufferOverflowException e) + { + grow(src.remaining()); + out.put(src); + } + } + + protected void put(byte[] bytes) + { + try + { + out.put(bytes); + } + catch (BufferOverflowException e) + { + grow(bytes.length); + out.put(bytes); + } + } + + public void writeUint8(short b) + { + assert b < 0x100; + + try + { + out.put((byte) b); + } + catch (BufferOverflowException e) + { + grow(1); + out.put((byte) b); + } + } + + public void writeUint16(int s) + { + assert s < 0x10000; + + try + { + out.putShort((short) s); + } + catch (BufferOverflowException e) + { + grow(2); + out.putShort((short) s); + } + } + + public void writeUint32(long i) + { + assert i < 0x100000000L; + + try + { + out.putInt((int) i); + } + catch (BufferOverflowException e) + { + grow(4); + out.putInt((int) i); + } + } + + public void writeUint64(long l) + { + try + { + out.putLong(l); + } + catch (BufferOverflowException e) + { + grow(8); + out.putLong(l); + } + } + + public int beginSize8() + { + int pos = out.position(); + try + { + out.put((byte) 0); + } + catch (BufferOverflowException e) + { + grow(1); + out.put((byte) 0); + } + return pos; + } + + public void endSize8(int pos) + { + int cur = out.position(); + out.put(pos, (byte) (cur - pos - 1)); + } + + public int beginSize16() + { + int pos = out.position(); + try + { + out.putShort((short) 0); + } + catch (BufferOverflowException e) + { + grow(2); + out.putShort((short) 0); + } + return pos; + } + + public void endSize16(int pos) + { + int cur = out.position(); + out.putShort(pos, (short) (cur - pos - 2)); + } + + public int beginSize32() + { + int pos = out.position(); + try + { + out.putInt(0); + } + catch (BufferOverflowException e) + { + grow(4); + out.putInt(0); + } + return pos; + } + + public void endSize32(int pos) + { + int cur = out.position(); + out.putInt(pos, (cur - pos - 4)); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java index dec901748d..50e787ccb2 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java @@ -18,14 +18,14 @@ * under the License. * */ -package org.apache.qpidity.transport.codec; +package org.apache.qpid.transport.codec; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.transport.RangeSet; -import org.apache.qpidity.transport.Struct; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; /** diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encodable.java b/java/common/src/main/java/org/apache/qpid/transport/codec/Encodable.java index 60c2ea97b8..2aefafd19c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encodable.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/Encodable.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport.codec; +package org.apache.qpid.transport.codec; /** diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java index 9d7a1a695d..2d8d13e80a 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java @@ -18,14 +18,14 @@ * under the License. * */ -package org.apache.qpidity.transport.codec; +package org.apache.qpid.transport.codec; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.transport.RangeSet; -import org.apache.qpidity.transport.Struct; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; /** diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java new file mode 100644 index 0000000000..33d552b91e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -0,0 +1,226 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.Decoder; + +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.SegmentType; +import org.apache.qpid.transport.Struct; + + +/** + * Assembler + * + */ + +public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate +{ + + private final Receiver<ProtocolEvent> receiver; + private final Map<Integer,List<Frame>> segments; + private final Method[] incomplete; + private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>() + { + public BBDecoder initialValue() + { + return new BBDecoder(); + } + }; + + public Assembler(Receiver<ProtocolEvent> receiver) + { + this.receiver = receiver; + segments = new HashMap<Integer,List<Frame>>(); + incomplete = new Method[64*1024]; + } + + private int segmentKey(Frame frame) + { + return (frame.getTrack() + 1) * frame.getChannel(); + } + + private List<Frame> getSegment(Frame frame) + { + return segments.get(segmentKey(frame)); + } + + private void setSegment(Frame frame, List<Frame> segment) + { + int key = segmentKey(frame); + if (segments.containsKey(key)) + { + error(new ProtocolError(Frame.L2, "segment in progress: %s", + frame)); + } + segments.put(segmentKey(frame), segment); + } + + private void clearSegment(Frame frame) + { + segments.remove(segmentKey(frame)); + } + + private void emit(int channel, ProtocolEvent event) + { + event.setChannel(channel); + receiver.received(event); + } + + public void received(NetworkEvent event) + { + event.delegate(this); + } + + public void exception(Throwable t) + { + this.receiver.exception(t); + } + + public void closed() + { + this.receiver.closed(); + } + + public void init(ProtocolHeader header) + { + emit(0, header); + } + + public void error(ProtocolError error) + { + emit(0, error); + } + + public void frame(Frame frame) + { + ByteBuffer segment; + if (frame.isFirstFrame() && frame.isLastFrame()) + { + segment = frame.getBody(); + assemble(frame, segment); + } + else + { + List<Frame> frames; + if (frame.isFirstFrame()) + { + frames = new ArrayList<Frame>(); + setSegment(frame, frames); + } + else + { + frames = getSegment(frame); + } + + frames.add(frame); + + if (frame.isLastFrame()) + { + clearSegment(frame); + + int size = 0; + for (Frame f : frames) + { + size += f.getSize(); + } + segment = ByteBuffer.allocate(size); + for (Frame f : frames) + { + segment.put(f.getBody()); + } + segment.flip(); + assemble(frame, segment); + } + } + + } + + private void assemble(Frame frame, ByteBuffer segment) + { + BBDecoder dec = decoder.get(); + dec.init(segment); + + int channel = frame.getChannel(); + Method command; + + switch (frame.getType()) + { + case CONTROL: + int controlType = dec.readUint16(); + Method control = Method.create(controlType); + control.read(dec); + emit(channel, control); + break; + case COMMAND: + int commandType = dec.readUint16(); + // read in the session header, right now we don't use it + dec.readUint16(); + command = Method.create(commandType); + command.read(dec); + if (command.hasPayload()) + { + incomplete[channel] = command; + } + else + { + emit(channel, command); + } + break; + case HEADER: + command = incomplete[channel]; + List<Struct> structs = new ArrayList(2); + while (dec.hasRemaining()) + { + structs.add(dec.readStruct32()); + } + command.setHeader(new Header(structs)); + if (frame.isLastSegment()) + { + incomplete[channel] = null; + emit(channel, command); + } + break; + case BODY: + command = incomplete[channel]; + command.setBody(segment); + incomplete[channel] = null; + emit(channel, command); + break; + default: + throw new IllegalStateException("unknown frame type: " + frame.getType()); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java index 2e7b41bf42..6886cb3a5a 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java @@ -18,62 +18,43 @@ * under the License. * */ -package org.apache.qpidity.transport.codec; +package org.apache.qpid.transport.network; import java.nio.ByteBuffer; -import java.nio.ByteOrder; +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; /** - * BBEncoder + * ConnectionBinding * - * @author Rafael H. Schloming */ -public final class BBEncoder extends AbstractEncoder +public class ConnectionBinding implements Binding<Connection,ByteBuffer> { - private final ByteBuffer out; + private static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - public BBEncoder(ByteBuffer out) { - this.out = out; - this.out.order(ByteOrder.BIG_ENDIAN); - } - - protected void doPut(byte b) - { - out.put(b); - } + private final ConnectionDelegate delegate; - protected void doPut(ByteBuffer src) + public ConnectionBinding(ConnectionDelegate delegate) { - out.put(src); + this.delegate = delegate; } - public void writeUint8(short b) + public Connection endpoint(Sender<ByteBuffer> sender) { - assert b < 0x100; - - out.put((byte) b); - } - - public void writeUint16(int s) - { - assert s < 0x10000; - - out.putShort((short) s); - } - - public void writeUint32(long i) - { - assert i < 0x100000000L; - - out.putInt((int) i); + // XXX: hardcoded max-frame + return new Connection + (new Disassembler(sender, MAX_FRAME_SIZE), delegate); } - public void writeUint64(long l) + public Receiver<ByteBuffer> receiver(Connection conn) { - out.putLong(l); + return new InputHandler(new Assembler(conn)); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java new file mode 100644 index 0000000000..bb7d2506e3 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -0,0 +1,236 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import org.apache.qpid.transport.codec.BBEncoder; + +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.ProtocolDelegate; +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.SegmentType; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.Struct; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.qpid.transport.network.Frame.*; + +import static java.lang.Math.*; + + +/** + * Disassembler + * + */ + +public final class Disassembler implements Sender<ProtocolEvent>, + ProtocolDelegate<Void> +{ + + private final Sender<ByteBuffer> sender; + private final int maxPayload; + private final ByteBuffer header; + private final Object sendlock = new Object(); + private final ThreadLocal<BBEncoder> encoder = new ThreadLocal() + { + public BBEncoder initialValue() + { + return new BBEncoder(4*1024); + } + }; + + public Disassembler(Sender<ByteBuffer> sender, int maxFrame) + { + if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) + { + throw new IllegalArgumentException + ("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + this.sender = sender; + this.maxPayload = maxFrame - HEADER_SIZE; + this.header = ByteBuffer.allocate(HEADER_SIZE); + this.header.order(ByteOrder.BIG_ENDIAN); + + } + + public void send(ProtocolEvent event) + { + event.delegate(null, this); + } + + public void flush() + { + synchronized (sendlock) + { + sender.flush(); + } + } + + public void close() + { + synchronized (sendlock) + { + sender.close(); + } + } + + private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) + { + synchronized (sendlock) + { + header.put(0, flags); + header.put(1, type); + header.putShort(2, (short) (size + HEADER_SIZE)); + header.put(5, track); + header.putShort(6, (short) channel); + + header.rewind(); + + sender.send(header); + + int limit = buf.limit(); + buf.limit(buf.position() + size); + sender.send(buf); + buf.limit(limit); + } + } + + private void fragment(byte flags, SegmentType type, ProtocolEvent event, + ByteBuffer buf) + { + byte typeb = (byte) type.getValue(); + byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; + + int remaining = buf.remaining(); + boolean first = true; + while (true) + { + int size = min(maxPayload, remaining); + remaining -= size; + + byte newflags = flags; + if (first) + { + newflags |= FIRST_FRAME; + first = false; + } + if (remaining == 0) + { + newflags |= LAST_FRAME; + } + + frame(newflags, typeb, track, event.getChannel(), size, buf); + + if (remaining == 0) + { + break; + } + } + } + + public void init(Void v, ProtocolHeader header) + { + synchronized (sendlock) + { + sender.send(header.toByteBuffer()); + sender.flush(); + } + } + + public void control(Void v, Method method) + { + method(method, SegmentType.CONTROL); + } + + public void command(Void v, Method method) + { + method(method, SegmentType.COMMAND); + } + + private ByteBuffer copy(ByteBuffer src) + { + ByteBuffer buf = ByteBuffer.allocate(src.remaining()); + buf.put(src); + buf.flip(); + return buf; + } + + private void method(Method method, SegmentType type) + { + BBEncoder enc = encoder.get(); + enc.init(); + enc.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + if (method.isSync()) + { + enc.writeUint16(0x0101); + } + else + { + enc.writeUint16(0x0100); + } + } + method.write(enc); + ByteBuffer methodSeg = enc.segment(); + + byte flags = FIRST_SEG; + + boolean payload = method.hasPayload(); + if (!payload) + { + flags |= LAST_SEG; + } + + ByteBuffer headerSeg = null; + if (payload) + { + final Header hdr = method.getHeader(); + final Struct[] structs = hdr.getStructs(); + + for (Struct st : structs) + { + enc.writeStruct32(st); + } + headerSeg = enc.segment(); + } + + synchronized (sendlock) + { + fragment(flags, type, method, methodSeg); + if (payload) + { + fragment((byte) 0x0, SegmentType.HEADER, method, headerSeg); + fragment(LAST_SEG, SegmentType.BODY, method, method.getBody()); + } + } + } + + public void error(Void v, ProtocolError error) + { + throw new IllegalArgumentException("" + error); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java index 2abac382e6..849355276e 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java @@ -18,10 +18,10 @@ * under the License. * */ -package org.apache.qpidity.transport.network; +package org.apache.qpid.transport.network; -import org.apache.qpidity.transport.SegmentType; -import org.apache.qpidity.transport.util.SliceIterator; +import org.apache.qpid.transport.SegmentType; +import org.apache.qpid.transport.util.SliceIterator; import java.nio.ByteBuffer; @@ -29,7 +29,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Iterator; -import static org.apache.qpidity.transport.util.Functions.*; +import static org.apache.qpid.transport.util.Functions.*; /** @@ -38,7 +38,7 @@ import static org.apache.qpidity.transport.util.Functions.*; * @author Rafael H. Schloming */ -public final class Frame implements NetworkEvent, Iterable<ByteBuffer> +public final class Frame implements NetworkEvent { public static final int HEADER_SIZE = 12; @@ -61,23 +61,21 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer> final private SegmentType type; final private byte track; final private int channel; - final private List<ByteBuffer> fragments; - private int size; + final private ByteBuffer body; - public Frame(byte flags, SegmentType type, byte track, int channel) + public Frame(byte flags, SegmentType type, byte track, int channel, + ByteBuffer body) { this.flags = flags; this.type = type; this.track = track; this.channel = channel; - this.size = 0; - this.fragments = new ArrayList<ByteBuffer>(); + this.body = body; } - public void addFragment(ByteBuffer fragment) + public ByteBuffer getBody() { - fragments.add(fragment); - size += fragment.remaining(); + return body.slice(); } public byte getFlags() @@ -92,7 +90,7 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer> public int getSize() { - return size; + return body.remaining(); } public SegmentType getType() @@ -130,16 +128,6 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer> return flag(LAST_FRAME); } - public Iterator<ByteBuffer> getFragments() - { - return new SliceIterator(fragments.iterator()); - } - - public Iterator<ByteBuffer> iterator() - { - return getFragments(); - } - public void delegate(NetworkDelegate delegate) { delegate.frame(this); @@ -148,26 +136,14 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer> public String toString() { StringBuilder str = new StringBuilder(); + str.append(String.format ("[%05d %05d %1d %s %d%d%d%d] ", getChannel(), getSize(), getTrack(), getType(), isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0, isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0)); - boolean first = true; - for (ByteBuffer buf : this) - { - if (first) - { - first = false; - } - else - { - str.append(" | "); - } - - str.append(str(buf)); - } + str.append(str(body)); return str.toString(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java new file mode 100644 index 0000000000..408c95e075 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -0,0 +1,204 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.SegmentType; + +import static org.apache.qpid.transport.util.Functions.*; + +import static org.apache.qpid.transport.network.InputHandler.State.*; + + +/** + * InputHandler + * + * @author Rafael H. Schloming + */ + +public final class InputHandler implements Receiver<ByteBuffer> +{ + + public enum State + { + PROTO_HDR, + FRAME_HDR, + FRAME_BODY, + ERROR; + } + + private final Receiver<NetworkEvent> receiver; + private State state; + private ByteBuffer input = null; + private int needed; + + private byte flags; + private SegmentType type; + private byte track; + private int channel; + + public InputHandler(Receiver<NetworkEvent> receiver, State state) + { + this.receiver = receiver; + this.state = state; + + switch (state) + { + case PROTO_HDR: + needed = 8; + break; + case FRAME_HDR: + needed = Frame.HEADER_SIZE; + break; + } + } + + public InputHandler(Receiver<NetworkEvent> receiver) + { + this(receiver, PROTO_HDR); + } + + private void error(String fmt, Object ... args) + { + receiver.received(new ProtocolError(Frame.L1, fmt, args)); + } + + public void received(ByteBuffer buf) + { + int limit = buf.limit(); + int remaining = buf.remaining(); + while (remaining > 0) + { + if (remaining >= needed) + { + int consumed = needed; + int pos = buf.position(); + if (input == null) + { + buf.limit(pos + needed); + input = buf; + state = next(pos); + buf.limit(limit); + buf.position(pos + consumed); + } + else + { + buf.limit(pos + needed); + input.put(buf); + buf.limit(limit); + input.flip(); + state = next(0); + } + + remaining -= consumed; + input = null; + } + else + { + if (input == null) + { + input = ByteBuffer.allocate(needed); + } + input.put(buf); + needed -= remaining; + remaining = 0; + } + } + } + + private State next(int pos) + { + input.order(ByteOrder.BIG_ENDIAN); + + switch (state) { + case PROTO_HDR: + if (input.get(pos) != 'A' && + input.get(pos + 1) != 'M' && + input.get(pos + 2) != 'Q' && + input.get(pos + 3) != 'P') + { + error("bad protocol header: %s", str(input)); + return ERROR; + } + + byte instance = input.get(pos + 5); + byte major = input.get(pos + 6); + byte minor = input.get(pos + 7); + receiver.received(new ProtocolHeader(instance, major, minor)); + needed = Frame.HEADER_SIZE; + return FRAME_HDR; + case FRAME_HDR: + flags = input.get(pos); + type = SegmentType.get(input.get(pos + 1)); + int size = (0xFFFF & input.getShort(pos + 2)); + size -= Frame.HEADER_SIZE; + if (size < 0 || size > (64*1024 - 12)) + { + error("bad frame size: %d", size); + return ERROR; + } + byte b = input.get(pos + 5); + if ((b & 0xF0) != 0) { + error("non-zero reserved bits in upper nibble of " + + "frame header byte 5: '%x'", b); + return ERROR; + } else { + track = (byte) (b & 0xF); + } + channel = (0xFFFF & input.getShort(pos + 6)); + if (size == 0) + { + Frame frame = new Frame(flags, type, track, channel, ByteBuffer.allocate(0)); + receiver.received(frame); + needed = Frame.HEADER_SIZE; + return FRAME_HDR; + } + else + { + needed = size; + return FRAME_BODY; + } + case FRAME_BODY: + Frame frame = new Frame(flags, type, track, channel, input.slice()); + receiver.received(frame); + needed = Frame.HEADER_SIZE; + return FRAME_HDR; + default: + throw new IllegalStateException(); + } + } + + public void exception(Throwable t) + { + receiver.exception(t); + } + + public void closed() + { + receiver.closed(); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkDelegate.java index 48655edd0c..fbdfe6e84c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkDelegate.java @@ -18,10 +18,10 @@ * under the License. * */ -package org.apache.qpidity.transport.network; +package org.apache.qpid.transport.network; -import org.apache.qpidity.transport.ProtocolError; -import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolHeader; /** diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkEvent.java index 080efee704..91314cd4ad 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkEvent.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport.network; +package org.apache.qpid.transport.network; /** diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java new file mode 100644 index 0000000000..b63020913b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java @@ -0,0 +1,109 @@ +package org.apache.qpid.transport.network.io; + +import java.nio.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.BodyFactory; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentBodyFactory; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentHeaderBodyFactory; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.HeartbeatBodyFactory; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.Receiver; + +public class InputHandler_0_9 implements Receiver<ByteBuffer> +{ + + private AMQVersionAwareProtocolSession _session; + private MethodRegistry _registry; + private BodyFactory bodyFactory; + private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; + + static + { + _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); + _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); + _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); + } + + public InputHandler_0_9(AMQVersionAwareProtocolSession session) + { + _session = session; + _registry = _session.getMethodRegistry(); + } + + public void closed() + { + // AS FIXME: implement + } + + public void exception(Throwable t) + { + // TODO: propogate exception to things + t.printStackTrace(); + } + + public void received(ByteBuffer buf) + { + org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf); + try + { + final byte type = in.get(); + if (type == AMQMethodBody.TYPE) + { + bodyFactory = new AMQMethodBodyFactory(_session); + } + else + { + bodyFactory = _bodiesSupported[type]; + } + + if (bodyFactory == null) + { + throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); + } + + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + // bodySize can be zero + if ((channel < 0) || (bodySize < 0)) + { + throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize, null); + } + + AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); + + byte marker = in.get(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type, null); + } + + try + { + frame.getBodyFrame().handle(frame.getChannel(), _session); + } + catch (AMQException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + catch (AMQFrameDecodingException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java new file mode 100644 index 0000000000..c4559ae6b4 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.TransportException; + +import java.io.IOException; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; + +import java.nio.ByteBuffer; + + +/** + * IoAcceptor + * + */ + +public class IoAcceptor<E> extends Thread +{ + + + private ServerSocket socket; + private Binding<E,ByteBuffer> binding; + + public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding) + throws IOException + { + socket = new ServerSocket(); + socket.setReuseAddress(true); + socket.bind(address); + this.binding = binding; + + setName(String.format("IoAcceptor - %s", socket.getInetAddress())); + } + + public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding) + throws IOException + { + this(new InetSocketAddress(host, port), binding); + } + + public void run() + { + while (true) + { + try + { + Socket sock = socket.accept(); + IoTransport<E> transport = new IoTransport<E>(sock, binding); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java new file mode 100644 index 0000000000..d6d1df573c --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.util.Logger; + +import java.io.InputStream; +import java.io.IOException; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * IoReceiver + * + */ + +final class IoReceiver extends Thread +{ + + private static final Logger log = Logger.get(IoReceiver.class); + + private final IoTransport transport; + private final Receiver<ByteBuffer> receiver; + private final int bufferSize; + private final Socket socket; + private final long timeout; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, + int bufferSize, long timeout) + { + this.transport = transport; + this.receiver = receiver; + this.bufferSize = bufferSize; + this.socket = transport.getSocket(); + this.timeout = timeout; + + setDaemon(true); + setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + start(); + } + + void close() + { + if (!closed.getAndSet(true)) + { + try + { + socket.shutdownInput(); + if (Thread.currentThread() != this) + { + join(timeout); + if (isAlive()) + { + throw new TransportException("join timed out"); + } + } + } + catch (InterruptedException e) + { + throw new TransportException(e); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + + public void run() + { + final int threshold = bufferSize / 2; + + // I set the read buffer size simillar to SO_RCVBUF + // Haven't tested with a lower value to see if it's better or worse + byte[] buffer = new byte[bufferSize]; + try + { + InputStream in = socket.getInputStream(); + int read = 0; + int offset = 0; + while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + { + if (read > 0) + { + ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); + receiver.received(b); + offset+=read; + if (offset > threshold) + { + offset = 0; + buffer = new byte[bufferSize]; + } + } + } + } + catch (Throwable t) + { + receiver.exception(t); + } + finally + { + receiver.closed(); + transport.getSender().close(); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java new file mode 100644 index 0000000000..ef892744ab --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.util.Logger; + +import static org.apache.qpid.transport.util.Functions.*; + + +public final class IoSender extends Thread implements Sender<ByteBuffer> +{ + + private static final Logger log = Logger.get(IoSender.class); + + // by starting here, we ensure that we always test the wraparound + // case, we should probably make this configurable somehow so that + // we can test other cases as well + private final static int START = Integer.MAX_VALUE - 10; + + private final IoTransport transport; + private final long timeout; + private final Socket socket; + private final OutputStream out; + + private final byte[] buffer; + private volatile int head = START; + private volatile int tail = START; + private volatile boolean idle = true; + private final Object notFull = new Object(); + private final Object notEmpty = new Object(); + private final AtomicBoolean closed = new AtomicBoolean(false); + + private volatile Throwable exception = null; + + + public IoSender(IoTransport transport, int bufferSize, long timeout) + { + this.transport = transport; + this.socket = transport.getSocket(); + this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 + this.timeout = timeout; + + try + { + out = socket.getOutputStream(); + } + catch (IOException e) + { + throw new TransportException("Error getting output stream for socket", e); + } + + setDaemon(true); + setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + start(); + } + + private static final int pof2(int n) + { + int result = 1; + while (result < n) + { + result *= 2; + } + return result; + } + + private static final int mod(int n, int m) + { + int r = n % m; + return r < 0 ? m + r : r; + } + + public void send(ByteBuffer buf) + { + if (closed.get()) + { + throw new TransportException("sender is closed", exception); + } + + final int size = buffer.length; + int remaining = buf.remaining(); + + while (remaining > 0) + { + final int hd = head; + final int tl = tail; + + if (hd - tl >= size) + { + flush(); + synchronized (notFull) + { + long start = System.currentTimeMillis(); + long elapsed = 0; + while (!closed.get() && head - tail >= size && elapsed < timeout) + { + try + { + notFull.wait(timeout - elapsed); + } + catch (InterruptedException e) + { + // pass + } + elapsed += System.currentTimeMillis() - start; + } + + if (closed.get()) + { + throw new TransportException("sender is closed", exception); + } + + if (head - tail >= size) + { + throw new TransportException(String.format("write timed out: %s, %s", head, tail)); + } + } + continue; + } + + final int hd_idx = mod(hd, size); + final int tl_idx = mod(tl, size); + final int length; + + if (tl_idx > hd_idx) + { + length = Math.min(tl_idx - hd_idx, remaining); + } + else + { + length = Math.min(size - hd_idx, remaining); + } + + buf.get(buffer, hd_idx, length); + head += length; + remaining -= length; + } + } + + public void flush() + { + if (idle) + { + synchronized (notEmpty) + { + notEmpty.notify(); + } + } + } + + public void close() + { + close(true); + } + + void close(boolean reportException) + { + if (!closed.getAndSet(true)) + { + synchronized (notEmpty) + { + notEmpty.notify(); + } + + try + { + if (Thread.currentThread() != this) + { + join(timeout); + if (isAlive()) + { + throw new TransportException("join timed out"); + } + } + transport.getReceiver().close(); + socket.close(); + } + catch (InterruptedException e) + { + throw new TransportException(e); + } + catch (IOException e) + { + throw new TransportException(e); + } + + if (reportException && exception != null) + { + throw new TransportException(exception); + } + } + } + + public void run() + { + final int size = buffer.length; + + while (true) + { + final int hd = head; + final int tl = tail; + + if (hd == tl) + { + if (closed.get()) + { + break; + } + + idle = true; + + synchronized (notEmpty) + { + while (head == tail && !closed.get()) + { + try + { + notEmpty.wait(); + } + catch (InterruptedException e) + { + // pass + } + } + } + + idle = false; + + continue; + } + + final int hd_idx = mod(hd, size); + final int tl_idx = mod(tl, size); + + final int length; + if (tl_idx < hd_idx) + { + length = hd_idx - tl_idx; + } + else + { + length = size - tl_idx; + } + + try + { + out.write(buffer, tl_idx, length); + } + catch (IOException e) + { + log.error(e, "error in write thread"); + exception = e; + close(false); + break; + } + tail += length; + if (head - tl >= size) + { + synchronized (notFull) + { + notFull.notify(); + } + } + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java new file mode 100644 index 0000000000..70fd8a3c06 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; + +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.util.Logger; + +/** + * This class provides a socket based transport using the java.io + * classes. + * + * The following params are configurable via JVM arguments + * TCP_NO_DELAY - amqj.tcpNoDelay + * SO_RCVBUF - amqj.receiveBufferSize + * SO_SNDBUF - amqj.sendBufferSize + */ +public final class IoTransport<E> +{ + + static + { + org.apache.mina.common.ByteBuffer.setAllocator + (new org.apache.mina.common.SimpleByteBufferAllocator()); + org.apache.mina.common.ByteBuffer.setUseDirectBuffers + (Boolean.getBoolean("amqj.enableDirectBuffers")); + } + + private static final Logger log = Logger.get(IoTransport.class); + + private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; + private static int readBufferSize = Integer.getInteger + ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); + private static int writeBufferSize = Integer.getInteger + ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); + + private Socket socket; + private IoSender sender; + private E endpoint; + private IoReceiver receiver; + private long timeout = 60000; + + IoTransport(Socket socket, Binding<E,ByteBuffer> binding) + { + this.socket = socket; + this.sender = new IoSender(this, 2*writeBufferSize, timeout); + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(this, binding.receiver(endpoint), + 2*readBufferSize, timeout); + } + + IoSender getSender() + { + return sender; + } + + IoReceiver getReceiver() + { + return receiver; + } + + Socket getSocket() + { + return socket; + } + + public static final <E> E connect(String host, int port, + Binding<E,ByteBuffer> binding) + { + Socket socket = createSocket(host, port); + IoTransport<E> transport = new IoTransport<E>(socket, binding); + return transport.endpoint; + } + + public static final Connection connect(String host, int port, + ConnectionDelegate delegate) + { + return connect(host, port, new ConnectionBinding(delegate)); + } + + public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port) + { + connect(host, port, new Binding_0_9(session)); + } + + private static class Binding_0_9 + implements Binding<AMQVersionAwareProtocolSession,ByteBuffer> + { + + private AMQVersionAwareProtocolSession session; + + Binding_0_9(AMQVersionAwareProtocolSession session) + { + this.session = session; + } + + public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender) + { + session.setSender(sender); + return session; + } + + public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn) + { + return new InputHandler_0_9(ssn); + } + + } + + private static Socket createSocket(String host, int port) + { + try + { + InetAddress address = InetAddress.getByName(host); + Socket socket = new Socket(); + socket.setReuseAddress(true); + socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + + log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); + log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); + + socket.setSendBufferSize(writeBufferSize); + socket.setReceiveBufferSize(readBufferSize); + + log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); + log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); + + socket.connect(new InetSocketAddress(address, port)); + return socket; + } + catch (SocketException e) + { + throw new TransportException("Error connecting to broker", e); + } + catch (IOException e) + { + throw new TransportException("Error connecting to broker", e); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java index c6855e3d48..f8dbec3c3d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport.network.mina; +package org.apache.qpid.transport.network.mina; import java.io.IOException; import java.net.InetSocketAddress; @@ -33,20 +33,20 @@ import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.qpidity.transport.Binding; -import org.apache.qpidity.transport.Connection; -import org.apache.qpidity.transport.ConnectionDelegate; -import org.apache.qpidity.transport.Receiver; -import org.apache.qpidity.transport.Sender; +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpidity.transport.util.Logger; +import org.apache.qpid.transport.util.Logger; -import org.apache.qpidity.transport.network.Assembler; -import org.apache.qpidity.transport.network.Disassembler; -import org.apache.qpidity.transport.network.InputHandler; -import org.apache.qpidity.transport.network.OutputHandler; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; -import static org.apache.qpidity.transport.util.Functions.*; +import static org.apache.qpid.transport.util.Functions.*; /** * MinaHandler @@ -56,7 +56,6 @@ import static org.apache.qpidity.transport.util.Functions.*; //RA making this public until we sort out the package issues public class MinaHandler<E> implements IoHandler { - private static final int MAX_FRAME_SIZE = 64 * 1024 - 1; /** Default buffer size for pending messages reads */ private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; /** Default buffer size for pending messages writes */ @@ -202,7 +201,7 @@ public class MinaHandler<E> implements IoHandler IoAcceptor acceptor = new SocketAcceptor(); acceptor.bind(address, new MinaHandler<E>(binding)); } - + public static final <E> E connect(String host, int port, Binding<E,java.nio.ByteBuffer> binding) { @@ -263,43 +262,13 @@ public class MinaHandler<E> implements IoHandler ConnectionDelegate delegate) throws IOException { - accept(host, port, new ConnectionBinding - (delegate, InputHandler.State.PROTO_HDR)); + accept(host, port, new ConnectionBinding(delegate)); } public static final Connection connect(String host, int port, ConnectionDelegate delegate) { - return connect(host, port, new ConnectionBinding - (delegate, InputHandler.State.PROTO_HDR)); - } - - private static class ConnectionBinding - implements Binding<Connection,java.nio.ByteBuffer> - { - - private final ConnectionDelegate delegate; - private final InputHandler.State state; - - ConnectionBinding(ConnectionDelegate delegate, - InputHandler.State state) - { - this.delegate = delegate; - this.state = state; - } - - public Connection endpoint(Sender<java.nio.ByteBuffer> sender) - { - // XXX: hardcoded max-frame - return new Connection - (new Disassembler(new OutputHandler(sender), MAX_FRAME_SIZE), delegate); - } - - public Receiver<java.nio.ByteBuffer> receiver(Connection conn) - { - return new InputHandler(new Assembler(conn), state); - } - + return connect(host, port, new ConnectionBinding(delegate)); } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java index f0f5731037..69d4061e0c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -18,15 +18,15 @@ * under the License. * */ -package org.apache.qpidity.transport.network.mina; +package org.apache.qpid.transport.network.mina; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; -import org.apache.qpidity.transport.Sender; -import org.apache.qpidity.transport.TransportException; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; /** @@ -58,6 +58,11 @@ public class MinaSender implements Sender<java.nio.ByteBuffer> } } + public void flush() + { + // pass + } + public synchronized void close() { // MINA will sometimes throw away in-progress writes when you diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java index a7339427c7..51e41b26f7 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java @@ -1,4 +1,4 @@ -package org.apache.qpidity.transport.network.nio; +package org.apache.qpid.transport.network.nio; import java.io.EOFException; import java.io.IOException; @@ -11,13 +11,12 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.qpidity.transport.Connection; -import org.apache.qpidity.transport.ConnectionDelegate; -import org.apache.qpidity.transport.Receiver; -import org.apache.qpidity.transport.network.Assembler; -import org.apache.qpidity.transport.network.Disassembler; -import org.apache.qpidity.transport.network.InputHandler; -import org.apache.qpidity.transport.network.OutputHandler; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; public class NioHandler implements Runnable { @@ -68,8 +67,7 @@ public class NioHandler implements Runnable NioSender sender = new NioSender(_ch); Connection con = new Connection - (new Disassembler(new OutputHandler(sender), 64*1024 - 1), - delegate); + (new Disassembler(sender, 64*1024 - 1), delegate); con.setConnectionId(_count.incrementAndGet()); _handlers.put(con.getConnectionId(),sender); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java index 2cfe6c2089..33e888cc56 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java @@ -1,9 +1,9 @@ -package org.apache.qpidity.transport.network.nio; +package org.apache.qpid.transport.network.nio; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import org.apache.qpidity.transport.Sender; +import org.apache.qpid.transport.Sender; public class NioSender implements Sender<java.nio.ByteBuffer> { @@ -47,6 +47,11 @@ public class NioSender implements Sender<java.nio.ByteBuffer> } } + public void flush() + { + // pass + } + private void write(java.nio.ByteBuffer buf) { synchronized (lock) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java index 0038cdf118..2c6984e302 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java +++ b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport.util; +package org.apache.qpid.transport.util; import java.nio.ByteBuffer; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Logger.java b/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java index 0159036a34..8c4818df92 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/util/Logger.java +++ b/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport.util; +package org.apache.qpid.transport.util; import org.slf4j.LoggerFactory; @@ -42,6 +42,11 @@ public final class Logger this.log = log; } + public boolean isDebugEnabled() + { + return log.isDebugEnabled(); + } + public void debug(String message, Object ... args) { if (log.isDebugEnabled()) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java b/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java index 32392a3561..3db29847b2 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java +++ b/java/common/src/main/java/org/apache/qpid/transport/util/SliceIterator.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport.util; +package org.apache.qpid.transport.util; import java.nio.ByteBuffer; diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java b/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java index 2cf035f601..f12fb2cff2 100644 --- a/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java @@ -17,7 +17,7 @@ */ package org.apache.qpid.url; -import org.apache.qpidity.exchange.ExchangeDefaults; +import org.apache.qpid.exchange.ExchangeDefaults; import org.slf4j.LoggerFactory; import org.slf4j.Logger; diff --git a/java/common/src/main/java/org/apache/qpid/url/QpidURL.java b/java/common/src/main/java/org/apache/qpid/url/QpidURL.java index 1d94b31de2..5ab4425323 100644 --- a/java/common/src/main/java/org/apache/qpid/url/QpidURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/QpidURL.java @@ -17,7 +17,7 @@ */ package org.apache.qpid.url; -import org.apache.qpidity.BrokerDetails; +import org.apache.qpid.BrokerDetails; import java.util.List; diff --git a/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java b/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java index b4a55e2bf4..f92934db7f 100644 --- a/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java +++ b/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java @@ -17,8 +17,8 @@ */ package org.apache.qpid.url; -import org.apache.qpidity.BrokerDetails; -import org.apache.qpidity.BrokerDetailsImpl; +import org.apache.qpid.BrokerDetails; +import org.apache.qpid.BrokerDetailsImpl; import java.net.MalformedURLException; import java.util.ArrayList; diff --git a/java/common/src/main/java/org/apache/qpid/util/ReflectionUtilsException.java b/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java index c2862a755b..e764c8536b 100644 --- a/java/common/src/main/java/org/apache/qpid/util/ReflectionUtilsException.java +++ b/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java @@ -20,25 +20,40 @@ */ package org.apache.qpid.util; +import java.nio.ByteBuffer; +import java.util.UUID; + + /** - * Wraps a checked exception that occurs when {@link ReflectionUtils} encounters checked exceptions using standard - * Java reflection methods. + * NameUUIDGen * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Wrap a checked reflection exception. - * </table> */ -public class ReflectionUtilsException extends RuntimeException + +public final class NameUUIDGen implements UUIDGen { - /** - * Creates a runtime reflection exception, from a checked one. - * - * @param message The message. - * @param cause The causing exception. - */ - public ReflectionUtilsException(String message, Throwable cause) + + private static final int WIDTH = 8; + + final private byte[] seed; + final private ByteBuffer seedBuf; + private long counter; + + public NameUUIDGen() { - super(message, cause); + String namespace = UUID.randomUUID().toString(); + this.seed = new byte[namespace.length() + WIDTH]; + for (int i = WIDTH; i < seed.length; i++) + { + seed[i] = (byte) namespace.charAt(i - WIDTH); + } + this.seedBuf = ByteBuffer.wrap(seed); + this.counter = 0; } + + public UUID generate() + { + seedBuf.putLong(0, counter++); + return UUID.nameUUIDFromBytes(seed); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java b/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java new file mode 100644 index 0000000000..60b402a105 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.util.UUID; + + +/** + * RandomUUIDGen + * + */ + +public final class RandomUUIDGen implements UUIDGen +{ + + public UUID generate() + { + return UUID.randomUUID(); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java b/java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java deleted file mode 100644 index 28fb2e0c8a..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -/** - * Provides helper methods for operating on classes and methods using reflection. Reflection methods tend to return - * a lot of checked exception so writing code to use them can be tedious and harder to read, especially when such errors - * are not expected to occur. This class always works with {@link ReflectionUtilsException}, which is a runtime exception, - * to wrap the checked exceptions raised by the standard Java reflection methods. Code using it does not normally - * expect these errors to occur, usually does not have a recovery mechanism for them when they do, but is cleaner, - * quicker to write and easier to read in the majority of cases. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Look up Classes by name. - * <tr><td> Instantiate Classes by no-arg constructor. - * </table> - */ -public class ReflectionUtils -{ - /** - * Gets the Class object for a named class. - * - * @param className The class to get the Class object for. - * - * @return The Class object for the named class. - */ - public static Class<?> forName(String className) - { - try - { - return Class.forName(className); - } - catch (ClassNotFoundException e) - { - throw new ReflectionUtilsException("ClassNotFoundException whilst finding class.", e); - } - } - - /** - * Creates an instance of a Class, instantiated through its no-args constructor. - * - * @param cls The Class to instantiate. - * @param <T> The Class type. - * - * @return An instance of the class. - */ - public static <T> T newInstance(Class<? extends T> cls) - { - try - { - return cls.newInstance(); - } - catch (InstantiationException e) - { - throw new ReflectionUtilsException("InstantiationException whilst instantiating class.", e); - } - catch (IllegalAccessException e) - { - throw new ReflectionUtilsException("IllegalAccessException whilst instantiating class.", e); - } - } - - /** - * Calls a named method on an object with a specified set of parameters, any Java access modifier are overridden. - * - * @param o The object to call. - * @param method The method name to call. - * @param params The parameters to pass. - * @param paramClasses The argument types. - * - * @return The return value from the method call. - */ - public static Object callMethodOverridingIllegalAccess(Object o, String method, Object[] params, Class[] paramClasses) - { - // Get the objects class. - Class cls = o.getClass(); - - // Get the classes of the parameters. - /*Class[] paramClasses = new Class[params.length]; - - for (int i = 0; i < params.length; i++) - { - paramClasses[i] = params[i].getClass(); - }*/ - - try - { - // Try to find the matching method on the class. - Method m = cls.getDeclaredMethod(method, paramClasses); - - // Make it accessible. - m.setAccessible(true); - - // Invoke it with the parameters. - return m.invoke(o, params); - } - catch (NoSuchMethodException e) - { - throw new ReflectionUtilsException("NoSuchMethodException.", e); - } - catch (IllegalAccessException e) - { - throw new ReflectionUtilsException("IllegalAccessException.", e); - } - catch (InvocationTargetException e) - { - throw new ReflectionUtilsException("InvocationTargetException", e); - } - } - - /** - * Calls a named method on an object with a specified set of parameters. - * - * @param o The object to call. - * @param method The method name to call. - * @param params The parameters to pass. - * - * @return The return value from the method call. - */ - public static Object callMethod(Object o, String method, Object[] params) - { - // Get the objects class. - Class cls = o.getClass(); - - // Get the classes of the parameters. - Class[] paramClasses = new Class[params.length]; - - for (int i = 0; i < params.length; i++) - { - paramClasses[i] = params[i].getClass(); - } - - try - { - // Try to find the matching method on the class. - Method m = cls.getMethod(method, paramClasses); - - // Invoke it with the parameters. - return m.invoke(o, params); - } - catch (NoSuchMethodException e) - { - throw new ReflectionUtilsException("NoSuchMethodException.", e); - } - catch (IllegalAccessException e) - { - throw new ReflectionUtilsException("IllegalAccessException", e); - } - catch (InvocationTargetException e) - { - throw new ReflectionUtilsException("InvocationTargetException", e); - } - } - - /** - * Calls a constuctor witht the specified arguments. - * - * @param constructor The constructor. - * @param args The arguments. - * @param <T> The Class type. - * - * @return An instance of the class that the constructor is for. - */ - public static <T> T newInstance(Constructor<T> constructor, Object[] args) - { - try - { - return constructor.newInstance(args); - } - catch (InstantiationException e) - { - throw new ReflectionUtilsException("InstantiationException", e); - } - catch (IllegalAccessException e) - { - throw new ReflectionUtilsException("IllegalAccessException", e); - } - catch (InvocationTargetException e) - { - throw new ReflectionUtilsException("InvocationTargetException", e); - } - } - - /** - * Gets the constructor of a class that takes the specified set of arguments if any matches. If no matching - * constructor is found then a runtime exception is raised. - * - * @param cls The class to get a constructor from. - * @param args The arguments to match. - * @param <T> The class type. - * - * @return The constructor. - */ - public static <T> Constructor<T> getConstructor(Class<T> cls, Class[] args) - { - try - { - return cls.getConstructor(args); - } - catch (NoSuchMethodException e) - { - throw new ReflectionUtilsException("NoSuchMethodException", e); - } - } -} diff --git a/java/common/src/main/java/org/apache/qpid/util/Strings.java b/java/common/src/main/java/org/apache/qpid/util/Strings.java new file mode 100644 index 0000000000..4b199bafe6 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/Strings.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.UnsupportedEncodingException; + + +/** + * Strings + * + */ + +public final class Strings +{ + + private static final byte[] EMPTY = new byte[0]; + + private static final ThreadLocal<char[]> charbuf = new ThreadLocal() + { + public char[] initialValue() + { + return new char[4096]; + } + }; + + public static final byte[] toUTF8(String str) + { + if (str == null) + { + return EMPTY; + } + else + { + final int size = str.length(); + char[] chars = charbuf.get(); + if (size > chars.length) + { + chars = new char[Math.max(size, 2*chars.length)]; + charbuf.set(chars); + } + + str.getChars(0, size, chars, 0); + final byte[] bytes = new byte[size]; + for (int i = 0; i < size; i++) + { + if (chars[i] > 127) + { + try + { + return str.getBytes("UTF-8"); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + + bytes[i] = (byte) chars[i]; + } + return bytes; + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java b/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java new file mode 100644 index 0000000000..3cfe5afdac --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + + +import java.util.UUID; + +/** + * UUIDGen + * + */ + +public interface UUIDGen +{ + + public UUID generate(); + +} diff --git a/java/common/src/main/java/org/apache/qpid/util/UUIDs.java b/java/common/src/main/java/org/apache/qpid/util/UUIDs.java new file mode 100644 index 0000000000..4bf6b7f0a2 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/UUIDs.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + + +/** + * UUIDs + * + */ + +public final class UUIDs +{ + + public static final UUIDGen newGenerator() + { + return newGenerator(System.getProperty("qpid.uuid.generator", + NameUUIDGen.class.getName())); + } + + public static UUIDGen newGenerator(String name) + { + try + { + Class cls = Class.forName(name); + return (UUIDGen) cls.newInstance(); + } + catch (InstantiationException e) + { + throw new RuntimeException(e); + } + catch (IllegalAccessException e) + { + throw new RuntimeException(e); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException(e); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolException.java b/java/common/src/main/java/org/apache/qpidity/ProtocolException.java deleted file mode 100644 index 596143a1b9..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/ProtocolException.java +++ /dev/null @@ -1,36 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpidity; - -public class ProtocolException extends QpidException -{ - /** - * Constructor for a Ptotocol Exception. - * <p> This is the only provided constructor and the parameters have to be set to null when - * they are unknown. - * @param message A description of the reason of this exception. - * @param errorCode A string specifyin the error code of this exception. - * @param cause The linked Execption. - * - */ - public ProtocolException(String message, ErrorCode errorCode, Throwable cause) - { - super(message, errorCode, cause); - } -} diff --git a/java/common/src/main/java/org/apache/qpidity/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpidity/exchange/ExchangeDefaults.java deleted file mode 100644 index a99ea56d69..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/exchange/ExchangeDefaults.java +++ /dev/null @@ -1,51 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.exchange; - -/** - * Default exchange names - */ -public class ExchangeDefaults -{ - /** The default direct exchange, which is a special internal exchange that cannot be explicitly bound to. */ - public static final String DEFAULT_EXCHANGE_NAME = "<<default>>"; - - /** The pre-defined topic exchange, the broker SHOULD provide this. */ - public static final String TOPIC_EXCHANGE_NAME = "amq.topic"; - - /** Defines the identifying type name of topic exchanges. */ - public static final String TOPIC_EXCHANGE_CLASS = "topic"; - - /** The pre-defined direct exchange, the broker MUST provide this. */ - public static final String DIRECT_EXCHANGE_NAME = "amq.direct"; - - /** Defines the identifying type name of direct exchanges. */ - public static final String DIRECT_EXCHANGE_CLASS = "direct"; - - /** The pre-defined headers exchange, the specification does not say this needs to be provided. */ - public static final String HEADERS_EXCHANGE_NAME = "amq.match"; - - /** Defines the identifying type name of headers exchanges. */ - public static final String HEADERS_EXCHANGE_CLASS = "headers"; - - /** The pre-defined fanout exchange, the boker MUST provide this. */ - public static final String FANOUT_EXCHANGE_NAME = "amq.fanout"; - - /** Defines the identifying type name of fanout exchanges. */ - public static final String FANOUT_EXCHANGE_CLASS = "fanout"; -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java deleted file mode 100644 index 4f61380809..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/Data.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport; - -import org.apache.qpidity.transport.network.Frame; - -import java.nio.ByteBuffer; - -import java.util.Collections; - -import static org.apache.qpidity.transport.util.Functions.*; - - -/** - * Data - * - */ - -public class Data implements ProtocolEvent -{ - - private final Iterable<ByteBuffer> fragments; - private final boolean first; - private final boolean last; - - public Data(Iterable<ByteBuffer> fragments, boolean first, boolean last) - { - this.fragments = fragments; - this.first = first; - this.last = last; - } - - public Data(ByteBuffer buf, boolean first, boolean last) - { - this(Collections.singletonList(buf), first, last); - } - - public Iterable<ByteBuffer> getFragments() - { - return fragments; - } - - public boolean isFirst() - { - return first; - } - - public boolean isLast() - { - return last; - } - - public byte getEncodedTrack() - { - return Frame.L4; - } - - public <C> void delegate(C context, ProtocolDelegate<C> delegate) - { - delegate.data(context, this); - } - - public String toString() - { - StringBuffer str = new StringBuffer(); - str.append("Data("); - boolean first = true; - int left = 64; - for (ByteBuffer buf : getFragments()) - { - if (first) - { - first = false; - } - else - { - str.append(" | "); - } - str.append(str(buf, left)); - left -= buf.remaining(); - if (left < 0) - { - break; - } - } - str.append(")"); - return str.toString(); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java deleted file mode 100644 index e9a0705de0..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.apache.qpidity.transport; - -public class TransportConstants -{ - - private static byte _protocol_version_minor = 10; - private static byte _protocol_version_major = 0; - - public static void setVersionMajor(byte value) - { - _protocol_version_major = value; - } - - public static void setVersionMinor(byte value) - { - _protocol_version_minor = value; - } - - public static byte getVersionMajor() - { - return _protocol_version_major; - } - - public static byte getVersionMinor() - { - return _protocol_version_minor; - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java deleted file mode 100644 index 474211ced2..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.codec; - -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; - -import java.util.Iterator; - -import static java.lang.Math.*; - - -/** - * FragmentDecoder - * - * @author Rafael H. Schloming - */ - -public class FragmentDecoder extends AbstractDecoder -{ - - private final Iterator<ByteBuffer> fragments; - private ByteBuffer current; - - public FragmentDecoder(Iterator<ByteBuffer> fragments) - { - this.fragments = fragments; - this.current = null; - } - - public boolean hasRemaining() - { - advance(); - return current != null || fragments.hasNext(); - } - - private void advance() - { - while (current == null && fragments.hasNext()) - { - current = fragments.next(); - if (current.hasRemaining()) - { - break; - } - else - { - current = null; - } - } - } - - private void preRead() - { - advance(); - - if (current == null) - { - throw new BufferUnderflowException(); - } - } - - private void postRead() - { - if (current.remaining() == 0) - { - current = null; - } - } - - protected byte doGet() - { - preRead(); - byte b = current.get(); - postRead(); - return b; - } - - protected void doGet(byte[] bytes) - { - int remaining = bytes.length; - int offset = 0; - while (remaining > 0) - { - preRead(); - int size = min(remaining, current.remaining()); - current.get(bytes, offset, size); - offset += size; - remaining -= size; - postRead(); - } - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java deleted file mode 100644 index 2e7e883a0b..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.codec; - -import java.nio.ByteBuffer; - -import java.util.Map; -import java.util.UUID; - -import org.apache.qpidity.transport.RangeSet; - - -/** - * SizeEncoder - * - * @author Rafael H. Schloming - */ - -public class SizeEncoder extends AbstractEncoder implements Sizer -{ - - private int size; - - public SizeEncoder() { - this(0); - } - - public SizeEncoder(int size) { - this.size = size; - } - - protected Sizer sizer() - { - return Sizer.NULL; - } - - public int getSize() { - return size; - } - - public void setSize(int size) { - this.size = size; - } - - public int size() - { - return getSize(); - } - - protected void doPut(byte b) - { - size += 1; - } - - protected void doPut(ByteBuffer src) - { - size += src.remaining(); - } - - public void writeUint8(short b) - { - size += 1; - } - - public void writeUint16(int s) - { - size += 2; - } - - public void writeUint32(long i) - { - size += 4; - } - - public void writeUint64(long l) - { - size += 8; - } - - public void writeDatetime(long l) - { - size += 8; - } - - public void writeUuid(UUID uuid) - { - size += 16; - } - - public void writeSequenceNo(int s) - { - size += 4; - } - - public void writeSequenceSet(RangeSet ranges) - { - size += 2 + 8*ranges.size(); - } - - //void writeByteRanges(RangeSet ranges); // XXX - - //void writeStr8(String s); - //void writeStr16(String s); - - //void writeVbin8(byte[] bytes); - //void writeVbin16(byte[] bytes); - //void writeVbin32(byte[] bytes); - - //void writeStruct32(Struct s); - //void writeMap(Map<String,Object> map); - //void writeList(List<Object> list); - //void writeArray(List<Object> array); - - //void writeStruct(int type, Struct s); - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java deleted file mode 100644 index d386987d64..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.codec; - -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.qpidity.transport.RangeSet; -import org.apache.qpidity.transport.Struct; - - -/** - * Sizer - * - */ - -public interface Sizer extends Encoder -{ - - public static final Sizer NULL = new Sizer() - { - public void writeUint8(short b) {} - public void writeUint16(int s) {} - public void writeUint32(long i) {} - public void writeUint64(long l) {} - - public void writeDatetime(long l) {} - public void writeUuid(UUID uuid) {} - - public void writeSequenceNo(int s) {} - public void writeSequenceSet(RangeSet ranges) {} // XXX - public void writeByteRanges(RangeSet ranges) {} // XXX - - public void writeStr8(String s) {} - public void writeStr16(String s) {} - - public void writeVbin8(byte[] bytes) {} - public void writeVbin16(byte[] bytes) {} - public void writeVbin32(byte[] bytes) {} - - public void writeStruct32(Struct s) {} - public void writeMap(Map<String,Object> map) {} - public void writeList(List<Object> list) {} - public void writeArray(List<Object> array) {} - - public void writeStruct(int type, Struct s) {} - - public int getSize() { return 0; } - - public int size() { return 0; } - }; - - int getSize(); - - int size(); - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Validator.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Validator.java deleted file mode 100644 index 743e9f3cae..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Validator.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.codec; - -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.qpidity.transport.RangeSet; -import org.apache.qpidity.transport.Struct; - - -/** - * Validator - * - */ - -public class Validator -{ - - public static final void checkBit(boolean b) - { - // no illegal values - } - - public static final void checkUint8(short s) - { - if (s > 0xFF || s < 0) - { - throw new IllegalArgumentException("" + s); - } - } - - public static final void checkUint16(int i) - { - if (i > 0xFFFF || i < 0) - { - throw new IllegalArgumentException("" + i); - } - } - - public static final void checkUint32(long l) - { - // XXX: we can't currently validate this because we do thinks - // like pass in -1 for 0xFFFFFFFF - // if (l > 0xFFFFFFFFL || l < 0) - // { - // throw new IllegalArgumentException("" + l); - // } - } - - public static final void checkSequenceNo(int s) - { - // no illegal values - } - - public static final void checkUint64(long l) - { - // no illegal values - } - - public static final void checkDatetime(long l) - { - // no illegal values - } - - public static final void checkUuid(UUID u) - { - // no illegal values - } - - public static final void checkStr8(String value) - { - if (value != null && value.length() > 255) - { - throw new IllegalArgumentException("" + value); - } - } - - public static final void checkStr16(String value) - { - if (value != null && value.length() > 0xFFFF) - { - throw new IllegalArgumentException("" + value); - } - } - - public static final void checkVbin8(byte[] value) - { - if (value != null && value.length > 255) - { - throw new IllegalArgumentException("" + value); - } - } - - public static final void checkVbin16(byte[] value) - { - if (value != null && value.length > 0xFFFF) - { - throw new IllegalArgumentException("" + value); - } - } - - public static final void checkByteRanges(RangeSet r) - { - // no illegal values - } - - public static final void checkSequenceSet(RangeSet r) - { - // no illegal values - } - - public static final void checkVbin32(byte[] value) - { - // no illegal values - } - - public static final void checkStruct32(Struct s) - { - // no illegal values - } - - public static final void checkArray(List<Object> array) - { - if (array == null) - { - return; - } - - for (Object o : array) - { - checkObject(o); - } - } - - public static final void checkMap(Map<String,Object> map) - { - if (map == null) - { - return; - } - - for (Map.Entry<String,Object> entry : map.entrySet()) - { - checkStr8(entry.getKey()); - checkObject(entry.getValue()); - } - } - - public static final void checkObject(Object o) - { - if (o != null && AbstractEncoder.resolve(o.getClass()) == null) - { - throw new IllegalArgumentException("cannot encode " + o.getClass()); - } - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java deleted file mode 100644 index 3a7a550573..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.network; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.transport.codec.BBDecoder; -import org.apache.qpidity.transport.codec.Decoder; -import org.apache.qpidity.transport.codec.FragmentDecoder; - -import org.apache.qpidity.transport.ConnectionEvent; -import org.apache.qpidity.transport.Data; -import org.apache.qpidity.transport.Header; -import org.apache.qpidity.transport.Method; -import org.apache.qpidity.transport.ProtocolError; -import org.apache.qpidity.transport.ProtocolEvent; -import org.apache.qpidity.transport.ProtocolHeader; -import org.apache.qpidity.transport.Receiver; -import org.apache.qpidity.transport.SegmentType; -import org.apache.qpidity.transport.Struct; - - -/** - * Assembler - * - */ - -public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate -{ - - private final Receiver<ConnectionEvent> receiver; - private final Map<Integer,List<ByteBuffer>> segments; - - public Assembler(Receiver<ConnectionEvent> receiver) - { - this.receiver = receiver; - segments = new HashMap<Integer,List<ByteBuffer>>(); - } - - private int segmentKey(Frame frame) - { - // XXX: can this overflow? - return (frame.getTrack() + 1) * frame.getChannel(); - } - - private List<ByteBuffer> getSegment(Frame frame) - { - return segments.get(segmentKey(frame)); - } - - private void setSegment(Frame frame, List<ByteBuffer> segment) - { - int key = segmentKey(frame); - if (segments.containsKey(key)) - { - error(new ProtocolError(Frame.L2, "segment in progress: %s", - frame)); - } - segments.put(segmentKey(frame), segment); - } - - private void clearSegment(Frame frame) - { - segments.remove(segmentKey(frame)); - } - - private void emit(int channel, ProtocolEvent event) - { - receiver.received(new ConnectionEvent(channel, event)); - } - - private void emit(Frame frame, ProtocolEvent event) - { - emit(frame.getChannel(), event); - } - - public void received(NetworkEvent event) - { - event.delegate(this); - } - - public void exception(Throwable t) - { - this.receiver.exception(t); - } - - public void closed() - { - this.receiver.closed(); - } - - public void init(ProtocolHeader header) - { - emit(0, header); - } - - public void frame(Frame frame) - { - switch (frame.getType()) - { - case BODY: - emit(frame, new Data(frame, frame.isFirstFrame(), - frame.isLastFrame())); - break; - default: - assemble(frame); - break; - } - } - - public void error(ProtocolError error) - { - emit(0, error); - } - - private void assemble(Frame frame) - { - List<ByteBuffer> segment; - if (frame.isFirstFrame()) - { - segment = new ArrayList<ByteBuffer>(); - setSegment(frame, segment); - } - else - { - segment = getSegment(frame); - } - - for (ByteBuffer buf : frame) - { - segment.add(buf); - } - - if (frame.isLastFrame()) - { - clearSegment(frame); - emit(frame, decode(frame, frame.getType(), segment)); - } - } - - private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment) - { - Decoder dec; - if (segment.size() == 1) - { - dec = new BBDecoder(segment.get(0)); - } - else - { - dec = new FragmentDecoder(segment.iterator()); - } - - switch (type) - { - case CONTROL: - int controlType = dec.readUint16(); - Method control = Method.create(controlType); - control.read(dec); - return control; - case COMMAND: - int commandType = dec.readUint16(); - // read in the session header, right now we don't use it - dec.readUint16(); - Method command = Method.create(commandType); - command.read(dec); - return command; - case HEADER: - List<Struct> structs = new ArrayList(); - while (dec.hasRemaining()) - { - structs.add(dec.readStruct32()); - } - return new Header(structs,frame.isLastFrame() && frame.isLastSegment()); - default: - throw new IllegalStateException("unknown frame type: " + type); - } - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java deleted file mode 100644 index da9ba84ab0..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.network; - -import org.apache.qpidity.transport.codec.BBEncoder; -import org.apache.qpidity.transport.codec.SizeEncoder; - -import org.apache.qpidity.transport.ConnectionEvent; -import org.apache.qpidity.transport.Data; -import org.apache.qpidity.transport.Header; -import org.apache.qpidity.transport.Method; -import org.apache.qpidity.transport.ProtocolDelegate; -import org.apache.qpidity.transport.ProtocolError; -import org.apache.qpidity.transport.ProtocolEvent; -import org.apache.qpidity.transport.ProtocolHeader; -import org.apache.qpidity.transport.SegmentType; -import org.apache.qpidity.transport.Sender; -import org.apache.qpidity.transport.Struct; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -import static org.apache.qpidity.transport.network.Frame.*; - -import static java.lang.Math.*; - - -/** - * Disassembler - * - */ - -public class Disassembler implements Sender<ConnectionEvent>, - ProtocolDelegate<ConnectionEvent> -{ - - private final Sender<NetworkEvent> sender; - private final int maxPayload; - - public Disassembler(Sender<NetworkEvent> sender, int maxFrame) - { - if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) - { - throw new IllegalArgumentException - ("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); - } - this.sender = sender; - this.maxPayload = maxFrame - HEADER_SIZE; - - } - - public void send(ConnectionEvent event) - { - event.getProtocolEvent().delegate(event, this); - } - - public void close() - { - sender.close(); - } - - private void fragment(byte flags, SegmentType type, ConnectionEvent event, - ByteBuffer buf, boolean first, boolean last) - { - byte track = event.getProtocolEvent().getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; - - if(!buf.hasRemaining()) - { - //empty data - byte nflags = flags; - if (first) - { - nflags |= FIRST_FRAME; - first = false; - } - nflags |= LAST_FRAME; - Frame frame = new Frame(nflags, type, track, event.getChannel()); - // frame.addFragment(buf); - sender.send(frame); - } - else - { - while (buf.hasRemaining()) - { - ByteBuffer slice = buf.slice(); - slice.limit(min(maxPayload, slice.remaining())); - buf.position(buf.position() + slice.remaining()); - - byte newflags = flags; - if (first) - { - newflags |= FIRST_FRAME; - first = false; - } - if (last && !buf.hasRemaining()) - { - newflags |= LAST_FRAME; - } - - Frame frame = new Frame(newflags, type, track, event.getChannel()); - frame.addFragment(slice); - sender.send(frame); - } - } - } - - public void init(ConnectionEvent event, ProtocolHeader header) - { - sender.send(header); - } - - public void control(ConnectionEvent event, Method method) - { - method(event, method, SegmentType.CONTROL); - } - - public void command(ConnectionEvent event, Method method) - { - method(event, method, SegmentType.COMMAND); - } - - private void method(ConnectionEvent event, Method method, SegmentType type) - { - SizeEncoder sizer = new SizeEncoder(); - sizer.writeUint16(method.getEncodedType()); - if (type == SegmentType.COMMAND) - { - sizer.writeUint16(0); - } - method.write(sizer); - - ByteBuffer buf = ByteBuffer.allocate(sizer.size()); - BBEncoder enc = new BBEncoder(buf); - enc.writeUint16(method.getEncodedType()); - if (type == SegmentType.COMMAND) - { - if (method.isSync()) - { - enc.writeUint16(0x0101); - } - else - { - enc.writeUint16(0x0100); - } - } - method.write(enc); - buf.flip(); - - byte flags = FIRST_SEG; - - if (!method.hasPayload()) - { - flags |= LAST_SEG; - } - - fragment(flags, type, event, buf, true, true); - } - - public void header(ConnectionEvent event, Header header) - { - ByteBuffer buf; - if( header.getBuf() == null) - { - SizeEncoder sizer = new SizeEncoder(); - for (Struct st : header.getStructs()) - { - sizer.writeStruct32(st); - } - - buf = ByteBuffer.allocate(sizer.size()); - BBEncoder enc = new BBEncoder(buf); - for (Struct st : header.getStructs()) - { - enc.writeStruct32(st); - } - header.setBuf(buf); - } - else - { - buf = header.getBuf(); - } - buf.flip(); - fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true); - } - - public void data(ConnectionEvent event, Data data) - { - boolean first = data.isFirst(); - for (Iterator<ByteBuffer> it = data.getFragments().iterator(); - it.hasNext(); ) - { - ByteBuffer buf = it.next(); - boolean last = data.isLast() && !it.hasNext(); - fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last); - first = false; - } - } - - public void error(ConnectionEvent event, ProtocolError error) - { - sender.send(error); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java deleted file mode 100644 index d1c03348b4..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.network; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.transport.ProtocolError; -import org.apache.qpidity.transport.ProtocolHeader; -import org.apache.qpidity.transport.Receiver; -import org.apache.qpidity.transport.SegmentType; - -import static org.apache.qpidity.transport.util.Functions.*; - -import static org.apache.qpidity.transport.network.InputHandler.State.*; - - -/** - * InputHandler - * - * @author Rafael H. Schloming - */ - -public class InputHandler implements Receiver<ByteBuffer> -{ - - public enum State - { - PROTO_HDR, - PROTO_HDR_M, - PROTO_HDR_Q, - PROTO_HDR_P, - PROTO_HDR_CLASS, - PROTO_HDR_INSTANCE, - PROTO_HDR_MAJOR, - PROTO_HDR_MINOR, - FRAME_HDR, - FRAME_HDR_TYPE, - FRAME_HDR_SIZE1, - FRAME_HDR_SIZE2, - FRAME_HDR_RSVD1, - FRAME_HDR_TRACK, - FRAME_HDR_CH1, - FRAME_HDR_CH2, - FRAME_HDR_RSVD2, - FRAME_HDR_RSVD3, - FRAME_HDR_RSVD4, - FRAME_HDR_RSVD5, - FRAME_FRAGMENT, - ERROR; - } - - private final Receiver<NetworkEvent> receiver; - private State state; - - private byte instance; - private byte major; - private byte minor; - - private byte flags; - private SegmentType type; - private byte track; - private int channel; - private int size; - private Frame frame; - - public InputHandler(Receiver<NetworkEvent> receiver, State state) - { - this.receiver = receiver; - this.state = state; - } - - public InputHandler(Receiver<NetworkEvent> receiver) - { - this(receiver, PROTO_HDR); - } - - private void init() - { - receiver.received(new ProtocolHeader(instance, major, minor)); - } - - private void frame() - { - assert size == frame.getSize(); - receiver.received(frame); - frame = null; - } - - private void error(String fmt, Object ... args) - { - receiver.received(new ProtocolError(Frame.L1, fmt, args)); - } - - public void received(ByteBuffer buf) - { - while (buf.hasRemaining()) - { - state = next(buf); - } - } - - private State next(ByteBuffer buf) - { - switch (state) { - case PROTO_HDR: - return expect(buf, 'A', PROTO_HDR_M); - case PROTO_HDR_M: - return expect(buf, 'M', PROTO_HDR_Q); - case PROTO_HDR_Q: - return expect(buf, 'Q', PROTO_HDR_P); - case PROTO_HDR_P: - return expect(buf, 'P', PROTO_HDR_CLASS); - case PROTO_HDR_CLASS: - return expect(buf, 1, PROTO_HDR_INSTANCE); - case PROTO_HDR_INSTANCE: - instance = buf.get(); - return PROTO_HDR_MAJOR; - case PROTO_HDR_MAJOR: - major = buf.get(); - return PROTO_HDR_MINOR; - case PROTO_HDR_MINOR: - minor = buf.get(); - init(); - return FRAME_HDR; - case FRAME_HDR: - flags = buf.get(); - return FRAME_HDR_TYPE; - case FRAME_HDR_TYPE: - type = SegmentType.get(buf.get()); - return FRAME_HDR_SIZE1; - case FRAME_HDR_SIZE1: - size = (0xFF & buf.get()) << 8; - return FRAME_HDR_SIZE2; - case FRAME_HDR_SIZE2: - size += 0xFF & buf.get(); - size -= 12; - if (size < 0 || size > (64*1024 - 12)) - { - error("bad frame size: %d", size); - return ERROR; - } - else - { - return FRAME_HDR_RSVD1; - } - case FRAME_HDR_RSVD1: - return expect(buf, 0, FRAME_HDR_TRACK); - case FRAME_HDR_TRACK: - byte b = buf.get(); - if ((b & 0xF0) != 0) { - error("non-zero reserved bits in upper nibble of " + - "frame header byte 5: '%x'", b); - return ERROR; - } else { - track = (byte) (b & 0xF); - return FRAME_HDR_CH1; - } - case FRAME_HDR_CH1: - channel = (0xFF & buf.get()) << 8; - return FRAME_HDR_CH2; - case FRAME_HDR_CH2: - channel += 0xFF & buf.get(); - return FRAME_HDR_RSVD2; - case FRAME_HDR_RSVD2: - return expect(buf, 0, FRAME_HDR_RSVD3); - case FRAME_HDR_RSVD3: - return expect(buf, 0, FRAME_HDR_RSVD4); - case FRAME_HDR_RSVD4: - return expect(buf, 0, FRAME_HDR_RSVD5); - case FRAME_HDR_RSVD5: - if (!expect(buf, 0)) - { - return ERROR; - } - - frame = new Frame(flags, type, track, channel); - if (size > buf.remaining()) { - frame.addFragment(buf.slice()); - buf.position(buf.limit()); - return FRAME_FRAGMENT; - } else { - ByteBuffer payload = buf.slice(); - payload.limit(size); - buf.position(buf.position() + size); - frame.addFragment(payload); - frame(); - return FRAME_HDR; - } - case FRAME_FRAGMENT: - int delta = size - frame.getSize(); - if (delta > buf.remaining()) { - frame.addFragment(buf.slice()); - buf.position(buf.limit()); - return FRAME_FRAGMENT; - } else { - ByteBuffer fragment = buf.slice(); - fragment.limit(delta); - buf.position(buf.position() + delta); - frame.addFragment(fragment); - frame(); - return FRAME_HDR; - } - default: - throw new IllegalStateException(); - } - } - - private State expect(ByteBuffer buf, int expected, State next) - { - return expect(buf, (byte) expected, next); - } - - private State expect(ByteBuffer buf, char expected, State next) - { - return expect(buf, (byte) expected, next); - } - - private State expect(ByteBuffer buf, byte expected, State next) - { - if (expect(buf, expected)) - { - return next; - } - else - { - return ERROR; - } - } - - private boolean expect(ByteBuffer buf, int expected) - { - return expect(buf, (byte) expected); - } - - private boolean expect(ByteBuffer buf, byte expected) - { - byte b = buf.get(); - if (b == expected) - { - return true; - } - else - { - error("expecting '%x', got '%x'", expected, b); - return false; - } - } - - public void exception(Throwable t) - { - receiver.exception(t); - } - - public void closed() - { - receiver.closed(); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java deleted file mode 100644 index b749332fa3..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.network; - -import java.nio.ByteBuffer; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.qpidity.transport.Constant; -import org.apache.qpidity.transport.ProtocolError; -import org.apache.qpidity.transport.ProtocolHeader; -import org.apache.qpidity.transport.Sender; - -import static org.apache.qpidity.transport.network.Frame.*; - - -/** - * OutputHandler - * - */ - -public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate -{ - - private Sender<ByteBuffer> sender; - private Object lock = new Object(); - private int bytes = 0; - private List<Frame> frames = new ArrayList<Frame>(); - - public OutputHandler(Sender<ByteBuffer> sender) - { - this.sender = sender; - } - - public void send(NetworkEvent event) - { - event.delegate(this); - } - - public void close() - { - synchronized (lock) - { - sender.close(); - } - } - - public void init(ProtocolHeader header) - { - synchronized (lock) - { - sender.send(header.toByteBuffer()); - } - } - - public void frame(Frame frame) - { - synchronized (lock) - { - frames.add(frame); - bytes += HEADER_SIZE + frame.getSize(); - - if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024) - { - ByteBuffer buf = ByteBuffer.allocate(bytes); - for (Frame f : frames) - { - buf.put(f.getFlags()); - buf.put((byte) f.getType().getValue()); - buf.putShort((short) (f.getSize() + HEADER_SIZE)); - // RESERVED - buf.put(RESERVED); - buf.put(f.getTrack()); - buf.putShort((short) f.getChannel()); - // RESERVED - buf.putInt(0); - for(ByteBuffer frg : f) - { - buf.put(frg); - } - } - buf.flip(); - - frames.clear(); - bytes = 0; - - sender.send(buf); - } - } - } - - public void error(ProtocolError error) - { - throw new IllegalStateException("XXX"); - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java deleted file mode 100644 index 568526d9db..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpidity.transport.network.io; - -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.qpidity.transport.Connection; -import org.apache.qpidity.transport.ConnectionDelegate; -import org.apache.qpidity.transport.Receiver; -import org.apache.qpidity.transport.network.Assembler; -import org.apache.qpidity.transport.network.Disassembler; -import org.apache.qpidity.transport.network.InputHandler; -import org.apache.qpidity.transport.network.OutputHandler; -import org.apache.qpidity.transport.util.Logger; - -/** - * This class provides a synchronous IO implementation using - * the java.io classes. The IoHandler runs in its own thread. - * The following params are configurable via JVM arguments - * TCP_NO_DELAY - amqj.tcpNoDelay - * SO_RCVBUF - amqj.receiveBufferSize - * SO_SNDBUF - amqj.sendBufferSize - */ -public class IoHandler implements Runnable -{ - private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; - - private Receiver<ByteBuffer> _receiver; - private Socket _socket; - private byte[] _readBuf; - private static AtomicInteger _count = new AtomicInteger(); - private int _readBufferSize; - private int _writeBufferSize; - - private static final Logger log = Logger.get(IoHandler.class); - - private IoHandler() - { - _readBufferSize = Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE); - _writeBufferSize = Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE); - } - - public static final Connection connect(String host, int port, - ConnectionDelegate delegate) - { - IoHandler handler = new IoHandler(); - return handler.connectInternal(host,port,delegate); - } - - private Connection connectInternal(String host, int port, - ConnectionDelegate delegate) - { - try - { - InetAddress address = InetAddress.getByName(host); - _socket = new Socket(); - _socket.setReuseAddress(true); - _socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); - - log.debug("default-SO_RCVBUF : " + _socket.getReceiveBufferSize()); - log.debug("default-SO_SNDBUF : " + _socket.getSendBufferSize()); - - _socket.setSendBufferSize(_writeBufferSize); - _socket.setReceiveBufferSize(_readBufferSize); - - log.debug("new-SO_RCVBUF : " + _socket.getReceiveBufferSize()); - log.debug("new-SO_SNDBUF : " + _socket.getSendBufferSize()); - - if (address != null) - { - _socket.connect(new InetSocketAddress(address, port)); - } - while (!_socket.isConnected()) - { - - } - - } - catch (SocketException e) - { - throw new RuntimeException("Error connecting to broker",e); - } - catch (IOException e) - { - throw new RuntimeException("Error connecting to broker",e); - } - - IoSender sender = new IoSender(_socket); - Connection con = new Connection - (new Disassembler(new OutputHandler(sender), 64*1024 - 1), - delegate); - - con.setConnectionId(_count.incrementAndGet()); - _receiver = new InputHandler(new Assembler(con), InputHandler.State.PROTO_HDR); - - Thread t = new Thread(this); - t.setName("IO Handler Thread-" + _count.get()); - t.start(); - - return con; - } - - public void run() - { - // I set the read_buffer size simillar to SO_RCVBUF - // Haven't tested with a lower value to see its better or worse - _readBuf = new byte[_readBufferSize]; - try - { - InputStream in = _socket.getInputStream(); - int read = 0; - while(_socket.isConnected()) - { - try - { - read = in.read(_readBuf); - if (read > 0) - { - ByteBuffer b = ByteBuffer.allocate(read); - b.put(_readBuf,0,read); - b.flip(); - _receiver.received(b); - } - } - catch(Exception e) - { - throw new RuntimeException("Error reading from socket input stream",e); - } - } - } - catch (IOException e) - { - throw new RuntimeException("Error getting input stream from the socket",e); - } - finally - { - try - { - _socket.close(); - } - catch(Exception e) - { - log.error(e,"Error closing socket"); - } - } - } - - /** - * Will experiment in a future version with batching - */ - public static void startBatchingFrames(int connectionId) - { - - } - - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java deleted file mode 100644 index 1adde531a6..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.transport.network.io; - -import java.io.IOException; -import java.io.OutputStream; -import java.net.Socket; - -import org.apache.qpidity.transport.Sender; - -public class IoSender implements Sender<java.nio.ByteBuffer> -{ - private final Object lock = new Object(); - private Socket _socket; - private OutputStream _outStream; - - public IoSender(Socket socket) - { - this._socket = socket; - try - { - _outStream = _socket.getOutputStream(); - } - catch(IOException e) - { - throw new RuntimeException("Error getting output stream for socket",e); - } - } - - /* - * Currently I don't implement any in memory buffering - * and just write straight to the wire. - * I want to experiment with buffering and see if I can - * get more performance, all though latency will suffer - * a bit. - */ - public void send(java.nio.ByteBuffer buf) - { - write(buf); - } - - /* The extra copying sucks. - * If I know for sure that the buf is backed - * by an array then I could do buf.array() - */ - private void write(java.nio.ByteBuffer buf) - { - byte[] array = new byte[buf.remaining()]; - buf.get(array); - if( _socket.isConnected()) - { - synchronized (lock) - { - try - { - _outStream.write(array); - } - catch(Exception e) - { - e.fillInStackTrace(); - throw new RuntimeException("Error trying to write to the socket",e); - } - } - } - else - { - throw new RuntimeException("Trying to write on a closed socket"); - } - } - - /* - * Haven't used this, but the intention is - * to experiment with it in the future. - * Also need to make sure the buffer size - * is configurable - */ - public void setStartBatching() - { - } - - public void close() - { - synchronized (lock) - { - try - { - _socket.close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - } -} diff --git a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 1a83786e12..b9ca210483 100644 --- a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -18,14 +18,16 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; import org.apache.mina.util.AvailablePortFinder; import org.apache.qpid.util.concurrent.Condition; -import org.apache.qpidity.transport.network.mina.MinaHandler; -import org.apache.qpidity.transport.util.Logger; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoAcceptor; +import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.transport.util.Logger; import junit.framework.TestCase; @@ -62,12 +64,14 @@ public class ConnectionTest extends TestCase public void closed() {} }; - MinaHandler.accept("localhost", port, server); + IoAcceptor ioa = new IoAcceptor + ("localhost", port, new ConnectionBinding(server)); + ioa.start(); } private Connection connect(final Condition closed) { - Connection conn = MinaHandler.connect("localhost", port, new ConnectionDelegate() + Connection conn = IoTransport.connect("localhost", port, new ConnectionDelegate() { public SessionDelegate getSessionDelegate() { @@ -86,7 +90,7 @@ public class ConnectionTest extends TestCase } }); - conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10))); + conn.send(new ProtocolHeader(1, 0, 10)); return conn; } diff --git a/java/common/src/test/java/org/apache/qpidity/transport/RangeSetTest.java b/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java index 474d47d8d7..ad45d00e46 100644 --- a/java/common/src/test/java/org/apache/qpidity/transport/RangeSetTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpidity.transport; +package org.apache.qpid.transport; import java.util.ArrayList; import java.util.Collections; |
