summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp38
1 files changed, 25 insertions, 13 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 33f7a63d45..7361d8827a 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -1,3 +1,4 @@
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -18,8 +19,11 @@
#include "MessageHandlerImpl.h"
#include "BrokerChannel.h"
+#include "FramingContent.h"
#include "Connection.h"
#include "Broker.h"
+#include "BrokerMessageMessage.h"
+
namespace qpid {
namespace broker {
@@ -41,7 +45,7 @@ void
MessageHandlerImpl::cancel( const MethodContext& context,
const string& destination )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
channel.cancel(destination);
@@ -73,10 +77,9 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool exclusive,
const qpid::framing::FieldTable& filter )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- Channel& channel = connection.getChannel(channel.getId());
if(!destination.empty() && channel.exists(destination)){
throw ConnectionException(530, "Consumer tags must be unique");
}
@@ -108,7 +111,7 @@ MessageHandlerImpl::get( const MethodContext& context,
const string& /*destination*/,
bool noAck )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Queue::shared_ptr queue =
connection.getQueue(queueName, context.channelId);
@@ -146,7 +149,7 @@ MessageHandlerImpl::qos(const MethodContext& context,
u_int16_t prefetchCount,
bool /*global*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
@@ -159,7 +162,7 @@ void
MessageHandlerImpl::recover(const MethodContext&,
bool requeue )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
channel.recover(requeue);
@@ -182,18 +185,18 @@ MessageHandlerImpl::resume(const MethodContext&,
}
void
-MessageHandlerImpl::transfer(const MethodContext&,
+MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool immediate,
+ bool /*immediate*/,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& routingKey,
+ const string& /*routingKey*/,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -204,15 +207,24 @@ MessageHandlerImpl::transfer(const MethodContext&,
const string& /*transactionId*/,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
+ qpid::framing::Content body,
+ bool /*mandatory*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Exchange::shared_ptr exchange = exchangeName.empty() ?
broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate);
- channel.handlePublish(msg, exchange);
+ if (body.isInline()) {
+// MessageMessage* msg =
+// new MessageMessage(&connection, exchangeName, routingKey, immediate);
+// channel.handlePublish(msg, exchange);
+
+ connection.client->getMessageHandler()->ok(context);
+ } else {
+ // Don't handle reference content yet
+ assert(body.isInline());
+ }
}else{
throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
}