summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java60
1 files changed, 33 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 752452ab38..9214263fe3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -409,37 +409,43 @@ public class AMQChannel
public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag)
{
final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes();
- AMQMethodBody openBody = MessageOpenBody.createMethodBody(
- _session.getProtocolMajorVersion(), // AMQP major version
- _session.getProtocolMinorVersion(), // AMQP minor version
- refId);
- _session.writeRequest(_channelId, openBody, new AMQMethodListener()
- {
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
- {
- AMQMethodBody method = evt.getMethod();
- if (_log.isDebugEnabled())
- {
- _log.debug(method + " received on channel " + _channelId);
- }
- if (method instanceof MessageOkBody)
- {
- acknowledgeMessage(deliveryTag, false);
- deliverRef(refId, msg, destination, _session.getStateManager());
- return true;
- }
- else
- {
- // TODO: implement reject
- return false;
- }
- }
- public void error(Exception e) {}
- });
+ deliverRef(refId, msg, destination, _session.getStateManager());
+// AMQMethodBody openBody = MessageOpenBody.createMethodBody(
+// _session.getProtocolMajorVersion(), // AMQP major version
+// _session.getProtocolMinorVersion(), // AMQP minor version
+// refId);
+// _session.writeRequest(_channelId, openBody, new AMQMethodListener()
+// {
+// public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+// {
+// AMQMethodBody method = evt.getMethod();
+// if (_log.isDebugEnabled())
+// {
+// _log.debug(method + " received on channel " + _channelId);
+// }
+// if (method instanceof MessageOkBody)
+// {
+// acknowledgeMessage(deliveryTag, false);
+// deliverRef(refId, msg, destination, _session.getStateManager());
+// return true;
+// }
+// else
+// {
+// // TODO: implement reject
+// return false;
+// }
+// }
+// public void error(Exception e) {}
+// });
}
public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
{
+ AMQMethodBody openBody = MessageOpenBody.createMethodBody(
+ _session.getProtocolMajorVersion(), // AMQP major version
+ _session.getProtocolMinorVersion(), // AMQP minor version
+ refId);
+ _session.writeRequest(_channelId, openBody, listener);
MessageTransferBody mtb = msg.getTransferBody().copy();
mtb.destination = destination;
mtb.redelivered = msg.isRedelivered();