From 9930676624d4c27f866c9d40227fd7c282f4dac2 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 21 Apr 2008 14:03:35 +0000 Subject: Initial checkpoint of queue refactoring work git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@650148 13f79535-47bb-0310-9956-ffa450edef68 --- .../mina/common/FixedSizeByteBufferAllocator.java | 63 +------------ .../filter/codec/OurCumulativeProtocolDecoder.java | 2 +- .../socket/nio/MultiThreadSocketConnector.java | 3 +- .../org/apache/qpid/common/ClientProperties.java | 23 ++++- .../org/apache/qpid/framing/AMQShortString.java | 56 +++++++++-- .../org/apache/qpid/framing/AMQTypedValue.java | 20 ++++ .../java/org/apache/qpid/framing/FieldTable.java | 105 ++++++++++++++++++++- 7 files changed, 194 insertions(+), 78 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java index bed80d5954..0c311b6645 100644 --- a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java +++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java @@ -62,7 +62,6 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator private static final class FixedSizeByteBuffer extends ByteBuffer { private java.nio.ByteBuffer buf; - private int refCount = 1; private int mark = -1; @@ -70,36 +69,14 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator { this.buf = buf; buf.order( ByteOrder.BIG_ENDIAN ); - refCount = 1; } public synchronized void acquire() { - if( refCount <= 0 ) - { - throw new IllegalStateException( "Already released buffer." ); - } - - refCount ++; } public void release() { - synchronized( this ) - { - if( refCount <= 0 ) - { - refCount = 0; - throw new IllegalStateException( - "Already released buffer. You released the buffer too many times." ); - } - - refCount --; - if( refCount > 0) - { - return; - } - } } public java.nio.ByteBuffer buf() @@ -157,50 +134,12 @@ public class FixedSizeByteBufferAllocator implements ByteBufferAllocator { if( newCapacity > capacity() ) { - // Allocate a new buffer and transfer all settings to it. - int pos = position(); - int limit = limit(); - ByteOrder bo = order(); - - capacity0( newCapacity ); - buf.limit( limit ); - if( mark >= 0 ) - { - buf.position( mark ); - buf.mark(); - } - buf.position( pos ); - buf.order( bo ); + throw new IllegalArgumentException(); } return this; } - protected void capacity0( int requestedCapacity ) - { - int newCapacity = MINIMUM_CAPACITY; - while( newCapacity < requestedCapacity ) - { - newCapacity <<= 1; - } - - java.nio.ByteBuffer oldBuf = this.buf; - java.nio.ByteBuffer newBuf; - if( isDirect() ) - { - newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity ); - } - else - { - newBuf = java.nio.ByteBuffer.allocate( newCapacity ); - } - - newBuf.clear(); - oldBuf.clear(); - newBuf.put( oldBuf ); - this.buf = newBuf; - } - public boolean isAutoExpand() diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java index 810d12f472..2ab3ddb50e 100644 --- a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java +++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java @@ -91,7 +91,7 @@ import org.apache.mina.common.IoSession; * * * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 598285 $, $Date: 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) $ + * @version $Rev$, $Date$ */ public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter { diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java index cb24102edd..30c64f44cd 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java @@ -376,7 +376,8 @@ public class MultiThreadSocketConnector extends SocketConnector // Set the ConnectFuture of the specified session, which will be // removed and notified by AbstractIoFilterChain eventually. - session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); + + session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); // Forward the remaining process to the SocketIoProcessor. session.getIoProcessor().addNew(session); diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java index 67f16e6a87..7371c12519 100644 --- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.common; +import org.apache.qpid.framing.AMQShortString; + /** * Specifies the available client property types that different clients can use to identify themselves with. * @@ -30,8 +32,21 @@ package org.apache.qpid.common; */ public enum ClientProperties { - instance, - product, - version, - platform + instance("instance"), + product("product"), + version("version"), + platform("platform"); + + private final AMQShortString _amqShortString; + + private ClientProperties(String name) + { + _amqShortString = new AMQShortString(name); + } + + + public AMQShortString toAMQShortString() + { + return _amqShortString; + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 665cbf7a84..ad2ab2ac0b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -90,7 +90,7 @@ public final class AMQShortString implements CharSequence, Comparable 9)) @@ -679,7 +704,7 @@ public final class AMQShortString implements CharSequence, Comparable> iterator() + { + if(_encodedForm != null) + { + return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize); + } + else + { + initMapIfNecessary(); + return _properties.entrySet().iterator(); + } + } + + public Object get(AMQShortString key) { @@ -1045,6 +1059,95 @@ public class FieldTable } } + private static final class FieldTableEntry implements Map.Entry + { + private final AMQTypedValue _value; + private final AMQShortString _key; + + public FieldTableEntry(final AMQShortString key, final AMQTypedValue value) + { + _key = key; + _value = value; + } + + public AMQShortString getKey() + { + return _key; + } + + public AMQTypedValue getValue() + { + return _value; + } + + public AMQTypedValue setValue(final AMQTypedValue value) + { + throw new UnsupportedOperationException(); + } + + public boolean equals(Object o) + { + if(o instanceof FieldTableEntry) + { + FieldTableEntry other = (FieldTableEntry) o; + return (_key == null ? other._key == null : _key.equals(other._key)) + && (_value == null ? other._value == null : _value.equals(other._value)); + } + else + { + return false; + } + } + + public int hashCode() + { + return (getKey()==null ? 0 : getKey().hashCode()) + ^ (getValue()==null ? 0 : getValue().hashCode()); + } + + } + + + private static final class FieldTableIterator implements Iterator> + { + + private final ByteBuffer _buffer; + private int _expectedRemaining; + + public FieldTableIterator(ByteBuffer buffer, int length) + { + _buffer = buffer; + _expectedRemaining = buffer.remaining() - length; + } + + public boolean hasNext() + { + return (_buffer.remaining() > _expectedRemaining); + } + + public Map.Entry next() + { + if(hasNext()) + { + final AMQShortString key = EncodingUtils.readAMQShortString(_buffer); + AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer); + return new FieldTableEntry(key, value); + } + else + { + return null; + } + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } + + + + public int hashCode() { initMapIfNecessary(); -- cgit v1.2.1