summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-18 17:45:23 +0000
committerAlan Conway <aconway@apache.org>2007-01-18 17:45:23 +0000
commit7a26e1d6aa0018e44120bef39903c4ce55676141 (patch)
treeeb676a407cdffd905cfd6d3e5ce3443b3faa0e14 /cpp
parent05306462d72913a31775420b024252dc0f71e30a (diff)
downloadqpid-python-7a26e1d6aa0018e44120bef39903c4ce55676141.tar.gz
* MethodContext: added invocation context for methods. Can carry RequestId
as well as ChannelId. * gentools: AMQP_Clinet|ServerOperations and *Body::invoke(): pass MethodContex instead of channel ID. * All *HandlerImpl classe: use MethodContext instead of channel ID. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp139
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp36
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.h30
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.cpp3
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.h4
-rw-r--r--cpp/lib/common/framing/MethodContext.h55
6 files changed, 161 insertions, 106 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 16519ec646..58062d4471 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -25,9 +25,6 @@
namespace qpid {
namespace broker {
-// FIXME aconway 2007-01-18: Remove channel argument from signatures,
-// adapter is already associated with a cahnnel.
-
using namespace qpid;
using namespace qpid::framing;
@@ -78,47 +75,47 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
public:
ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void startOk(u_int16_t /*channel*/,
+ void startOk(const MethodContext& context,
const qpid::framing::FieldTable& clientProperties,
const std::string& mechanism, const std::string& response,
const std::string& locale);
- void secureOk(u_int16_t /*channel*/, const std::string& response);
- void tuneOk(u_int16_t /*channel*/, u_int16_t channelMax,
+ void secureOk(const MethodContext& context, const std::string& response);
+ void tuneOk(const MethodContext& context, u_int16_t channelMax,
u_int32_t frameMax, u_int16_t heartbeat);
- void open(u_int16_t /*channel*/, const std::string& virtualHost,
+ void open(const MethodContext& context, const std::string& virtualHost,
const std::string& capabilities, bool insist);
- void close(u_int16_t /*channel*/, u_int16_t replyCode,
+ void close(const MethodContext& context, u_int16_t replyCode,
const std::string& replyText,
u_int16_t classId, u_int16_t methodId);
- void closeOk(u_int16_t channel);
+ void closeOk(const MethodContext& context);
};
class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
public:
ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void open(u_int16_t /*channel*/, const std::string& outOfBand);
- void flow(u_int16_t /*channel*/, bool active);
- void flowOk(u_int16_t /*channel*/, bool active);
- void ok( u_int16_t channel );
- void ping( u_int16_t channel );
- void pong( u_int16_t channel );
- void resume( u_int16_t /*channel*/, const std::string& channelId );
- void close(u_int16_t /*channel*/, u_int16_t replyCode, const
+ void open(const MethodContext& context, const std::string& outOfBand);
+ void flow(const MethodContext& context, bool active);
+ void flowOk(const MethodContext& context, bool active);
+ void ok( const MethodContext& context );
+ void ping( const MethodContext& context );
+ void pong( const MethodContext& context );
+ void resume( const MethodContext& context, const std::string& channelId );
+ void close(const MethodContext& context, u_int16_t replyCode, const
std::string& replyText, u_int16_t classId, u_int16_t methodId);
- void closeOk(u_int16_t channel);
+ void closeOk(const MethodContext& context);
};
class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
public:
ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(u_int16_t /*channel*/, u_int16_t ticket,
+ void declare(const MethodContext& context, u_int16_t ticket,
const std::string& exchange, const std::string& type,
bool passive, bool durable, bool autoDelete,
bool internal, bool nowait,
const qpid::framing::FieldTable& arguments);
- void delete_(u_int16_t /*channel*/, u_int16_t ticket,
+ void delete_(const MethodContext& context, u_int16_t ticket,
const std::string& exchange, bool ifUnused, bool nowait);
- void unbind(u_int16_t /*channel*/,
+ void unbind(const MethodContext& context,
u_int16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
@@ -127,22 +124,22 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
class QueueHandlerImpl : private CoreRefs, public QueueHandler{
public:
QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
+ void declare(const MethodContext& context, u_int16_t ticket, const std::string& queue,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait,
const qpid::framing::FieldTable& arguments);
- void bind(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
+ void bind(const MethodContext& context, u_int16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
bool nowait, const qpid::framing::FieldTable& arguments);
- void unbind(u_int16_t /*channel*/,
+ void unbind(const MethodContext& context,
u_int16_t ticket,
const std::string& queue,
const std::string& exchange,
const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
- void purge(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
+ void purge(const MethodContext& context, u_int16_t ticket, const std::string& queue,
bool nowait);
- void delete_(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
+ void delete_(const MethodContext& context, u_int16_t ticket, const std::string& queue,
bool ifUnused, bool ifEmpty,
bool nowait);
};
@@ -150,31 +147,31 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
class BasicHandlerImpl : private CoreRefs, public BasicHandler{
public:
BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void qos(u_int16_t /*channel*/, u_int32_t prefetchSize,
+ void qos(const MethodContext& context, u_int32_t prefetchSize,
u_int16_t prefetchCount, bool global);
void consume(
- u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
+ const MethodContext& context, u_int16_t ticket, const std::string& queue,
const std::string& consumerTag, bool noLocal, bool noAck,
bool exclusive, bool nowait,
const qpid::framing::FieldTable& fields);
- void cancel(u_int16_t /*channel*/, const std::string& consumerTag,
+ void cancel(const MethodContext& context, const std::string& consumerTag,
bool nowait);
- void publish(u_int16_t /*channel*/, u_int16_t ticket,
+ void publish(const MethodContext& context, u_int16_t ticket,
const std::string& exchange, const std::string& routingKey,
bool mandatory, bool immediate);
- void get(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
+ void get(const MethodContext& context, u_int16_t ticket, const std::string& queue,
bool noAck);
- void ack(u_int16_t /*channel*/, u_int64_t deliveryTag, bool multiple);
- void reject(u_int16_t /*channel*/, u_int64_t deliveryTag, bool requeue);
- void recover(u_int16_t /*channel*/, bool requeue);
+ void ack(const MethodContext& context, u_int64_t deliveryTag, bool multiple);
+ void reject(const MethodContext& context, u_int64_t deliveryTag, bool requeue);
+ void recover(const MethodContext& context, bool requeue);
};
class TxHandlerImpl : private CoreRefs, public TxHandler{
public:
TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void select(u_int16_t channel);
- void commit(u_int16_t channel);
- void rollback(u_int16_t channel);
+ void select(const MethodContext& context);
+ void commit(const MethodContext& context);
+ void rollback(const MethodContext& context);
};
BasicHandlerImpl basicHandler;
@@ -188,37 +185,37 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
};
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
- u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+ const MethodContext& , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat);
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(const MethodContext&, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
connection.framemax = framemax;
connection.heartbeat = heartbeat;
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext&, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
connection.client->getConnection().openOk(0, knownhosts);
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
- u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
connection.client->getConnection().closeOk(0);
connection.context->close();
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){
connection.context->close();
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
- u_int16_t /*channel*/, const string& /*outOfBand*/){
+ const MethodContext&, const string& /*outOfBand*/){
// FIXME aconway 2007-01-17: Assertions on all channel methods,
assertChannelNonZero(channel.getId());
if (channel.isOpen())
@@ -228,21 +225,21 @@ void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
connection.client->getChannel().openOk(channel.getId(), std::string()/* ID */);
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/){
connection.client->getChannel().closeOk(channel.getId());
// FIXME aconway 2007-01-18: Following line destroys this. Ugly.
connection.closeChannel(channel.getId());
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&){}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& exchange, const string& type,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& exchange, const string& type,
bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
const FieldTable& /*arguments*/){
@@ -271,7 +268,7 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t /*channel*
void BrokerAdapter::ServerOps::ExchangeHandlerImpl::unbind(
- u_int16_t /*channel*/,
+ const MethodContext&,
u_int16_t /*ticket*/,
const string& /*queue*/,
const string& /*exchange*/,
@@ -283,7 +280,7 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::unbind(
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t /*channel*/, u_int16_t /*ticket*/,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/,
const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
@@ -291,7 +288,7 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t /*channel*
if(!nowait) connection.client->getExchange().deleteOk(channel.getId());
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& name,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
@@ -326,7 +323,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t /*channel*/,
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_int16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
@@ -345,14 +342,14 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t /*channel*/, u_i
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
if(!nowait) connection.client->getQueue().purgeOk(channel.getId(), count);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queue,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/, const string& queue,
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
@@ -378,7 +375,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t /*channel*/,
-void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t /*channel*/, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext&, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
@@ -386,7 +383,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t /*channel*/, u_in
}
void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
- u_int16_t /*channel*/, u_int16_t /*ticket*/,
+ const MethodContext&, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
bool nowait, const FieldTable& fields)
@@ -413,13 +410,13 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t /*channel*/, const string& consumerTag, bool nowait){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext&, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
if(!nowait) connection.client->getBasic().cancelOk(channel.getId(), consumerTag);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t /*channel*/, u_int16_t /*ticket*/,
+void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate){
@@ -433,7 +430,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t /*channel*/,
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
@@ -442,7 +439,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t /*channel*/, u_in
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t /*channel*/, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
try{
channel.ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
@@ -450,23 +447,23 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t /*channel*/, u_in
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t /*channel*/, bool requeue){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
channel.recover(requeue);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t /*channel*/){
+void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext&){
channel.begin();
connection.client->getTx().selectOk(channel.getId());
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t /*channel*/){
+void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext&){
channel.commit();
connection.client->getTx().commitOk(channel.getId());
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t /*channel*/){
+void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext&){
channel.rollback();
connection.client->getTx().rollbackOk(channel.getId());
@@ -475,7 +472,7 @@ void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t /*channel*/){
void
BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
- u_int16_t /*channel*/,
+ const MethodContext&,
u_int16_t /*ticket*/,
const string& /*queue*/,
const string& /*exchange*/,
@@ -486,26 +483,26 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
- u_int16_t /*channel*/,
+ const MethodContext&,
const string& /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 0c0f9e96eb..07ede4097a 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -23,11 +23,13 @@
namespace qpid {
namespace broker {
+using namespace framing;
+
//
// Message class method handlers
//
void
-MessageHandlerImpl::append( u_int16_t /*channel*/,
+MessageHandlerImpl::append(const MethodContext&,
const string& /*reference*/,
const string& /*bytes*/ )
{
@@ -36,18 +38,18 @@ MessageHandlerImpl::append( u_int16_t /*channel*/,
void
-MessageHandlerImpl::cancel( u_int16_t channel,
+MessageHandlerImpl::cancel( const MethodContext& context,
const string& destination )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
- connection.getChannel(channel).cancel(destination);
+ channel.cancel(destination);
- connection.client->getMessageHandler()->ok(channel);
+ connection.client->getMessageHandler()->ok(context);
}
void
-MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
+MessageHandlerImpl::checkpoint(const MethodContext&,
const string& /*reference*/,
const string& /*identifier*/ )
{
@@ -55,14 +57,14 @@ MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
}
void
-MessageHandlerImpl::close( u_int16_t /*channel*/,
+MessageHandlerImpl::close(const MethodContext&,
const string& /*reference*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-MessageHandlerImpl::consume( u_int16_t /*channel*/,
+MessageHandlerImpl::consume(const MethodContext&,
u_int16_t /*ticket*/,
const string& queueName,
const string& destination,
@@ -94,13 +96,13 @@ MessageHandlerImpl::consume( u_int16_t /*channel*/,
}
void
-MessageHandlerImpl::empty( u_int16_t /*channel*/ )
+MessageHandlerImpl::empty( const MethodContext& )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-MessageHandlerImpl::get( u_int16_t /*channelId*/,
+MessageHandlerImpl::get( const MethodContext&,
u_int16_t /*ticket*/,
const string& queueName,
const string& /*destination*/,
@@ -119,27 +121,27 @@ MessageHandlerImpl::get( u_int16_t /*channelId*/,
}
void
-MessageHandlerImpl::offset( u_int16_t /*channel*/,
+MessageHandlerImpl::offset(const MethodContext&,
u_int64_t /*value*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-MessageHandlerImpl::ok( u_int16_t /*channel*/ )
+MessageHandlerImpl::ok( const MethodContext& )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-MessageHandlerImpl::open( u_int16_t /*channel*/,
+MessageHandlerImpl::open(const MethodContext&,
const string& /*reference*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-MessageHandlerImpl::qos( u_int16_t /*channel*/,
+MessageHandlerImpl::qos(const MethodContext&,
u_int32_t prefetchSize,
u_int16_t prefetchCount,
bool /*global*/ )
@@ -154,7 +156,7 @@ MessageHandlerImpl::qos( u_int16_t /*channel*/,
}
void
-MessageHandlerImpl::recover( u_int16_t /*channel*/,
+MessageHandlerImpl::recover(const MethodContext&,
bool requeue )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -164,7 +166,7 @@ MessageHandlerImpl::recover( u_int16_t /*channel*/,
}
void
-MessageHandlerImpl::reject( u_int16_t /*channel*/,
+MessageHandlerImpl::reject(const MethodContext&,
u_int16_t /*code*/,
const string& /*text*/ )
{
@@ -172,7 +174,7 @@ MessageHandlerImpl::reject( u_int16_t /*channel*/,
}
void
-MessageHandlerImpl::resume( u_int16_t /*channel*/,
+MessageHandlerImpl::resume(const MethodContext&,
const string& /*reference*/,
const string& /*identifier*/ )
{
@@ -180,7 +182,7 @@ MessageHandlerImpl::resume( u_int16_t /*channel*/,
}
void
-MessageHandlerImpl::transfer( u_int16_t /*channel*/,
+MessageHandlerImpl::transfer(const MethodContext&,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h
index 77e30abe05..d97d29153a 100644
--- a/cpp/lib/broker/MessageHandlerImpl.h
+++ b/cpp/lib/broker/MessageHandlerImpl.h
@@ -37,21 +37,21 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH
MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
: channel(ch), connection(c), broker(b) {}
- void append( u_int16_t channel,
+ void append(const qpid::framing::MethodContext&,
const std::string& reference,
const std::string& bytes );
- void cancel( u_int16_t channel,
+ void cancel(const qpid::framing::MethodContext&,
const std::string& destination );
- void checkpoint( u_int16_t channel,
+ void checkpoint(const qpid::framing::MethodContext&,
const std::string& reference,
const std::string& identifier );
- void close( u_int16_t channel,
+ void close(const qpid::framing::MethodContext&,
const std::string& reference );
- void consume( u_int16_t channel,
+ void consume(const qpid::framing::MethodContext&,
u_int16_t ticket,
const std::string& queue,
const std::string& destination,
@@ -60,39 +60,39 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH
bool exclusive,
const qpid::framing::FieldTable& filter );
- void empty( u_int16_t channel );
+ void empty( const qpid::framing::MethodContext& );
- void get( u_int16_t channel,
+ void get(const qpid::framing::MethodContext&,
u_int16_t ticket,
const std::string& queue,
const std::string& destination,
bool noAck );
- void offset( u_int16_t channel,
+ void offset(const qpid::framing::MethodContext&,
u_int64_t value );
- void ok( u_int16_t channel );
+ void ok( const qpid::framing::MethodContext& );
- void open( u_int16_t channel,
+ void open(const qpid::framing::MethodContext&,
const std::string& reference );
- void qos( u_int16_t channel,
+ void qos(const qpid::framing::MethodContext&,
u_int32_t prefetchSize,
u_int16_t prefetchCount,
bool global );
- void recover( u_int16_t channel,
+ void recover(const qpid::framing::MethodContext&,
bool requeue );
- void reject( u_int16_t channel,
+ void reject(const qpid::framing::MethodContext&,
u_int16_t code,
const std::string& text );
- void resume( u_int16_t channel,
+ void resume(const qpid::framing::MethodContext&,
const std::string& reference,
const std::string& identifier );
- void transfer( u_int16_t channel,
+ void transfer(const qpid::framing::MethodContext&,
u_int16_t ticket,
const std::string& destination,
bool redelivered,
diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp
index de081243ee..0c77a1c64a 100644
--- a/cpp/lib/common/framing/AMQMethodBody.cpp
+++ b/cpp/lib/common/framing/AMQMethodBody.cpp
@@ -34,7 +34,8 @@ bool AMQMethodBody::match(AMQMethodBody* other) const{
return other != 0 && other->amqpClassId() == amqpClassId() && other->amqpMethodId() == amqpMethodId();
}
-void AMQMethodBody::invoke(AMQP_ServerOperations& /*target*/, u_int16_t /*channel*/){
+void AMQMethodBody::invoke(AMQP_ServerOperations&, const MethodContext&){
+ assert(0);
THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server.");
}
diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h
index a7725079fb..a38f580aa5 100644
--- a/cpp/lib/common/framing/AMQMethodBody.h
+++ b/cpp/lib/common/framing/AMQMethodBody.h
@@ -43,13 +43,13 @@ class AMQMethodBody : public AMQBody
ProtocolVersion version;
u_int8_t type() const { return METHOD_BODY; }
AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {}
- AMQMethodBody(ProtocolVersion version) : version(version) {}
+ AMQMethodBody(ProtocolVersion ver) : version(ver) {}
virtual ~AMQMethodBody() {}
void decode(Buffer&, u_int32_t);
virtual u_int16_t amqpMethodId() const = 0;
virtual u_int16_t amqpClassId() const = 0;
- virtual void invoke(AMQP_ServerOperations& target, u_int16_t channel);
+ virtual void invoke(AMQP_ServerOperations&, const MethodContext&);
bool match(AMQMethodBody* other) const;
protected:
diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h
new file mode 100644
index 0000000000..13d5f658ca
--- /dev/null
+++ b/cpp/lib/common/framing/MethodContext.h
@@ -0,0 +1,55 @@
+#ifndef _framing_MethodContext_h
+#define _framing_MethodContext_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace framing {
+
+class BodyHandler;
+
+/**
+ * Invocation context for an AMQP method.
+ * All generated proxy and handler functions take a MethodContext parameter.
+ *
+ * The user calling on a broker proxy can simply pass an integer
+ * channel ID, it will implicitly be converted to an appropriate context.
+ *
+ * Other context members are for internal use.
+ */
+struct MethodContext
+{
+ /**
+ * Passing a integer channel-id in place of a MethodContext
+ * will automatically construct the MethodContext.
+ */
+ MethodContext(ChannelId channel, RequestId request=0)
+ : channelId(channel), requestId(request) {}
+
+ /** Channel on which the method is sent. */
+ ChannelId channelId;
+ /** \internal For proxy response: the original request or 0. */
+ RequestId requestId;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_MethodContext_h*/