From 66f97f32c78e0cf5914a441ae8277ee3aa659ce9 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 24 Nov 2007 21:14:14 +0000 Subject: QPID-567 : Add mutliversion support to Qpid/Java. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@597918 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/AMQChannelException.java | 9 +- .../org/apache/qpid/AMQConnectionException.java | 13 +- .../main/java/org/apache/qpid/framing/AMQBody.java | 12 +- .../qpid/framing/AMQFrameDecodingException.java | 6 + .../org/apache/qpid/framing/AMQMethodBody.java | 117 ++++--------- .../apache/qpid/framing/AMQMethodBodyFactory.java | 3 +- .../org/apache/qpid/framing/AMQMethodBodyImpl.java | 89 ++++++++++ .../qpid/framing/AMQMethodBodyInstanceFactory.java | 3 +- .../java/org/apache/qpid/framing/ContentBody.java | 2 +- .../org/apache/qpid/framing/ContentHeaderBody.java | 2 +- .../framing/ContentHeaderPropertiesFactory.java | 4 +- .../org/apache/qpid/framing/EncodingUtils.java | 19 ++- .../org/apache/qpid/framing/HeartbeatBody.java | 6 +- .../apache/qpid/framing/MethodConverter_8_0.java | 125 -------------- .../apache/qpid/framing/ProtocolInitiation.java | 3 +- .../qpid/framing/VersionSpecificRegistry.java | 2 +- .../qpid/framing/amqp_0_9/AMQMethodBody_0_9.java | 188 +++++++++++++++++++++ .../qpid/framing/amqp_0_9/MethodConverter_0_9.java | 126 ++++++++++++++ .../qpid/framing/amqp_8_0/AMQMethodBody_8_0.java | 188 +++++++++++++++++++++ .../qpid/framing/amqp_8_0/MethodConverter_8_0.java | 146 ++++++++++++++++ .../protocol/AMQVersionAwareProtocolSession.java | 5 +- .../apache/qpid/protocol/ProtocolVersionAware.java | 6 + 22 files changed, 831 insertions(+), 243 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java delete mode 100644 java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 9efd271e4d..35f4b1f074 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -20,9 +20,7 @@ */ package org.apache.qpid; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; /** @@ -63,6 +61,9 @@ public class AMQChannelException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); + + MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor)); + return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId)); + } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index 7edfa648ed..fabb7f1f1f 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -21,9 +21,7 @@ package org.apache.qpid; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; /** @@ -66,8 +64,13 @@ public class AMQConnectionException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), - new AMQShortString(getMessage())); + MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor)); + return new AMQFrame(channel, + reg.createConnectionCloseBody(getErrorCode().getCode(), + new AMQShortString(getMessage()), + _classId, + _methodId)); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index ebeea8d2b4..3abd97ddb7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -22,18 +22,18 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public abstract class AMQBody +public interface AMQBody { - public abstract byte getFrameType(); + public byte getFrameType(); /** * Get the size of the body * @return unsigned short */ - protected abstract int getSize(); + public abstract int getSize(); - protected abstract void writePayload(ByteBuffer buffer); + public void writePayload(ByteBuffer buffer); - protected abstract void populateFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException; + //public void populateFromBuffer(ByteBuffer buffer, long size) + // throws AMQFrameDecodingException, AMQProtocolVersionException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java index cd5ccf8e04..e6320a07fc 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java @@ -38,4 +38,10 @@ public class AMQFrameDecodingException extends AMQException { super(errorCode, message, t); } + + public AMQFrameDecodingException(AMQConstant errorCode, String message) + { + super(errorCode, message); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index f2e91083ca..4763b22290 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -23,79 +23,42 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; -public abstract class AMQMethodBody extends AMQBody +public interface AMQMethodBody extends AMQBody { public static final byte TYPE = 1; /** AMQP version */ - protected byte major; - protected byte minor; + public byte getMajor(); - public byte getMajor() - { - return major; - } + public byte getMinor(); - public byte getMinor() - { - return minor; - } - public AMQMethodBody(byte major, byte minor) - { - this.major = major; - this.minor = minor; - } - - /** unsigned short */ - protected abstract int getBodySize(); /** @return unsigned short */ - protected abstract int getClazz(); + public int getClazz(); /** @return unsigned short */ - protected abstract int getMethod(); - - protected abstract void writeMethodPayload(ByteBuffer buffer); - - public byte getFrameType() - { - return TYPE; - } - - protected int getSize() - { - return 2 + 2 + getBodySize(); - } - - protected void writePayload(ByteBuffer buffer) - { - EncodingUtils.writeUnsignedShort(buffer, getClazz()); - EncodingUtils.writeUnsignedShort(buffer, getMethod()); - writeMethodPayload(buffer); - } - - protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; - - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException - { - populateMethodBodyFromBuffer(buffer); - } - - public String toString() - { - StringBuffer buf = new StringBuffer(getClass().getName()); - buf.append("[ Class: ").append(getClazz()); - buf.append(" Method: ").append(getMethod()).append(']'); - return buf.toString(); - } + public int getMethod(); + + public void writeMethodPayload(ByteBuffer buffer); + + + public int getSize(); + + public void writePayload(ByteBuffer buffer); + + //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; + + //public void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException; + + public AMQFrame generateFrame(int channelId); + + public String toString(); + - /** - * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and - * method ids of the body it resulted from). - */ /** * Convenience Method to create a channel not found exception @@ -104,29 +67,17 @@ public abstract class AMQMethodBody extends AMQBody * * @return new AMQChannelException */ - public AMQChannelException getChannelNotFoundException(int channelId) - { - return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); - } - - public AMQChannelException getChannelException(AMQConstant code, String message) - { - return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor); - } - - public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) - { - return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause); - } - - public AMQConnectionException getConnectionException(AMQConstant code, String message) - { - return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor); - } - - public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) - { - return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause); - } + public AMQChannelException getChannelNotFoundException(int channelId); + + public AMQChannelException getChannelException(AMQConstant code, String message); + + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause); + + public AMQConnectionException getConnectionException(AMQConstant code, String message); + + + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause); + + public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index cf85bdab31..1a7022c11b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -40,7 +40,6 @@ public class AMQMethodBodyFactory implements BodyFactory public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - return _protocolSession.getRegistry().get((short) in.getUnsignedShort(), (short) in.getUnsignedShort(), in, - bodySize); + return _protocolSession.getMethodRegistry().convertToBody(in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java new file mode 100644 index 0000000000..5215bcbd66 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -0,0 +1,89 @@ +package org.apache.qpid.framing; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.protocol.AMQConstant; + +public abstract class AMQMethodBodyImpl implements AMQMethodBody +{ + public static final byte TYPE = 1; + + public AMQMethodBodyImpl() + { + } + + public byte getFrameType() + { + return TYPE; + } + + + /** unsigned short */ + abstract protected int getBodySize(); + + + public AMQFrame generateFrame(int channelId) + { + return new AMQFrame(channelId, this); + } + + /** + * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and + * method ids of the body it resulted from). + */ + + /** + * Convenience Method to create a channel not found exception + * + * @param channelId The channel id that is not found + * + * @return new AMQChannelException + */ + public AMQChannelException getChannelNotFoundException(int channelId) + { + return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); + } + + public AMQChannelException getChannelException(AMQConstant code, String message) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), null); + } + + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), null); + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java index 359efe7eb7..0030742e94 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -26,6 +26,5 @@ import org.apache.mina.common.ByteBuffer; public abstract interface AMQMethodBodyInstanceFactory { - public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException; - public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException; + public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index be38695384..cbee1680f7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class ContentBody extends AMQBody +public class ContentBody implements AMQBody { public static final byte TYPE = 3; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 02631a5f88..80a61544b3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class ContentHeaderBody extends AMQBody +public class ContentHeaderBody implements AMQBody { public static final byte TYPE = 2; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index 712eb437db..46189b63d7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -22,6 +22,8 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; + public class ContentHeaderPropertiesFactory { private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory(); @@ -43,7 +45,7 @@ public class ContentHeaderPropertiesFactory // AMQP version change: "Hardwired" version to major=8, minor=0 // TODO: Change so that the actual version is obtained from // the ProtocolInitiation object for this session. - if (classId == BasicConsumeBody.getClazz((byte)8, (byte)0)) + if (classId == BasicConsumeBodyImpl.CLASS_ID) { properties = new BasicContentHeaderProperties(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index ccba8bd41e..6425f8c591 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -229,7 +229,11 @@ public class EncodingUtils encodedString[i] = (byte) cha[i]; } - writeBytes(buffer, encodedString); + // TODO: check length fits in an unsigned byte + writeUnsignedByte(buffer, (short)encodedString.length); + buffer.put(encodedString); + + } else { @@ -928,15 +932,15 @@ public class EncodingUtils public static byte[] readBytes(ByteBuffer buffer) { - short length = buffer.getUnsigned(); + long length = buffer.getUnsignedInt(); if (length == 0) { return null; } else { - byte[] dataBytes = new byte[length]; - buffer.get(dataBytes, 0, length); + byte[] dataBytes = new byte[(int)length]; + buffer.get(dataBytes, 0, (int)length); return dataBytes; } @@ -947,13 +951,14 @@ public class EncodingUtils if (data != null) { // TODO: check length fits in an unsigned byte - writeUnsignedByte(buffer, (short) data.length); + writeUnsignedInteger(buffer, (long)data.length); buffer.put(data); } else - { + { // really writing out unsigned byte - buffer.put((byte) 0); + //buffer.put((byte) 0); + writeUnsignedInteger(buffer, 0L); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 7246c4a1cf..ef7163bd40 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class HeartbeatBody extends AMQBody +public class HeartbeatBody implements AMQBody { public static final byte TYPE = 8; public static AMQFrame FRAME = new HeartbeatBody().toFrame(); @@ -46,12 +46,12 @@ public class HeartbeatBody extends AMQBody return TYPE; } - protected int getSize() + public int getSize() { return 0;//heartbeats we generate have no payload } - protected void writePayload(ByteBuffer buffer) + public void writePayload(ByteBuffer buffer) { } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java deleted file mode 100644 index 9a113f452b..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.framing; - -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.AbstractMethodConverter; - -import org.apache.mina.common.ByteBuffer; - -public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter -{ - private int _basicPublishClassId; - private int _basicPublishMethodId; - - public MethodConverter_8_0() - { - super((byte)8,(byte)0); - - - } - - public AMQBody convertToBody(ContentChunk contentChunk) - { - return new ContentBody(contentChunk.getData()); - } - - public ContentChunk convertToContentChunk(AMQBody body) - { - final ContentBody contentBodyChunk = (ContentBody) body; - - return new ContentChunk() - { - - public int getSize() - { - return contentBodyChunk.getSize(); - } - - public ByteBuffer getData() - { - return contentBodyChunk.payload; - } - - public void reduceToFit() - { - contentBodyChunk.reduceBufferToFit(); - } - }; - - } - - public void configure() - { - - _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion()); - _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion()); - - } - - public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) - { - final BasicPublishBody body = (BasicPublishBody) methodBody; - - return new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return body.getExchange(); - } - - public boolean isImmediate() - { - return body.getImmediate(); - } - - public boolean isMandatory() - { - return body.getMandatory(); - } - - public AMQShortString getRoutingKey() - { - return body.getRoutingKey(); - } - }; - - } - - public AMQMethodBody convertToBody(MessagePublishInfo info) - { - - return new BasicPublishBody(getProtocolMajorVersion(), - getProtocolMinorVersion(), - _basicPublishClassId, - _basicPublishMethodId, - info.getExchange(), - info.isImmediate(), - info.isMandatory(), - info.getRoutingKey(), - 0) ; // ticket - - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 8b40fe72eb..aaea771a07 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -139,7 +139,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData } } - public void checkVersion() throws AMQException + public ProtocolVersion checkVersion() throws AMQException { if(_protocolHeader.length != 4) @@ -180,6 +180,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData throw new AMQProtocolVersionException("Protocol version " + _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker."); } + return pv; } public String toString() diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index 6006e9793c..516d0c569c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -182,7 +182,7 @@ public class VersionSpecificRegistry + " method " + methodID + ".", null); } - return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size); + return bodyFactory.newInstance( in, size); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java new file mode 100644 index 0000000000..169160a9e4 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java @@ -0,0 +1,188 @@ +package org.apache.qpid.framing.amqp_0_9; + +import org.apache.qpid.framing.EncodingUtils; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.Content; + +import org.apache.mina.common.ByteBuffer; + +public abstract class AMQMethodBody_0_9 extends org.apache.qpid.framing.AMQMethodBodyImpl +{ + + public byte getMajor() + { + return 0; + } + + public byte getMinor() + { + return 9; + } + + public int getSize() + { + return 2 + 2 + getBodySize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + + protected byte readByte(ByteBuffer buffer) + { + return buffer.get(); + } + + protected AMQShortString readAMQShortString(ByteBuffer buffer) + { + return EncodingUtils.readAMQShortString(buffer); + } + + protected int getSizeOf(AMQShortString string) + { + return EncodingUtils.encodedShortStringLength(string); + } + + protected void writeByte(ByteBuffer buffer, byte b) + { + buffer.put(b); + } + + protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string) + { + EncodingUtils.writeShortStringBytes(buffer, string); + } + + protected int readInt(ByteBuffer buffer) + { + return buffer.getInt(); + } + + protected void writeInt(ByteBuffer buffer, int i) + { + buffer.putInt(i); + } + + protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + { + return EncodingUtils.readFieldTable(buffer); + } + + protected int getSizeOf(FieldTable table) + { + return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. + } + + protected void writeFieldTable(ByteBuffer buffer, FieldTable table) + { + EncodingUtils.writeFieldTableBytes(buffer, table); + } + + protected long readLong(ByteBuffer buffer) + { + return buffer.getLong(); + } + + protected void writeLong(ByteBuffer buffer, long l) + { + buffer.putLong(l); + } + + protected int getSizeOf(byte[] response) + { + return (response == null) ? 4 :response.length + 4; + } + + protected void writeBytes(ByteBuffer buffer, byte[] data) + { + EncodingUtils.writeBytes(buffer,data); + } + + protected byte[] readBytes(ByteBuffer buffer) + { + return EncodingUtils.readBytes(buffer); + } + + protected short readShort(ByteBuffer buffer) + { + return EncodingUtils.readShort(buffer); + } + + protected void writeShort(ByteBuffer buffer, short s) + { + EncodingUtils.writeShort(buffer, s); + } + + protected Content readContent(ByteBuffer buffer) + { + return null; //To change body of created methods use File | Settings | File Templates. + } + + protected int getSizeOf(Content body) + { + return 0; //To change body of created methods use File | Settings | File Templates. + } + + protected void writeContent(ByteBuffer buffer, Content body) + { + //To change body of created methods use File | Settings | File Templates. + } + + protected byte readBitfield(ByteBuffer buffer) + { + return readByte(buffer); //To change body of created methods use File | Settings | File Templates. + } + + protected int readUnsignedShort(ByteBuffer buffer) + { + return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates. + } + + protected void writeBitfield(ByteBuffer buffer, byte bitfield0) + { + buffer.put(bitfield0); + } + + protected void writeUnsignedShort(ByteBuffer buffer, int s) + { + EncodingUtils.writeUnsignedShort(buffer, s); + } + + protected long readUnsignedInteger(ByteBuffer buffer) + { + return buffer.getUnsignedInt(); + } + protected void writeUnsignedInteger(ByteBuffer buffer, long i) + { + EncodingUtils.writeUnsignedInteger(buffer, i); + } + + + protected short readUnsignedByte(ByteBuffer buffer) + { + return buffer.getUnsigned(); + } + + protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte) + { + EncodingUtils.writeUnsignedByte(buffer, unsignedByte); + } + + protected long readTimestamp(ByteBuffer buffer) + { + return EncodingUtils.readTimestamp(buffer); + } + + protected void writeTimestamp(ByteBuffer buffer, long t) + { + EncodingUtils.writeTimestamp(buffer, t); + } + + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java new file mode 100644 index 0000000000..de0007c132 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -0,0 +1,126 @@ +package org.apache.qpid.framing.amqp_0_9; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.framing.abstraction.AbstractMethodConverter; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_9.*; +import org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl; + +public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private int _basicPublishClassId; + private int _basicPublishMethodId; + + public MethodConverter_0_9() + { + super((byte)0,(byte)9); + + + } + + public AMQBody convertToBody(ContentChunk contentChunk) + { + return new ContentBody(contentChunk.getData()); + } + + public ContentChunk convertToContentChunk(AMQBody body) + { + final ContentBody contentBodyChunk = (ContentBody) body; + + return new ContentChunk() + { + + public int getSize() + { + return contentBodyChunk.getSize(); + } + + public ByteBuffer getData() + { + return contentBodyChunk.payload; + } + + public void reduceToFit() + { + contentBodyChunk.reduceBufferToFit(); + } + }; + + } + + public void configure() + { + + _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID; + _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; + + } + + public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) + { + final BasicPublishBody publishBody = ((BasicPublishBody) methodBody); + + final AMQShortString exchange = publishBody.getExchange(); + final AMQShortString routingKey = publishBody.getRoutingKey(); + + return new MethodConverter_0_9.MessagePublishInfoImpl(exchange == null ? null : exchange.intern(), + publishBody.getImmediate(), + publishBody.getMandatory(), + routingKey == null ? null : routingKey.intern()); + + } + + public AMQMethodBody convertToBody(MessagePublishInfo info) + { + + return new BasicPublishBodyImpl(0, + info.getExchange(), + info.getRoutingKey(), + info.isMandatory(), + info.isImmediate()) ; + + } + + private static class MessagePublishInfoImpl implements MessagePublishInfo + { + private final AMQShortString _exchange; + private final boolean _immediate; + private final boolean _mandatory; + private final AMQShortString _routingKey; + + public MessagePublishInfoImpl(final AMQShortString exchange, + final boolean immediate, + final boolean mandatory, + final AMQShortString routingKey) + { + _exchange = exchange; + _immediate = immediate; + _mandatory = mandatory; + _routingKey = routingKey; + } + + public AMQShortString getExchange() + { + return _exchange; + } + + public boolean isImmediate() + { + return _immediate; + } + + public boolean isMandatory() + { + return _mandatory; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java new file mode 100644 index 0000000000..fd6353fa54 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java @@ -0,0 +1,188 @@ +package org.apache.qpid.framing.amqp_8_0; + +import org.apache.qpid.framing.EncodingUtils; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.Content; + +import org.apache.mina.common.ByteBuffer; + +public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMethodBodyImpl +{ + + public byte getMajor() + { + return 8; + } + + public byte getMinor() + { + return 0; + } + + public int getSize() + { + return 2 + 2 + getBodySize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + + protected byte readByte(ByteBuffer buffer) + { + return buffer.get(); + } + + protected AMQShortString readAMQShortString(ByteBuffer buffer) + { + return EncodingUtils.readAMQShortString(buffer); + } + + protected int getSizeOf(AMQShortString string) + { + return EncodingUtils.encodedShortStringLength(string); + } + + protected void writeByte(ByteBuffer buffer, byte b) + { + buffer.put(b); + } + + protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string) + { + EncodingUtils.writeShortStringBytes(buffer, string); + } + + protected int readInt(ByteBuffer buffer) + { + return buffer.getInt(); + } + + protected void writeInt(ByteBuffer buffer, int i) + { + buffer.putInt(i); + } + + protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + { + return EncodingUtils.readFieldTable(buffer); + } + + protected int getSizeOf(FieldTable table) + { + return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. + } + + protected void writeFieldTable(ByteBuffer buffer, FieldTable table) + { + EncodingUtils.writeFieldTableBytes(buffer, table); + } + + protected long readLong(ByteBuffer buffer) + { + return buffer.getLong(); + } + + protected void writeLong(ByteBuffer buffer, long l) + { + buffer.putLong(l); + } + + protected int getSizeOf(byte[] response) + { + return (response == null) ? 4 : response.length + 4; + } + + protected void writeBytes(ByteBuffer buffer, byte[] data) + { + EncodingUtils.writeBytes(buffer,data); + } + + protected byte[] readBytes(ByteBuffer buffer) + { + return EncodingUtils.readBytes(buffer); + } + + protected short readShort(ByteBuffer buffer) + { + return EncodingUtils.readShort(buffer); + } + + protected void writeShort(ByteBuffer buffer, short s) + { + EncodingUtils.writeShort(buffer, s); + } + + protected Content readContent(ByteBuffer buffer) + { + return null; //To change body of created methods use File | Settings | File Templates. + } + + protected int getSizeOf(Content body) + { + return 0; //To change body of created methods use File | Settings | File Templates. + } + + protected void writeContent(ByteBuffer buffer, Content body) + { + //To change body of created methods use File | Settings | File Templates. + } + + protected byte readBitfield(ByteBuffer buffer) + { + return readByte(buffer); //To change body of created methods use File | Settings | File Templates. + } + + protected int readUnsignedShort(ByteBuffer buffer) + { + return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates. + } + + protected void writeBitfield(ByteBuffer buffer, byte bitfield0) + { + buffer.put(bitfield0); + } + + protected void writeUnsignedShort(ByteBuffer buffer, int s) + { + EncodingUtils.writeUnsignedShort(buffer, s); + } + + protected long readUnsignedInteger(ByteBuffer buffer) + { + return buffer.getUnsignedInt(); + } + protected void writeUnsignedInteger(ByteBuffer buffer, long i) + { + EncodingUtils.writeUnsignedInteger(buffer, i); + } + + + protected short readUnsignedByte(ByteBuffer buffer) + { + return buffer.getUnsigned(); + } + + protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte) + { + EncodingUtils.writeUnsignedByte(buffer, unsignedByte); + } + + protected long readTimestamp(ByteBuffer buffer) + { + return EncodingUtils.readTimestamp(buffer); + } + + protected void writeTimestamp(ByteBuffer buffer, long t) + { + EncodingUtils.writeTimestamp(buffer, t); + } + + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java new file mode 100644 index 0000000000..7a13af8a43 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.amqp_8_0; + +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.AbstractMethodConverter; +import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl; +import org.apache.qpid.framing.*; + +import org.apache.mina.common.ByteBuffer; + +public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private int _basicPublishClassId; + private int _basicPublishMethodId; + + public MethodConverter_8_0() + { + super((byte)8,(byte)0); + + + } + + public AMQBody convertToBody(ContentChunk contentChunk) + { + return new ContentBody(contentChunk.getData()); + } + + public ContentChunk convertToContentChunk(AMQBody body) + { + final ContentBody contentBodyChunk = (ContentBody) body; + + return new ContentChunk() + { + + public int getSize() + { + return contentBodyChunk.getSize(); + } + + public ByteBuffer getData() + { + return contentBodyChunk.payload; + } + + public void reduceToFit() + { + contentBodyChunk.reduceBufferToFit(); + } + }; + + } + + public void configure() + { + + _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID; + _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; + + } + + public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) + { + final BasicPublishBody publishBody = ((BasicPublishBody) methodBody); + + final AMQShortString exchange = publishBody.getExchange(); + final AMQShortString routingKey = publishBody.getRoutingKey(); + + return new MessagePublishInfoImpl(exchange == null ? null : exchange.intern(), + publishBody.getImmediate(), + publishBody.getMandatory(), + routingKey == null ? null : routingKey.intern()); + + } + + public AMQMethodBody convertToBody(MessagePublishInfo info) + { + + return new BasicPublishBodyImpl(0, + info.getExchange(), + info.getRoutingKey(), + info.isMandatory(), + info.isImmediate()) ; + + } + + private static class MessagePublishInfoImpl implements MessagePublishInfo + { + private final AMQShortString _exchange; + private final boolean _immediate; + private final boolean _mandatory; + private final AMQShortString _routingKey; + + public MessagePublishInfoImpl(final AMQShortString exchange, + final boolean immediate, + final boolean mandatory, + final AMQShortString routingKey) + { + _exchange = exchange; + _immediate = immediate; + _mandatory = mandatory; + _routingKey = routingKey; + } + + public AMQShortString getExchange() + { + return _exchange; + } + + public boolean isImmediate() + { + return _immediate; + } + + public boolean isMandatory() + { + return _mandatory; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 7c1d6fdaa0..035645aad2 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.protocol; import org.apache.qpid.framing.VersionSpecificRegistry; +import org.apache.qpid.framing.MethodRegistry; /** * AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to @@ -42,5 +43,7 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto * * @return The method registry for a specific version of the AMQP. */ - public VersionSpecificRegistry getRegistry(); +// public VersionSpecificRegistry getRegistry(); + + MethodRegistry getMethodRegistry(); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java index 60a7f30185..dea80cdcf4 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.framing.ProtocolVersion; + /** * ProtocolVersionAware is implemented by all AMQP handling classes, that need to provide an awareness to callers of * the version of the AMQP protocol that they are able to handle. @@ -32,6 +34,7 @@ package org.apache.qpid.protocol; public interface ProtocolVersionAware { /** + * @deprecated * Reports the AMQP minor version, that the implementer can handle. * * @return The AMQP minor version. @@ -39,9 +42,12 @@ public interface ProtocolVersionAware public byte getProtocolMinorVersion(); /** + * @deprecated * Reports the AMQP major version, that the implementer can handle. * * @return The AMQP major version. */ public byte getProtocolMajorVersion(); + + public ProtocolVersion getProtocolVersion(); } -- cgit v1.2.1