diff options
Diffstat (limited to 'java')
36 files changed, 407 insertions, 397 deletions
diff --git a/java/broker/etc/log4j.xml b/java/broker/etc/log4j.xml index 9be428fbbd..98da18b8bb 100644 --- a/java/broker/etc/log4j.xml +++ b/java/broker/etc/log4j.xml @@ -37,9 +37,9 @@ </layout> </appender> - <!--<category name="org.apache.qpid.server.store"> + <category name="org.apache.qpid.framing"> <priority value="debug"/> - </category>--> + </category> <root> <priority value="info"/> diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 0f2f5ac94e..59ebc08428 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -127,7 +127,7 @@ public class AMQChannel _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; _exchanges = exchanges; - _requestManager = new RequestManager(channelId, protocolWriter); + _requestManager = new RequestManager(channelId, protocolWriter); _responseManager = new ResponseManager(channelId, methodListener, protocolWriter); _txnBuffer = new TxnBuffer(_messageStore); } @@ -827,7 +827,8 @@ public class AMQChannel catch (NoConsumersException e) { //TODO: store this for delivery after the commit-ok - _returns.add(e.getReturnMessage(_channelId)); + throw new Error("XXX"); + //_returns.add(e.getReturnMessage(_channelId)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 4942f17d3c..a1a6a77a93 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -20,11 +20,8 @@ */ package org.apache.qpid.server; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.MessageTransferBody; import org.apache.qpid.framing.CompositeAMQDataBlock; -import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; @@ -39,57 +36,26 @@ import java.util.List; public abstract class RequiredDeliveryException extends AMQException { private final String _message; - private final BasicPublishBody _publishBody; - private final ContentHeaderBody _contentHeaderBody; - private final List<ContentBody> _contentBodies; + private final AMQMessage _payload; public RequiredDeliveryException(String message, AMQMessage payload) { super(message); _message = message; - _publishBody = payload.getPublishBody(); - _contentHeaderBody = payload.getContentHeaderBody(); - _contentBodies = payload.getContentBodies(); + _payload = payload; } - public RequiredDeliveryException(String message, - BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) - { - super(message); - _message = message; - _publishBody = publishBody; - _contentHeaderBody = contentHeaderBody; - _contentBodies = contentBodies; - } - - public BasicPublishBody getPublishBody() - { - return _publishBody; - } - - public ContentHeaderBody getContentHeaderBody() - { - return _contentHeaderBody; - } - - public List<ContentBody> getContentBodies() - { - return _contentBodies; - } - - public CompositeAMQDataBlock getReturnMessage(int channel) + /* public CompositeAMQDataBlock getReturnMessage(int channel) { // AMQP version change: All generated *Body classes are now version-aware. // Shortcut: hardwire version to 0-9 (major=0, minor=9) for now. // TODO: Connect the version to that returned by the ProtocolInitiation // for this session. BasicReturnBody returnBody = new BasicReturnBody((byte)0, (byte)9); - returnBody.exchange = _publishBody.exchange; + returnBody.exchange = _transferBody.exchange; returnBody.replyCode = getReplyCode(); returnBody.replyText = _message; - returnBody.routingKey = _publishBody.routingKey; + returnBody.routingKey = _transferBody.routingKey; AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; @@ -105,7 +71,7 @@ public abstract class RequiredDeliveryException extends AMQException } return new CompositeAMQDataBlock(allFrames); - } + }*/ public int getErrorCode() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index eb9d1acb59..5c484edfef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -81,7 +81,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry */ public void routeContent(AMQMessage payload) throws AMQException { - final String exchange = payload.getPublishBody().exchange; + final String exchange = payload.getTransferBody().exchange; final Exchange exch = _exchangeMap.get(exchange); // there is a small window of opportunity for the exchange to be deleted in between // the JmsPublish being received (where the exchange is validated) and the final diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index d4069fa315..b777ae7d82 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.MessageTransferBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; @@ -168,18 +168,19 @@ public class DestNameExchange extends AbstractExchange public void route(AMQMessage payload) throws AMQException { - BasicPublishBody publishBody = payload.getPublishBody(); + MessageTransferBody transferBody = payload.getTransferBody(); - final String routingKey = publishBody.routingKey; + final String routingKey = transferBody.routingKey; final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); if (queues == null || queues.isEmpty()) { String msg = "Routing key " + routingKey + " is not known to " + this; - if (publishBody.mandatory) + // XXX + /*if (transferBody.mandatory) { throw new NoRouteException(msg, payload); } - else + else*/ { _logger.warn(msg); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 139307488e..932632cde3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.MessageTransferBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; @@ -152,9 +152,9 @@ public class DestWildExchange extends AbstractExchange public void route(AMQMessage payload) throws AMQException { - BasicPublishBody publishBody = payload.getPublishBody(); + MessageTransferBody transferBody = payload.getTransferBody(); - final String routingKey = publishBody.routingKey; + final String routingKey = transferBody.routingKey; List<AMQQueue> queues = _routingKey2queues.get(routingKey); // if we have no registered queues we have nothing to do // TODO: add support for the immediate flag diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 229502d2a6..42f75ac302 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -187,7 +187,7 @@ public class HeadersExchange extends AbstractExchange public void route(AMQMessage payload) throws AMQException { - FieldTable headers = getHeaders(payload.getContentHeaderBody()); + FieldTable headers = payload.getHeadersTable(); if (_logger.isDebugEnabled()) { _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); @@ -211,11 +211,12 @@ public class HeadersExchange extends AbstractExchange String msg = "Exchange " + getName() + ": message not routable."; - if (payload.getPublishBody().mandatory) + // XXX + /* if (payload.getTransferBody().mandatory) { throw new NoRouteException(msg, payload); } - else + else*/ { _logger.warn(msg); } @@ -250,13 +251,6 @@ public class HeadersExchange extends AbstractExchange return !_bindings.isEmpty(); } - protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame) - { - //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers, - //but these are not yet implemented. - return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders(); - } - protected ExchangeMBean createMBean() throws AMQException { try diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index f3e9965c2e..02b07db93d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -31,10 +31,10 @@ import javax.jms.JMSException; //import org.apache.activemq.command.Message; //import org.apache.activemq.command.TransactionId; //import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.message.jms.JMSMessage; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.log4j.Logger; /** @@ -243,12 +243,12 @@ public class PropertyExpression implements Expression else { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + FieldTable headers = message.getApplicationHeaders(); _logger.info("Looking up property:" + name); - _logger.info("Properties are:" + _properties.getHeaders().keySet()); + _logger.info("Properties are:" + headers.keySet()); - return _properties.getHeaders().get(name); + return headers.get(name); } // catch (IOException ioe) // { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 7461f93539..bfab8ac353 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -58,7 +58,6 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9); - protocolSession.writeFrame(response); + protocolSession.writeResponse(evt, ChannelCloseOkBody.createMethodBody((byte)0, (byte)9)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index 07ab0537d5..af5ccbfd78 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -28,7 +28,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ChannelFlowBody; import org.apache.qpid.framing.ChannelFlowOkBody; import org.apache.qpid.AMQException; @@ -61,9 +61,9 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) - body.active); // active - protocolSession.writeFrame(response); + AMQMethodBody response = ChannelFlowOkBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + body.active); // active + protocolSession.writeResponse(evt, response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 459ccf40a8..950c4b53e3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.protocol.AMQMethodEvent; @@ -50,15 +50,12 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<ChannelOpenBody> evt) throws AMQException - { - IApplicationRegistry registry = ApplicationRegistry.getInstance(); - final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(), - exchangeRegistry); - protocolSession.addChannel(channel); + { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9); - protocolSession.writeFrame(response); + // XXX: Client id + AMQMethodBody response = ChannelOpenOkBody.createMethodBody((byte)0, (byte)9, "XXX".getBytes()); + protocolSession.writeResponse(evt, response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index 6e22d67b72..52760b38bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -65,7 +65,6 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9); - protocolSession.writeFrame(response); + protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody((byte)0, (byte)9)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index c3b6560ee4..ce107aedfb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ConnectionOpenOkBody; import org.apache.qpid.protocol.AMQMethodEvent; @@ -67,10 +67,10 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, - (byte)0, (byte)9, // AMQP version (major, minor) - contextKey); // knownHosts + AMQMethodBody response = ConnectionOpenOkBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + contextKey); // knownHosts stateManager.changeState(AMQState.CONNECTION_OPEN); - protocolSession.writeFrame(response); + protocolSession.writeResponse(evt, response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index 9aea4a7b26..d2885045ef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -78,13 +78,13 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame close = ConnectionCloseBody.createAMQFrame(0, - (byte)0, (byte)9, // AMQP version (major, minor) - ConnectionCloseBody.getClazz((byte)0, (byte)9), // classId - ConnectionCloseBody.getMethod((byte)0, (byte)9), // methodId - AMQConstant.NOT_ALLOWED.getCode(), // replyCode - AMQConstant.NOT_ALLOWED.getName()); // replyText - protocolSession.writeFrame(close); + AMQMethodBody close = ConnectionCloseBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + ConnectionCloseBody.getClazz((byte)0, (byte)9), // classId + ConnectionCloseBody.getMethod((byte)0, (byte)9), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + AMQConstant.NOT_ALLOWED.getName()); // replyText + protocolSession.writeResponse(evt, close); disposeSaslServer(protocolSession); break; case SUCCESS: @@ -96,12 +96,12 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, - (byte)0, (byte)9, // AMQP version (major, minor) - Integer.MAX_VALUE, // channelMax - ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax - HeartbeatConfig.getInstance().getDelay()); // heartbeat - protocolSession.writeFrame(tune); + AMQMethodBody tune = ConnectionTuneBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + Integer.MAX_VALUE, // channelMax + ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax + HeartbeatConfig.getInstance().getDelay()); // heartbeat + protocolSession.writeResponse(evt, tune); disposeSaslServer(protocolSession); break; case CONTINUE: @@ -109,10 +109,10 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, - (byte)0, (byte)9, // AMQP version (major, minor) - authResult.challenge); // challenge - protocolSession.writeFrame(challenge); + AMQMethodBody challenge = ConnectionSecureBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + authResult.challenge); // challenge + protocolSession.writeResponse(evt, challenge); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 77fddf1ff5..1745fd03f6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneBody; @@ -95,22 +95,22 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, - (byte)0, (byte)9, // AMQP version (major, minor) - Integer.MAX_VALUE, // channelMax - getConfiguredFrameSize(), // frameMax - HeartbeatConfig.getInstance().getDelay()); // heartbeat - protocolSession.writeFrame(tune); + AMQMethodBody tune = ConnectionTuneBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + Integer.MAX_VALUE, // channelMax + getConfiguredFrameSize(), // frameMax + HeartbeatConfig.getInstance().getDelay()); // heartbeat + protocolSession.writeResponse(evt, tune); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, - (byte)0, (byte)9, // AMQP version (major, minor) - authResult.challenge); // challenge - protocolSession.writeFrame(challenge); + AMQMethodBody challenge = ConnectionSecureBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + authResult.challenge); // challenge + protocolSession.writeResponse(evt, challenge); } } catch (SaslException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 67f77c72ef..8e4fe6d1af 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -18,7 +18,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ExchangeBoundBody; import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.protocol.AMQMethodEvent; @@ -79,14 +79,14 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo throw new AMQException("Exchange exchange must not be null"); } Exchange exchange = exchangeRegistry.getExchange(exchangeName); - AMQFrame response; + AMQMethodBody response; if (exchange == null) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - EXCHANGE_NOT_FOUND, // replyCode - "Exchange " + exchangeName + " not found"); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + EXCHANGE_NOT_FOUND, // replyCode + "Exchange " + exchangeName + " not found"); // replyText } else if (routingKey == null) { @@ -95,18 +95,18 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo if (exchange.hasBindings()) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - OK, // replyCode - null); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - NO_BINDINGS, // replyCode - null); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + NO_BINDINGS, // replyCode + null); // replyText } } else @@ -115,28 +115,28 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo if (queue == null) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - QUEUE_NOT_FOUND, // replyCode - "Queue " + queueName + " not found"); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + QUEUE_NOT_FOUND, // replyCode + "Queue " + queueName + " not found"); // replyText } else { if (exchange.isBound(queue)) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - OK, // replyCode - null); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - QUEUE_NOT_BOUND, // replyCode - "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + QUEUE_NOT_BOUND, // replyCode + "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText } } } @@ -147,29 +147,29 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo if (queue == null) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - QUEUE_NOT_FOUND, // replyCode - "Queue " + queueName + " not found"); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + QUEUE_NOT_FOUND, // replyCode + "Queue " + queueName + " not found"); // replyText } else { if (exchange.isBound(body.routingKey, queue)) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - OK, // replyCode - null); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - "Queue " + queueName + " not bound with routing key " + - body.routingKey + " to exchange " + exchangeName); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode + "Queue " + queueName + " not bound with routing key " + + body.routingKey + " to exchange " + exchangeName); // replyText } } } @@ -178,21 +178,21 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo if (exchange.isBound(body.routingKey)) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - OK, // replyCode - null); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) - NO_QUEUE_BOUND_WITH_RK, // replyCode - "No queue bound with routing key " + body.routingKey + - " to exchange " + exchangeName); // replyText + response = ExchangeBoundOkBody.createMethodBody + (major, minor, // AMQP version (major, minor) + NO_QUEUE_BOUND_WITH_RK, // replyCode + "No queue bound with routing key " + body.routingKey + + " to exchange " + exchangeName); // replyText } } - protocolSession.writeFrame(response); + protocolSession.writeResponse(evt, response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index cdb3a503ae..ce4e778c5b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.protocol.AMQMethodEvent; @@ -78,8 +78,8 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9); - protocolSession.writeFrame(response); + AMQMethodBody response = ExchangeDeclareOkBody.createMethodBody((byte)0, (byte)9); + protocolSession.writeResponse(evt, response); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 79b4e07c90..3f94e359cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -56,8 +56,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9); - protocolSession.writeFrame(response); + protocolSession.writeResponse(evt, ExchangeDeleteOkBody.createMethodBody((byte)0, (byte)9)); } catch (ExchangeInUseException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 915bfa67a6..567b391868 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.protocol.AMQMethodEvent; @@ -93,8 +93,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9); - protocolSession.writeFrame(response); + final AMQMethodBody response = QueueBindOkBody.createMethodBody((byte)0, (byte)9); + protocolSession.writeResponse(evt, response); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 1a7b82829a..fef7fb4197 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.configuration.Configured; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.protocol.AMQMethodEvent; @@ -105,13 +105,13 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) - 0L, // consumerCount - 0L, // messageCount - body.queue); // queue + AMQMethodBody response = QueueDeclareOkBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + 0L, // consumerCount + 0L, // messageCount + body.queue); // queue _log.info("Queue " + body.queue + " declared successfully"); - protocolSession.writeFrame(response); + protocolSession.writeResponse(evt, response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index b867d80fdb..5263462c89 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -84,9 +84,9 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) - purged)); // messageCount + session.writeResponse(evt, QueueDeleteOkBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + purged)); // messageCount } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index 983e6f7e56..ee8b4e05d0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -55,8 +55,8 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9)); - channel.processReturns(protocolSession); + protocolSession.writeResponse(evt, TxCommitOkBody.createMethodBody((byte)0, (byte)9)); + channel.processReturns(protocolSession); }catch(AMQException e){ throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 891dd69d4d..36fa1884ff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -54,7 +54,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9)); + protocolSession.writeResponse(evt, TxRollbackOkBody.createMethodBody((byte)0, (byte)9)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). channel.resend(protocolSession); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index 0c2a6ca210..0bfbed32f1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -51,6 +51,6 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9)); + protocolSession.writeResponse(evt, TxSelectOkBody.createMethodBody((byte)0, (byte)9)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java index 72e241ea0a..7160a292d6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.message.jms; import org.apache.qpid.server.message.MessageDecorator; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; import javax.jms.Message; import javax.jms.JMSException; @@ -35,13 +33,10 @@ public class JMSMessage implements MessageDecorator { private AMQMessage _message; - private BasicContentHeaderProperties _properties; public JMSMessage(AMQMessage message) { _message = message; - ContentHeaderBody contentHeader = message.getContentHeaderBody(); - _properties = (BasicContentHeaderProperties) contentHeader.properties; } protected void checkWriteable() throws MessageNotWriteableException @@ -56,29 +51,29 @@ public class JMSMessage implements MessageDecorator public String getJMSMessageID() { - return _properties.getMessageId(); + return _message.getXXXMessageId(); } public void setJMSMessageID(String string) throws MessageNotWriteableException { checkWriteable(); - _properties.setMessageId(string); + _message.setXXXMessageId(string); } public long getJMSTimestamp() { - return _properties.getTimestamp(); + return _message.getTimestamp(); } public void setJMSTimestamp(long l) throws MessageNotWriteableException { checkWriteable(); - _properties.setTimestamp(l); + _message.setTimestamp(l); } public byte[] getJMSCorrelationIDAsBytes() { - return _properties.getCorrelationId().getBytes(); + return _message.getCorrelationId().getBytes(); } // public void setJMSCorrelationIDAsBytes(byte[] bytes) @@ -88,23 +83,23 @@ public class JMSMessage implements MessageDecorator public void setJMSCorrelationID(String string) throws MessageNotWriteableException { checkWriteable(); - _properties.setCorrelationId(string); + _message.setCorrelationId(string); } public String getJMSCorrelationID() { - return _properties.getCorrelationId(); + return _message.getCorrelationId(); } public String getJMSReplyTo() { - return _properties.getReplyTo(); + return _message.getReplyTo(); } public void setJMSReplyTo(Destination destination) throws MessageNotWriteableException { checkWriteable(); - _properties.setReplyTo(destination.toString()); + _message.setReplyTo(destination.toString()); } public String getJMSDestination() @@ -121,13 +116,13 @@ public class JMSMessage implements MessageDecorator public int getJMSDeliveryMode() { - return _properties.getDeliveryMode(); + return _message.getDeliveryMode(); } public void setJMSDeliveryMode(byte i) throws MessageNotWriteableException { checkWriteable(); - _properties.setDeliveryMode(i); + _message.setDeliveryMode(i); } public boolean getJMSRedelivered() @@ -143,150 +138,150 @@ public class JMSMessage implements MessageDecorator public String getJMSType() { - return _properties.getType(); + return _message.getType(); } public void setJMSType(String string) throws MessageNotWriteableException { checkWriteable(); - _properties.setType(string); + _message.setType(string); } public long getJMSExpiration() { - return _properties.getExpiration(); + return _message.getExpiration(); } public void setJMSExpiration(long l) throws MessageNotWriteableException { checkWriteable(); - _properties.setExpiration(l); + _message.setExpiration(l); } public int getJMSPriority() { - return _properties.getPriority(); + return _message.getPriority(); } public void setJMSPriority(byte i) throws MessageNotWriteableException { checkWriteable(); - _properties.setPriority(i); + _message.setPriority(i); } public void clearProperties() throws MessageNotWriteableException { checkWriteable(); - _properties.getJMSHeaders().clear(); + _message.getApplicationHeaders().clear(); } public boolean propertyExists(String string) { - return _properties.getJMSHeaders().propertyExists(string); + return _message.getApplicationHeaders().propertyExists(string); } public boolean getBooleanProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getBoolean(string); + return _message.getApplicationHeaders().getBoolean(string); } public byte getByteProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getByte(string); + return _message.getApplicationHeaders().getByte(string); } public short getShortProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getShort(string); + return _message.getApplicationHeaders().getShort(string); } public int getIntProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getInteger(string); + return _message.getApplicationHeaders().getInteger(string); } public long getLongProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getLong(string); + return _message.getApplicationHeaders().getLong(string); } public float getFloatProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getFloat(string); + return _message.getApplicationHeaders().getFloat(string); } public double getDoubleProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getDouble(string); + return _message.getApplicationHeaders().getDouble(string); } public String getStringProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getString(string); + return _message.getApplicationHeaders().getString(string); } public Object getObjectProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getObject(string); + return _message.getApplicationHeaders().getObject(string); } public Enumeration getPropertyNames() { - return _properties.getJMSHeaders().getPropertyNames(); + return _message.getApplicationHeaders().getPropertyNames(); } public void setBooleanProperty(String string, boolean b) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setBoolean(string, b); + _message.getApplicationHeaders().setBoolean(string, b); } public void setByteProperty(String string, byte b) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setByte(string, b); + _message.getApplicationHeaders().setByte(string, b); } public void setShortProperty(String string, short i) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setShort(string, i); + _message.getApplicationHeaders().setShort(string, i); } public void setIntProperty(String string, int i) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setInteger(string, i); + _message.getApplicationHeaders().setInteger(string, i); } public void setLongProperty(String string, long l) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setLong(string, l); + _message.getApplicationHeaders().setLong(string, l); } public void setFloatProperty(String string, float v) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setFloat(string, v); + _message.getApplicationHeaders().setFloat(string, v); } public void setDoubleProperty(String string, double v) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setDouble(string, v); + _message.getApplicationHeaders().setDouble(string, v); } public void setStringProperty(String string, String string1) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setString(string, string1); + _message.getApplicationHeaders().setString(string, string1); } public void setObjectProperty(String string, Object object) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setObject(string, object); + _message.getApplicationHeaders().setObject(string, object); } public void acknowledge() throws MessageNotWriteableException diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index ee035287b7..a2268c792e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -52,6 +52,7 @@ import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; import javax.management.JMException; @@ -162,10 +163,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return (AMQProtocolSession) minaProtocolSession.getAttachment(); } + private AMQChannel createChannel(int id) throws AMQException { + IApplicationRegistry registry = ApplicationRegistry.getInstance(); + AMQChannel channel = new AMQChannel(id, registry.getMessageStore(), + _exchangeRegistry, this, _stateManager); + addChannel(channel); + return channel; + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; + if (message instanceof ProtocolInitiation) { ProtocolInitiation pi = (ProtocolInitiation) message; @@ -180,13 +190,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); String locales = "en_US"; // Interfacing with generated code - be aware of possible changes to parameter order as versions change. - AMQMethodBody connectionStartBody = ConnectionStartBody.createMethodBody( - (byte)_major, (byte)_minor, // AMQP version (major, minor) - locales.getBytes(), // locales - mechanisms.getBytes(), // mechanisms - null, // serverProperties - (short)_major, // versionMajor - (short)_minor); // versionMinor + createChannel(0); + AMQMethodBody connectionStartBody = ConnectionStartBody.createMethodBody + ((byte)_major, (byte)_minor, // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short)_major, // versionMajor + (short)_minor); // versionMinor writeRequest(0, connectionStartBody, _stateManager); } catch (AMQException e) @@ -209,6 +220,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { AMQFrame frame = (AMQFrame) message; + AMQChannel channel = getChannel(frame.channel); + if (channel == null) { + channel = createChannel(frame.channel); + } + if (frame.bodyFrame instanceof AMQRequestBody) { requestFrameReceived(frame.channel, (AMQRequestBody)frame.bodyFrame); @@ -224,7 +240,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws AMQException + private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws Exception { if (_logger.isDebugEnabled()) { @@ -235,7 +251,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, responseManager.requestReceived(requestBody); } - private void responseFrameReceived(int channelNum, AMQResponseBody responseBody) throws AMQException + private void responseFrameReceived(int channelNum, AMQResponseBody responseBody) throws Exception { if (_logger.isDebugEnabled()) { @@ -247,7 +263,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener) - throws RequestResponseMappingException + throws AMQException { AMQChannel channel = getChannel(channelNum); RequestManager requestManager = channel.getRequestManager(); @@ -255,13 +271,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) - throws RequestResponseMappingException + throws AMQException { AMQChannel channel = getChannel(channelNum); ResponseManager responseManager = channel.getResponseManager(); responseManager.sendResponse(requestId, methodBody); } + public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) + throws AMQException + { + writeResponse(evt.getChannelId(), evt.getRequestId(), response); + } + /** * Convenience method that writes a frame to the protocol session. Equivalent * to calling getProtocolSession().write(). diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 0e5c1ec8b2..badb523786 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -20,7 +20,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; @@ -196,17 +196,17 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, - (byte)0, (byte)9, // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - "Broker Management Console has closing the connection." // replyText - ); - _session.writeFrame(response); - + final AMQMethodBody request = ConnectionCloseBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + "Broker Management Console has closing the connection." // replyText + ); try { + if (true) throw new Error("XXX"); + _session.writeRequest(0, request, null /*XXX*/); _session.closeSession(); } catch (AMQException ex) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 5aa991b580..3f0cf63b70 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -49,11 +49,9 @@ public class AMQMessage private AMQProtocolSession _publisher; - private final BasicPublishBody _publishBody; + private final MessageTransferBody _transferBody; - private ContentHeaderBody _contentHeaderBody; - - private List<ContentBody> _contentBodies; + private List<MessageAppendBody> _contentBodies; private boolean _redelivered; @@ -93,29 +91,28 @@ public class AMQMessage private AtomicBoolean _taken; - public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody) + public AMQMessage(MessageStore messageStore, MessageTransferBody transferBody) { - this(messageStore, publishBody, true); + this(messageStore, transferBody, true); } - public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete) + public AMQMessage(MessageStore messageStore, MessageTransferBody transferBody, boolean storeWhenComplete) { _messageId = messageStore.getNewMessageId(); - _publishBody = publishBody; + _transferBody = transferBody; _store = messageStore; - _contentBodies = new LinkedList<ContentBody>(); + _contentBodies = new LinkedList<MessageAppendBody>(); _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>(); _storeWhenComplete = storeWhenComplete; _taken = new AtomicBoolean(false); } - public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) + public AMQMessage(MessageStore store, long messageId, MessageTransferBody transferBody, + List<MessageAppendBody> contentBodies) throws AMQException { - _publishBody = publishBody; - _contentHeaderBody = contentHeaderBody; + _transferBody = transferBody; _contentBodies = contentBodies; _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>(); _messageId = messageId; @@ -123,16 +120,103 @@ public class AMQMessage storeMessage(); } - public AMQMessage(MessageStore store, BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) + public AMQMessage(MessageStore store, MessageTransferBody transferBody, List<MessageAppendBody> contentBodies) throws AMQException { - this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies); + this(store, store.getNewMessageId(), transferBody, contentBodies); } protected AMQMessage(AMQMessage msg) throws AMQException { - this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies); + this(msg._store, msg._messageId, msg._transferBody, msg._contentBodies); + } + + public long getSize() { + throw new Error("XXX"); + } + + public FieldTable getHeadersTable() { + throw new Error("XXX"); + } + + public FieldTable getApplicationHeaders() { + throw new Error("XXX"); + } + + public void setXXXMessageId(String messageId) { + throw new Error("XXX"); + } + + public String getXXXMessageId() { + throw new Error("XXX"); + } + + public void setType(String type) { + throw new Error("XXX"); + } + + public String getType() { + throw new Error("XXX"); + } + + public void setDeliveryMode(byte mode) { + throw new Error("XXX"); + } + + public byte getDeliveryMode() { + throw new Error("XXX"); + } + + public void setReplyTo(String replyTo) { + throw new Error("XXX"); + } + + public String getReplyTo() { + throw new Error("XXX"); + } + + public void setCorrelationId(String correlationId) { + throw new Error("XXX"); + } + + public String getCorrelationId() { + throw new Error("XXX"); + } + + public void setPriority(byte priority) { + throw new Error("XXX"); + } + + public byte getPriority() { + throw new Error("XXX"); + } + + public void setExpiration(long l) { + throw new Error("XXX"); + } + + public long getExpiration() { + throw new Error("XXX"); + } + + public void setTimestamp(long l) { + throw new Error("XXX"); + } + + public long getTimestamp() { + throw new Error("XXX"); + } + + public String getContentType() { + throw new Error("XXX"); + } + + public String getEncoding() { + throw new Error("XXX"); + } + + public byte[] getMessageBytes() { + throw new Error("XXX"); } public void storeMessage() throws AMQException @@ -147,11 +231,12 @@ public class AMQMessage { AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()]; - allFrames[0] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); + if (true) throw new Error("XXX"); + /*allFrames[0] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); for (int i = 1; i < allFrames.length; i++) { allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 1)); - } + }*/ return new CompositeAMQDataBlock(encodedDeliverBody, allFrames); } @@ -163,7 +248,9 @@ public class AMQMessage // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - allFrames[0] = BasicDeliverBody.createAMQFrame(channel, + if (true) throw new Error("XXX"); + /* + allFrames[0] = MessageTransferBody.createAMQFrame(channel, (byte)0, (byte)9, // AMQP version (major, minor) consumerTag, // consumerTag deliveryTag, // deliveryTag @@ -171,65 +258,32 @@ public class AMQMessage _redelivered, // redelivered getRoutingKey() // routingKey ); - allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); + allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); for (int i = 2; i < allFrames.length; i++) { allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2)); - } + }*/ return new CompositeAMQDataBlock(allFrames); } public List<AMQBody> getPayload() { List<AMQBody> payload = new ArrayList<AMQBody>(2 + _contentBodies.size()); - payload.add(_publishBody); - payload.add(_contentHeaderBody); + payload.add(_transferBody); payload.addAll(_contentBodies); return payload; } - public BasicPublishBody getPublishBody() + public MessageTransferBody getTransferBody() { - return _publishBody; - } - - public ContentHeaderBody getContentHeaderBody() - { - return _contentHeaderBody; - } - - public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException - { - _contentHeaderBody = contentHeaderBody; - if (_storeWhenComplete && isAllContentReceived()) - { - storeMessage(); - } - } - - public List<ContentBody> getContentBodies() - { - return _contentBodies; - } - - public void setContentBodies(List<ContentBody> contentBodies) - { - _contentBodies = contentBodies; - } - - public void addContentBodyFrame(ContentBody contentBody) throws AMQException - { - _contentBodies.add(contentBody); - _bodyLengthReceived += contentBody.getSize(); - if (_storeWhenComplete && isAllContentReceived()) - { - storeMessage(); - } + return _transferBody; } public boolean isAllContentReceived() { - return _bodyLengthReceived == _contentHeaderBody.bodySize; + if (true) throw new Error("XXX"); + /*XXX*/return false; + //return _bodyLengthReceived == _contentHeaderBody.bodySize; } @@ -240,22 +294,22 @@ public class AMQMessage String getExchangeName() { - return _publishBody.exchange; + return _transferBody.exchange; } String getRoutingKey() { - return _publishBody.routingKey; + return _transferBody.routingKey; } boolean isImmediate() { - return _publishBody.immediate; + return _transferBody.immediate; } NoConsumersException getNoConsumersException(String queue) { - return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies); + return new NoConsumersException(queue, this); } public void setRedelivered(boolean redelivered) @@ -347,14 +401,7 @@ public class AMQMessage public boolean isPersistent() throws AMQException { - if (_contentHeaderBody == null) - { - throw new AMQException("Cannot determine delivery mode of message. Content header not found."); - } - - //todo remove literal values to a constant file such as AMQConstants in common - return _contentHeaderBody.properties instanceof BasicContentHeaderProperties - && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; + return getDeliveryMode() == 2; } public void setTxnBuffer(TxnBuffer buffer) @@ -377,7 +424,7 @@ public class AMQMessage { if (isImmediate() && !_deliveredToConsumer) { - throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies); + throw new NoConsumersException(this); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 1bdf265a1b..aee6f74117 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -17,13 +17,11 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.mina.common.ByteBuffer; import javax.management.openmbean.*; @@ -34,6 +32,7 @@ import javax.management.MBeanNotificationInfo; import javax.management.OperationsException; import javax.management.monitor.MonitorNotification; import java.util.List; +import java.util.Set; import java.util.ArrayList; /** @@ -191,7 +190,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue return 0l; } - return msg.getContentHeaderBody().bodySize; + return msg.getSize(); } /** @@ -273,32 +272,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } // get message content - List<ContentBody> cBodies = msg.getContentBodies(); - List<Byte> msgContent = new ArrayList<Byte>(); - if (cBodies != null) - { - for (ContentBody body : cBodies) - { - if (body.getSize() != 0) - { - ByteBuffer slice = body.payload.slice(); - for (int j = 0; j < slice.limit(); j++) - { - msgContent.add(slice.get()); - } - } - } - } + byte[] msgContent = msg.getMessageBytes(); // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = null, encoding = null; - if (headerProperties != null) - { - mimeType = headerProperties.getContentType(); - encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); - } - Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; + String mimeType = msg.getContentType(); + String encoding = msg.getEncoding(); + Object[] itemValues = {msgId, mimeType, encoding, msgContent}; return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); } @@ -321,11 +300,15 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) { AMQMessage msg = list.get(i - 1); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; - String[] headerAttributes = headerProperties.toString().split(","); - Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()}; + FieldTable headers = msg.getHeadersTable(); + Set<String> names = headers.keys(); + String[] values = new String[names.size()]; + int index = 0; + for (String name : names) { + values[index++] = "" + headers.get(name); + } + Object[] itemValues = {msg.getMessageId(), values, msg.getSize(), msg.isRedelivered()}; CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); _messageList.put(messageData); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java index 022d3b9635..4609cce054 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.configuration.Configured; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.configuration.Configurator; import java.util.ArrayList; @@ -143,8 +142,9 @@ public class ConcurrentDeliveryManager implements DeliveryManager private boolean addMessageToQueue(AMQMessage msg) { - // Shrink the ContentBodies to their actual size to save memory. - if (compressBufferOnQueue) + // Shrink the ContentBodies to their actual size to save memory. + if (true) throw new Error("XXX"); + /*if (compressBufferOnQueue) { Iterator it = msg.getContentBodies().iterator(); while (it.hasNext()) @@ -152,7 +152,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager ContentBody cb = (ContentBody) it.next(); cb.reduceBufferToFit(); } - } + }*/ _messages.offer(msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index f09e8213b1..53ada898ab 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.configuration.Configured; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.configuration.Configurator; import java.util.ArrayList; @@ -97,7 +96,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private boolean addMessageToQueue(AMQMessage msg) { // Shrink the ContentBodies to their actual size to save memory. - if (compressBufferOnQueue) + if (true) throw new Error("XXX"); + /*if (compressBufferOnQueue) { Iterator it = msg.getContentBodies().iterator(); while (it.hasNext()) @@ -105,7 +105,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager ContentBody cb = (ContentBody) it.next(); cb.reduceBufferToFit(); } - } + }*/ _messages.offer(msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java index 2d37b806f6..09f0b00e90 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java @@ -21,9 +21,6 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.protocol.AMQConstant; import java.util.List; @@ -35,19 +32,14 @@ import java.util.List; */ public class NoConsumersException extends RequiredDeliveryException { - public NoConsumersException(String queue, - BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) + public NoConsumersException(String queue, AMQMessage message) { - super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies); + super("Immediate delivery to " + queue + " is not possible.", message); } - public NoConsumersException(BasicPublishBody publishBody, - ContentHeaderBody contentHeaderBody, - List<ContentBody> contentBodies) + public NoConsumersException(AMQMessage message) { - super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies); + super("Immediate delivery is not possible.", message); } public int getReplyCode() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 149c765df5..1be4dbd95a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -28,9 +28,8 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -235,7 +234,9 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + ByteBuffer deliver = null; + if (true) throw new Error("XXX"); + //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); protocolSession.writeFrame(frame); @@ -268,7 +269,9 @@ public class SubscriptionImpl implements Subscription channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + ByteBuffer deliver = null; + if (true) throw new Error("XXX"); + //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); protocolSession.writeFrame(frame); @@ -382,10 +385,11 @@ public class SubscriptionImpl implements Subscription // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) + if (true) throw new Error("XXX"); + /*protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) consumerTag // consumerTag - )); + ));*/ _closed = true; } } @@ -396,12 +400,12 @@ public class SubscriptionImpl implements Subscription } - private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) + /* private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), + AMQFrame deliverFrame = MessageTransferBody.createAMQFrame(channel.getChannelId(), (byte)0, (byte)9, // AMQP version (major, minor) consumerTag, // consumerTag deliveryTag, // deliveryTag @@ -413,5 +417,5 @@ public class SubscriptionImpl implements Subscription deliverFrame.writePayload(buf); buf.flip(); return buf; - } + }*/ } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java index 00a27a8869..623b2356ae 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java @@ -24,6 +24,8 @@ import org.apache.mina.common.ByteBuffer; public class AMQRequestBody extends AMQBody { + public static final byte TYPE = 9; + // Fields declared in specification protected long requestId; protected long responseMark; @@ -54,14 +56,14 @@ public class AMQRequestBody extends AMQBody protected int getSize() { - return 8 + 8 + 4 + methodPayload.getBodySize(); + return 8 + 8 + 4 + methodPayload.getSize(); } protected void writePayload(ByteBuffer buffer) { EncodingUtils.writeLong(buffer, requestId); EncodingUtils.writeLong(buffer, responseMark); - EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0 + EncodingUtils.writeInteger(buffer, 0); // reserved, set to 0 methodPayload.writePayload(buffer); } @@ -70,7 +72,7 @@ public class AMQRequestBody extends AMQBody { requestId = EncodingUtils.readLong(buffer); responseMark = EncodingUtils.readLong(buffer); - int reserved = EncodingUtils.readShort(buffer); // reserved, throw away + int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java index 90038da2d4..2b0fc97b1b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java @@ -24,6 +24,8 @@ import org.apache.mina.common.ByteBuffer; public class AMQResponseBody extends AMQBody { + public static final byte TYPE = 10; + // Fields declared in specification protected long responseId; protected long requestId; @@ -54,14 +56,15 @@ public class AMQResponseBody extends AMQBody protected int getSize() { - return 8 + 8 + 4 + methodPayload.getBodySize(); + return 8 + 8 + 4 + methodPayload.getSize(); } protected void writePayload(ByteBuffer buffer) { EncodingUtils.writeLong(buffer, responseId); EncodingUtils.writeLong(buffer, requestId); - EncodingUtils.writeUnsignedShort(buffer, batchOffset); + // XXX + EncodingUtils.writeInteger(buffer, batchOffset); methodPayload.writePayload(buffer); } @@ -70,7 +73,8 @@ public class AMQResponseBody extends AMQBody { responseId = EncodingUtils.readLong(buffer); requestId = EncodingUtils.readLong(buffer); - batchOffset = EncodingUtils.readShort(buffer); + // XXX + batchOffset = EncodingUtils.readInteger(buffer); methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java index 5ec9b122af..e39d85277d 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java @@ -20,9 +20,10 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; public interface AMQProtocolWriter @@ -34,8 +35,12 @@ public interface AMQProtocolWriter public void writeFrame(AMQDataBlock frame); public long writeRequest(int channelNum, AMQMethodBody methodBody, - AMQMethodListener methodListener) throws RequestResponseMappingException; + AMQMethodListener methodListener) + throws AMQException; public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) - throws RequestResponseMappingException; + throws AMQException; + + public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) + throws AMQException; } |
