summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-01-16 04:44:48 +0000
committerRafael H. Schloming <rhs@apache.org>2007-01-16 04:44:48 +0000
commitd24ea0b5a3f4ca7c1a7f30f9af99b4e7338a1d85 (patch)
tree6b6d68a21d07fdc5c0a2e4b41018212681ec7fc0 /java
parentef1469a7ea1f54f266aee8f2899b7cd0c7e07d08 (diff)
downloadqpid-python-d24ea0b5a3f4ca7c1a7f30f9af99b4e7338a1d85.tar.gz
fixed broker compile errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496586 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/etc/log4j.xml4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java98
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java81
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java195
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java47
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java24
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java11
36 files changed, 407 insertions, 397 deletions
diff --git a/java/broker/etc/log4j.xml b/java/broker/etc/log4j.xml
index 9be428fbbd..98da18b8bb 100644
--- a/java/broker/etc/log4j.xml
+++ b/java/broker/etc/log4j.xml
@@ -37,9 +37,9 @@
</layout>
</appender>
- <!--<category name="org.apache.qpid.server.store">
+ <category name="org.apache.qpid.framing">
<priority value="debug"/>
- </category>-->
+ </category>
<root>
<priority value="info"/>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 0f2f5ac94e..59ebc08428 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -127,7 +127,7 @@ public class AMQChannel
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
- _requestManager = new RequestManager(channelId, protocolWriter);
+ _requestManager = new RequestManager(channelId, protocolWriter);
_responseManager = new ResponseManager(channelId, methodListener, protocolWriter);
_txnBuffer = new TxnBuffer(_messageStore);
}
@@ -827,7 +827,8 @@ public class AMQChannel
catch (NoConsumersException e)
{
//TODO: store this for delivery after the commit-ok
- _returns.add(e.getReturnMessage(_channelId));
+ throw new Error("XXX");
+ //_returns.add(e.getReturnMessage(_channelId));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 4942f17d3c..a1a6a77a93 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -20,11 +20,8 @@
*/
package org.apache.qpid.server;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
@@ -39,57 +36,26 @@ import java.util.List;
public abstract class RequiredDeliveryException extends AMQException
{
private final String _message;
- private final BasicPublishBody _publishBody;
- private final ContentHeaderBody _contentHeaderBody;
- private final List<ContentBody> _contentBodies;
+ private final AMQMessage _payload;
public RequiredDeliveryException(String message, AMQMessage payload)
{
super(message);
_message = message;
- _publishBody = payload.getPublishBody();
- _contentHeaderBody = payload.getContentHeaderBody();
- _contentBodies = payload.getContentBodies();
+ _payload = payload;
}
- public RequiredDeliveryException(String message,
- BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
- {
- super(message);
- _message = message;
- _publishBody = publishBody;
- _contentHeaderBody = contentHeaderBody;
- _contentBodies = contentBodies;
- }
-
- public BasicPublishBody getPublishBody()
- {
- return _publishBody;
- }
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return _contentHeaderBody;
- }
-
- public List<ContentBody> getContentBodies()
- {
- return _contentBodies;
- }
-
- public CompositeAMQDataBlock getReturnMessage(int channel)
+ /* public CompositeAMQDataBlock getReturnMessage(int channel)
{
// AMQP version change: All generated *Body classes are now version-aware.
// Shortcut: hardwire version to 0-9 (major=0, minor=9) for now.
// TODO: Connect the version to that returned by the ProtocolInitiation
// for this session.
BasicReturnBody returnBody = new BasicReturnBody((byte)0, (byte)9);
- returnBody.exchange = _publishBody.exchange;
+ returnBody.exchange = _transferBody.exchange;
returnBody.replyCode = getReplyCode();
returnBody.replyText = _message;
- returnBody.routingKey = _publishBody.routingKey;
+ returnBody.routingKey = _transferBody.routingKey;
AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
@@ -105,7 +71,7 @@ public abstract class RequiredDeliveryException extends AMQException
}
return new CompositeAMQDataBlock(allFrames);
- }
+ }*/
public int getErrorCode()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index eb9d1acb59..5c484edfef 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -81,7 +81,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
*/
public void routeContent(AMQMessage payload) throws AMQException
{
- final String exchange = payload.getPublishBody().exchange;
+ final String exchange = payload.getTransferBody().exchange;
final Exchange exch = _exchangeMap.get(exchange);
// there is a small window of opportunity for the exchange to be deleted in between
// the JmsPublish being received (where the exchange is validated) and the final
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index d4069fa315..b777ae7d82 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
@@ -168,18 +168,19 @@ public class DestNameExchange extends AbstractExchange
public void route(AMQMessage payload) throws AMQException
{
- BasicPublishBody publishBody = payload.getPublishBody();
+ MessageTransferBody transferBody = payload.getTransferBody();
- final String routingKey = publishBody.routingKey;
+ final String routingKey = transferBody.routingKey;
final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
if (queues == null || queues.isEmpty())
{
String msg = "Routing key " + routingKey + " is not known to " + this;
- if (publishBody.mandatory)
+ // XXX
+ /*if (transferBody.mandatory)
{
throw new NoRouteException(msg, payload);
}
- else
+ else*/
{
_logger.warn(msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index 139307488e..932632cde3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
@@ -152,9 +152,9 @@ public class DestWildExchange extends AbstractExchange
public void route(AMQMessage payload) throws AMQException
{
- BasicPublishBody publishBody = payload.getPublishBody();
+ MessageTransferBody transferBody = payload.getTransferBody();
- final String routingKey = publishBody.routingKey;
+ final String routingKey = transferBody.routingKey;
List<AMQQueue> queues = _routingKey2queues.get(routingKey);
// if we have no registered queues we have nothing to do
// TODO: add support for the immediate flag
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 229502d2a6..42f75ac302 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -187,7 +187,7 @@ public class HeadersExchange extends AbstractExchange
public void route(AMQMessage payload) throws AMQException
{
- FieldTable headers = getHeaders(payload.getContentHeaderBody());
+ FieldTable headers = payload.getHeadersTable();
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
@@ -211,11 +211,12 @@ public class HeadersExchange extends AbstractExchange
String msg = "Exchange " + getName() + ": message not routable.";
- if (payload.getPublishBody().mandatory)
+ // XXX
+ /* if (payload.getTransferBody().mandatory)
{
throw new NoRouteException(msg, payload);
}
- else
+ else*/
{
_logger.warn(msg);
}
@@ -250,13 +251,6 @@ public class HeadersExchange extends AbstractExchange
return !_bindings.isEmpty();
}
- protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
- {
- //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
- //but these are not yet implemented.
- return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders();
- }
-
protected ExchangeMBean createMBean() throws AMQException
{
try
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
index f3e9965c2e..02b07db93d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -31,10 +31,10 @@ import javax.jms.JMSException;
//import org.apache.activemq.command.Message;
//import org.apache.activemq.command.TransactionId;
//import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.log4j.Logger;
/**
@@ -243,12 +243,12 @@ public class PropertyExpression implements Expression
else
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ FieldTable headers = message.getApplicationHeaders();
_logger.info("Looking up property:" + name);
- _logger.info("Properties are:" + _properties.getHeaders().keySet());
+ _logger.info("Properties are:" + headers.keySet());
- return _properties.getHeaders().get(name);
+ return headers.get(name);
}
// catch (IOException ioe)
// {
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 7461f93539..bfab8ac353 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -58,7 +58,6 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9);
- protocolSession.writeFrame(response);
+ protocolSession.writeResponse(evt, ChannelCloseOkBody.createMethodBody((byte)0, (byte)9));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index 07ab0537d5..af5ccbfd78 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -28,7 +28,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.framing.ChannelFlowOkBody;
import org.apache.qpid.AMQException;
@@ -61,9 +61,9 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
- (byte)0, (byte)9, // AMQP version (major, minor)
- body.active); // active
- protocolSession.writeFrame(response);
+ AMQMethodBody response = ChannelFlowOkBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ body.active); // active
+ protocolSession.writeResponse(evt, response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 459ccf40a8..950c4b53e3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -50,15 +50,12 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
- {
- IApplicationRegistry registry = ApplicationRegistry.getInstance();
- final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(),
- exchangeRegistry);
- protocolSession.addChannel(channel);
+ {
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9);
- protocolSession.writeFrame(response);
+ // XXX: Client id
+ AMQMethodBody response = ChannelOpenOkBody.createMethodBody((byte)0, (byte)9, "XXX".getBytes());
+ protocolSession.writeResponse(evt, response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index 6e22d67b72..52760b38bf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -65,7 +65,6 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9);
- protocolSession.writeFrame(response);
+ protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody((byte)0, (byte)9));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index c3b6560ee4..ce107aedfb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -67,10 +67,10 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
- (byte)0, (byte)9, // AMQP version (major, minor)
- contextKey); // knownHosts
+ AMQMethodBody response = ConnectionOpenOkBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ contextKey); // knownHosts
stateManager.changeState(AMQState.CONNECTION_OPEN);
- protocolSession.writeFrame(response);
+ protocolSession.writeResponse(evt, response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
index 9aea4a7b26..d2885045ef 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -78,13 +78,13 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
- (byte)0, (byte)9, // AMQP version (major, minor)
- ConnectionCloseBody.getClazz((byte)0, (byte)9), // classId
- ConnectionCloseBody.getMethod((byte)0, (byte)9), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- AMQConstant.NOT_ALLOWED.getName()); // replyText
- protocolSession.writeFrame(close);
+ AMQMethodBody close = ConnectionCloseBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ ConnectionCloseBody.getClazz((byte)0, (byte)9), // classId
+ ConnectionCloseBody.getMethod((byte)0, (byte)9), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName()); // replyText
+ protocolSession.writeResponse(evt, close);
disposeSaslServer(protocolSession);
break;
case SUCCESS:
@@ -96,12 +96,12 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
- (byte)0, (byte)9, // AMQP version (major, minor)
- Integer.MAX_VALUE, // channelMax
- ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
- HeartbeatConfig.getInstance().getDelay()); // heartbeat
- protocolSession.writeFrame(tune);
+ AMQMethodBody tune = ConnectionTuneBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ Integer.MAX_VALUE, // channelMax
+ ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
+ HeartbeatConfig.getInstance().getDelay()); // heartbeat
+ protocolSession.writeResponse(evt, tune);
disposeSaslServer(protocolSession);
break;
case CONTINUE:
@@ -109,10 +109,10 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
- (byte)0, (byte)9, // AMQP version (major, minor)
- authResult.challenge); // challenge
- protocolSession.writeFrame(challenge);
+ AMQMethodBody challenge = ConnectionSecureBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ authResult.challenge); // challenge
+ protocolSession.writeResponse(evt, challenge);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 77fddf1ff5..1745fd03f6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -23,7 +23,7 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
@@ -95,22 +95,22 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
- (byte)0, (byte)9, // AMQP version (major, minor)
- Integer.MAX_VALUE, // channelMax
- getConfiguredFrameSize(), // frameMax
- HeartbeatConfig.getInstance().getDelay()); // heartbeat
- protocolSession.writeFrame(tune);
+ AMQMethodBody tune = ConnectionTuneBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ Integer.MAX_VALUE, // channelMax
+ getConfiguredFrameSize(), // frameMax
+ HeartbeatConfig.getInstance().getDelay()); // heartbeat
+ protocolSession.writeResponse(evt, tune);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
- (byte)0, (byte)9, // AMQP version (major, minor)
- authResult.challenge); // challenge
- protocolSession.writeFrame(challenge);
+ AMQMethodBody challenge = ConnectionSecureBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ authResult.challenge); // challenge
+ protocolSession.writeResponse(evt, challenge);
}
}
catch (SaslException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 67f77c72ef..8e4fe6d1af 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -18,7 +18,7 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ExchangeBoundBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -79,14 +79,14 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
throw new AMQException("Exchange exchange must not be null");
}
Exchange exchange = exchangeRegistry.getExchange(exchangeName);
- AMQFrame response;
+ AMQMethodBody response;
if (exchange == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- EXCHANGE_NOT_FOUND, // replyCode
- "Exchange " + exchangeName + " not found"); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ EXCHANGE_NOT_FOUND, // replyCode
+ "Exchange " + exchangeName + " not found"); // replyText
}
else if (routingKey == null)
{
@@ -95,18 +95,18 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
if (exchange.hasBindings())
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
- null); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- NO_BINDINGS, // replyCode
- null); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ NO_BINDINGS, // replyCode
+ null); // replyText
}
}
else
@@ -115,28 +115,28 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
if (queue == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- QUEUE_NOT_FOUND, // replyCode
- "Queue " + queueName + " not found"); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_FOUND, // replyCode
+ "Queue " + queueName + " not found"); // replyText
}
else
{
if (exchange.isBound(queue))
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
- null); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- QUEUE_NOT_BOUND, // replyCode
- "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_BOUND, // replyCode
+ "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText
}
}
}
@@ -147,29 +147,29 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
if (queue == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- QUEUE_NOT_FOUND, // replyCode
- "Queue " + queueName + " not found"); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_FOUND, // replyCode
+ "Queue " + queueName + " not found"); // replyText
}
else
{
if (exchange.isBound(body.routingKey, queue))
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
- null); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
- "Queue " + queueName + " not bound with routing key " +
- body.routingKey + " to exchange " + exchangeName); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+ "Queue " + queueName + " not bound with routing key " +
+ body.routingKey + " to exchange " + exchangeName); // replyText
}
}
}
@@ -178,21 +178,21 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
if (exchange.isBound(body.routingKey))
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
- null); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- NO_QUEUE_BOUND_WITH_RK, // replyCode
- "No queue bound with routing key " + body.routingKey +
- " to exchange " + exchangeName); // replyText
+ response = ExchangeBoundOkBody.createMethodBody
+ (major, minor, // AMQP version (major, minor)
+ NO_QUEUE_BOUND_WITH_RK, // replyCode
+ "No queue bound with routing key " + body.routingKey +
+ " to exchange " + exchangeName); // replyText
}
}
- protocolSession.writeFrame(response);
+ protocolSession.writeResponse(evt, response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index cdb3a503ae..ce4e778c5b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -78,8 +78,8 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9);
- protocolSession.writeFrame(response);
+ AMQMethodBody response = ExchangeDeclareOkBody.createMethodBody((byte)0, (byte)9);
+ protocolSession.writeResponse(evt, response);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index 79b4e07c90..3f94e359cb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -56,8 +56,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9);
- protocolSession.writeFrame(response);
+ protocolSession.writeResponse(evt, ExchangeDeleteOkBody.createMethodBody((byte)0, (byte)9));
}
catch (ExchangeInUseException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 915bfa67a6..567b391868 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -93,8 +93,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9);
- protocolSession.writeFrame(response);
+ final AMQMethodBody response = QueueBindOkBody.createMethodBody((byte)0, (byte)9);
+ protocolSession.writeResponse(evt, response);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 1a7b82829a..fef7fb4197 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -23,7 +23,7 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -105,13 +105,13 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
- (byte)0, (byte)9, // AMQP version (major, minor)
- 0L, // consumerCount
- 0L, // messageCount
- body.queue); // queue
+ AMQMethodBody response = QueueDeclareOkBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ 0L, // consumerCount
+ 0L, // messageCount
+ body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
- protocolSession.writeFrame(response);
+ protocolSession.writeResponse(evt, response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index b867d80fdb..5263462c89 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -84,9 +84,9 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
- (byte)0, (byte)9, // AMQP version (major, minor)
- purged)); // messageCount
+ session.writeResponse(evt, QueueDeleteOkBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ purged)); // messageCount
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index 983e6f7e56..ee8b4e05d0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -55,8 +55,8 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9));
- channel.processReturns(protocolSession);
+ protocolSession.writeResponse(evt, TxCommitOkBody.createMethodBody((byte)0, (byte)9));
+ channel.processReturns(protocolSession);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 891dd69d4d..36fa1884ff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -54,7 +54,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9));
+ protocolSession.writeResponse(evt, TxRollbackOkBody.createMethodBody((byte)0, (byte)9));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
channel.resend(protocolSession);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
index 0c2a6ca210..0bfbed32f1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -51,6 +51,6 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9));
+ protocolSession.writeResponse(evt, TxSelectOkBody.createMethodBody((byte)0, (byte)9));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
index 72e241ea0a..7160a292d6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
@@ -22,8 +22,6 @@ package org.apache.qpid.server.message.jms;
import org.apache.qpid.server.message.MessageDecorator;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import javax.jms.Message;
import javax.jms.JMSException;
@@ -35,13 +33,10 @@ public class JMSMessage implements MessageDecorator
{
private AMQMessage _message;
- private BasicContentHeaderProperties _properties;
public JMSMessage(AMQMessage message)
{
_message = message;
- ContentHeaderBody contentHeader = message.getContentHeaderBody();
- _properties = (BasicContentHeaderProperties) contentHeader.properties;
}
protected void checkWriteable() throws MessageNotWriteableException
@@ -56,29 +51,29 @@ public class JMSMessage implements MessageDecorator
public String getJMSMessageID()
{
- return _properties.getMessageId();
+ return _message.getXXXMessageId();
}
public void setJMSMessageID(String string) throws MessageNotWriteableException
{
checkWriteable();
- _properties.setMessageId(string);
+ _message.setXXXMessageId(string);
}
public long getJMSTimestamp()
{
- return _properties.getTimestamp();
+ return _message.getTimestamp();
}
public void setJMSTimestamp(long l) throws MessageNotWriteableException
{
checkWriteable();
- _properties.setTimestamp(l);
+ _message.setTimestamp(l);
}
public byte[] getJMSCorrelationIDAsBytes()
{
- return _properties.getCorrelationId().getBytes();
+ return _message.getCorrelationId().getBytes();
}
// public void setJMSCorrelationIDAsBytes(byte[] bytes)
@@ -88,23 +83,23 @@ public class JMSMessage implements MessageDecorator
public void setJMSCorrelationID(String string) throws MessageNotWriteableException
{
checkWriteable();
- _properties.setCorrelationId(string);
+ _message.setCorrelationId(string);
}
public String getJMSCorrelationID()
{
- return _properties.getCorrelationId();
+ return _message.getCorrelationId();
}
public String getJMSReplyTo()
{
- return _properties.getReplyTo();
+ return _message.getReplyTo();
}
public void setJMSReplyTo(Destination destination) throws MessageNotWriteableException
{
checkWriteable();
- _properties.setReplyTo(destination.toString());
+ _message.setReplyTo(destination.toString());
}
public String getJMSDestination()
@@ -121,13 +116,13 @@ public class JMSMessage implements MessageDecorator
public int getJMSDeliveryMode()
{
- return _properties.getDeliveryMode();
+ return _message.getDeliveryMode();
}
public void setJMSDeliveryMode(byte i) throws MessageNotWriteableException
{
checkWriteable();
- _properties.setDeliveryMode(i);
+ _message.setDeliveryMode(i);
}
public boolean getJMSRedelivered()
@@ -143,150 +138,150 @@ public class JMSMessage implements MessageDecorator
public String getJMSType()
{
- return _properties.getType();
+ return _message.getType();
}
public void setJMSType(String string) throws MessageNotWriteableException
{
checkWriteable();
- _properties.setType(string);
+ _message.setType(string);
}
public long getJMSExpiration()
{
- return _properties.getExpiration();
+ return _message.getExpiration();
}
public void setJMSExpiration(long l) throws MessageNotWriteableException
{
checkWriteable();
- _properties.setExpiration(l);
+ _message.setExpiration(l);
}
public int getJMSPriority()
{
- return _properties.getPriority();
+ return _message.getPriority();
}
public void setJMSPriority(byte i) throws MessageNotWriteableException
{
checkWriteable();
- _properties.setPriority(i);
+ _message.setPriority(i);
}
public void clearProperties() throws MessageNotWriteableException
{
checkWriteable();
- _properties.getJMSHeaders().clear();
+ _message.getApplicationHeaders().clear();
}
public boolean propertyExists(String string)
{
- return _properties.getJMSHeaders().propertyExists(string);
+ return _message.getApplicationHeaders().propertyExists(string);
}
public boolean getBooleanProperty(String string) throws JMSException
{
- return _properties.getJMSHeaders().getBoolean(string);
+ return _message.getApplicationHeaders().getBoolean(string);
}
public byte getByteProperty(String string) throws JMSException
{
- return _properties.getJMSHeaders().getByte(string);
+ return _message.getApplicationHeaders().getByte(string);
}
public short getShortProperty(String string) throws JMSException
{
- return _properties.getJMSHeaders().getShort(string);
+ return _message.getApplicationHeaders().getShort(string);
}
public int getIntProperty(String string) throws JMSException
{
- return _properties.getJMSHeaders().getInteger(string);
+ return _message.getApplicationHeaders().getInteger(string);
}
public long getLongProperty(String string) throws JMSException
{
- return _properties.getJMSHeaders().getLong(string);
+ return _message.getApplicationHeaders().getLong(string);
}
public float getFloatProperty(String string) throws JMSException
{
- return _properties.getJMSHeaders().getFloat(string);
+ return _message.getApplicationHeaders().getFloat(string);
}
public double getDoubleProperty(String string) throws JMSException
{
- return _properties.getJMSHeaders().getDouble(string);
+ return _message.getApplicationHeaders().getDouble(string);
}
public String getStringProperty(String string) throws JMSException
{
- return _properties.getJMSHeaders().getString(string);
+ return _message.getApplicationHeaders().getString(string);
}
public Object getObjectProperty(String string) throws JMSException
{
- return _properties.getJMSHeaders().getObject(string);
+ return _message.getApplicationHeaders().getObject(string);
}
public Enumeration getPropertyNames()
{
- return _properties.getJMSHeaders().getPropertyNames();
+ return _message.getApplicationHeaders().getPropertyNames();
}
public void setBooleanProperty(String string, boolean b) throws JMSException
{
checkWriteable();
- _properties.getJMSHeaders().setBoolean(string, b);
+ _message.getApplicationHeaders().setBoolean(string, b);
}
public void setByteProperty(String string, byte b) throws JMSException
{
checkWriteable();
- _properties.getJMSHeaders().setByte(string, b);
+ _message.getApplicationHeaders().setByte(string, b);
}
public void setShortProperty(String string, short i) throws JMSException
{
checkWriteable();
- _properties.getJMSHeaders().setShort(string, i);
+ _message.getApplicationHeaders().setShort(string, i);
}
public void setIntProperty(String string, int i) throws JMSException
{
checkWriteable();
- _properties.getJMSHeaders().setInteger(string, i);
+ _message.getApplicationHeaders().setInteger(string, i);
}
public void setLongProperty(String string, long l) throws JMSException
{
checkWriteable();
- _properties.getJMSHeaders().setLong(string, l);
+ _message.getApplicationHeaders().setLong(string, l);
}
public void setFloatProperty(String string, float v) throws JMSException
{
checkWriteable();
- _properties.getJMSHeaders().setFloat(string, v);
+ _message.getApplicationHeaders().setFloat(string, v);
}
public void setDoubleProperty(String string, double v) throws JMSException
{
checkWriteable();
- _properties.getJMSHeaders().setDouble(string, v);
+ _message.getApplicationHeaders().setDouble(string, v);
}
public void setStringProperty(String string, String string1) throws JMSException
{
checkWriteable();
- _properties.getJMSHeaders().setString(string, string1);
+ _message.getApplicationHeaders().setString(string, string1);
}
public void setObjectProperty(String string, Object object) throws JMSException
{
checkWriteable();
- _properties.getJMSHeaders().setObject(string, object);
+ _message.getApplicationHeaders().setObject(string, object);
}
public void acknowledge() throws MessageNotWriteableException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index ee035287b7..a2268c792e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -52,6 +52,7 @@ import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import javax.management.JMException;
@@ -162,10 +163,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
+ private AMQChannel createChannel(int id) throws AMQException {
+ IApplicationRegistry registry = ApplicationRegistry.getInstance();
+ AMQChannel channel = new AMQChannel(id, registry.getMessageStore(),
+ _exchangeRegistry, this, _stateManager);
+ addChannel(channel);
+ return channel;
+ }
+
public void dataBlockReceived(AMQDataBlock message)
throws Exception
{
_lastReceived = message;
+
if (message instanceof ProtocolInitiation)
{
ProtocolInitiation pi = (ProtocolInitiation) message;
@@ -180,13 +190,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
- AMQMethodBody connectionStartBody = ConnectionStartBody.createMethodBody(
- (byte)_major, (byte)_minor, // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short)_major, // versionMajor
- (short)_minor); // versionMinor
+ createChannel(0);
+ AMQMethodBody connectionStartBody = ConnectionStartBody.createMethodBody
+ ((byte)_major, (byte)_minor, // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short)_major, // versionMajor
+ (short)_minor); // versionMinor
writeRequest(0, connectionStartBody, _stateManager);
}
catch (AMQException e)
@@ -209,6 +220,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
AMQFrame frame = (AMQFrame) message;
+ AMQChannel channel = getChannel(frame.channel);
+ if (channel == null) {
+ channel = createChannel(frame.channel);
+ }
+
if (frame.bodyFrame instanceof AMQRequestBody)
{
requestFrameReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
@@ -224,7 +240,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws AMQException
+ private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws Exception
{
if (_logger.isDebugEnabled())
{
@@ -235,7 +251,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
responseManager.requestReceived(requestBody);
}
- private void responseFrameReceived(int channelNum, AMQResponseBody responseBody) throws AMQException
+ private void responseFrameReceived(int channelNum, AMQResponseBody responseBody) throws Exception
{
if (_logger.isDebugEnabled())
{
@@ -247,7 +263,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener)
- throws RequestResponseMappingException
+ throws AMQException
{
AMQChannel channel = getChannel(channelNum);
RequestManager requestManager = channel.getRequestManager();
@@ -255,13 +271,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
- throws RequestResponseMappingException
+ throws AMQException
{
AMQChannel channel = getChannel(channelNum);
ResponseManager responseManager = channel.getResponseManager();
responseManager.sendResponse(requestId, methodBody);
}
+ public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
+ throws AMQException
+ {
+ writeResponse(evt.getChannelId(), evt.getRequestId(), response);
+ }
+
/**
* Convenience method that writes a frame to the protocol session. Equivalent
* to calling getProtocolSession().write().
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 0e5c1ec8b2..badb523786 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -20,7 +20,7 @@ package org.apache.qpid.server.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -196,17 +196,17 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
- (byte)0, (byte)9, // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- "Broker Management Console has closing the connection." // replyText
- );
- _session.writeFrame(response);
-
+ final AMQMethodBody request = ConnectionCloseBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "Broker Management Console has closing the connection." // replyText
+ );
try
{
+ if (true) throw new Error("XXX");
+ _session.writeRequest(0, request, null /*XXX*/);
_session.closeSession();
}
catch (AMQException ex)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 5aa991b580..3f0cf63b70 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -49,11 +49,9 @@ public class AMQMessage
private AMQProtocolSession _publisher;
- private final BasicPublishBody _publishBody;
+ private final MessageTransferBody _transferBody;
- private ContentHeaderBody _contentHeaderBody;
-
- private List<ContentBody> _contentBodies;
+ private List<MessageAppendBody> _contentBodies;
private boolean _redelivered;
@@ -93,29 +91,28 @@ public class AMQMessage
private AtomicBoolean _taken;
- public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
+ public AMQMessage(MessageStore messageStore, MessageTransferBody transferBody)
{
- this(messageStore, publishBody, true);
+ this(messageStore, transferBody, true);
}
- public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody, boolean storeWhenComplete)
+ public AMQMessage(MessageStore messageStore, MessageTransferBody transferBody, boolean storeWhenComplete)
{
_messageId = messageStore.getNewMessageId();
- _publishBody = publishBody;
+ _transferBody = transferBody;
_store = messageStore;
- _contentBodies = new LinkedList<ContentBody>();
+ _contentBodies = new LinkedList<MessageAppendBody>();
_decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_storeWhenComplete = storeWhenComplete;
_taken = new AtomicBoolean(false);
}
- public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
+ public AMQMessage(MessageStore store, long messageId, MessageTransferBody transferBody,
+ List<MessageAppendBody> contentBodies)
throws AMQException
{
- _publishBody = publishBody;
- _contentHeaderBody = contentHeaderBody;
+ _transferBody = transferBody;
_contentBodies = contentBodies;
_decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_messageId = messageId;
@@ -123,16 +120,103 @@ public class AMQMessage
storeMessage();
}
- public AMQMessage(MessageStore store, BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
+ public AMQMessage(MessageStore store, MessageTransferBody transferBody, List<MessageAppendBody> contentBodies)
throws AMQException
{
- this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
+ this(store, store.getNewMessageId(), transferBody, contentBodies);
}
protected AMQMessage(AMQMessage msg) throws AMQException
{
- this(msg._store, msg._messageId, msg._publishBody, msg._contentHeaderBody, msg._contentBodies);
+ this(msg._store, msg._messageId, msg._transferBody, msg._contentBodies);
+ }
+
+ public long getSize() {
+ throw new Error("XXX");
+ }
+
+ public FieldTable getHeadersTable() {
+ throw new Error("XXX");
+ }
+
+ public FieldTable getApplicationHeaders() {
+ throw new Error("XXX");
+ }
+
+ public void setXXXMessageId(String messageId) {
+ throw new Error("XXX");
+ }
+
+ public String getXXXMessageId() {
+ throw new Error("XXX");
+ }
+
+ public void setType(String type) {
+ throw new Error("XXX");
+ }
+
+ public String getType() {
+ throw new Error("XXX");
+ }
+
+ public void setDeliveryMode(byte mode) {
+ throw new Error("XXX");
+ }
+
+ public byte getDeliveryMode() {
+ throw new Error("XXX");
+ }
+
+ public void setReplyTo(String replyTo) {
+ throw new Error("XXX");
+ }
+
+ public String getReplyTo() {
+ throw new Error("XXX");
+ }
+
+ public void setCorrelationId(String correlationId) {
+ throw new Error("XXX");
+ }
+
+ public String getCorrelationId() {
+ throw new Error("XXX");
+ }
+
+ public void setPriority(byte priority) {
+ throw new Error("XXX");
+ }
+
+ public byte getPriority() {
+ throw new Error("XXX");
+ }
+
+ public void setExpiration(long l) {
+ throw new Error("XXX");
+ }
+
+ public long getExpiration() {
+ throw new Error("XXX");
+ }
+
+ public void setTimestamp(long l) {
+ throw new Error("XXX");
+ }
+
+ public long getTimestamp() {
+ throw new Error("XXX");
+ }
+
+ public String getContentType() {
+ throw new Error("XXX");
+ }
+
+ public String getEncoding() {
+ throw new Error("XXX");
+ }
+
+ public byte[] getMessageBytes() {
+ throw new Error("XXX");
}
public void storeMessage() throws AMQException
@@ -147,11 +231,12 @@ public class AMQMessage
{
AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()];
- allFrames[0] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
+ if (true) throw new Error("XXX");
+ /*allFrames[0] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
for (int i = 1; i < allFrames.length; i++)
{
allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 1));
- }
+ }*/
return new CompositeAMQDataBlock(encodedDeliverBody, allFrames);
}
@@ -163,7 +248,9 @@ public class AMQMessage
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
+ if (true) throw new Error("XXX");
+ /*
+ allFrames[0] = MessageTransferBody.createAMQFrame(channel,
(byte)0, (byte)9, // AMQP version (major, minor)
consumerTag, // consumerTag
deliveryTag, // deliveryTag
@@ -171,65 +258,32 @@ public class AMQMessage
_redelivered, // redelivered
getRoutingKey() // routingKey
);
- allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
+ allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
for (int i = 2; i < allFrames.length; i++)
{
allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2));
- }
+ }*/
return new CompositeAMQDataBlock(allFrames);
}
public List<AMQBody> getPayload()
{
List<AMQBody> payload = new ArrayList<AMQBody>(2 + _contentBodies.size());
- payload.add(_publishBody);
- payload.add(_contentHeaderBody);
+ payload.add(_transferBody);
payload.addAll(_contentBodies);
return payload;
}
- public BasicPublishBody getPublishBody()
+ public MessageTransferBody getTransferBody()
{
- return _publishBody;
- }
-
- public ContentHeaderBody getContentHeaderBody()
- {
- return _contentHeaderBody;
- }
-
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
- {
- _contentHeaderBody = contentHeaderBody;
- if (_storeWhenComplete && isAllContentReceived())
- {
- storeMessage();
- }
- }
-
- public List<ContentBody> getContentBodies()
- {
- return _contentBodies;
- }
-
- public void setContentBodies(List<ContentBody> contentBodies)
- {
- _contentBodies = contentBodies;
- }
-
- public void addContentBodyFrame(ContentBody contentBody) throws AMQException
- {
- _contentBodies.add(contentBody);
- _bodyLengthReceived += contentBody.getSize();
- if (_storeWhenComplete && isAllContentReceived())
- {
- storeMessage();
- }
+ return _transferBody;
}
public boolean isAllContentReceived()
{
- return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ if (true) throw new Error("XXX");
+ /*XXX*/return false;
+ //return _bodyLengthReceived == _contentHeaderBody.bodySize;
}
@@ -240,22 +294,22 @@ public class AMQMessage
String getExchangeName()
{
- return _publishBody.exchange;
+ return _transferBody.exchange;
}
String getRoutingKey()
{
- return _publishBody.routingKey;
+ return _transferBody.routingKey;
}
boolean isImmediate()
{
- return _publishBody.immediate;
+ return _transferBody.immediate;
}
NoConsumersException getNoConsumersException(String queue)
{
- return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies);
+ return new NoConsumersException(queue, this);
}
public void setRedelivered(boolean redelivered)
@@ -347,14 +401,7 @@ public class AMQMessage
public boolean isPersistent() throws AMQException
{
- if (_contentHeaderBody == null)
- {
- throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
- }
-
- //todo remove literal values to a constant file such as AMQConstants in common
- return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
- && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ return getDeliveryMode() == 2;
}
public void setTxnBuffer(TxnBuffer buffer)
@@ -377,7 +424,7 @@ public class AMQMessage
{
if (isImmediate() && !_deliveredToConsumer)
{
- throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
+ throw new NoConsumersException(this);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 1bdf265a1b..aee6f74117 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -17,13 +17,11 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.mina.common.ByteBuffer;
import javax.management.openmbean.*;
@@ -34,6 +32,7 @@ import javax.management.MBeanNotificationInfo;
import javax.management.OperationsException;
import javax.management.monitor.MonitorNotification;
import java.util.List;
+import java.util.Set;
import java.util.ArrayList;
/**
@@ -191,7 +190,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
return 0l;
}
- return msg.getContentHeaderBody().bodySize;
+ return msg.getSize();
}
/**
@@ -273,32 +272,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
// get message content
- List<ContentBody> cBodies = msg.getContentBodies();
- List<Byte> msgContent = new ArrayList<Byte>();
- if (cBodies != null)
- {
- for (ContentBody body : cBodies)
- {
- if (body.getSize() != 0)
- {
- ByteBuffer slice = body.payload.slice();
- for (int j = 0; j < slice.limit(); j++)
- {
- msgContent.add(slice.get());
- }
- }
- }
- }
+ byte[] msgContent = msg.getMessageBytes();
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
- {
- mimeType = headerProperties.getContentType();
- encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
- }
- Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
+ String mimeType = msg.getContentType();
+ String encoding = msg.getEncoding();
+ Object[] itemValues = {msgId, mimeType, encoding, msgContent};
return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
}
@@ -321,11 +300,15 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
{
AMQMessage msg = list.get(i - 1);
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
- String[] headerAttributes = headerProperties.toString().split(",");
- Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
+ FieldTable headers = msg.getHeadersTable();
+ Set<String> names = headers.keys();
+ String[] values = new String[names.size()];
+ int index = 0;
+ for (String name : names) {
+ values[index++] = "" + headers.get(name);
+ }
+ Object[] itemValues = {msg.getMessageId(), values, msg.getSize(), msg.isRedelivered()};
CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
_messageList.put(messageData);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
index 022d3b9635..4609cce054 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
@@ -24,7 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.configuration.Configurator;
import java.util.ArrayList;
@@ -143,8 +142,9 @@ public class ConcurrentDeliveryManager implements DeliveryManager
private boolean addMessageToQueue(AMQMessage msg)
{
- // Shrink the ContentBodies to their actual size to save memory.
- if (compressBufferOnQueue)
+ // Shrink the ContentBodies to their actual size to save memory.
+ if (true) throw new Error("XXX");
+ /*if (compressBufferOnQueue)
{
Iterator it = msg.getContentBodies().iterator();
while (it.hasNext())
@@ -152,7 +152,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager
ContentBody cb = (ContentBody) it.next();
cb.reduceBufferToFit();
}
- }
+ }*/
_messages.offer(msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index f09e8213b1..53ada898ab 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -24,7 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.configuration.Configurator;
import java.util.ArrayList;
@@ -97,7 +96,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private boolean addMessageToQueue(AMQMessage msg)
{
// Shrink the ContentBodies to their actual size to save memory.
- if (compressBufferOnQueue)
+ if (true) throw new Error("XXX");
+ /*if (compressBufferOnQueue)
{
Iterator it = msg.getContentBodies().iterator();
while (it.hasNext())
@@ -105,7 +105,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
ContentBody cb = (ContentBody) it.next();
cb.reduceBufferToFit();
}
- }
+ }*/
_messages.offer(msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
index 2d37b806f6..09f0b00e90 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
@@ -21,9 +21,6 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.protocol.AMQConstant;
import java.util.List;
@@ -35,19 +32,14 @@ import java.util.List;
*/
public class NoConsumersException extends RequiredDeliveryException
{
- public NoConsumersException(String queue,
- BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
+ public NoConsumersException(String queue, AMQMessage message)
{
- super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
+ super("Immediate delivery to " + queue + " is not possible.", message);
}
- public NoConsumersException(BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
+ public NoConsumersException(AMQMessage message)
{
- super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+ super("Immediate delivery is not possible.", message);
}
public int getReplyCode()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 149c765df5..1be4dbd95a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -28,9 +28,8 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -235,7 +234,9 @@ public class SubscriptionImpl implements Subscription
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ ByteBuffer deliver = null;
+ if (true) throw new Error("XXX");
+ //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
protocolSession.writeFrame(frame);
@@ -268,7 +269,9 @@ public class SubscriptionImpl implements Subscription
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ ByteBuffer deliver = null;
+ if (true) throw new Error("XXX");
+ //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
protocolSession.writeFrame(frame);
@@ -382,10 +385,11 @@ public class SubscriptionImpl implements Subscription
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- (byte)0, (byte)9, // AMQP version (major, minor)
+ if (true) throw new Error("XXX");
+ /*protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
consumerTag // consumerTag
- ));
+ ));*/
_closed = true;
}
}
@@ -396,12 +400,12 @@ public class SubscriptionImpl implements Subscription
}
- private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+ /* private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
+ AMQFrame deliverFrame = MessageTransferBody.createAMQFrame(channel.getChannelId(),
(byte)0, (byte)9, // AMQP version (major, minor)
consumerTag, // consumerTag
deliveryTag, // deliveryTag
@@ -413,5 +417,5 @@ public class SubscriptionImpl implements Subscription
deliverFrame.writePayload(buf);
buf.flip();
return buf;
- }
+ }*/
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
index 00a27a8869..623b2356ae 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
@@ -24,6 +24,8 @@ import org.apache.mina.common.ByteBuffer;
public class AMQRequestBody extends AMQBody
{
+ public static final byte TYPE = 9;
+
// Fields declared in specification
protected long requestId;
protected long responseMark;
@@ -54,14 +56,14 @@ public class AMQRequestBody extends AMQBody
protected int getSize()
{
- return 8 + 8 + 4 + methodPayload.getBodySize();
+ return 8 + 8 + 4 + methodPayload.getSize();
}
protected void writePayload(ByteBuffer buffer)
{
EncodingUtils.writeLong(buffer, requestId);
EncodingUtils.writeLong(buffer, responseMark);
- EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0
+ EncodingUtils.writeInteger(buffer, 0); // reserved, set to 0
methodPayload.writePayload(buffer);
}
@@ -70,7 +72,7 @@ public class AMQRequestBody extends AMQBody
{
requestId = EncodingUtils.readLong(buffer);
responseMark = EncodingUtils.readLong(buffer);
- int reserved = EncodingUtils.readShort(buffer); // reserved, throw away
+ int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away
methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
index 90038da2d4..2b0fc97b1b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
@@ -24,6 +24,8 @@ import org.apache.mina.common.ByteBuffer;
public class AMQResponseBody extends AMQBody
{
+ public static final byte TYPE = 10;
+
// Fields declared in specification
protected long responseId;
protected long requestId;
@@ -54,14 +56,15 @@ public class AMQResponseBody extends AMQBody
protected int getSize()
{
- return 8 + 8 + 4 + methodPayload.getBodySize();
+ return 8 + 8 + 4 + methodPayload.getSize();
}
protected void writePayload(ByteBuffer buffer)
{
EncodingUtils.writeLong(buffer, responseId);
EncodingUtils.writeLong(buffer, requestId);
- EncodingUtils.writeUnsignedShort(buffer, batchOffset);
+ // XXX
+ EncodingUtils.writeInteger(buffer, batchOffset);
methodPayload.writePayload(buffer);
}
@@ -70,7 +73,8 @@ public class AMQResponseBody extends AMQBody
{
responseId = EncodingUtils.readLong(buffer);
requestId = EncodingUtils.readLong(buffer);
- batchOffset = EncodingUtils.readShort(buffer);
+ // XXX
+ batchOffset = EncodingUtils.readInteger(buffer);
methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
index 5ec9b122af..e39d85277d 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
@@ -20,9 +20,10 @@
*/
package org.apache.qpid.protocol;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
public interface AMQProtocolWriter
@@ -34,8 +35,12 @@ public interface AMQProtocolWriter
public void writeFrame(AMQDataBlock frame);
public long writeRequest(int channelNum, AMQMethodBody methodBody,
- AMQMethodListener methodListener) throws RequestResponseMappingException;
+ AMQMethodListener methodListener)
+ throws AMQException;
public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
- throws RequestResponseMappingException;
+ throws AMQException;
+
+ public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
+ throws AMQException;
}