summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-04-21 14:03:35 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-04-21 14:03:35 +0000
commit9930676624d4c27f866c9d40227fd7c282f4dac2 (patch)
treeef3639dcdbd8e93160a8286b7f13fecb4e55abcb /java/common/src
parent1d69ea16b30dd67ac32683e6dc512f4c58ef93f1 (diff)
downloadqpid-python-9930676624d4c27f866c9d40227fd7c282f4dac2.tar.gz
Initial checkpoint of queue refactoring work
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@650148 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java63
-rw-r--r--java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java2
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/ClientProperties.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java56
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java105
7 files changed, 194 insertions, 78 deletions
diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
index bed80d5954..0c311b6645 100644
--- a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
+++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
@@ -62,7 +62,6 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
private static final class FixedSizeByteBuffer extends ByteBuffer
{
private java.nio.ByteBuffer buf;
- private int refCount = 1;
private int mark = -1;
@@ -70,36 +69,14 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
{
this.buf = buf;
buf.order( ByteOrder.BIG_ENDIAN );
- refCount = 1;
}
public synchronized void acquire()
{
- if( refCount <= 0 )
- {
- throw new IllegalStateException( "Already released buffer." );
- }
-
- refCount ++;
}
public void release()
{
- synchronized( this )
- {
- if( refCount <= 0 )
- {
- refCount = 0;
- throw new IllegalStateException(
- "Already released buffer. You released the buffer too many times." );
- }
-
- refCount --;
- if( refCount > 0)
- {
- return;
- }
- }
}
public java.nio.ByteBuffer buf()
@@ -157,50 +134,12 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
{
if( newCapacity > capacity() )
{
- // Allocate a new buffer and transfer all settings to it.
- int pos = position();
- int limit = limit();
- ByteOrder bo = order();
-
- capacity0( newCapacity );
- buf.limit( limit );
- if( mark >= 0 )
- {
- buf.position( mark );
- buf.mark();
- }
- buf.position( pos );
- buf.order( bo );
+ throw new IllegalArgumentException();
}
return this;
}
- protected void capacity0( int requestedCapacity )
- {
- int newCapacity = MINIMUM_CAPACITY;
- while( newCapacity < requestedCapacity )
- {
- newCapacity <<= 1;
- }
-
- java.nio.ByteBuffer oldBuf = this.buf;
- java.nio.ByteBuffer newBuf;
- if( isDirect() )
- {
- newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity );
- }
- else
- {
- newBuf = java.nio.ByteBuffer.allocate( newCapacity );
- }
-
- newBuf.clear();
- oldBuf.clear();
- newBuf.put( oldBuf );
- this.buf = newBuf;
- }
-
public boolean isAutoExpand()
diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
index 810d12f472..2ab3ddb50e 100644
--- a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
+++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
@@ -91,7 +91,7 @@ import org.apache.mina.common.IoSession;
* </pre>
*
* @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 598285 $, $Date: 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) $
+ * @version $Rev$, $Date$
*/
public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter {
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
index cb24102edd..30c64f44cd 100644
--- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
@@ -376,7 +376,8 @@ public class MultiThreadSocketConnector extends SocketConnector
// Set the ConnectFuture of the specified session, which will be
// removed and notified by AbstractIoFilterChain eventually.
- session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
+
+ session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
// Forward the remaining process to the SocketIoProcessor.
session.getIoProcessor().addNew(session);
diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
index 67f16e6a87..7371c12519 100644
--- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.common;
+import org.apache.qpid.framing.AMQShortString;
+
/**
* Specifies the available client property types that different clients can use to identify themselves with.
*
@@ -30,8 +32,21 @@ package org.apache.qpid.common;
*/
public enum ClientProperties
{
- instance,
- product,
- version,
- platform
+ instance("instance"),
+ product("product"),
+ version("version"),
+ platform("platform");
+
+ private final AMQShortString _amqShortString;
+
+ private ClientProperties(String name)
+ {
+ _amqShortString = new AMQShortString(name);
+ }
+
+
+ public AMQShortString toAMQShortString()
+ {
+ return _amqShortString;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index 665cbf7a84..ad2ab2ac0b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -90,7 +90,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private AMQShortString substring(final int from, final int to)
{
- return new AMQShortString(_data, from, to);
+ return new AMQShortString(_data, from+_offset, to+_offset);
}
@@ -184,11 +184,22 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private AMQShortString(ByteBuffer data, final int length)
{
- byte[] dataBytes = new byte[length];
- data.get(dataBytes);
- _data = dataBytes;
+ if(data.isDirect() || data.isReadOnly())
+ {
+ byte[] dataBytes = new byte[length];
+ data.get(dataBytes);
+ _data = dataBytes;
+ _offset = 0;
+ }
+ else
+ {
+
+ _data = data.array();
+ _offset = data.arrayOffset() + data.position();
+ data.skip(length);
+
+ }
_length = length;
- _offset = 0;
}
@@ -199,6 +210,20 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
_data = data;
}
+ public AMQShortString shrink()
+ {
+ if(_data.length != _length)
+ {
+ byte[] dataBytes = new byte[_length];
+ System.arraycopy(_data,_offset,dataBytes,0,_length);
+ return new AMQShortString(dataBytes,0,_length);
+ }
+ else
+ {
+ return this;
+ }
+ }
+
/**
* Get the length of the short string
@@ -572,7 +597,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
ref = _globalInternMap.get(this);
if((ref == null) || ((internString = ref.get()) == null))
{
- internString = new AMQShortString(getBytes());
+ internString = shrink();
ref = new WeakReference(internString);
_globalInternMap.put(internString, ref);
}
@@ -651,7 +676,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public int toIntValue()
{
- int pos = 0;
+ int pos = _offset;
int val = 0;
@@ -660,7 +685,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
{
pos++;
}
- while(pos < _length)
+ while(pos < _length + _offset)
{
int digit = (int) (_data[pos++] - ZERO);
if((digit < 0) || (digit > 9))
@@ -679,7 +704,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public boolean contains(final byte b)
{
- for(int i = 0; i < _length; i++)
+ for(int i = _offset; i < _length + _offset; i++)
{
if(_data[i] == b)
{
@@ -689,4 +714,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return false; //To change body of created methods use File | Settings | File Templates.
}
+
+ public static void main(String args[])
+ {
+ AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k");
+ AMQShortString s2 = s.substring(2, 7);
+
+ AMQShortStringTokenizer t = s2.tokenize((byte) '.');
+ while(t.hasMoreTokens())
+ {
+ System.err.println(t.nextToken());
+ }
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index e5b1fad9a8..4a1a2e709d 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -93,4 +93,24 @@ public class AMQTypedValue
{
return "[" + getType() + ": " + getValue() + "]";
}
+
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof AMQTypedValue)
+ {
+ AMQTypedValue other = (AMQTypedValue) o;
+ return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index ee6762181d..d87da9cdad 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -74,7 +74,7 @@ public class FieldTable
buffer.skip((int) length);
}
- private AMQTypedValue getProperty(AMQShortString string)
+ public AMQTypedValue getProperty(AMQShortString string)
{
checkPropertyName(string);
@@ -891,6 +891,20 @@ public class FieldTable
return keys;
}
+ public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator()
+ {
+ if(_encodedForm != null)
+ {
+ return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize);
+ }
+ else
+ {
+ initMapIfNecessary();
+ return _properties.entrySet().iterator();
+ }
+ }
+
+
public Object get(AMQShortString key)
{
@@ -1045,6 +1059,95 @@ public class FieldTable
}
}
+ private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue>
+ {
+ private final AMQTypedValue _value;
+ private final AMQShortString _key;
+
+ public FieldTableEntry(final AMQShortString key, final AMQTypedValue value)
+ {
+ _key = key;
+ _value = value;
+ }
+
+ public AMQShortString getKey()
+ {
+ return _key;
+ }
+
+ public AMQTypedValue getValue()
+ {
+ return _value;
+ }
+
+ public AMQTypedValue setValue(final AMQTypedValue value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof FieldTableEntry)
+ {
+ FieldTableEntry other = (FieldTableEntry) o;
+ return (_key == null ? other._key == null : _key.equals(other._key))
+ && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return (getKey()==null ? 0 : getKey().hashCode())
+ ^ (getValue()==null ? 0 : getValue().hashCode());
+ }
+
+ }
+
+
+ private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>>
+ {
+
+ private final ByteBuffer _buffer;
+ private int _expectedRemaining;
+
+ public FieldTableIterator(ByteBuffer buffer, int length)
+ {
+ _buffer = buffer;
+ _expectedRemaining = buffer.remaining() - length;
+ }
+
+ public boolean hasNext()
+ {
+ return (_buffer.remaining() > _expectedRemaining);
+ }
+
+ public Map.Entry<AMQShortString, AMQTypedValue> next()
+ {
+ if(hasNext())
+ {
+ final AMQShortString key = EncodingUtils.readAMQShortString(_buffer);
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer);
+ return new FieldTableEntry(key, value);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+
+
public int hashCode()
{
initMapIfNecessary();