diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
| commit | 7e34266b9a23f4536415bfbc3f161b84615b6550 (patch) | |
| tree | 484008cf2d413f58b5e4ab80b373303c66200888 /RC9/qpid/dotnet/Qpid.Common/Framing | |
| parent | 4612263ea692f00a4bd810438bdaf9bc88022091 (diff) | |
| download | qpid-python-M4.tar.gz | |
Tag M4 RC9M4
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@734202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC9/qpid/dotnet/Qpid.Common/Framing')
27 files changed, 3767 insertions, 0 deletions
diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/AMQDataBlockDecoder.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQDataBlockDecoder.cs new file mode 100644 index 0000000000..7867650e50 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQDataBlockDecoder.cs @@ -0,0 +1,155 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Codec; +using Apache.Qpid.Codec.Demux; + +namespace Apache.Qpid.Framing +{ + public class AMQDataBlockDecoder : IMessageDecoder + { + private static ILog _logger = LogManager.GetLogger(typeof(AMQDataBlockDecoder)); + + private Hashtable _supportedBodies = new Hashtable(); + + private bool _disabled = false; + + public AMQDataBlockDecoder() + { + _supportedBodies[AMQMethodBody.TYPE] = AMQMethodBodyFactory.GetInstance(); + _supportedBodies[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.GetInstance(); + _supportedBodies[ContentBody.TYPE] = ContentBodyFactory.GetInstance(); + _supportedBodies[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); + } + + public MessageDecoderResult Decodable(ByteBuffer input) + { + if (_disabled) + { + return MessageDecoderResult.NOT_OK; + } + // final +1 represents the command end which we know we must require even + // if there is an empty body + if (input.Remaining < 1) + { + return MessageDecoderResult.NEED_DATA; + } + byte type = input.GetByte(); + + // we have to check this isn't a protocol initiation frame here - we can't tell later on and we end up + // waiting for more data. This could be improved if MINA supported some kind of state awareness when decoding + if ((char)type == 'A') + { + _logger.Error("Received what appears to be a protocol initiation frame"); + return MessageDecoderResult.NOT_OK; + } + // zero, channel, body size and end byte + if (input.Remaining < (1 + 2 + 4 + 1)) + { + return MessageDecoderResult.NEED_DATA; + } + + int channel = input.GetUInt16(); + long bodySize = input.GetUInt32(); + + // bodySize can be zero + if (type <= 0 || channel < 0 || bodySize < 0) + { + _logger.Error(String.Format("Error decoding frame: Type={0}, Channel={1}, BodySize={2}", type, channel, bodySize)); + return MessageDecoderResult.NOT_OK; + } + + if (input.Remaining < (bodySize + 1)) + { + return MessageDecoderResult.NEED_DATA; + } + + if (IsSupportedFrameType(type)) + { + if (_logger.IsDebugEnabled) + { + // we have read 7 bytes so far, so output 7 + bodysize + 1 (for end byte) to get complete data block size + // this logging statement is useful when looking at exactly what size of data is coming in/out + // the broker + _logger.Debug("Able to decode data block of size " + (bodySize + 8)); + } + return MessageDecoderResult.OK; + } + else + { + return MessageDecoderResult.NOT_OK; + } + } + + private bool IsSupportedFrameType(byte frameType) + { + bool result = _supportedBodies.ContainsKey(frameType); + + if (!result) + { + _logger.Warn("AMQDataBlockDecoder does not handle frame type " + frameType); + } + + return result; + } + + protected Object CreateAndPopulateFrame(ByteBuffer input) + { + byte type = input.GetByte(); + ushort channel = input.GetUInt16(); + uint bodySize = input.GetUInt32(); + + IBodyFactory bodyFactory = (IBodyFactory)_supportedBodies[type]; + if (bodyFactory == null) + { + throw new AMQFrameDecodingException("Unsupported body type: " + type); + } + AMQFrame frame = new AMQFrame(); + + frame.PopulateFromBuffer(input, channel, bodySize, bodyFactory); + + byte marker = input.GetByte(); + if (marker != 0xCE) { + throw new FormatException("marker is not 0xCE"); + } + return frame; + } + + public MessageDecoderResult Decode(ByteBuffer input, IProtocolDecoderOutput output) + { + + output.Write(CreateAndPopulateFrame(input)); + + return MessageDecoderResult.OK; + } + + public bool Disabled + { + set + { + _disabled = value; + } + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/AMQDataBlockEncoder.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQDataBlockEncoder.cs new file mode 100644 index 0000000000..e2645c630e --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQDataBlockEncoder.cs @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Codec; +using Apache.Qpid.Codec.Demux; + +namespace Apache.Qpid.Framing +{ + public class AMQDataBlockEncoder : IMessageEncoder + { + private static ILog _logger = LogManager.GetLogger(typeof(AMQDataBlockEncoder)); + + private Hashtable _messageTypes; + + public AMQDataBlockEncoder() + { + _messageTypes = new Hashtable(); + _messageTypes[typeof (IEncodableAMQDataBlock)] = 1; + } + + + public Hashtable MessageTypes + { + get + { + return _messageTypes; + } + } + + public void Encode(object message, IProtocolEncoderOutput output) + { + IDataBlock frame = (IDataBlock) message; + int frameSize = (int)frame.Size; // TODO: sort out signed/unsigned + ByteBuffer buffer = ByteBuffer.Allocate(frameSize); + frame.WritePayload(buffer); + + if (_logger.IsDebugEnabled) + { + _logger.Debug("Encoded frame byte-buffer is '" + ByteBufferHexDumper.GetHexDump(buffer) + "'"); + } + buffer.Flip(); + output.Write(buffer); + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/AMQFrame.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQFrame.cs new file mode 100644 index 0000000000..912be72d30 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQFrame.cs @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class AMQFrame : IDataBlock + { + private ushort _channel; + + private IBody _bodyFrame; + + public AMQFrame() + { + } + + public AMQFrame(ushort channel, IBody bodyFrame) + { + _channel = channel; + _bodyFrame = bodyFrame; + } + + public ushort Channel + { + get + { + return _channel; + } + set + { + _channel = value; + } + } + + public IBody BodyFrame + { + get + { + return _bodyFrame; + } + set + { + _bodyFrame = value; + } + } + + #region IDataBlock Members + + public uint Size + { + get + { + return (uint) (1 + 2 + 4 + _bodyFrame.Size + 1); + } + } + + public void WritePayload(ByteBuffer buffer) + { + buffer.Put(_bodyFrame.BodyType); + // TODO: how does channel get populated + buffer.Put(_channel); + buffer.Put(_bodyFrame.Size); + _bodyFrame.WritePayload(buffer); + buffer.Put((byte) 0xCE); + } + + #endregion + + /// <summary> + /// Populates the frame instance data from the supplied buffer. + /// </summary> + /// <param name="buffer">The buffer.</param> + /// <param name="channel">The channel.</param> + /// <param name="bodySize">Size of the body in bytes</param> + /// <param name="bodyFactory">The body factory.</param> + /// <exception cref="AMQFrameDecodingException">Thrown if the buffer cannot be decoded</exception> + public void PopulateFromBuffer(ByteBuffer buffer, ushort channel, uint bodySize, IBodyFactory bodyFactory) + { + _channel = channel; + _bodyFrame = bodyFactory.CreateBody(buffer); + _bodyFrame.PopulateFromBuffer(buffer, bodySize); + } + + public override string ToString() + { + return "Frame channelId: " + _channel + ", bodyFrame: " + _bodyFrame.ToString(); + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/AMQFrameDecodingException.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQFrameDecodingException.cs new file mode 100644 index 0000000000..cda8c84ecf --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQFrameDecodingException.cs @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; +using log4net; + +namespace Apache.Qpid.Framing +{ + /// <summary> + /// Thrown when a frame cannot be decoded. This generally indicates a mismatch between the broker and the + /// client. + /// </summary> + [Serializable] + public class AMQFrameDecodingException : AMQException + { + public AMQFrameDecodingException(string message) + : base(message) + { + } + + public AMQFrameDecodingException(string message, Exception innerException) + : base(message, innerException) + { + } + + public AMQFrameDecodingException(ILog logger, string message) + : base(logger, message) + { + } + + public AMQFrameDecodingException(ILog logger, string message, Exception innerException) + : base(logger, message, innerException) + { + } + + protected AMQFrameDecodingException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/AMQMethodBody.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQMethodBody.cs new file mode 100644 index 0000000000..a3c4337147 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQMethodBody.cs @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public abstract class AMQMethodBody : IBody + { + public const byte TYPE = 1; + + protected abstract uint BodySize + { + get; + } + + protected abstract ushort Clazz + { + get; + } + + protected abstract ushort Method + { + get; + } + + protected abstract void WriteMethodPayload(ByteBuffer buffer); + + public byte BodyType + { + get + { + return TYPE; + } + } + + public uint Size + { + get + { + return (uint) (2 + 2 + BodySize); + } + } + + public void WritePayload(ByteBuffer buffer) + { + buffer.Put(Clazz); + buffer.Put(Method); + WriteMethodPayload(buffer); + } + + /// <summary> + /// Populates the method body by decoding the specified buffer + /// </summary> + /// <param name="buffer">The buffer to decode.</param> + /// <exception cref="AMQFrameDecodingException">If the buffer cannot be decoded</exception> + protected abstract void PopulateMethodBodyFromBuffer(ByteBuffer buffer); + + /// <summary> + /// Populates this instance from a buffer of data. + /// </summary> + /// <param name="buffer">The buffer.</param> + /// <param name="size">The size.</param> + /// <exception cref="AMQFrameDecodingException">If the buffer contains data that cannot be decoded</exception> + public void PopulateFromBuffer(ByteBuffer buffer, uint size) + { + PopulateMethodBodyFromBuffer(buffer); + } + + public override string ToString() + { + return String.Format("{0}{{ Class: {1} Method: {2} }}", GetType().Name, Clazz, Method); + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/AMQMethodBodyFactory.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQMethodBodyFactory.cs new file mode 100644 index 0000000000..c1fd3f887a --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQMethodBodyFactory.cs @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class AMQMethodBodyFactory : IBodyFactory + { + private static readonly AMQMethodBodyFactory _instance = new AMQMethodBodyFactory(); + + public static AMQMethodBodyFactory GetInstance() + { + return _instance; + } + + /// <summary> + /// Creates the body. + /// </summary> + /// <param name="inbuf">The ByteBuffer containing data from the network</param> + /// <returns></returns> + /// <exception>AMQFrameDecodingException</exception> + public IBody CreateBody(ByteBuffer inbuf) + { + return MethodBodyDecoderRegistry.Get(inbuf.GetUInt16(), inbuf.GetUInt16()); + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/AMQProtocolHeaderException.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQProtocolHeaderException.cs new file mode 100644 index 0000000000..379e5d00ba --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQProtocolHeaderException.cs @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Runtime.Serialization; + +namespace Apache.Qpid.Framing +{ + [Serializable] + public class AMQProtocolHeaderException : AMQException + { + public AMQProtocolHeaderException(string message) : base(message) + { + } + + protected AMQProtocolHeaderException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/AMQType.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQType.cs new file mode 100644 index 0000000000..618ab31d32 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQType.cs @@ -0,0 +1,700 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using Apache.Qpid.Buffer;
+
+namespace Apache.Qpid.Framing
+{
+ /// <summary>
+ /// Base class for the Field Table Type system.
+ /// Ported over from the Java AMQType enumeration
+ /// </summary>
+ public abstract class AMQType
+ {
+ private byte _identifier;
+
+ /// <summary>
+ /// Type code identifier for this type
+ /// </summary>
+ public byte Identifier
+ {
+ get { return _identifier; }
+ }
+
+ protected AMQType(char identifier)
+ {
+ _identifier = (byte)identifier;
+ }
+
+ /// <summary>
+ /// Create a new <see cref="AMQTypedValue"/> instance
+ /// </summary>
+ /// <param name="value">Value to initialize with</param>
+ /// <returns>A new typed value instance</returns>
+ public AMQTypedValue AsTypedValue(object value)
+ {
+ return new AMQTypedValue(this, ToNativeValue(value));
+ }
+
+ /// <summary>
+ /// Write the specified value to the buffer using the encoding
+ /// specified for this type
+ /// </summary>
+ /// <param name="value">Value to write</param>
+ /// <param name="buffer">Buffer to write to</param>
+ public void WriteToBuffer(object value, ByteBuffer buffer)
+ {
+ buffer.Put(Identifier);
+ WriteValueImpl(value, buffer);
+ }
+
+ public override string ToString()
+ {
+ return ((Char) Identifier).ToString();
+ }
+
+ /// <summary>
+ /// Get the encoding size for the specified value in this type format
+ /// </summary>
+ /// <param name="value">Value to find encoded size for</param>
+ /// <returns>The encoded size</returns>
+ public abstract uint GetEncodingSize(object value);
+ /// <summary>
+ /// Convert the specified value to this type
+ /// </summary>
+ /// <param name="value">Value to convert</param>
+ /// <returns>The converted value</returns>
+ public abstract object ToNativeValue(object value);
+
+ /// <summary>
+ /// Read a value from the specified buffer using the encoding for
+ /// this type
+ /// </summary>
+ /// <param name="buffer">Buffer to read from</param>
+ /// <returns>The value read</returns>
+ public abstract object ReadValueFromBuffer(ByteBuffer buffer);
+
+ protected abstract void WriteValueImpl(Object value, ByteBuffer buffer);
+
+
+ #region Known Types
+ //
+ // Known Types
+ //
+
+ // long string is not defined in the proposed specification,
+ // and the 'S' discriminator is left for unsigned short (16-bit) values
+ public static readonly AMQType LONG_STRING = new AMQLongStringType();
+ public static readonly AMQType UINT32 = new AMQUInt32Type();
+ public static readonly AMQType DECIMAL = new AMQDecimalType();
+ public static readonly AMQType TIMESTAMP = new AMQTimeStampType();
+ public static readonly AMQType FIELD_TABLE = new AMQFieldTableType();
+ public static readonly AMQType VOID = new AMQVoidType();
+ public static readonly AMQType BINARY = new AMQBinaryType();
+ public static readonly AMQType ASCII_STRING = new AMQAsciiStringType();
+ public static readonly AMQType WIDE_STRING = new AMQWideStringType();
+ public static readonly AMQType BOOLEAN = new AMQBooleanType();
+ public static readonly AMQType ASCII_CHARACTER = new AMQAsciiCharType();
+ public static readonly AMQType BYTE = new AMQByteType();
+ public static readonly AMQType SBYTE = new AMQSByteType();
+ public static readonly AMQType INT16 = new AMQInt16Type();
+ public static readonly AMQType UINT16 = new AMQUInt16Type();
+ public static readonly AMQType INT32 = new AMQInt32Type();
+ public static readonly AMQType INT64 = new AMQInt64Type();
+ public static readonly AMQType UINT64 = new AMQUInt64Type();
+ public static readonly AMQType FLOAT = new AMQFloatType();
+ public static readonly AMQType DOUBLE = new AMQDoubleType();
+
+ #endregion // Known Types
+
+ #region Type Implementation
+ //
+ // Type Implementation
+ //
+
+ sealed class AMQLongStringType : AMQType
+ {
+ public AMQLongStringType() : base('S')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedLongStringLength((string) value);
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ if ( value == null )
+ throw new ArgumentNullException("value");
+ return value.ToString();
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadLongString(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteLongStringBytes(buffer, (string) value);
+ }
+
+ }
+
+ sealed class AMQUInt32Type : AMQType
+ {
+ public AMQUInt32Type() : base('i')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.UnsignedIntegerLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToUInt32(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadUnsignedInteger(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteUnsignedInteger(buffer, (uint) value);
+ }
+
+ }
+
+ sealed class AMQDecimalType : AMQType
+ {
+ public AMQDecimalType() : base('D')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ throw new NotImplementedException();
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ sealed class AMQTimeStampType : AMQType
+ {
+ public AMQTimeStampType() : base('T')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ throw new NotImplementedException();
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ sealed class AMQFieldTableType : AMQType
+ {
+ public AMQFieldTableType() : base('F')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ throw new NotImplementedException();
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ sealed class AMQVoidType : AMQType
+ {
+ public AMQVoidType() : base('V')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return 0;
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ if ( value != null )
+ throw new FormatException(string.Format("Cannot convert {0} to VOID type", value));
+ return null;
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return null;
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ }
+ }
+
+ // Extended Types
+
+ sealed class AMQBinaryType : AMQType
+ {
+ public AMQBinaryType() : base('x')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedLongstrLength((byte[]) value);
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ if ( value is byte[] || value == null )
+ {
+ return value;
+ }
+ throw new ArgumentException("Value cannot be converted to byte[]");
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadLongstr(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteLongstr(buffer, (byte[])value);
+ }
+ }
+
+ sealed class AMQAsciiStringType : AMQType
+ {
+ public AMQAsciiStringType() : base('c')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedAsciiStringLength((string)value);
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ if ( value == null )
+ throw new ArgumentNullException("value");
+ return value.ToString();
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadAsciiString(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteAsciiString(buffer, (string)value);
+ }
+ }
+
+ sealed class AMQWideStringType : AMQType
+ {
+ // todo: Change encoding to UTF16 (java code still uses default
+ // ascii encoding for wide strings
+ private static readonly Encoding ENCODING = Encoding.ASCII;
+
+ public AMQWideStringType()
+ : base('C')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedLongStringLength((string)value, ENCODING);
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ if ( value == null )
+ throw new ArgumentNullException("value");
+ return value.ToString();
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadLongString(buffer, ENCODING);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteLongStringBytes(buffer, (string)value, ENCODING);
+ }
+ }
+
+ sealed class AMQBooleanType : AMQType
+ {
+ public AMQBooleanType() : base('t')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedBooleanLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToBoolean(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadBoolean(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteBoolean(buffer, (bool)value);
+ }
+ }
+
+ sealed class AMQAsciiCharType : AMQType
+ {
+ public AMQAsciiCharType() : base('k')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedCharLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToChar(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadChar(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteChar(buffer, (char)value);
+ }
+ }
+
+ sealed class AMQByteType : AMQType
+ {
+ public AMQByteType() : base('B')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedByteLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToByte(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadByte(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteByte(buffer, (byte)value);
+ }
+ }
+
+ sealed class AMQSByteType : AMQType
+ {
+ public AMQSByteType()
+ : base('b')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedSByteLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToSByte(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadSByte(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteSByte(buffer, (sbyte)value);
+ }
+ }
+
+ sealed class AMQInt16Type : AMQType
+ {
+ public AMQInt16Type() : base('s')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedShortLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToInt16(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadShort(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteShort(buffer, (short)value);
+ }
+ }
+
+ sealed class AMQUInt16Type : AMQType
+ {
+ public AMQUInt16Type()
+ : base('S')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedUnsignedShortLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToUInt16(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadUnsignedShort(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteUnsignedShort(buffer, (ushort)value);
+ }
+ }
+
+ sealed class AMQInt32Type : AMQType
+ {
+ public AMQInt32Type() : base('I')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedIntegerLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToInt32(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadInteger(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteInteger(buffer, (int)value);
+ }
+ }
+
+ sealed class AMQInt64Type : AMQType
+ {
+ public AMQInt64Type() : base('l')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedLongLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToInt64(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadLong(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteLong(buffer, (long)value);
+ }
+ }
+
+ sealed class AMQUInt64Type : AMQType
+ {
+ public AMQUInt64Type()
+ : base('L')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedUnsignedLongLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToUInt64(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadUnsignedLong(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteUnsignedLong(buffer, (ulong)value);
+ }
+ }
+
+ sealed class AMQFloatType : AMQType
+ {
+ public AMQFloatType() : base('f')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedFloatLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToSingle(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadFloat(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteFloat(buffer, (float)value);
+ }
+ }
+
+ sealed class AMQDoubleType : AMQType
+ {
+ public AMQDoubleType() : base('d')
+ {
+ }
+
+ public override uint GetEncodingSize(object value)
+ {
+ return EncodingUtils.EncodedDoubleLength();
+ }
+
+ public override object ToNativeValue(object value)
+ {
+ return Convert.ToDouble(value);
+ }
+
+ public override object ReadValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.ReadDouble(buffer);
+ }
+
+ protected override void WriteValueImpl(object value, ByteBuffer buffer)
+ {
+ EncodingUtils.WriteDouble(buffer, (double)value);
+ }
+ }
+
+ #endregion // Type Implementation
+
+ } // class AMQType
+}
diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/AMQTypeMap.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQTypeMap.cs new file mode 100644 index 0000000000..ed38c203a9 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQTypeMap.cs @@ -0,0 +1,75 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+
+namespace Apache.Qpid.Framing
+{
+ public sealed class AMQTypeMap
+ {
+ private static Hashtable _reverseTypeMap;
+
+ private AMQTypeMap()
+ {
+ }
+
+ static AMQTypeMap()
+ {
+ _reverseTypeMap = Hashtable.Synchronized(new Hashtable());
+
+ Add(AMQType.LONG_STRING);
+ Add(AMQType.BOOLEAN);
+ Add(AMQType.BYTE);
+ Add(AMQType.SBYTE);
+ Add(AMQType.INT16);
+ // not supported for now as type code conflicts
+ // with LONG_STRING
+ //Add(AMQType.UINT16);
+ Add(AMQType.INT32);
+ Add(AMQType.UINT32);
+ Add(AMQType.INT64);
+ Add(AMQType.UINT64);
+ Add(AMQType.FLOAT);
+ Add(AMQType.DOUBLE);
+ Add(AMQType.DECIMAL);
+ Add(AMQType.BINARY);
+ Add(AMQType.ASCII_STRING);
+ Add(AMQType.WIDE_STRING);
+ Add(AMQType.ASCII_CHARACTER);
+ Add(AMQType.TIMESTAMP);
+ Add(AMQType.FIELD_TABLE);
+ Add(AMQType.VOID);
+ }
+
+ public static AMQType GetType(byte identifier)
+ {
+ AMQType type = (AMQType)_reverseTypeMap[identifier];
+ if ( type == null )
+ throw new ArgumentOutOfRangeException(string.Format("No such type code: {0:x}", identifier));
+ return type;
+ }
+
+ private static void Add(AMQType type)
+ {
+ _reverseTypeMap.Add(type.Identifier, type);
+ }
+ }
+}
diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/AMQTypedValue.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQTypedValue.cs new file mode 100644 index 0000000000..8d21a60831 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/AMQTypedValue.cs @@ -0,0 +1,76 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Apache.Qpid.Buffer;
+
+namespace Apache.Qpid.Framing
+{
+ public class AMQTypedValue
+ {
+ private readonly AMQType _type;
+ private readonly object _value;
+
+ public AMQType Type
+ {
+ get { return _type; }
+ }
+
+ public object Value
+ {
+ get { return _value; }
+ }
+
+ public uint EncodingLength
+ {
+ get { return _type.GetEncodingSize(_value); }
+ }
+
+ public AMQTypedValue(AMQType type, object value)
+ {
+ if ( type == null )
+ throw new ArgumentNullException("type");
+ _type = type;
+ _value = type.ToNativeValue(value);
+ }
+
+ public AMQTypedValue(AMQType type, ByteBuffer buffer)
+ {
+ _type = type;
+ _value = type.ReadValueFromBuffer(buffer);
+ }
+
+ public void WriteToBuffer(ByteBuffer buffer)
+ {
+ _type.WriteToBuffer(_value, buffer);
+ }
+
+ public static AMQTypedValue ReadFromBuffer(ByteBuffer buffer)
+ {
+ AMQType type = AMQTypeMap.GetType(buffer.GetByte());
+ return new AMQTypedValue(type, buffer);
+ }
+
+ public override string ToString()
+ {
+ return string.Format("{0}: {1}", Type, Value);
+ }
+ }
+}
diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs new file mode 100644 index 0000000000..47db7b0887 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs @@ -0,0 +1,290 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Framing +{ + public class BasicContentHeaderProperties : IContentHeaderProperties + { + private static readonly ILog _log = LogManager.GetLogger(typeof(BasicContentHeaderProperties)); + + private string _contentType; + private string _encoding; + private FieldTable _headers; + private byte _deliveryMode; + private byte _priority; + private string _correlationId; + private long _expiration; + private string _replyTo; + private string _messageId; + private ulong _timestamp; + private string _type; + private string _userId; + private string _appId; + private string _clusterId; + + + #region Properties + // + // Properties + // + + /// <summary> + /// The MIME Content Type + /// </summary> + public string ContentType + { + get { return _contentType; } + set { _contentType = value; } + } + + /// <summary> + /// The MIME Content Encoding + /// </summary> + public string Encoding + { + get { return _encoding; } + set { _encoding = value; } + } + + /// <summary> + /// Message headers + /// </summary> + public FieldTable Headers + { + get { return _headers; } + set { _headers = value; } + } + + /// <summary> + /// Non-persistent (1) or persistent (2) + /// </summary> + public byte DeliveryMode + { + get { return _deliveryMode; } + set { _deliveryMode = value; } + } + + /// <summary> + /// The message priority, 0 to 9 + /// </summary> + public byte Priority + { + get { return _priority; } + set { _priority = value; } + } + + /// <summary> + /// The application correlation identifier + /// </summary> + public string CorrelationId + { + get { return _correlationId; } + set { _correlationId = value; } + } + + /// <summary> + /// Message expiration specification + /// </summary> + // TODO: Should be string according to spec + public long Expiration + { + get { return _expiration; } + set { _expiration = value; } + } + + /// <summary> + /// The destination to reply to + /// </summary> + public string ReplyTo + { + get { return _replyTo; } + set { _replyTo = value; } + } + + /// <summary> + /// The application message identifier + /// </summary> + public string MessageId + { + get { return _messageId; } + set { _messageId = value; } + } + + /// <summary> + /// The message timestamp + /// </summary> + public ulong Timestamp + { + get { return _timestamp; } + set { _timestamp = value; } + } + + /// <summary> + /// The message type name + /// </summary> + public string Type + { + get { return _type; } + set { _type = value; } + } + + /// <summary> + /// The creating user id + /// </summary> + public string UserId + { + get { return _userId; } + set { _userId = value; } + } + + /// <summary> + /// The creating application id + /// </summary> + public string AppId + { + get { return _appId; } + set { _appId = value; } + } + + /// <summary> + /// Intra-cluster routing identifier + /// </summary> + public string ClusterId + { + get { return _clusterId; } + set { _clusterId = value; } + } + + #endregion // Properties + + + public BasicContentHeaderProperties() + { + } + + public uint PropertyListSize + { + get + { + return (uint)(EncodingUtils.EncodedShortStringLength(ContentType) + + EncodingUtils.EncodedShortStringLength(Encoding) + + EncodingUtils.EncodedFieldTableLength(Headers) + + 1 + 1 + + EncodingUtils.EncodedShortStringLength(CorrelationId) + + EncodingUtils.EncodedShortStringLength(ReplyTo) + + EncodingUtils.EncodedShortStringLength(String.Format("{0:D}", Expiration)) + + EncodingUtils.EncodedShortStringLength(MessageId) + + 8 + + EncodingUtils.EncodedShortStringLength(Type) + + EncodingUtils.EncodedShortStringLength(UserId) + + EncodingUtils.EncodedShortStringLength(AppId) + + EncodingUtils.EncodedShortStringLength(ClusterId)); + + } + } + + public ushort PropertyFlags + { + get + { + int value = 0; + + // for now we just blast in all properties + for ( int i = 0; i < 14; i++ ) + { + value += (1 << (15 - i)); + } + return (ushort)value; + } + } + + public void WritePropertyListPayload(ByteBuffer buffer) + { + EncodingUtils.WriteShortStringBytes(buffer, ContentType); + EncodingUtils.WriteShortStringBytes(buffer, Encoding); + EncodingUtils.WriteFieldTableBytes(buffer, Headers); + buffer.Put(DeliveryMode); + buffer.Put(Priority); + EncodingUtils.WriteShortStringBytes(buffer, CorrelationId); + EncodingUtils.WriteShortStringBytes(buffer, ReplyTo); + EncodingUtils.WriteShortStringBytes(buffer, String.Format("{0:D}", Expiration)); + EncodingUtils.WriteShortStringBytes(buffer, MessageId); + buffer.Put(Timestamp); + EncodingUtils.WriteShortStringBytes(buffer, Type); + EncodingUtils.WriteShortStringBytes(buffer, UserId); + EncodingUtils.WriteShortStringBytes(buffer, AppId); + EncodingUtils.WriteShortStringBytes(buffer, ClusterId); + } + + public void PopulatePropertiesFromBuffer(ByteBuffer buffer, ushort propertyFlags) + { + _log.Debug("Property flags: " + propertyFlags); + if ( (propertyFlags & (1 << 15)) > 0 ) + ContentType = EncodingUtils.ReadShortString(buffer); + if ( (propertyFlags & (1 << 14)) > 0 ) + Encoding = EncodingUtils.ReadShortString(buffer); + if ( (propertyFlags & (1 << 13)) > 0 ) + Headers = EncodingUtils.ReadFieldTable(buffer); + if ( (propertyFlags & (1 << 12)) > 0 ) + DeliveryMode = buffer.GetByte(); + if ( (propertyFlags & (1 << 11)) > 0 ) + Priority = buffer.GetByte(); + if ( (propertyFlags & (1 << 10)) > 0 ) + CorrelationId = EncodingUtils.ReadShortString(buffer); + if ( (propertyFlags & (1 << 9)) > 0 ) + ReplyTo = EncodingUtils.ReadShortString(buffer); + if ( (propertyFlags & (1 << 8)) > 0 ) + Expiration = EncodingUtils.ReadLongAsShortString(buffer); + if ( (propertyFlags & (1 << 7)) > 0 ) + MessageId = EncodingUtils.ReadShortString(buffer); + if ( (propertyFlags & (1 << 6)) > 0 ) + Timestamp = buffer.GetUInt64(); + if ( (propertyFlags & (1 << 5)) > 0 ) + Type = EncodingUtils.ReadShortString(buffer); + if ( (propertyFlags & (1 << 4)) > 0 ) + UserId = EncodingUtils.ReadShortString(buffer); + if ( (propertyFlags & (1 << 3)) > 0 ) + AppId = EncodingUtils.ReadShortString(buffer); + if ( (propertyFlags & (1 << 2)) > 0 ) + ClusterId = EncodingUtils.ReadShortString(buffer); + } + + public void SetDeliveryMode(DeliveryMode deliveryMode) + { + if ( deliveryMode == Messaging.DeliveryMode.NonPersistent ) + { + DeliveryMode = 1; + } else + { + DeliveryMode = 2; + } + } + + public override string ToString() + { + return "Properties: " + ContentType + " " + Encoding + " " + Timestamp + " " + Type; + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/CompositeAMQDataBlock.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/CompositeAMQDataBlock.cs new file mode 100644 index 0000000000..d2b7f606b2 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/CompositeAMQDataBlock.cs @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Text; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class CompositeAMQDataBlock : IDataBlock, IEncodableAMQDataBlock + { + private IDataBlock[] _blocks; + + public CompositeAMQDataBlock(IDataBlock[] blocks) + { + _blocks = blocks; + } + + public IDataBlock[] Blocks + { + get + { + return _blocks; + } + } + + public uint Size + { + get + { + uint frameSize = 0; + foreach (IDataBlock block in _blocks) + { + frameSize += block.Size; + } + return frameSize; + } + } + + public void WritePayload(ByteBuffer buffer) + { + foreach (IDataBlock block in _blocks) + { + block.WritePayload(buffer); + } + } + + public override string ToString() + { + if (_blocks == null) + { + return "No blocks contained in composite frame"; + } + else + { + StringBuilder buf = new StringBuilder(GetType().Name); + buf.Append("{"); + //buf.Append("encodedBlock=").Append(_encodedBlock); + for (int i = 0; i < _blocks.Length; i++) + { + buf.Append(" ").Append(i).Append("=[").Append(_blocks[i].ToString()).Append("]"); + } + buf.Append("}"); + return buf.ToString(); + } + } + + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs new file mode 100644 index 0000000000..7a2142985d --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class ContentBody : IBody + { + public const byte TYPE = 3; + + private ByteBuffer _payload; + + public ByteBuffer Payload + { + get { return _payload; } + } + + public ContentBody() + { + } + public ContentBody(ByteBuffer payload) + { + PopulateFromBuffer(payload, (uint)payload.Remaining); + } + public ContentBody(ByteBuffer payload, uint length) + { + PopulateFromBuffer(payload, length); + } + + #region IBody Members + + public byte BodyType + { + get + { + return TYPE; + } + } + + public uint Size + { + get + { + return (ushort)(Payload == null ? 0 : Payload.Remaining); + } + } + + public void WritePayload(ByteBuffer buffer) + { + if (Payload != null) + { + buffer.Put(Payload); + Payload.Rewind(); + } + } + + public void PopulateFromBuffer(ByteBuffer buffer, uint size) + { + if (size > 0) + { + _payload = buffer.Slice(); + _payload.Limit = (int)size; + buffer.Skip((int)size); + } + } + + #endregion + + public static AMQFrame CreateAMQFrame(ushort channelId, ContentBody body) + { + AMQFrame frame = new AMQFrame(); + frame.Channel = channelId; + frame.BodyFrame = body; + return frame; + } + + public override string ToString() + { + return string.Format("ContentBody [ Size: {0} ]", Size); + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/ContentBodyFactory.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/ContentBodyFactory.cs new file mode 100644 index 0000000000..400b2aec08 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/ContentBodyFactory.cs @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class ContentBodyFactory : IBodyFactory + { + private static readonly ILog _log = LogManager.GetLogger(typeof(ContentBodyFactory)); + + private static readonly ContentBodyFactory _instance = new ContentBodyFactory(); + + public static ContentBodyFactory GetInstance() + { + return _instance; + } + + private ContentBodyFactory() + { + _log.Debug("Creating content body factory"); + } + + /// <summary> + /// Creates the body. + /// </summary> + /// <param name="inbuf">The ByteBuffer containing data from the network</param> + /// <returns></returns> + /// <exception>AMQFrameDecodingException</exception> + public IBody CreateBody(ByteBuffer inbuf) + { + return new ContentBody(); + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/ContentHeaderBody.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/ContentHeaderBody.cs new file mode 100644 index 0000000000..82889c23c8 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/ContentHeaderBody.cs @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class ContentHeaderBody : IBody + { + public static readonly byte TYPE = 2; + + public ushort ClassId; + + public ushort Weight; + + public ulong BodySize; + + /** must never be null */ + public IContentHeaderProperties Properties; + + public ContentHeaderBody() + { + } + + public ContentHeaderBody(IContentHeaderProperties props, ushort classId) + { + Properties = props; + ClassId = classId; + } + + public ContentHeaderBody(ushort classId, ushort weight, IContentHeaderProperties props, uint bodySize) + : this(props, classId) + { + Weight = weight; + BodySize = bodySize; + } + + #region IBody Members + + public byte BodyType + { + get + { + return TYPE; + } + } + + public uint Size + { + get + { + return (2 + 2 + 8 + 2 + Properties.PropertyListSize); + } + } + + public void WritePayload(ByteBuffer buffer) + { + buffer.Put(ClassId); + buffer.Put(Weight); + buffer.Put(BodySize); + buffer.Put(Properties.PropertyFlags); + Properties.WritePropertyListPayload(buffer); + } + + public void PopulateFromBuffer(ByteBuffer buffer, uint size) + { + ClassId = buffer.GetUInt16(); + Weight = buffer.GetUInt16(); + BodySize = buffer.GetUInt64(); + ushort propertyFlags = buffer.GetUInt16(); + ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.GetInstance(); + Properties = factory.CreateContentHeaderProperties(ClassId, propertyFlags, buffer); + } + + #endregion + + public static AMQFrame CreateAMQFrame(ushort channelId, ushort classId, ushort weight, BasicContentHeaderProperties properties, + uint bodySize) + { + AMQFrame frame = new AMQFrame(); + frame.Channel = channelId; + frame.BodyFrame = new ContentHeaderBody(classId, weight, properties, bodySize); + return frame; + } + + public static AMQFrame CreateAMQFrame(ushort channelId, ContentHeaderBody body) + { + AMQFrame frame = new AMQFrame(); + frame.Channel = channelId; + frame.BodyFrame = body; + return frame; + } + + public override string ToString() + { + return String.Format("ContentHeaderBody: ClassId {0}, Weight {1}, BodySize {2}, Properties {3}", ClassId, Weight, + BodySize, Properties); + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/ContentHeaderBodyFactory.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/ContentHeaderBodyFactory.cs new file mode 100644 index 0000000000..c95a10871d --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/ContentHeaderBodyFactory.cs @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class ContentHeaderBodyFactory : IBodyFactory + { + private static readonly ILog _log = LogManager.GetLogger(typeof(ContentHeaderBodyFactory)); + + private static readonly ContentHeaderBodyFactory _instance = new ContentHeaderBodyFactory(); + + public static ContentHeaderBodyFactory GetInstance() + { + return _instance; + } + + private ContentHeaderBodyFactory() + { + _log.Debug("Creating content header body factory"); + } + + #region IBodyFactory Members + + public IBody CreateBody(ByteBuffer inbuf) + { + // all content headers are the same - it is only the properties that differ. + // the content header body further delegates construction of properties + return new ContentHeaderBody(); + } + + #endregion + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/ContentHeaderPropertiesFactory.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/ContentHeaderPropertiesFactory.cs new file mode 100644 index 0000000000..bac5d10fd4 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/ContentHeaderPropertiesFactory.cs @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class ContentHeaderPropertiesFactory + { + + private static readonly ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory(); + + public static ContentHeaderPropertiesFactory GetInstance() + { + return _instance; + } + + private ContentHeaderPropertiesFactory() + { + } + + /// <summary> + /// Creates the content header properties from a buffer. + /// </summary> + /// <param name="classId">The class id.</param> + /// <param name="propertyFlags">The property flags.</param> + /// <param name="buffer">The buffer.</param> + /// <returns>a populated properties structure</returns> + /// <exception cref="AMQFrameDecodingException">if the buffer cannot be decoded</exception> + public IContentHeaderProperties CreateContentHeaderProperties(ushort classId, ushort propertyFlags, + ByteBuffer buffer) + { + IContentHeaderProperties properties; + switch (classId) + { + case 60: + properties = new BasicContentHeaderProperties(); + break; + default: + throw new AMQFrameDecodingException("Unsupport content header class id: " + classId); + } + properties.PopulatePropertiesFromBuffer(buffer, propertyFlags); + return properties; + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/EncodingUtils.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/EncodingUtils.cs new file mode 100644 index 0000000000..4d424656f9 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/EncodingUtils.cs @@ -0,0 +1,460 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Globalization; +using System.Text; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class EncodingUtils + { + private static readonly Encoding DEFAULT_ENCODER = Encoding.ASCII; + + // SHORT STRING + public static ushort EncodedShortStringLength(string s) + { + if ( s == null ) + { + return 1; + } else + { + return (ushort)(1 + s.Length); + } + } + public static void WriteShortStringBytes(ByteBuffer buffer, string s) + { + if ( s != null ) + { + //try + //{ + //final byte[] encodedString = s.getBytes(STRING_ENCODING); + byte[] encodedString; + lock ( DEFAULT_ENCODER ) + { + encodedString = DEFAULT_ENCODER.GetBytes(s); + } + // TODO: check length fits in an unsigned byte + buffer.Put((byte)encodedString.Length); + buffer.Put(encodedString); + + } else + { + // really writing out unsigned byte + buffer.Put((byte)0); + } + } + + // ASCII STRINGS + public static uint EncodedAsciiStringLength(string s) + { + // TODO: move this to 2-byte length once the proposed encodings + // have been approved. Also, validate length! + if ( s == null ) + return 4; + else + return (uint) (4 + s.Length); + } + public static string ReadAsciiString(ByteBuffer buffer) + { + return ReadLongString(buffer, DEFAULT_ENCODER); + } + public static void WriteAsciiString(ByteBuffer buffer, string s) + { + WriteLongStringBytes(buffer, s, DEFAULT_ENCODER); + } + + // LONG STRING + public static uint EncodedLongStringLength(string s) + { + return EncodedLongStringLength(s, DEFAULT_ENCODER); + } + + public static uint EncodedLongStringLength(string s, Encoding encoding) + { + if ( s == null ) + { + return 4; + } else + { + return (uint)(4 + encoding.GetByteCount(s)); + } + } + public static string ReadLongString(ByteBuffer buffer) + { + return ReadLongString(buffer, DEFAULT_ENCODER); + } + public static string ReadLongString(ByteBuffer buffer, Encoding encoding) + { + uint length = buffer.GetUInt32(); + if ( length == 0 ) + { + return null; + } else + { + byte[] data = new byte[length]; + buffer.GetBytes(data); + lock ( encoding ) + { + return encoding.GetString(data); + } + } + } + public static void WriteLongStringBytes(ByteBuffer buffer, string s) + { + WriteLongStringBytes(buffer, s, DEFAULT_ENCODER); + } + + public static void WriteLongStringBytes(ByteBuffer buffer, string s, Encoding encoding) + { + if ( !(s == null || s.Length <= 0xFFFE) ) + { + throw new ArgumentException("String too long"); + } + if ( s != null ) + { + lock ( encoding ) + { + byte[] encodedString = encoding.GetBytes(s); + buffer.Put((uint)encodedString.Length); + buffer.Put(encodedString); + } + } else + { + buffer.Put((uint)0); + } + } + + // BINARY + public static uint EncodedLongstrLength(byte[] bytes) + { + if ( bytes == null ) + { + return 4; + } else + { + return (uint)(4 + bytes.Length); + } + } + public static byte[] ReadLongstr(ByteBuffer buffer) + { + uint length = buffer.GetUInt32(); + if ( length == 0 ) + { + return null; + } else + { + byte[] result = new byte[length]; + buffer.GetBytes(result); + return result; + } + } + public static void WriteLongstr(ByteBuffer buffer, byte[] data) + { + if ( data != null ) + { + buffer.Put((uint)data.Length); + buffer.Put(data); + } else + { + buffer.Put((uint)0); + } + } + + // BOOLEANS + public static bool[] ReadBooleans(ByteBuffer buffer) + { + byte packedValue = buffer.GetByte(); + bool[] result = new bool[8]; + + for ( int i = 0; i < 8; i++ ) + { + result[i] = ((packedValue & (1 << i)) != 0); + } + return result; + } + public static void WriteBooleans(ByteBuffer buffer, bool[] values) + { + byte packedValue = 0; + for ( int i = 0; i < values.Length; i++ ) + { + if ( values[i] ) + { + packedValue = (byte)(packedValue | (1 << i)); + } + } + + buffer.Put(packedValue); + } + + // FIELD TABLES + public static uint EncodedFieldTableLength(FieldTable table) + { + if ( table == null ) + { + // size is encoded as 4 octets + return 4; + } else + { + // size of the table plus 4 octets for the size + return table.EncodedSize + 4; + } + } + /// <summary> + /// Reads the field table using the data in the specified buffer + /// </summary> + /// <param name="buffer">The buffer to read from.</param> + /// <returns>a populated field table</returns> + /// <exception cref="AMQFrameDecodingException">if the buffer does not contain a decodable field table</exception> + public static FieldTable ReadFieldTable(ByteBuffer buffer) + { + uint length = buffer.GetUInt32(); + if ( length == 0 ) + { + return null; + } else + { + return new FieldTable(buffer, length); + } + } + public static void WriteFieldTableBytes(ByteBuffer buffer, FieldTable table) + { + if ( table != null ) + { + table.WriteToBuffer(buffer); + } else + { + buffer.Put((uint)0); + } + } + + + /// <summary> + /// Read a short string from the buffer + /// </summary> + /// <param name="buffer">The buffer to read from.</param> + /// <returns>a string</returns> + /// <exception cref="AMQFrameDecodingException">if the buffer does not contain a decodable short string</exception> + public static string ReadShortString(ByteBuffer buffer) + { + byte length = buffer.GetByte(); + if ( length == 0 ) + { + return null; + } else + { + byte[] data = new byte[length]; + buffer.GetBytes(data); + + lock ( DEFAULT_ENCODER ) + { + return DEFAULT_ENCODER.GetString(data); + } + } + } + + + + // BOOLEAN + public static uint EncodedBooleanLength() + { + return 1; + } + public static bool ReadBoolean(ByteBuffer buffer) + { + byte packedValue = buffer.GetByte(); + return (packedValue == 1); + } + public static void WriteBoolean(ByteBuffer buffer, bool value) + { + buffer.Put((byte)(value ? 1 : 0)); + } + + + // CHAR + public static uint EncodedCharLength() + { + return EncodedByteLength(); + } + public static char ReadChar(ByteBuffer buffer) + { + return (char)buffer.GetByte(); + } + public static void WriteChar(ByteBuffer buffer, char value) + { + buffer.Put((byte)value); + } + + // BYTE + public static uint EncodedByteLength() + { + return 1; + } + public static byte ReadByte(ByteBuffer buffer) + { + return buffer.GetByte(); + } + public static void WriteByte(ByteBuffer buffer, byte value) + { + buffer.Put(value); + } + + // SBYTE + public static uint EncodedSByteLength() + { + return 1; + } + public static sbyte ReadSByte(ByteBuffer buffer) + { + return buffer.GetSByte(); + } + public static void WriteSByte(ByteBuffer buffer, sbyte value) + { + buffer.Put(value); + } + + // INT16 + public static uint EncodedShortLength() + { + return 2; + } + + public static short ReadShort(ByteBuffer buffer) + { + return buffer.GetInt16(); + } + public static void WriteShort(ByteBuffer buffer, short value) + { + buffer.Put(value); + } + + // UINT16 + public static uint EncodedUnsignedShortLength() + { + return 2; + } + + public static ushort ReadUnsignedShort(ByteBuffer buffer) + { + return buffer.GetUInt16(); + } + public static void WriteUnsignedShort(ByteBuffer buffer, ushort value) + { + buffer.Put(value); + } + + + // INT32 + public static uint EncodedIntegerLength() + { + return 4; + } + public static int ReadInteger(ByteBuffer buffer) + { + return buffer.GetInt32(); + } + public static void WriteInteger(ByteBuffer buffer, int value) + { + buffer.Put(value); + } + + // UINT32 + public static uint UnsignedIntegerLength() + { + return 4; + } + public static void WriteUnsignedInteger(ByteBuffer buffer, uint value) + { + buffer.Put(value); + } + public static uint ReadUnsignedInteger(ByteBuffer buffer) + { + return buffer.GetUInt32(); + } + + // INT64 + public static uint EncodedUnsignedLongLength() + { + return 8; + } + public static ulong ReadUnsignedLong(ByteBuffer buffer) + { + return buffer.GetUInt64(); + } + public static void WriteUnsignedLong(ByteBuffer buffer, ulong value) + { + buffer.Put(value); + } + + // UINT64 + public static uint EncodedLongLength() + { + return 8; + } + public static long ReadLong(ByteBuffer buffer) + { + return buffer.GetInt64(); + } + public static void WriteLong(ByteBuffer buffer, long value) + { + buffer.Put(value); + } + + // FLOAT + public static uint EncodedFloatLength() + { + return 4; + } + public static void WriteFloat(ByteBuffer buffer, float value) + { + buffer.Put(value); + } + public static float ReadFloat(ByteBuffer buffer) + { + return buffer.GetFloat(); + } + + // DOUBLE + public static uint EncodedDoubleLength() + { + return 8; + } + public static void WriteDouble(ByteBuffer buffer, double value) + { + buffer.Put(value); + } + public static double ReadDouble(ByteBuffer buffer) + { + return buffer.GetDouble(); + } + + // OTHER + public static long ReadLongAsShortString(ByteBuffer buffer) + { + string value = ReadShortString(buffer); + if ( value == null || value.Length == 0 ) + return 0L; + return Convert.ToInt64(value, CultureInfo.InvariantCulture); + } + + } + +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs new file mode 100644 index 0000000000..6567bf58ab --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs @@ -0,0 +1,633 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Collections; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Framing +{ + public class FieldTable : IFieldTable, IEnumerable + { + private static readonly ILog _log = LogManager.GetLogger(typeof(FieldTable)); + + IDictionary _properties; + private ByteBuffer _encodedForm; + private object _syncLock; + private uint _encodedSize; + + public FieldTable() + { + _syncLock = new object(); + } + + /// <summary> + /// Construct a new field table. + /// </summary> + /// <param name="buffer">the buffer from which to read data. The length byte must be read already</param> + /// <param name="length">the length of the field table. Must be > 0.</param> + public FieldTable(ByteBuffer buffer, uint length) : this() + { + _encodedForm = buffer.Slice(); + _encodedForm.Limit = (int)length; + _encodedSize = length; + buffer.Skip((int)length); + } + + /// <summary> + /// The set of all property names + /// </summary> + public ICollection Keys + { + get + { + InitMapIfNecessary(); + return _properties.Keys; + } + } + + /// <summary> + /// Calculated size of this field table once encoded + /// </summary> + public uint EncodedSize + { + get { return _encodedSize; } + } + + /// <summary> + /// Number of properties in the field table + /// </summary> + public int Count + { + get + { + InitMapIfNecessary(); + return _properties.Count; + } + } + + /// <summary> + /// Gets or sets the specified property. + /// </summary> + /// <param name="key">Property name</param> + /// <returns>The specified property value</returns> + public object this[string key] + { + get { return GetObject(key); } + set { SetObject(key, value); } + } + + #region Typed Setters and Getters + // + // Typed Setters and Getters + // + public bool GetBoolean(string key) + { + return (bool)this[key]; + } + public void SetBoolean(string key, bool value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.BOOLEAN.AsTypedValue(value)); + } + public byte GetByte(string key) + { + return (byte)this[key]; + } + public void SetByte(string key, byte value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.BYTE.AsTypedValue(value)); + } + public sbyte GetSByte(string key) + { + return (sbyte)this[key]; + } + public void SetSByte(string key, sbyte value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.SBYTE.AsTypedValue(value)); + } + public short GetInt16(string key) + { + return (short)this[key]; + } + public void SetInt16(string key, short value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.INT16.AsTypedValue(value)); + } + public int GetInt32(string key) + { + return (int)this[key]; + } + public void SetInt32(string key, int value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.INT32.AsTypedValue(value)); + } + public long GetInt64(string key) + { + return (long)this[key]; + } + public void SetInt64(string key, long value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.INT64.AsTypedValue(value)); + } + public char GetChar(string key) + { + return (char)this[key]; + } + public void SetChar(string key, char value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.ASCII_CHARACTER.AsTypedValue(value)); + } + public float GetFloat(string key) + { + return (float)this[key]; + } + public void SetFloat(string key, float value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.FLOAT.AsTypedValue(value)); + } + public double GetDouble(string key) + { + return (double)this[key]; + } + public void SetDouble(string key, double value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.DOUBLE.AsTypedValue(value)); + } + public decimal GetDecimal(string key) + { + return (decimal)this[key]; + } + public void SetDecimal(string key, decimal value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.DECIMAL.AsTypedValue(value)); + } + public string GetString(string key) + { + return (string)this[key]; + } + public void SetString(string key, string value) + { + CheckPropertyName(key); + if ( value == null ) + SetProperty(key, AMQType.VOID.AsTypedValue(null)); + else + SetProperty(key, AMQType.LONG_STRING.AsTypedValue(value)); + } + public byte[] GetBytes(string key) + { + return (byte[])this[key]; + } + public void SetBytes(string key, byte[] value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.BINARY.AsTypedValue(value)); + } + public ushort GetUInt16(string key) + { + return (ushort)this[key]; + } + public void SetUInt16(string key, ushort value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.UINT16.AsTypedValue(value)); + } + public uint GetUInt32(string key) + { + return (uint)this[key]; + } + public void SetUInt32(string key, uint value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.UINT32.AsTypedValue(value)); + } + public ulong GetUInt64(string key) + { + return (ulong)this[key]; + } + public void SetUInt64(string key, ulong value) + { + CheckPropertyName(key); + SetProperty(key, AMQType.UINT64.AsTypedValue(value)); + } + + #endregion // Typed Setters and Getters + + #region Public Methods + // + // Public Methods + // + + /// <summary> + /// Removes the property with the specified name + /// </summary> + /// <param name="key">The name of the property to remove</param> + /// <returns>The previous value of the property or null</returns> + public AMQTypedValue RemoveKey(string key) + { + InitMapIfNecessary(); + _encodedForm = null; + AMQTypedValue value = (AMQTypedValue)_properties[key]; + if ( value != null ) + { + _properties.Remove(key); + _encodedSize -= EncodingUtils.EncodedShortStringLength(key); + _encodedSize--; + _encodedSize -= value.EncodingLength; + + } + return value; + } + + + /// <summary> + /// Remove the property with the specified name + /// </summary> + /// <param name="key">The name of the property to remove</param> + public void Remove(string key) + { + RemoveKey(key); + } + + /// <summary> + /// Remove all properties from the table + /// </summary> + public void Clear() + { + InitMapIfNecessary(); + _encodedForm = null; + _properties.Clear(); + _encodedSize = 0; + } + + /// <summary> + /// Adds all the items from one field table in this one. Will overwrite any items in the current table + /// with the same key. + /// </summary> + /// <param name="ft">the source field table</param> + public void AddAll(IFieldTable ft) + { + foreach ( DictionaryEntry dictionaryEntry in ft ) + { + this[(string)dictionaryEntry.Key] = dictionaryEntry.Value; + } + } + + /// <summary> + /// Get a enumerator over the internal property set. + /// Notice the enumerator will DictionaryEntry objects with + /// a string as the Key and an <see cref="AMQTypedValue"/> instance as the value + /// </summary> + /// <returns>The enumerator object</returns> + public IEnumerator GetEnumerator() + { + InitMapIfNecessary(); + return _properties.GetEnumerator(); + } + + /// <summary> + /// Indicates if a property with the given name exists + /// </summary> + /// <param name="s">Property name to check</param> + /// <returns>True if the property exists</returns> + public bool Contains(string s) + { + InitMapIfNecessary(); + return _properties.Contains(s); + } + + /// <summary> + /// Returns a dictionary mapping Property Names to the corresponding + /// <see cref="AMQTypedValue"/> value + /// </summary> + /// <returns>The internal dictionary</returns> + public IDictionary AsDictionary() + { + InitMapIfNecessary(); + return _properties; + } + + /// <summary> + /// Returns a string representation of this field table + /// </summary> + /// <returns>A string</returns> + public override string ToString() + { + StringBuilder sb = new StringBuilder("FieldTable {"); + + bool first = true; + InitMapIfNecessary(); + foreach ( DictionaryEntry entry in _properties ) + { + if ( !first ) + { + sb.Append(", "); + } + first = false; + sb.Append(entry.Key).Append(" => ").Append(entry.Value); + } + + sb.Append("}"); + return sb.ToString(); + } + + /// <summary> + /// Serializes this instance to the specified <see cref="ByteBuffer"/>. + /// </summary> + /// <param name="buffer">The buffer to write to</param> + public void WriteToBuffer(ByteBuffer buffer) + { + if ( _log.IsDebugEnabled ) + { + _log.Debug("FieldTable::writeToBuffer: Writing encoded length of " + EncodedSize + "..."); + } + + EncodingUtils.WriteUnsignedInteger(buffer, EncodedSize); + WritePayload(buffer); + } + + /// <summary> + /// Returns a byte array with the serialized representation + /// of this field table + /// </summary> + /// <returns>An array of bytes</returns> + public byte[] GetDataAsBytes() + { + ByteBuffer buffer = ByteBuffer.Allocate((int)_encodedSize); + WritePayload(buffer); + byte[] result = new byte[_encodedSize]; + buffer.Flip(); + buffer.GetBytes(result); + //buffer.Release(); + return result; + } + + #endregion // Public Methods + + #region Private Methods + // + // Private Methods + // + + private static void CheckPropertyName(string propertyName) + { + if ( propertyName == null || propertyName.Length == 0 ) + throw new ArgumentNullException("propertyName"); + CheckIdentifierFormat(propertyName); + } + + private static void CheckIdentifierFormat(string propertyName) + { + // AMQP Spec: 4.2.5.5 Field Tables + // Guidelines for implementers: + // * Field names MUST start with a letter, '$' or '#' and may continue with + // letters, '$' or '#', digits, or underlines, to a maximum length of 128 + // characters. + // * The server SHOULD validate field names and upon receiving an invalid + // field name, it SHOULD signal a connection exception with reply code + // 503 (syntax error). Conformance test: amq_wlp_table_01. + // * A peer MUST handle duplicate fields by using only the first instance. + + + // AMQP length limit + if ( propertyName.Length > 128 ) + { + throw new ArgumentException("AMQP limits property names to 128 characters"); + } + + // AMQ start character + if ( !(Char.IsLetter(propertyName[0]) + || propertyName[0] == '$' + || propertyName[0] == '#' + || propertyName[0] == '_' ) )// Not official AMQP added for JMS. + { + throw new ArgumentException("Identifier '" + propertyName + "' does not start with a valid AMQP start character"); + } + } + + private object GetObject(string key) + { + AMQTypedValue value = GetProperty(key); + return value != null ? value.Value : null; + } + + private void SetObject(string key, object value) + { + if ( value is bool ) + { + SetBoolean(key, (bool)value); + } else if ( value is byte ) + { + SetByte(key, (byte)value); + } else if ( value is sbyte ) + { + SetSByte(key, (sbyte)value); + } else if ( value is short ) + { + SetInt16(key, (short)value); + } else if ( value is ushort ) + { + SetUInt16(key, (ushort)value); + } else if ( value is int ) + { + SetInt32(key, (int) value); + } else if ( value is uint ) + { + SetUInt32(key, (uint)value); + } else if ( value is long ) + { + SetInt64(key, (long) value); + } else if ( value is ulong ) + { + SetUInt64(key, (ulong)value); + } else if ( value is char ) + { + SetChar(key, (char) value); + } else if ( value is float ) + { + SetFloat(key, (float) value); + } else if ( value is double ) + { + SetDouble(key, (double) value); + } else if ( value is decimal ) + { + SetDecimal(key, (decimal) value); + } else if ( value is string ) + { + SetString(key, (string) value); + } else if ( value is byte[] ) + { + SetBytes(key, (byte[])value); + } else + { + throw new ArgumentException("Data type not supported yet"); + } + } + + private AMQTypedValue GetProperty(string name) + { + InitMapIfNecessary(); + return (AMQTypedValue) _properties[name]; + } + + private void PopulateFromBuffer() + { + try + { + ByteBuffer buffer = _encodedForm; + _encodedForm = null; + if ( buffer != null ) + SetFromBuffer(buffer, _encodedSize); + } catch ( AMQFrameDecodingException e ) + { + _log.Error("Error decoding FieldTable in deferred decoding mode ", e); + throw; + } + } + + private void SetFromBuffer(ByteBuffer buffer, uint length) + { + bool trace = _log.IsDebugEnabled; + if ( length > 0 ) + { + int expectedRemaining = buffer.Remaining - (int)length; + _properties = new LinkedHashtable(); + + do + { + string key = EncodingUtils.ReadShortString(buffer); + AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer); + if ( trace ) + { + _log.Debug(string.Format("FieldTable::PropFieldTable(buffer,{0}): Read type '{1}', key '{2}', value '{3}'", length, value.Type, key, value.Value)); + } + _properties.Add(key, value); + + } while ( buffer.Remaining > expectedRemaining ); + _encodedSize = length; + } + if ( trace ) + { + _log.Debug("FieldTable::FieldTable(buffer," + length + "): Done."); + } + } + + private void InitMapIfNecessary() + { + lock ( _syncLock ) + { + if ( _properties == null ) + { + if ( _encodedForm == null ) + { + _properties = new LinkedHashtable(); + } else + { + PopulateFromBuffer(); + } + } + } + } + + private AMQTypedValue SetProperty(string key, AMQTypedValue value) + { + InitMapIfNecessary(); + _encodedForm = null; + if ( value == null ) + { + RemoveKey(key); + } + AMQTypedValue oldVal = (AMQTypedValue)_properties[key]; + _properties.Add(key, value); + if ( oldVal != null ) + { + _encodedSize -= oldVal.EncodingLength; + } else + { + _encodedSize += EncodingUtils.EncodedShortStringLength(key) + (uint)1; + } + if ( value != null ) + { + _encodedSize += value.EncodingLength; + } + + return oldVal; + } + + public void WritePayload(ByteBuffer buffer) + { + if ( _encodedForm != null ) + { + lock ( _syncLock ) + { + buffer.Put(_encodedForm); + _encodedForm.Flip(); + } + } else if ( _properties != null ) + { + foreach ( DictionaryEntry de in _properties ) + { + string key = (string)de.Key; + AMQTypedValue value = (AMQTypedValue)de.Value; + try + { + if ( _log.IsDebugEnabled ) + { + _log.Debug("Writing Property:" + key + + " Type:" + value.Type + + " Value:" + value.Value); + _log.Debug("Buffer Position:" + buffer.Position + + " Remaining:" + buffer.Remaining); + } + //Write the actual parameter name + EncodingUtils.WriteShortStringBytes(buffer, key); + value.WriteToBuffer(buffer); + } catch ( Exception ex ) + { + if ( _log.IsDebugEnabled ) + { + _log.Debug("Exception thrown:" + ex); + _log.Debug("Writing Property:" + key + + " Type:" + value.Type + + " Value:" + value.Value); + _log.Debug("Buffer Position:" + buffer.Position + + " Remaining:" + buffer.Remaining); + } + throw; + } + } + } + } + #endregion // Private Methods + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/HeartbeatBody.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/HeartbeatBody.cs new file mode 100644 index 0000000000..a8906f5ba8 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/HeartbeatBody.cs @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class HeartbeatBody : IBody +{ + public const byte TYPE = 8; + public static AMQFrame FRAME = new HeartbeatBody().ToFrame(); + + public byte BodyType + { + get + { + return TYPE; + } + } + + public uint Size + { + get + { + return 0;//heartbeats we generate have no payload + } + } + + public void WritePayload(ByteBuffer buffer) + { + } + + public void PopulateFromBuffer(ByteBuffer buffer, uint size) + { + if (size > 0) + { + //allow other implementations to have a payload, but ignore it: + buffer.Skip((int) size); + } + } + + public AMQFrame ToFrame() + { + return new AMQFrame(0, this); + } +} +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/HeartbeatBodyFactory.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/HeartbeatBodyFactory.cs new file mode 100644 index 0000000000..90e5c7768e --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/HeartbeatBodyFactory.cs @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + public class HeartbeatBodyFactory : IBodyFactory + { + public IBody CreateBody(ByteBuffer input) + { + return new HeartbeatBody(); + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/IBody.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/IBody.cs new file mode 100644 index 0000000000..97b4459e5c --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/IBody.cs @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + /// <summary> + /// An IBody is contained within a top level frame. As such, it is not en/decodable on its own but + /// is decoded as a step within a the overall en/decoding process. + /// </summary> + public interface IBody + { + /// <summary> + /// Gets the type. See RFC 006 for the meaning of "type" in this context. + /// </summary> + /// <value>The type.</value> + byte BodyType + { + get; + } + + /// <summary> + /// Get the size of the body + /// </summary> + /// <value>The size in bytes.</value> + uint Size + { + get; + } + + /// <summary> + /// Writes this instance to a buffer. + /// </summary> + /// <param name="buffer">The buffer.</param> + void WritePayload(ByteBuffer buffer); + + /// <summary> + /// Populates this instance from a buffer of data. + /// </summary> + /// <param name="buffer">The buffer.</param> + /// <param name="size">The size.</param> + /// <exception cref="AMQFrameDecodingException">If the buffer contains data that cannot be decoded</exception> + void PopulateFromBuffer(ByteBuffer buffer, uint size); + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/IBodyFactory.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/IBodyFactory.cs new file mode 100644 index 0000000000..dd7960ddbe --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/IBodyFactory.cs @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + /// <summary> + /// Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface. + /// </summary> + public interface IBodyFactory + { + /// <summary> + /// Creates the body. + /// </summary> + /// <param name="inbuf">The ByteBuffer containing data from the network</param> + /// <returns></returns> + /// <exception>AMQFrameDecodingException</exception> + IBody CreateBody(ByteBuffer inbuf); + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/IContentHeaderProperties.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/IContentHeaderProperties.cs new file mode 100644 index 0000000000..676d0910d4 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/IContentHeaderProperties.cs @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + /// <summary> + /// There will be an implementation of this interface for each content type. All content types have associated + /// header properties and this provides a way to encode and decode them. + /// </summary> + public interface IContentHeaderProperties + { + /// <summary> + /// Writes the property list to the buffer, in a suitably encoded form. + /// </summary> + /// <param name="buffer">The buffer to write to</param> + void WritePropertyListPayload(ByteBuffer buffer); + + /// <summary> + /// Populates the properties from buffer. + /// </summary> + /// <param name="buffer">The buffer to read from.</param> + /// <param name="propertyFlags">The property flags.</param> + /// <exception cref="AMQFrameDecodingException">Thrown when the buffer does not contain valid data</exception> + void PopulatePropertiesFromBuffer(ByteBuffer buffer, ushort propertyFlags); + + /// <summary> + /// Gets the size of the encoded property list in bytes. + /// </summary> + /// <value>The size of the property list in bytes</value> + uint PropertyListSize + { + get; + } + + /// <summary> + /// Gets the property flags. Property flags indicate which properties are set in the list. The + /// position and meaning of each flag is defined in the protocol specification for the particular + /// content type with which these properties are associated. + /// </summary> + /// <value>the flags as an unsigned integer</value> + ushort PropertyFlags + { + get; + } + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/IDataBlock.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/IDataBlock.cs new file mode 100644 index 0000000000..c61ed90d10 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/IDataBlock.cs @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Framing +{ + /// <summary> + /// A data block represents something that has a size in bytes and the ability to write itself to a byte + /// buffer (similar to a byte array). It represents "top level" frames in the protocol specification. + /// </summary> + public interface IDataBlock : IEncodableAMQDataBlock + { + /// <summary> + /// Get the size of buffer needed to store the byte representation of this + /// frame. + /// </summary> + /// <returns>size in bytes</returns> + uint Size + { + get; + } + + /// <summary> + /// Writes the datablock to the specified buffer. + /// </summary> + /// <param name="buffer">The buffer to write to. Must be the correct size.</param> + void WritePayload(ByteBuffer buffer); + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/IEncodableAMQDataBlock.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/IEncodableAMQDataBlock.cs new file mode 100644 index 0000000000..da8bf9fef9 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/IEncodableAMQDataBlock.cs @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Framing +{ + + /// <summary> + /// Marker interface to indicate to MINA that a data block should be encoded with the + /// single encoder/decoder that we have defined. + /// </summary> + public interface IEncodableAMQDataBlock + { + } +} diff --git a/RC9/qpid/dotnet/Qpid.Common/Framing/ProtocolInitiation.cs b/RC9/qpid/dotnet/Qpid.Common/Framing/ProtocolInitiation.cs new file mode 100644 index 0000000000..5407bc08d5 --- /dev/null +++ b/RC9/qpid/dotnet/Qpid.Common/Framing/ProtocolInitiation.cs @@ -0,0 +1,158 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Configuration; +using System.Reflection; +using System.Xml; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Codec; +using Apache.Qpid.Codec.Demux; +using Apache.Qpid.Common; + +namespace Apache.Qpid.Framing +{ + public class ProtocolInitiation : IDataBlock, IEncodableAMQDataBlock + { + private static readonly ILog _log = LogManager.GetLogger(typeof(ProtocolInitiation)); + + public char[] Header = new char[]{'A','M','Q','P'}; + + private const byte CURRENT_PROTOCOL_CLASS = 1; + private const int CURRENT_PROTOCOL_INSTANCE = 1; + // FIXME: Needs to be tweakable from GRM.dll.config file. i.e. Major version 7 or 8 + + // FIXME: a configuration item for avoiding Basic.Qos (for OpenAMQ compatibility) + public static int CURRENT_PROTOCOL_VERSION_MAJOR = 8; // FIXME: put back to 7 for OpenAMQ! + private const int CURRENT_PROTOCOL_VERSION_MINOR = 0; + + public byte ProtocolClass = CURRENT_PROTOCOL_CLASS; + public byte ProtocolInstance = CURRENT_PROTOCOL_INSTANCE; + public byte ProtocolMajor = (byte)CURRENT_PROTOCOL_VERSION_MAJOR; + public byte ProtocolMinor = CURRENT_PROTOCOL_VERSION_MINOR; + + static ProtocolInitiation() + { + AssemblySettings settings = new AssemblySettings(); + + /* + string openAMQ = settings["OpenAMQ1d4Compatibility"]; + if (openAMQ.Equals("true")) + { + _log.Warn("Starting in OpenAMQ-1.0d4 compatibility mode. ProtocolMajorVersion is 7 and Basic.Qos will not be sent."); + CURRENT_PROTOCOL_VERSION_MAJOR = 7; + } + */ + } + + public uint Size + { + get + { + return 4 + 1 + 1 + 1 + 1; + } + } + + public void WritePayload(ByteBuffer buffer) + { + foreach (char c in Header) + { + buffer.Put((byte) c); + } + buffer.Put(ProtocolClass); + buffer.Put(ProtocolInstance); + buffer.Put(ProtocolMajor); + buffer.Put(ProtocolMinor); + } + + /// <summary> + /// Populates from buffer. + /// </summary> + /// <param name="buffer">The buffer.</param> + public void PopulateFromBuffer(ByteBuffer buffer) + { + throw new AMQException("Method not implemented"); + } + + public class Decoder : IMessageDecoder + { + private bool _disabled = false; + + public MessageDecoderResult Decodable(ByteBuffer inbuf) + { + if (_disabled) + { + return MessageDecoderResult.NOT_OK; + } + if (inbuf.Remaining < 8) + { + return MessageDecoderResult.NEED_DATA; + } + else + { + char[] expected = new char[]{'A', 'M', 'Q', 'P'}; + for (int i = 0; i < 4; i++) + { + if (((char) inbuf.GetByte()) != expected[i]) + { + return MessageDecoderResult.NOT_OK; + } + } + return MessageDecoderResult.OK; + } + } + + /// <summary> + /// Decodes the specified session. + /// </summary> + /// <param name="inbuf">The inbuf.</param> + /// <param name="output">The protocol output.</param> + /// <returns></returns> + public MessageDecoderResult Decode(ByteBuffer inbuf, IProtocolDecoderOutput output) + { + byte[] header = new byte[4]; + inbuf.GetBytes(header); + ProtocolInitiation pi = new ProtocolInitiation(); + pi.Header = new char[]{'A','M','Q','P'}; + pi.ProtocolClass = inbuf.GetByte(); + pi.ProtocolInstance = inbuf.GetByte(); + pi.ProtocolMajor = inbuf.GetByte(); + pi.ProtocolMinor = inbuf.GetByte(); + output.Write(pi); + return MessageDecoderResult.OK; + } + + public bool Disabled + { + set + { + _disabled = value; + } + } + } + + public override string ToString() + { + return String.Format("{0}{{Class={1} Instance={2} Major={3} Minor={4}}}", + GetType().Name, ProtocolClass, ProtocolInstance, ProtocolMajor, ProtocolMinor); + } + } +} |
