diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-18 17:45:23 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-18 17:45:23 +0000 |
| commit | 7a26e1d6aa0018e44120bef39903c4ce55676141 (patch) | |
| tree | eb676a407cdffd905cfd6d3e5ce3443b3faa0e14 /cpp | |
| parent | 05306462d72913a31775420b024252dc0f71e30a (diff) | |
| download | qpid-python-7a26e1d6aa0018e44120bef39903c4ce55676141.tar.gz | |
* MethodContext: added invocation context for methods. Can carry RequestId
as well as ChannelId.
* gentools: AMQP_Clinet|ServerOperations and *Body::invoke(): pass MethodContex instead of channel ID.
* All *HandlerImpl classe: use MethodContext instead of channel ID.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 139 | ||||
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 36 | ||||
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.h | 30 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQMethodBody.cpp | 3 | ||||
| -rw-r--r-- | cpp/lib/common/framing/AMQMethodBody.h | 4 | ||||
| -rw-r--r-- | cpp/lib/common/framing/MethodContext.h | 55 |
6 files changed, 161 insertions, 106 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 16519ec646..58062d4471 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -25,9 +25,6 @@ namespace qpid { namespace broker { -// FIXME aconway 2007-01-18: Remove channel argument from signatures, -// adapter is already associated with a cahnnel. - using namespace qpid; using namespace qpid::framing; @@ -78,47 +75,47 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations public: ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void startOk(u_int16_t /*channel*/, + void startOk(const MethodContext& context, const qpid::framing::FieldTable& clientProperties, const std::string& mechanism, const std::string& response, const std::string& locale); - void secureOk(u_int16_t /*channel*/, const std::string& response); - void tuneOk(u_int16_t /*channel*/, u_int16_t channelMax, + void secureOk(const MethodContext& context, const std::string& response); + void tuneOk(const MethodContext& context, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); - void open(u_int16_t /*channel*/, const std::string& virtualHost, + void open(const MethodContext& context, const std::string& virtualHost, const std::string& capabilities, bool insist); - void close(u_int16_t /*channel*/, u_int16_t replyCode, + void close(const MethodContext& context, u_int16_t replyCode, const std::string& replyText, u_int16_t classId, u_int16_t methodId); - void closeOk(u_int16_t channel); + void closeOk(const MethodContext& context); }; class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{ public: ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void open(u_int16_t /*channel*/, const std::string& outOfBand); - void flow(u_int16_t /*channel*/, bool active); - void flowOk(u_int16_t /*channel*/, bool active); - void ok( u_int16_t channel ); - void ping( u_int16_t channel ); - void pong( u_int16_t channel ); - void resume( u_int16_t /*channel*/, const std::string& channelId ); - void close(u_int16_t /*channel*/, u_int16_t replyCode, const + void open(const MethodContext& context, const std::string& outOfBand); + void flow(const MethodContext& context, bool active); + void flowOk(const MethodContext& context, bool active); + void ok( const MethodContext& context ); + void ping( const MethodContext& context ); + void pong( const MethodContext& context ); + void resume( const MethodContext& context, const std::string& channelId ); + void close(const MethodContext& context, u_int16_t replyCode, const std::string& replyText, u_int16_t classId, u_int16_t methodId); - void closeOk(u_int16_t channel); + void closeOk(const MethodContext& context); }; class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{ public: ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void declare(u_int16_t /*channel*/, u_int16_t ticket, + void declare(const MethodContext& context, u_int16_t ticket, const std::string& exchange, const std::string& type, bool passive, bool durable, bool autoDelete, bool internal, bool nowait, const qpid::framing::FieldTable& arguments); - void delete_(u_int16_t /*channel*/, u_int16_t ticket, + void delete_(const MethodContext& context, u_int16_t ticket, const std::string& exchange, bool ifUnused, bool nowait); - void unbind(u_int16_t /*channel*/, + void unbind(const MethodContext& context, u_int16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments ); @@ -127,22 +124,22 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations class QueueHandlerImpl : private CoreRefs, public QueueHandler{ public: QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void declare(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, + void declare(const MethodContext& context, u_int16_t ticket, const std::string& queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); - void bind(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, + void bind(const MethodContext& context, u_int16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, bool nowait, const qpid::framing::FieldTable& arguments); - void unbind(u_int16_t /*channel*/, + void unbind(const MethodContext& context, u_int16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments ); - void purge(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, + void purge(const MethodContext& context, u_int16_t ticket, const std::string& queue, bool nowait); - void delete_(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, + void delete_(const MethodContext& context, u_int16_t ticket, const std::string& queue, bool ifUnused, bool ifEmpty, bool nowait); }; @@ -150,31 +147,31 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations class BasicHandlerImpl : private CoreRefs, public BasicHandler{ public: BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void qos(u_int16_t /*channel*/, u_int32_t prefetchSize, + void qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); void consume( - u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, + const MethodContext& context, u_int16_t ticket, const std::string& queue, const std::string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait, const qpid::framing::FieldTable& fields); - void cancel(u_int16_t /*channel*/, const std::string& consumerTag, + void cancel(const MethodContext& context, const std::string& consumerTag, bool nowait); - void publish(u_int16_t /*channel*/, u_int16_t ticket, + void publish(const MethodContext& context, u_int16_t ticket, const std::string& exchange, const std::string& routingKey, bool mandatory, bool immediate); - void get(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, + void get(const MethodContext& context, u_int16_t ticket, const std::string& queue, bool noAck); - void ack(u_int16_t /*channel*/, u_int64_t deliveryTag, bool multiple); - void reject(u_int16_t /*channel*/, u_int64_t deliveryTag, bool requeue); - void recover(u_int16_t /*channel*/, bool requeue); + void ack(const MethodContext& context, u_int64_t deliveryTag, bool multiple); + void reject(const MethodContext& context, u_int64_t deliveryTag, bool requeue); + void recover(const MethodContext& context, bool requeue); }; class TxHandlerImpl : private CoreRefs, public TxHandler{ public: TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void select(u_int16_t channel); - void commit(u_int16_t channel); - void rollback(u_int16_t channel); + void select(const MethodContext& context); + void commit(const MethodContext& context); + void rollback(const MethodContext& context); }; BasicHandlerImpl basicHandler; @@ -188,37 +185,37 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations }; void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk( - u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, + const MethodContext& , const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/){ connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat); } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){} -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(const MethodContext&, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ connection.framemax = framemax; connection.heartbeat = heartbeat; } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext&, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; connection.client->getConnection().openOk(0, knownhosts); } void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close( - u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, + const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { connection.client->getConnection().closeOk(0); connection.context->close(); } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){ connection.context->close(); } void BrokerAdapter::ServerOps::ChannelHandlerImpl::open( - u_int16_t /*channel*/, const string& /*outOfBand*/){ + const MethodContext&, const string& /*outOfBand*/){ // FIXME aconway 2007-01-17: Assertions on all channel methods, assertChannelNonZero(channel.getId()); if (channel.isOpen()) @@ -228,21 +225,21 @@ void BrokerAdapter::ServerOps::ChannelHandlerImpl::open( connection.client->getChannel().openOk(channel.getId(), std::string()/* ID */); } -void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} -void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} +void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} -void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, +void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/){ connection.client->getChannel().closeOk(channel.getId()); // FIXME aconway 2007-01-18: Following line destroys this. Ugly. connection.closeChannel(channel.getId()); } -void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} +void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&){} -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& exchange, const string& type, +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& exchange, const string& type, bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, const FieldTable& /*arguments*/){ @@ -271,7 +268,7 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t /*channel* void BrokerAdapter::ServerOps::ExchangeHandlerImpl::unbind( - u_int16_t /*channel*/, + const MethodContext&, u_int16_t /*ticket*/, const string& /*queue*/, const string& /*exchange*/, @@ -283,7 +280,7 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::unbind( -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t /*channel*/, u_int16_t /*ticket*/, +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/, const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused @@ -291,7 +288,7 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t /*channel* if(!nowait) connection.client->getExchange().deleteOk(channel.getId()); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& name, +void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; @@ -326,7 +323,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t /*channel*/, } } -void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, +void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, bool nowait, const FieldTable& arguments){ @@ -345,14 +342,14 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t /*channel*/, u_i } } -void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); int count = queue->purge(); if(!nowait) connection.client->getQueue().purgeOk(channel.getId(), count); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queue, +void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); @@ -378,7 +375,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t /*channel*/, -void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t /*channel*/, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext&, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); @@ -386,7 +383,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t /*channel*/, u_in } void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( - u_int16_t /*channel*/, u_int16_t /*ticket*/, + const MethodContext&, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait, const FieldTable& fields) @@ -413,13 +410,13 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( } -void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t /*channel*/, const string& consumerTag, bool nowait){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext&, const string& consumerTag, bool nowait){ channel.cancel(consumerTag); if(!nowait) connection.client->getBasic().cancelOk(channel.getId(), consumerTag); } -void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t /*channel*/, u_int16_t /*ticket*/, +void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/, const string& exchangeName, const string& routingKey, bool mandatory, bool immediate){ @@ -433,7 +430,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t /*channel*/, } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!connection.getChannel(channel.getId()).get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack @@ -442,7 +439,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t /*channel*/, u_in } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t /*channel*/, u_int64_t deliveryTag, bool multiple){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ try{ channel.ack(deliveryTag, multiple); }catch(InvalidAckException& e){ @@ -450,23 +447,23 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t /*channel*/, u_in } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} +void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t /*channel*/, bool requeue){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ channel.recover(requeue); } -void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t /*channel*/){ +void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext&){ channel.begin(); connection.client->getTx().selectOk(channel.getId()); } -void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t /*channel*/){ +void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext&){ channel.commit(); connection.client->getTx().commitOk(channel.getId()); } -void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t /*channel*/){ +void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext&){ channel.rollback(); connection.client->getTx().rollbackOk(channel.getId()); @@ -475,7 +472,7 @@ void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t /*channel*/){ void BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( - u_int16_t /*channel*/, + const MethodContext&, u_int16_t /*ticket*/, const string& /*queue*/, const string& /*exchange*/, @@ -486,26 +483,26 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void BrokerAdapter::ServerOps::ChannelHandlerImpl::resume( - u_int16_t /*channel*/, + const MethodContext&, const string& /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 0c0f9e96eb..07ede4097a 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -23,11 +23,13 @@ namespace qpid { namespace broker { +using namespace framing; + // // Message class method handlers // void -MessageHandlerImpl::append( u_int16_t /*channel*/, +MessageHandlerImpl::append(const MethodContext&, const string& /*reference*/, const string& /*bytes*/ ) { @@ -36,18 +38,18 @@ MessageHandlerImpl::append( u_int16_t /*channel*/, void -MessageHandlerImpl::cancel( u_int16_t channel, +MessageHandlerImpl::cancel( const MethodContext& context, const string& destination ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - connection.getChannel(channel).cancel(destination); + channel.cancel(destination); - connection.client->getMessageHandler()->ok(channel); + connection.client->getMessageHandler()->ok(context); } void -MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, +MessageHandlerImpl::checkpoint(const MethodContext&, const string& /*reference*/, const string& /*identifier*/ ) { @@ -55,14 +57,14 @@ MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, } void -MessageHandlerImpl::close( u_int16_t /*channel*/, +MessageHandlerImpl::close(const MethodContext&, const string& /*reference*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -MessageHandlerImpl::consume( u_int16_t /*channel*/, +MessageHandlerImpl::consume(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, const string& destination, @@ -94,13 +96,13 @@ MessageHandlerImpl::consume( u_int16_t /*channel*/, } void -MessageHandlerImpl::empty( u_int16_t /*channel*/ ) +MessageHandlerImpl::empty( const MethodContext& ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -MessageHandlerImpl::get( u_int16_t /*channelId*/, +MessageHandlerImpl::get( const MethodContext&, u_int16_t /*ticket*/, const string& queueName, const string& /*destination*/, @@ -119,27 +121,27 @@ MessageHandlerImpl::get( u_int16_t /*channelId*/, } void -MessageHandlerImpl::offset( u_int16_t /*channel*/, +MessageHandlerImpl::offset(const MethodContext&, u_int64_t /*value*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -MessageHandlerImpl::ok( u_int16_t /*channel*/ ) +MessageHandlerImpl::ok( const MethodContext& ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -MessageHandlerImpl::open( u_int16_t /*channel*/, +MessageHandlerImpl::open(const MethodContext&, const string& /*reference*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -MessageHandlerImpl::qos( u_int16_t /*channel*/, +MessageHandlerImpl::qos(const MethodContext&, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/ ) @@ -154,7 +156,7 @@ MessageHandlerImpl::qos( u_int16_t /*channel*/, } void -MessageHandlerImpl::recover( u_int16_t /*channel*/, +MessageHandlerImpl::recover(const MethodContext&, bool requeue ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature @@ -164,7 +166,7 @@ MessageHandlerImpl::recover( u_int16_t /*channel*/, } void -MessageHandlerImpl::reject( u_int16_t /*channel*/, +MessageHandlerImpl::reject(const MethodContext&, u_int16_t /*code*/, const string& /*text*/ ) { @@ -172,7 +174,7 @@ MessageHandlerImpl::reject( u_int16_t /*channel*/, } void -MessageHandlerImpl::resume( u_int16_t /*channel*/, +MessageHandlerImpl::resume(const MethodContext&, const string& /*reference*/, const string& /*identifier*/ ) { @@ -180,7 +182,7 @@ MessageHandlerImpl::resume( u_int16_t /*channel*/, } void -MessageHandlerImpl::transfer( u_int16_t /*channel*/, +MessageHandlerImpl::transfer(const MethodContext&, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 77e30abe05..d97d29153a 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -37,21 +37,21 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) : channel(ch), connection(c), broker(b) {} - void append( u_int16_t channel, + void append(const qpid::framing::MethodContext&, const std::string& reference, const std::string& bytes ); - void cancel( u_int16_t channel, + void cancel(const qpid::framing::MethodContext&, const std::string& destination ); - void checkpoint( u_int16_t channel, + void checkpoint(const qpid::framing::MethodContext&, const std::string& reference, const std::string& identifier ); - void close( u_int16_t channel, + void close(const qpid::framing::MethodContext&, const std::string& reference ); - void consume( u_int16_t channel, + void consume(const qpid::framing::MethodContext&, u_int16_t ticket, const std::string& queue, const std::string& destination, @@ -60,39 +60,39 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH bool exclusive, const qpid::framing::FieldTable& filter ); - void empty( u_int16_t channel ); + void empty( const qpid::framing::MethodContext& ); - void get( u_int16_t channel, + void get(const qpid::framing::MethodContext&, u_int16_t ticket, const std::string& queue, const std::string& destination, bool noAck ); - void offset( u_int16_t channel, + void offset(const qpid::framing::MethodContext&, u_int64_t value ); - void ok( u_int16_t channel ); + void ok( const qpid::framing::MethodContext& ); - void open( u_int16_t channel, + void open(const qpid::framing::MethodContext&, const std::string& reference ); - void qos( u_int16_t channel, + void qos(const qpid::framing::MethodContext&, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global ); - void recover( u_int16_t channel, + void recover(const qpid::framing::MethodContext&, bool requeue ); - void reject( u_int16_t channel, + void reject(const qpid::framing::MethodContext&, u_int16_t code, const std::string& text ); - void resume( u_int16_t channel, + void resume(const qpid::framing::MethodContext&, const std::string& reference, const std::string& identifier ); - void transfer( u_int16_t channel, + void transfer(const qpid::framing::MethodContext&, u_int16_t ticket, const std::string& destination, bool redelivered, diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp index de081243ee..0c77a1c64a 100644 --- a/cpp/lib/common/framing/AMQMethodBody.cpp +++ b/cpp/lib/common/framing/AMQMethodBody.cpp @@ -34,7 +34,8 @@ bool AMQMethodBody::match(AMQMethodBody* other) const{ return other != 0 && other->amqpClassId() == amqpClassId() && other->amqpMethodId() == amqpMethodId(); } -void AMQMethodBody::invoke(AMQP_ServerOperations& /*target*/, u_int16_t /*channel*/){ +void AMQMethodBody::invoke(AMQP_ServerOperations&, const MethodContext&){ + assert(0); THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server."); } diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h index a7725079fb..a38f580aa5 100644 --- a/cpp/lib/common/framing/AMQMethodBody.h +++ b/cpp/lib/common/framing/AMQMethodBody.h @@ -43,13 +43,13 @@ class AMQMethodBody : public AMQBody ProtocolVersion version; u_int8_t type() const { return METHOD_BODY; } AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {} - AMQMethodBody(ProtocolVersion version) : version(version) {} + AMQMethodBody(ProtocolVersion ver) : version(ver) {} virtual ~AMQMethodBody() {} void decode(Buffer&, u_int32_t); virtual u_int16_t amqpMethodId() const = 0; virtual u_int16_t amqpClassId() const = 0; - virtual void invoke(AMQP_ServerOperations& target, u_int16_t channel); + virtual void invoke(AMQP_ServerOperations&, const MethodContext&); bool match(AMQMethodBody* other) const; protected: diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h new file mode 100644 index 0000000000..13d5f658ca --- /dev/null +++ b/cpp/lib/common/framing/MethodContext.h @@ -0,0 +1,55 @@ +#ifndef _framing_MethodContext_h +#define _framing_MethodContext_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace qpid { +namespace framing { + +class BodyHandler; + +/** + * Invocation context for an AMQP method. + * All generated proxy and handler functions take a MethodContext parameter. + * + * The user calling on a broker proxy can simply pass an integer + * channel ID, it will implicitly be converted to an appropriate context. + * + * Other context members are for internal use. + */ +struct MethodContext +{ + /** + * Passing a integer channel-id in place of a MethodContext + * will automatically construct the MethodContext. + */ + MethodContext(ChannelId channel, RequestId request=0) + : channelId(channel), requestId(request) {} + + /** Channel on which the method is sent. */ + ChannelId channelId; + /** \internal For proxy response: the original request or 0. */ + RequestId requestId; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_MethodContext_h*/ |
