diff options
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 33 |
1 files changed, 16 insertions, 17 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 797e3fbbf9..0853aebcb1 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -25,16 +25,15 @@ #include "BrokerMessageMessage.h" #include "MessageAppendBody.h" #include "MessageTransferBody.h" +#include "BrokerAdapter.h" namespace qpid { namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b), references(ch), - client(connection.client->getMessage()) -{} +MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) + : HandlerImplType(parent), references(channel) {} // // Message class method handlers @@ -47,7 +46,7 @@ MessageHandlerImpl::append(const MethodContext& context, references.get(reference).append( boost::shared_polymorphic_downcast<MessageAppendBody>( context.methodBody)); - client.ok(context); + client.ok(context.getRequestId()); } @@ -56,7 +55,7 @@ MessageHandlerImpl::cancel(const MethodContext& context, const string& destination ) { channel.cancel(destination); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -73,7 +72,7 @@ MessageHandlerImpl::close(const MethodContext& context, const string& reference) { references.get(reference).close(); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -84,7 +83,7 @@ MessageHandlerImpl::consume(const MethodContext& context, bool noLocal, bool noAck, bool exclusive, - const qpid::framing::FieldTable& filter ) + const framing::FieldTable& filter ) { Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!destination.empty() && channel.exists(destination)) @@ -93,7 +92,7 @@ MessageHandlerImpl::consume(const MethodContext& context, channel.consume( tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - client.ok(context); + client.ok(context.getRequestId()); // Dispatch messages as there is now a consumer. queue->dispatch(); } @@ -117,9 +116,9 @@ MessageHandlerImpl::get( const MethodContext& context, connection.getQueue(queueName, context.channel->getId()); if(channel.get(queue, destination, !noAck)) - client.ok(context); + client.ok(context.getRequestId()); else - client.empty(context); + client.empty(context.getRequestId()); } void @@ -141,7 +140,7 @@ MessageHandlerImpl::open(const MethodContext& context, const string& reference) { references.open(reference); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -153,7 +152,7 @@ MessageHandlerImpl::qos(const MethodContext& context, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -161,7 +160,7 @@ MessageHandlerImpl::recover(const MethodContext& context, bool requeue) { channel.recover(requeue); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -204,8 +203,8 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*appId*/, const string& /*transactionId*/, const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content body, + const framing::FieldTable& /*applicationHeaders*/, + const framing::Content& body, bool /*mandatory*/) { MessageTransferBody::shared_ptr transfer( @@ -218,7 +217,7 @@ MessageHandlerImpl::transfer(const MethodContext& context, channel.handleInlineTransfer(message); else references.get(body.getValue()).addMessage(message); - client.ok(context); + client.ok(context.getRequestId()); } |
