From c46d62e6834205408502d89d99f73e47f3ca2eb8 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 20 Feb 2007 16:20:41 +0000 Subject: QPID-325 : Persist durable exchange information in the store QPID-318 : Remove hardcoding of version numbers (as applies to store) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509628 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/framing/ContentBody.java | 2 + .../org/apache/qpid/framing/EncodingUtils.java | 7 +- .../apache/qpid/framing/MethodConverter_8_0.java | 104 +++++++++++++++++++++ .../qpid/framing/VersionSpecificRegistry.java | 55 +++++++++++ .../abstraction/AbstractMethodConverter.java | 26 ++++++ .../qpid/framing/abstraction/ContentChunk.java | 32 +++++++ .../framing/abstraction/MessagePublishInfo.java | 36 +++++++ .../abstraction/MessagePublishInfoConverter.java | 29 ++++++ .../ProtocolVersionMethodConverter.java | 29 ++++++ .../framing/BasicContentHeaderPropertiesTest.java | 2 +- 10 files changed, 317 insertions(+), 5 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (limited to 'java/common') diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index c35fc0a6c4..be38695384 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -96,6 +96,8 @@ public class ContentBody extends AMQBody } } + + public static AMQFrame createAMQFrame(int channelId, ContentBody body) { final AMQFrame frame = new AMQFrame(channelId, body); diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index 4e3768e4d4..f94cd4934c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -641,8 +641,7 @@ public class EncodingUtils public static void writeTimestamp(ByteBuffer buffer, long timestamp) { - writeUnsignedInteger(buffer, 0/*timestamp msb*/); - writeUnsignedInteger(buffer, timestamp); + writeLong(buffer, timestamp); } public static boolean[] readBooleans(ByteBuffer buffer) @@ -765,8 +764,8 @@ public class EncodingUtils public static long readTimestamp(ByteBuffer buffer) { // Discard msb from AMQ timestamp - buffer.getUnsignedInt(); - return buffer.getUnsignedInt(); + //buffer.getUnsignedInt(); + return buffer.getLong(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java new file mode 100644 index 0000000000..dd93cc97fa --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java @@ -0,0 +1,104 @@ +package org.apache.qpid.framing; + +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.AbstractMethodConverter; + +import org.apache.mina.common.ByteBuffer; + +public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private int _basicPublishClassId; + private int _basicPublishMethodId; + + public MethodConverter_8_0() + { + super((byte)8,(byte)0); + + + } + + public AMQBody convertToBody(ContentChunk contentChunk) + { + return new ContentBody(contentChunk.getData()); + } + + public ContentChunk convertToContentChunk(AMQBody body) + { + final ContentBody contentBodyChunk = (ContentBody) body; + + return new ContentChunk() + { + + public int getSize() + { + return contentBodyChunk.getSize(); + } + + public ByteBuffer getData() + { + return contentBodyChunk.payload; + } + + public void reduceToFit() + { + contentBodyChunk.reduceBufferToFit(); + } + }; + + } + + public void configure() + { + + _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion()); + _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion()); + + } + + public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) + { + final BasicPublishBody body = (BasicPublishBody) methodBody; + + return new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return body.getExchange(); + } + + public boolean isImmediate() + { + return body.getImmediate(); + } + + public boolean isMandatory() + { + return body.getMandatory(); + } + + public AMQShortString getRoutingKey() + { + return body.getRoutingKey(); + } + }; + + } + + public AMQMethodBody convertToBody(MessagePublishInfo info) + { + + return new BasicPublishBody(getProtocolMajorVersion(), + getProtocolMinorVersion(), + _basicPublishClassId, + _basicPublishMethodId, + info.getExchange(), + info.isImmediate(), + info.isMandatory(), + info.getRoutingKey(), + 0) ; // ticket + + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index 1df62c7b1b..ec371453aa 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; + import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; @@ -36,10 +38,53 @@ public class VersionSpecificRegistry private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][]; + private ProtocolVersionMethodConverter _protocolVersionConverter; + public VersionSpecificRegistry(byte major, byte minor) { _protocolMajorVersion = major; _protocolMinorVersion = minor; + + _protocolVersionConverter = loadProtocolVersionConverters(major, minor); + } + + private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, byte protocolMinorVersion) + { + try + { + Class versionMethodConverterClass = + (Class) Class.forName("org.apache.qpid.framing.MethodConverter_"+protocolMajorVersion + "_" + protocolMinorVersion); + return versionMethodConverterClass.newInstance(); + + } + catch (ClassNotFoundException e) + { + _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion); + if(protocolMinorVersion != 0) + { + protocolMinorVersion--; + return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); + } + else if (protocolMajorVersion != 0) + { + protocolMajorVersion--; + return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); + } + else + { + return null; + } + + + } + catch (IllegalAccessException e) + { + throw new IllegalStateException("Unable to load protocol version converter: ", e); + } + catch (InstantiationException e) + { + throw new IllegalStateException("Unable to load protocol version converter: ", e); + } } public byte getProtocolMajorVersion() @@ -138,4 +183,14 @@ public class VersionSpecificRegistry } + + public ProtocolVersionMethodConverter getProtocolVersionMethodConverter() + { + return _protocolVersionConverter; + } + + public void configure() + { + _protocolVersionConverter.configure(); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java new file mode 100644 index 0000000000..5490d482a1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java @@ -0,0 +1,26 @@ +package org.apache.qpid.framing.abstraction; + +public abstract class AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private final byte _protocolMajorVersion; + + + private final byte _protocolMinorVersion; + + public AbstractMethodConverter(byte major, byte minor) + { + _protocolMajorVersion = major; + _protocolMinorVersion = minor; + } + + + public final byte getProtocolMajorVersion() + { + return _protocolMajorVersion; + } + + public final byte getProtocolMinorVersion() + { + return _protocolMinorVersion; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java new file mode 100644 index 0000000000..6312e478a8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.abstraction; + +import org.apache.mina.common.ByteBuffer; + +public interface ContentChunk +{ + int getSize(); + ByteBuffer getData(); + + void reduceToFit(); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java new file mode 100644 index 0000000000..706499c1b0 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing.abstraction; + +import org.apache.qpid.framing.AMQShortString; + +public interface MessagePublishInfo +{ + + public AMQShortString getExchange(); + + public boolean isImmediate(); + + public boolean isMandatory(); + + public AMQShortString getRoutingKey(); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java new file mode 100644 index 0000000000..c9e15f18e3 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.qpid.framing.abstraction; + +import org.apache.qpid.framing.AMQMethodBody; + + +public interface MessagePublishInfoConverter +{ + public MessagePublishInfo convertToInfo(AMQMethodBody body); + public AMQMethodBody convertToBody(MessagePublishInfo info); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java new file mode 100644 index 0000000000..52e82cdf07 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.qpid.framing.abstraction; + +import org.apache.qpid.framing.AMQBody; + +public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter +{ + AMQBody convertToBody(ContentChunk contentBody); + ContentChunk convertToContentChunk(AMQBody body); + + void configure(); +} diff --git a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java index ffbdf730a9..0f706ac553 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java @@ -154,7 +154,7 @@ public class BasicContentHeaderPropertiesTest extends TestCase public void testSetGetTimestamp() { - long timestamp = 999999999; + long timestamp = System.currentTimeMillis(); _testProperties.setTimestamp(timestamp); assertEquals(timestamp, _testProperties.getTimestamp()); } -- cgit v1.2.1