diff options
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 38 |
1 files changed, 25 insertions, 13 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 33f7a63d45..7361d8827a 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -1,3 +1,4 @@ + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -18,8 +19,11 @@ #include "MessageHandlerImpl.h" #include "BrokerChannel.h" +#include "FramingContent.h" #include "Connection.h" #include "Broker.h" +#include "BrokerMessageMessage.h" + namespace qpid { namespace broker { @@ -41,7 +45,7 @@ void MessageHandlerImpl::cancel( const MethodContext& context, const string& destination ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature channel.cancel(destination); @@ -73,10 +77,9 @@ MessageHandlerImpl::consume(const MethodContext& context, bool exclusive, const qpid::framing::FieldTable& filter ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - Channel& channel = connection.getChannel(channel.getId()); if(!destination.empty() && channel.exists(destination)){ throw ConnectionException(530, "Consumer tags must be unique"); } @@ -108,7 +111,7 @@ MessageHandlerImpl::get( const MethodContext& context, const string& /*destination*/, bool noAck ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Queue::shared_ptr queue = connection.getQueue(queueName, context.channelId); @@ -146,7 +149,7 @@ MessageHandlerImpl::qos(const MethodContext& context, u_int16_t prefetchCount, bool /*global*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature //TODO: handle global channel.setPrefetchSize(prefetchSize); @@ -159,7 +162,7 @@ void MessageHandlerImpl::recover(const MethodContext&, bool requeue ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature channel.recover(requeue); @@ -182,18 +185,18 @@ MessageHandlerImpl::resume(const MethodContext&, } void -MessageHandlerImpl::transfer(const MethodContext&, +MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool immediate, + bool /*immediate*/, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& routingKey, + const string& /*routingKey*/, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -204,15 +207,24 @@ MessageHandlerImpl::transfer(const MethodContext&, const string& /*transactionId*/, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) + qpid::framing::Content body, + bool /*mandatory*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate); - channel.handlePublish(msg, exchange); + if (body.isInline()) { +// MessageMessage* msg = +// new MessageMessage(&connection, exchangeName, routingKey, immediate); +// channel.handlePublish(msg, exchange); + + connection.client->getMessageHandler()->ok(context); + } else { + // Don't handle reference content yet + assert(body.isInline()); + } }else{ throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); } |
