diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2008-04-21 14:03:35 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2008-04-21 14:03:35 +0000 |
| commit | 9930676624d4c27f866c9d40227fd7c282f4dac2 (patch) | |
| tree | ef3639dcdbd8e93160a8286b7f13fecb4e55abcb /java/common/src | |
| parent | 1d69ea16b30dd67ac32683e6dc512f4c58ef93f1 (diff) | |
| download | qpid-python-9930676624d4c27f866c9d40227fd7c282f4dac2.tar.gz | |
Initial checkpoint of queue refactoring work
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@650148 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
7 files changed, 194 insertions, 78 deletions
diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java index bed80d5954..0c311b6645 100644 --- a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java +++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java @@ -62,7 +62,6 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator private static final class FixedSizeByteBuffer extends ByteBuffer { private java.nio.ByteBuffer buf; - private int refCount = 1; private int mark = -1; @@ -70,36 +69,14 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator { this.buf = buf; buf.order( ByteOrder.BIG_ENDIAN ); - refCount = 1; } public synchronized void acquire() { - if( refCount <= 0 ) - { - throw new IllegalStateException( "Already released buffer." ); - } - - refCount ++; } public void release() { - synchronized( this ) - { - if( refCount <= 0 ) - { - refCount = 0; - throw new IllegalStateException( - "Already released buffer. You released the buffer too many times." ); - } - - refCount --; - if( refCount > 0) - { - return; - } - } } public java.nio.ByteBuffer buf() @@ -157,50 +134,12 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator { if( newCapacity > capacity() ) { - // Allocate a new buffer and transfer all settings to it. - int pos = position(); - int limit = limit(); - ByteOrder bo = order(); - - capacity0( newCapacity ); - buf.limit( limit ); - if( mark >= 0 ) - { - buf.position( mark ); - buf.mark(); - } - buf.position( pos ); - buf.order( bo ); + throw new IllegalArgumentException(); } return this; } - protected void capacity0( int requestedCapacity ) - { - int newCapacity = MINIMUM_CAPACITY; - while( newCapacity < requestedCapacity ) - { - newCapacity <<= 1; - } - - java.nio.ByteBuffer oldBuf = this.buf; - java.nio.ByteBuffer newBuf; - if( isDirect() ) - { - newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity ); - } - else - { - newBuf = java.nio.ByteBuffer.allocate( newCapacity ); - } - - newBuf.clear(); - oldBuf.clear(); - newBuf.put( oldBuf ); - this.buf = newBuf; - } - public boolean isAutoExpand() diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java index 810d12f472..2ab3ddb50e 100644 --- a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java +++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java @@ -91,7 +91,7 @@ import org.apache.mina.common.IoSession; * </pre> * * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 598285 $, $Date: 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) $ + * @version $Rev$, $Date$ */ public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter { diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java index cb24102edd..30c64f44cd 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java @@ -376,7 +376,8 @@ public class MultiThreadSocketConnector extends SocketConnector // Set the ConnectFuture of the specified session, which will be // removed and notified by AbstractIoFilterChain eventually. - session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); + + session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); // Forward the remaining process to the SocketIoProcessor. session.getIoProcessor().addNew(session); diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java index 67f16e6a87..7371c12519 100644 --- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.common; +import org.apache.qpid.framing.AMQShortString; + /** * Specifies the available client property types that different clients can use to identify themselves with. * @@ -30,8 +32,21 @@ package org.apache.qpid.common; */ public enum ClientProperties { - instance, - product, - version, - platform + instance("instance"), + product("product"), + version("version"), + platform("platform"); + + private final AMQShortString _amqShortString; + + private ClientProperties(String name) + { + _amqShortString = new AMQShortString(name); + } + + + public AMQShortString toAMQShortString() + { + return _amqShortString; + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 665cbf7a84..ad2ab2ac0b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -90,7 +90,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt private AMQShortString substring(final int from, final int to) { - return new AMQShortString(_data, from, to); + return new AMQShortString(_data, from+_offset, to+_offset); } @@ -184,11 +184,22 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt private AMQShortString(ByteBuffer data, final int length) { - byte[] dataBytes = new byte[length]; - data.get(dataBytes); - _data = dataBytes; + if(data.isDirect() || data.isReadOnly()) + { + byte[] dataBytes = new byte[length]; + data.get(dataBytes); + _data = dataBytes; + _offset = 0; + } + else + { + + _data = data.array(); + _offset = data.arrayOffset() + data.position(); + data.skip(length); + + } _length = length; - _offset = 0; } @@ -199,6 +210,20 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt _data = data; } + public AMQShortString shrink() + { + if(_data.length != _length) + { + byte[] dataBytes = new byte[_length]; + System.arraycopy(_data,_offset,dataBytes,0,_length); + return new AMQShortString(dataBytes,0,_length); + } + else + { + return this; + } + } + /** * Get the length of the short string @@ -572,7 +597,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt ref = _globalInternMap.get(this); if((ref == null) || ((internString = ref.get()) == null)) { - internString = new AMQShortString(getBytes()); + internString = shrink(); ref = new WeakReference(internString); _globalInternMap.put(internString, ref); } @@ -651,7 +676,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public int toIntValue() { - int pos = 0; + int pos = _offset; int val = 0; @@ -660,7 +685,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt { pos++; } - while(pos < _length) + while(pos < _length + _offset) { int digit = (int) (_data[pos++] - ZERO); if((digit < 0) || (digit > 9)) @@ -679,7 +704,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public boolean contains(final byte b) { - for(int i = 0; i < _length; i++) + for(int i = _offset; i < _length + _offset; i++) { if(_data[i] == b) { @@ -689,4 +714,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return false; //To change body of created methods use File | Settings | File Templates. } + + public static void main(String args[]) + { + AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k"); + AMQShortString s2 = s.substring(2, 7); + + AMQShortStringTokenizer t = s2.tokenize((byte) '.'); + while(t.hasMoreTokens()) + { + System.err.println(t.nextToken()); + } + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index e5b1fad9a8..4a1a2e709d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -93,4 +93,24 @@ public class AMQTypedValue {
return "[" + getType() + ": " + getValue() + "]";
}
+
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof AMQTypedValue)
+ {
+ AMQTypedValue other = (AMQTypedValue) o;
+ return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index ee6762181d..d87da9cdad 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -74,7 +74,7 @@ public class FieldTable buffer.skip((int) length); } - private AMQTypedValue getProperty(AMQShortString string) + public AMQTypedValue getProperty(AMQShortString string) { checkPropertyName(string); @@ -891,6 +891,20 @@ public class FieldTable return keys; } + public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator() + { + if(_encodedForm != null) + { + return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize); + } + else + { + initMapIfNecessary(); + return _properties.entrySet().iterator(); + } + } + + public Object get(AMQShortString key) { @@ -1045,6 +1059,95 @@ public class FieldTable } } + private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue> + { + private final AMQTypedValue _value; + private final AMQShortString _key; + + public FieldTableEntry(final AMQShortString key, final AMQTypedValue value) + { + _key = key; + _value = value; + } + + public AMQShortString getKey() + { + return _key; + } + + public AMQTypedValue getValue() + { + return _value; + } + + public AMQTypedValue setValue(final AMQTypedValue value) + { + throw new UnsupportedOperationException(); + } + + public boolean equals(Object o) + { + if(o instanceof FieldTableEntry) + { + FieldTableEntry other = (FieldTableEntry) o; + return (_key == null ? other._key == null : _key.equals(other._key)) + && (_value == null ? other._value == null : _value.equals(other._value)); + } + else + { + return false; + } + } + + public int hashCode() + { + return (getKey()==null ? 0 : getKey().hashCode()) + ^ (getValue()==null ? 0 : getValue().hashCode()); + } + + } + + + private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>> + { + + private final ByteBuffer _buffer; + private int _expectedRemaining; + + public FieldTableIterator(ByteBuffer buffer, int length) + { + _buffer = buffer; + _expectedRemaining = buffer.remaining() - length; + } + + public boolean hasNext() + { + return (_buffer.remaining() > _expectedRemaining); + } + + public Map.Entry<AMQShortString, AMQTypedValue> next() + { + if(hasNext()) + { + final AMQShortString key = EncodingUtils.readAMQShortString(_buffer); + AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer); + return new FieldTableEntry(key, value); + } + else + { + return null; + } + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } + + + + public int hashCode() { initMapIfNecessary(); |
