summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-01-18 17:56:44 +0000
committerRafael H. Schloming <rhs@apache.org>2007-01-18 17:56:44 +0000
commit5b6ca65abd333e0ea15790ec351bfb67a7013a5e (patch)
tree86332583cc0be84981bde900cdac825865599392 /java
parent7a26e1d6aa0018e44120bef39903c4ce55676141 (diff)
downloadqpid-python-5b6ca65abd333e0ea15790ec351bfb67a7013a5e.tar.gz
made message-transfer return a result, switched over message delivery to use message-transfer, added a generated .copy() to method bodies, and made hello-world acknowledge the message it sends to itself
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497515 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java56
2 files changed, 29 insertions, 29 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
index 58160611d1..353af5bd04 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -89,6 +90,7 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT
// it is routed to the exchange.
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.addMessageTransfer(body, protocolSession);
+ protocolSession.writeResponse(evt, MessageOkBody.createMethodBody((byte)0, (byte)9));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 1be4dbd95a..70074e2c65 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -23,17 +23,21 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import java.util.Queue;
@@ -262,19 +266,33 @@ public class SubscriptionImpl implements Subscription
}
synchronized(channel)
{
- long deliveryTag = channel.getNextDeliveryTag();
+ final long deliveryTag = channel.getNextDeliveryTag();
if (_acks)
{
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = null;
- if (true) throw new Error("XXX");
- //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
-
- protocolSession.writeFrame(frame);
+ // XXX: references
+ MessageTransferBody mtb = msg.getTransferBody().copy();
+ mtb.destination = consumerTag;
+ try {
+ protocolSession.writeRequest
+ (channel.getChannelId(),
+ mtb, new AMQMethodListener() {
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException {
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("Ack received on channel " + evt.getChannelId());
+ }
+ // XXX: multiple
+ channel.acknowledgeMessage(deliveryTag, false);
+ return true;
+ }
+ public void error(Exception e) {}
+ });
+ } catch (AMQException e) {
+ throw new RuntimeException(e);
+ }
}
}
finally
@@ -398,24 +416,4 @@ public class SubscriptionImpl implements Subscription
{
return _isBrowser;
}
-
-
- /* private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
- {
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame deliverFrame = MessageTransferBody.createAMQFrame(channel.getChannelId(),
- (byte)0, (byte)9, // AMQP version (major, minor)
- consumerTag, // consumerTag
- deliveryTag, // deliveryTag
- exchange, // exchange
- false, // redelivered
- routingKey // routingKey
- );
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
- }*/
}