diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-01-18 17:56:44 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-01-18 17:56:44 +0000 |
| commit | 5b6ca65abd333e0ea15790ec351bfb67a7013a5e (patch) | |
| tree | 86332583cc0be84981bde900cdac825865599392 /java | |
| parent | 7a26e1d6aa0018e44120bef39903c4ce55676141 (diff) | |
| download | qpid-python-5b6ca65abd333e0ea15790ec351bfb67a7013a5e.tar.gz | |
made message-transfer return a result, switched over message delivery to use message-transfer, added a generated .copy() to method bodies, and made hello-world acknowledge the message it sends to itself
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497515 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java | 2 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java | 56 |
2 files changed, 29 insertions, 29 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java index 58160611d1..353af5bd04 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.framing.MessageOkBody; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; @@ -89,6 +90,7 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT // it is routed to the exchange. AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); channel.addMessageTransfer(body, protocolSession); + protocolSession.writeResponse(evt, MessageOkBody.createMethodBody((byte)0, (byte)9)); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 1be4dbd95a..70074e2c65 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -23,17 +23,21 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.common.ClientProperties; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.Content; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import java.util.Queue; @@ -262,19 +266,33 @@ public class SubscriptionImpl implements Subscription } synchronized(channel) { - long deliveryTag = channel.getNextDeliveryTag(); + final long deliveryTag = channel.getNextDeliveryTag(); if (_acks) { channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = null; - if (true) throw new Error("XXX"); - //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); - AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - - protocolSession.writeFrame(frame); + // XXX: references + MessageTransferBody mtb = msg.getTransferBody().copy(); + mtb.destination = consumerTag; + try { + protocolSession.writeRequest + (channel.getChannelId(), + mtb, new AMQMethodListener() { + public boolean methodReceived(AMQMethodEvent evt) throws AMQException { + if (_logger.isDebugEnabled()) { + _logger.debug("Ack received on channel " + evt.getChannelId()); + } + // XXX: multiple + channel.acknowledgeMessage(deliveryTag, false); + return true; + } + public void error(Exception e) {} + }); + } catch (AMQException e) { + throw new RuntimeException(e); + } } } finally @@ -398,24 +416,4 @@ public class SubscriptionImpl implements Subscription { return _isBrowser; } - - - /* private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) - { - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame deliverFrame = MessageTransferBody.createAMQFrame(channel.getChannelId(), - (byte)0, (byte)9, // AMQP version (major, minor) - consumerTag, // consumerTag - deliveryTag, // deliveryTag - exchange, // exchange - false, // redelivered - routingKey // routingKey - ); - ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? - deliverFrame.writePayload(buf); - buf.flip(); - return buf; - }*/ } |
