summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp33
1 files changed, 16 insertions, 17 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 797e3fbbf9..0853aebcb1 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -25,16 +25,15 @@
#include "BrokerMessageMessage.h"
#include "MessageAppendBody.h"
#include "MessageTransferBody.h"
+#include "BrokerAdapter.h"
namespace qpid {
namespace broker {
using namespace framing;
-MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b), references(ch),
- client(connection.client->getMessage())
-{}
+MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
+ : HandlerImplType(parent), references(channel) {}
//
// Message class method handlers
@@ -47,7 +46,7 @@ MessageHandlerImpl::append(const MethodContext& context,
references.get(reference).append(
boost::shared_polymorphic_downcast<MessageAppendBody>(
context.methodBody));
- client.ok(context);
+ client.ok(context.getRequestId());
}
@@ -56,7 +55,7 @@ MessageHandlerImpl::cancel(const MethodContext& context,
const string& destination )
{
channel.cancel(destination);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -73,7 +72,7 @@ MessageHandlerImpl::close(const MethodContext& context,
const string& reference)
{
references.get(reference).close();
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -84,7 +83,7 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool noLocal,
bool noAck,
bool exclusive,
- const qpid::framing::FieldTable& filter )
+ const framing::FieldTable& filter )
{
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!destination.empty() && channel.exists(destination))
@@ -93,7 +92,7 @@ MessageHandlerImpl::consume(const MethodContext& context,
channel.consume(
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
- client.ok(context);
+ client.ok(context.getRequestId());
// Dispatch messages as there is now a consumer.
queue->dispatch();
}
@@ -117,9 +116,9 @@ MessageHandlerImpl::get( const MethodContext& context,
connection.getQueue(queueName, context.channel->getId());
if(channel.get(queue, destination, !noAck))
- client.ok(context);
+ client.ok(context.getRequestId());
else
- client.empty(context);
+ client.empty(context.getRequestId());
}
void
@@ -141,7 +140,7 @@ MessageHandlerImpl::open(const MethodContext& context,
const string& reference)
{
references.open(reference);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -153,7 +152,7 @@ MessageHandlerImpl::qos(const MethodContext& context,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -161,7 +160,7 @@ MessageHandlerImpl::recover(const MethodContext& context,
bool requeue)
{
channel.recover(requeue);
- client.ok(context);
+ client.ok(context.getRequestId());
}
void
@@ -204,8 +203,8 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*appId*/,
const string& /*transactionId*/,
const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content body,
+ const framing::FieldTable& /*applicationHeaders*/,
+ const framing::Content& body,
bool /*mandatory*/)
{
MessageTransferBody::shared_ptr transfer(
@@ -218,7 +217,7 @@ MessageHandlerImpl::transfer(const MethodContext& context,
channel.handleInlineTransfer(message);
else
references.get(body.getValue()).addMessage(message);
- client.ok(context);
+ client.ok(context.getRequestId());
}