diff options
Diffstat (limited to 'cpp/lib')
39 files changed, 686 insertions, 378 deletions
diff --git a/cpp/lib/Makefile.am b/cpp/lib/Makefile.am index 377cd4ede4..09a689bc76 100644 --- a/cpp/lib/Makefile.am +++ b/cpp/lib/Makefile.am @@ -1 +1 @@ -SUBDIRS = client common broker +SUBDIRS = common broker client diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index 27d2fec006..e2ca88d4d0 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -30,7 +30,6 @@ #include <MessageStore.h> #include <AutoDelete.h> #include <ExchangeRegistry.h> -#include <BrokerChannel.h> #include <ConnectionToken.h> #include <DirectExchange.h> #include <OutputHandler.h> diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 8b081874fc..ec80241c66 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -18,11 +18,10 @@ #include <boost/format.hpp> #include "BrokerAdapter.h" +#include "BrokerChannel.h" #include "Connection.h" -#include "Exception.h" #include "AMQMethodBody.h" #include "Exception.h" -#include "MessageHandlerImpl.h" namespace qpid { namespace broker { @@ -33,18 +32,37 @@ using namespace qpid::framing; typedef std::vector<Queue::shared_ptr> QueueVector; -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk( - const MethodContext& context , const FieldTable& /*clientProperties*/, + +BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : + CoreRefs(ch, c, b), + connection(c), + basicHandler(*this), + channelHandler(*this), + connectionHandler(*this), + exchangeHandler(*this), + messageHandler(*this), + queueHandler(*this), + txHandler(*this) +{} + + +ProtocolVersion BrokerAdapter::getVersion() const { + return connection.getVersion(); +} + +void BrokerAdapter::ConnectionHandlerImpl::startOk( + const MethodContext&, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/){ - connection.client->getConnection().tune( - context, 100, connection.getFrameMax(), connection.getHeartbeat()); + const string& /*response*/, const string& /*locale*/) +{ + client.tune( + 100, connection.getFrameMax(), connection.getHeartbeat()); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk( +void BrokerAdapter::ConnectionHandlerImpl::secureOk( const MethodContext&, const string& /*response*/){} -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk( +void BrokerAdapter::ConnectionHandlerImpl::tuneOk( const MethodContext&, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat) { @@ -52,50 +70,55 @@ void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk( connection.setHeartbeat(heartbeat); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void BrokerAdapter::ConnectionHandlerImpl::open( + const MethodContext& context, const string& /*virtualHost*/, + const string& /*capabilities*/, bool /*insist*/) +{ string knownhosts; - connection.client->getConnection().openOk(context, knownhosts); + client.openOk( + knownhosts, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close( +void BrokerAdapter::ConnectionHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - connection.client->getConnection().closeOk(context); + client.closeOk(context.getRequestId()); connection.getOutput().close(); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ +void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ connection.getOutput().close(); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open( +void BrokerAdapter::ChannelHandlerImpl::open( const MethodContext& context, const string& /*outOfBand*/){ channel.open(); // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - connection.client->getChannel().openOk(context, std::string()/* ID */); + client.openOk( + std::string()/* ID */, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close( +void BrokerAdapter::ChannelHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - connection.client->getChannel().closeOk(context); + client.closeOk(context.getRequestId()); // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. connection.closeChannel(channel.getId()); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} +void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} -void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ +void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ if(passive){ if(!broker.getExchanges().get(exchange)) { @@ -116,27 +139,30 @@ void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodCont } } if(!nowait){ - connection.client->getExchange().declareOk(context); + client.declareOk(context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ +void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused broker.getExchanges().destroy(exchange); - if(!nowait) connection.client->getExchange().deleteOk(context); + if(!nowait) client.deleteOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ +void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = connection.getQueue(name, channel.getId()); } else { std::pair<Queue::shared_ptr, bool> queue_created = - broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0); + broker.getQueues().declare( + name, durable, + autoDelete ? connection.getTimeout() : 0, + exclusive ? &connection : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -161,20 +187,22 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext % queue->getName()); if (!nowait) { string queueName = queue->getName(); - connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount()); + client.declareOk( + queueName, queue->getMessageCount(), queue->getConsumerCount(), + context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ +void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) connection.client->getQueue().bindOk(context); + if(!nowait) client.bindOk(context.getRequestId()); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -182,7 +210,7 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& c } void -BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind( +BrokerAdapter::QueueHandlerImpl::unbind( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, @@ -198,18 +226,18 @@ BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind( exchange->unbind(queue, routingKey, &arguments); - connection.client->getQueue().unbindOk(context); + client.unbindOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); int count = queue->purge(); - if(!nowait) connection.client->getQueue().purgeOk(context, count); + if(!nowait) client.purgeOk( count, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ +void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); @@ -228,20 +256,21 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext broker.getQueues().destroy(queue); } - if(!nowait) connection.client->getQueue().deleteOk(context, count); + if(!nowait) + client.deleteOk(count, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getBasic().qosOk(context); + client.qosOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( +void BrokerAdapter::BasicHandlerImpl::consume( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, @@ -257,19 +286,19 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( channel.consume( newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) connection.client->getBasic().consumeOk(context, newTag); + if(!nowait) client.consumeOk(newTag, context.getRequestId()); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ +void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ channel.cancel(consumerTag); - if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag); + if(!nowait) client.cancelOk(consumerTag, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( +void BrokerAdapter::BasicHandlerImpl::publish( const MethodContext& context, u_int16_t /*ticket*/, const string& exchangeName, const string& routingKey, bool mandatory, bool immediate) @@ -287,16 +316,16 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){ string clusterId;//not used, part of an imatix hack - connection.client->getBasic().getEmpty(context, clusterId); + client.getEmpty(clusterId, context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ +void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ try{ channel.ack(deliveryTag, multiple); }catch(InvalidAckException& e){ @@ -304,31 +333,31 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} +void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ +void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ channel.recover(requeue); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ channel.begin(); - connection.client->getTx().selectOk(context); + client.selectOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ channel.commit(); - connection.client->getTx().commitOk(context); + client.commitOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ channel.rollback(); - connection.client->getTx().rollbackOk(context); + client.rollbackOk(context.getRequestId()); channel.recover(false); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) +BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) { //no specific action required, generic response handling should be sufficient } @@ -338,21 +367,21 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) // Message class method handlers // void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) +BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) { - connection.client->getChannel().ok(context); - connection.client->getChannel().pong(context); + client.ok(context.getRequestId()); + client.pong(); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) +BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) { - connection.client->getChannel().ok(context); + client.ok(context.getRequestId()); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume( +BrokerAdapter::ChannelHandlerImpl::resume( const MethodContext&, const string& /*channel*/ ) { diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h index cd34f17e58..166ec78ddd 100644 --- a/cpp/lib/broker/BrokerAdapter.h +++ b/cpp/lib/broker/BrokerAdapter.h @@ -19,8 +19,9 @@ * */ #include "AMQP_ServerOperations.h" +#include "HandlerImpl.h" #include "MessageHandlerImpl.h" -#include "BrokerChannel.h" +#include "Exception.h" namespace qpid { namespace broker { @@ -28,14 +29,6 @@ namespace broker { class Channel; class Connection; class Broker; - -/** - * Per-channel protocol adapter. - * - * Translates protocol bodies into calls on the core Channel, - * Connection and Broker objects. - */ - class ChannelHandler; class ConnectionHandler; class BasicHandler; @@ -48,20 +41,23 @@ class FileHandler; class StreamHandler; class DtxHandler; class TunnelHandler; +class MessageHandlerImpl; -class BrokerAdapter : public framing::AMQP_ServerOperations +/** + * Per-channel protocol adapter. + * + * A container for a collection of AMQP-class adapters that translate + * AMQP method bodies into calls on the core Channel, Connection and + * Broker objects. Each adapter class also provides a client proxy + * to send methods to the peer. + * + */ +class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations { public: - BrokerAdapter(Channel& ch, Connection& c, Broker& b) : - basicHandler(ch, c, b), - channelHandler(ch, c, b), - connectionHandler(ch, c, b), - exchangeHandler(ch, c, b), - messageHandler(ch, c, b), - queueHandler(ch, c, b), - txHandler(ch, c, b) - {} - + BrokerAdapter(Channel& ch, Connection& c, Broker& b); + + framing::ProtocolVersion getVersion() const; ChannelHandler* getChannelHandler() { return &channelHandler; } ConnectionHandler* getConnectionHandler() { return &connectionHandler; } BasicHandler* getBasicHandler() { return &basicHandler; } @@ -80,19 +76,16 @@ class BrokerAdapter : public framing::AMQP_ServerOperations TunnelHandler* getTunnelHandler() { throw ConnectionException(540, "Tunnel class not implemented"); } + framing::AMQP_ClientProxy& getProxy() { return proxy; } + private: - struct CoreRefs { - CoreRefs(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b) {} - Channel& channel; - Connection& connection; - Broker& broker; - }; - - class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler { + class ConnectionHandlerImpl : + public ConnectionHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Connection> + { public: - ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} void startOk(const framing::MethodContext& context, const qpid::framing::FieldTable& clientProperties, @@ -112,9 +105,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations void closeOk(const framing::MethodContext& context); }; - class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{ + class ChannelHandlerImpl : + public ChannelHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Channel> + { public: - ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void open(const framing::MethodContext& context, const std::string& outOfBand); void flow(const framing::MethodContext& context, bool active); void flowOk(const framing::MethodContext& context, bool active); @@ -127,9 +124,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations void closeOk(const framing::MethodContext& context); }; - class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{ + class ExchangeHandlerImpl : + public ExchangeHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Exchange> + { public: - ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& exchange, const std::string& type, bool passive, bool durable, bool autoDelete, @@ -139,9 +140,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations const std::string& exchange, bool ifUnused, bool nowait); }; - class QueueHandlerImpl : private CoreRefs, public QueueHandler{ + class QueueHandlerImpl : + public QueueHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Queue> + { public: - QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, @@ -162,9 +167,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations bool nowait); }; - class BasicHandlerImpl : private CoreRefs, public BasicHandler{ + class BasicHandlerImpl : + public BasicHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Basic> + { public: - BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void qos(const framing::MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); void consume( @@ -184,14 +193,19 @@ class BrokerAdapter : public framing::AMQP_ServerOperations void recover(const framing::MethodContext& context, bool requeue); }; - class TxHandlerImpl : private CoreRefs, public TxHandler{ + class TxHandlerImpl : + public TxHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Tx> + { public: - TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void select(const framing::MethodContext& context); void commit(const framing::MethodContext& context); void rollback(const framing::MethodContext& context); }; + Connection& connection; BasicHandlerImpl basicHandler; ChannelHandlerImpl channelHandler; ConnectionHandlerImpl connectionHandler; @@ -199,7 +213,7 @@ class BrokerAdapter : public framing::AMQP_ServerOperations MessageHandlerImpl messageHandler; QueueHandlerImpl queueHandler; TxHandlerImpl txHandler; - + }; }} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 07636216a6..74e5504f17 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -25,6 +25,8 @@ #include <algorithm> #include <functional> +#include <boost/bind.hpp> + #include "BrokerChannel.h" #include "DeletingTxOp.h" #include "framing/ChannelAdapter.h" @@ -50,7 +52,7 @@ Channel::Channel( u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold ) : - ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()), + ChannelAdapter(id, &con.getOutput(), con.getVersion()), connection(con), currentDeliveryTag(1), transactional(false), @@ -74,46 +76,32 @@ bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, +// TODO aconway 2007-02-12: Why is connection token passed in instead +// of using the channel's parent connection? +void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { - if(tag.empty()) tag = tagGenerator.generate(); - ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); - try{ - queue->consume(c, exclusive);//may throw exception - consumers[tag] = c; - } catch(...) { - // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt. - delete c; - throw; - } -} - -void Channel::cancel(consumer_iterator i){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } + if(tagInOut.empty()) + tagInOut = tagGenerator.generate(); + std::auto_ptr<ConsumerImpl> c( + new ConsumerImpl(this, tagInOut, queue, connection, acks)); + queue->consume(c.get(), exclusive);//may throw exception + consumers.insert(tagInOut, c.release()); } void Channel::cancel(const string& tag){ - consumer_iterator i = consumers.find(tag); - if(i != consumers.end()){ - cancel(i); - } + // consumers is a ptr_map so erase will delete the consumer + // which will call cancel. + ConsumerImplMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + consumers.erase(i); } void Channel::close(){ - if (isOpen()) { - opened = false; - while (!consumers.empty()) - cancel(consumers.begin()); - //requeue: - recover(true); - } + opened = false; + consumers.clear(); + recover(true); } void Channel::begin(){ @@ -160,14 +148,10 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){ } Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack) : parent(_parent), - tag(_tag), - queue(_queue), - connection(_connection), - ackExpected(ack), - blocked(false){ -} + Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack +) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), + ackExpected(ack), blocked(false) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local @@ -182,12 +166,18 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ return false; } +Channel::ConsumerImpl::~ConsumerImpl() { + cancel(); +} + void Channel::ConsumerImpl::cancel(){ - if(queue) queue->cancel(this); + if(queue) + queue->cancel(this); } void Channel::ConsumerImpl::requestDispatch(){ - if(blocked) queue->dispatch(); + if(blocked) + queue->dispatch(); } void Channel::handleInlineTransfer(Message::shared_ptr msg) @@ -196,11 +186,15 @@ void Channel::handleInlineTransfer(Message::shared_ptr msg) connection.broker.getExchanges().get(msg->getExchange()); if(transactional){ TxPublish* deliverable = new TxPublish(msg); - exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route( + *deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable)); }else{ DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route( + deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); } } @@ -244,7 +238,8 @@ void Channel::ack(){ ack(getRequestInProgress(), false); } -void Channel::ack(u_int64_t deliveryTag, bool multiple){ +void Channel::ack(u_int64_t deliveryTag, bool multiple) +{ if(transactional){ accumulatedAck.update(deliveryTag, multiple); //TODO: I think the outstanding prefetch size & count should be updated at this point... @@ -271,9 +266,8 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ //if the prefetch limit had previously been reached, there may //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ - j->second->requestDispatch(); - } + std::for_each(consumers.begin(), consumers.end(), + boost::bind(&ConsumerImpl::requestDispatch, _1)); } } @@ -328,8 +322,8 @@ void Channel::handleMethodInContext( method->invoke(*adapter, context); } }catch(ChannelException& e){ - connection.client->getChannel().close( - context, e.code, e.toString(), + adapter->getProxy().getChannel().close( + e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); connection.closeChannel(getId()); }catch(ConnectionException& e){ @@ -338,4 +332,3 @@ void Channel::handleMethodInContext( connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } - diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 58c4f0a45b..538e86b0a8 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -23,10 +23,10 @@ */ #include <list> -#include <map> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> +#include <boost/ptr_container/ptr_map.hpp> #include <AccumulatedAck.h> #include <Consumer.h> @@ -56,7 +56,7 @@ using framing::string; class Channel : public framing::ChannelAdapter, public CompletionHandler { - class ConsumerImpl : public virtual Consumer + class ConsumerImpl : public Consumer { Channel* parent; const string tag; @@ -64,23 +64,25 @@ class Channel : public framing::ChannelAdapter, ConnectionToken* const connection; const bool ackExpected; bool blocked; + public: ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); + ~ConsumerImpl(); virtual bool deliver(Message::shared_ptr& msg); void cancel(); void requestDispatch(); }; - typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; + typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; Connection& connection; u_int16_t id; u_int64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; bool transactional; - std::map<string, ConsumerImpl*> consumers; + ConsumerImplMap consumers; u_int32_t prefetchSize; u_int16_t prefetchCount; Prefetch outstanding; @@ -93,18 +95,17 @@ class Channel : public framing::ChannelAdapter, MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message bool opened; - boost::scoped_ptr<BrokerAdapter> adapter; // completion handler for MessageBuilder void complete(Message::shared_ptr msg); - void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); - void cancel(consumer_iterator consumer); + void deliver(Message::shared_ptr& msg, const string& tag, + Queue::shared_ptr& queue, bool ackExpected); bool checkPrefetch(Message::shared_ptr& msg); public: - Channel(Connection& channel, + Channel(Connection& parent, framing::ChannelId id, u_int32_t framesize, MessageStore* const _store = 0, @@ -112,8 +113,8 @@ class Channel : public framing::ChannelAdapter, ~Channel(); - // For ChannelAdapter bool isOpen() const { return opened; } + BrokerAdapter& getAdatper() { return *adapter; } void open() { opened = true; } void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } @@ -122,7 +123,11 @@ class Channel : public framing::ChannelAdapter, u_int16_t setPrefetchCount(u_int16_t n){ return prefetchCount = n; } bool exists(const string& consumerTag); - void consume(string& tag, Queue::shared_ptr queue, bool acks, + + /** + *@param tagInOut - if empty it is updated with the generated token. + */ + void consume(string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0, const framing::FieldTable* = 0); void cancel(const string& tag); @@ -146,7 +151,6 @@ class Channel : public framing::ChannelAdapter, void handleMethodInContext( boost::shared_ptr<framing::AMQMethodBody> method, const framing::MethodContext& context); - }; struct InvalidAckException{}; diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index e8a993942a..bff4492a49 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -28,6 +28,9 @@ #include <MessageStore.h> #include <BasicDeliverBody.h> #include <BasicGetOkBody.h> +#include <AMQContentBody.h> +#include <AMQHeaderBody.h> +#include "AMQMethodBody.h" #include "AMQFrame.h" #include "framing/ChannelAdapter.h" diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 9e77eab446..871514e55f 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -22,12 +22,10 @@ * */ -#include <BrokerMessageBase.h> #include <memory> #include <boost/shared_ptr.hpp> -#include <AMQContentBody.h> -#include <AMQHeaderBody.h> -#include "AMQMethodBody.h" + +#include <BrokerMessageBase.h> #include <BasicHeaderProperties.h> #include <ConnectionToken.h> #include <Content.h> @@ -39,6 +37,7 @@ namespace qpid { namespace framing { class MethodContext; class ChannelAdapter; +class AMQHeaderBody; } namespace broker { @@ -52,7 +51,7 @@ using framing::string; * request. */ class BasicMessage : public Message { - framing::AMQHeaderBody::shared_ptr header; + boost::shared_ptr<framing::AMQHeaderBody> header; std::auto_ptr<Content> content; sys::Mutex contentLock; u_int64_t size; @@ -65,10 +64,10 @@ class BasicMessage : public Message { BasicMessage(const ConnectionToken* const publisher, const string& exchange, const string& routingKey, bool mandatory, bool immediate, - framing::AMQMethodBody::shared_ptr respondTo); + boost::shared_ptr<framing::AMQMethodBody> respondTo); BasicMessage(); ~BasicMessage(); - void setHeader(framing::AMQHeaderBody::shared_ptr header); + void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header); void addContent(framing::AMQContentBody::shared_ptr data); bool isComplete(); diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index 41a0cd45fa..3bba95a5f8 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -22,14 +22,10 @@ * */ -#include "AMQContentBody.h" -#include "AMQHeaderBody.h" -#include "AMQMethodBody.h" -#include "Content.h" -#include "framing/amqp_types.h" - #include <string> #include <boost/shared_ptr.hpp> +#include "Content.h" +#include "framing/amqp_types.h" namespace qpid { @@ -38,6 +34,9 @@ class MethodContext; class ChannelAdapter; class BasicHeaderProperties; class FieldTable; +class AMQMethodBody; +class AMQContentBody; +class AMQHeaderBody; } @@ -50,24 +49,17 @@ class MessageStore; * abstracting away the operations * TODO; AMS: for the moment this is mostly a placeholder */ -class Message{ - const ConnectionToken* publisher; - std::string exchange; - std::string routingKey; - const bool mandatory; - const bool immediate; - u_int64_t persistenceId; - bool redelivered; - framing::AMQMethodBody::shared_ptr respondTo; - +class Message { public: typedef boost::shared_ptr<Message> shared_ptr; + typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr; + Message(const ConnectionToken* publisher_, const std::string& _exchange, const std::string& _routingKey, bool _mandatory, bool _immediate, - framing::AMQMethodBody::shared_ptr respondTo_) : + AMQMethodBodyPtr respondTo_) : publisher(publisher_), exchange(_exchange), routingKey(_routingKey), @@ -92,9 +84,7 @@ class Message{ const std::string& getExchange() const { return exchange; } u_int64_t getPersistenceId() const { return persistenceId; } bool getRedelivered() const { return redelivered; } - framing::AMQMethodBody::shared_ptr getRespondTo() const { - return respondTo; - } + AMQMethodBodyPtr getRespondTo() const { return respondTo; } void setRouting(const std::string& _exchange, const std::string& _routingKey) { exchange = _exchange; routingKey = _routingKey; } @@ -168,14 +158,24 @@ class Message{ * it uses). */ virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; - virtual void setHeader(framing::AMQHeaderBody::shared_ptr /*header*/) {}; - virtual void addContent(framing::AMQContentBody::shared_ptr /*data*/) {}; + virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {}; + virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {}; /** * Releases the in-memory content data held by this * message. Must pass in a store from which the data can * be reloaded. */ virtual void releaseContent(MessageStore* /*store*/) {}; + + private: + const ConnectionToken* publisher; + std::string exchange; + std::string routingKey; + const bool mandatory; + const bool immediate; + u_int64_t persistenceId; + bool redelivered; + AMQMethodBodyPtr respondTo; }; }} diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index 99c045c59a..789e652947 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -149,7 +149,9 @@ void Queue::consume(Consumer* c, bool requestExclusive){ void Queue::cancel(Consumer* c){ Mutex::ScopedLock locker(lock); - consumers.erase(find(consumers.begin(), consumers.end(), c)); + Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); + if (i != consumers.end()) + consumers.erase(i); if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC; if(exclusive == c) exclusive = 0; } diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h index 40fa4bd415..015b27fe76 100644 --- a/cpp/lib/broker/BrokerQueue.h +++ b/cpp/lib/broker/BrokerQueue.h @@ -53,13 +53,17 @@ namespace qpid { * or more consumers registers. */ class Queue{ + typedef std::vector<Consumer*> Consumers; + typedef std::queue<Binding*> Bindings; + typedef std::queue<Message::shared_ptr> Messages; + const string name; const u_int32_t autodelete; MessageStore* const store; const ConnectionToken* const owner; - std::vector<Consumer*> consumers; - std::queue<Binding*> bindings; - std::queue<Message::shared_ptr> messages; + Consumers consumers; + Bindings bindings; + Messages messages; bool queueing; bool dispatching; int next; diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index 000199a65e..3d9e5cdaf8 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -22,6 +22,9 @@ #include <assert.h> #include "Connection.h" +#include "BrokerChannel.h" +#include "AMQP_ClientProxy.h" +#include "BrokerAdapter.h" using namespace boost; using namespace qpid::sys; @@ -33,12 +36,15 @@ namespace broker { Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : broker(broker_), - settings(broker.getTimeout(), broker.getStagingThreshold()), out(out_), framemax(65536), - heartbeat(0) + heartbeat(0), + client(0), + timeout(broker.getTimeout()), + stagingThreshold(broker.getStagingThreshold()) {} + Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ Queue::shared_ptr queue; if (name.empty()) { @@ -59,31 +65,27 @@ Exchange::shared_ptr Connection::findExchange(const string& name){ } -void Connection::received(qpid::framing::AMQFrame* frame){ +void Connection::received(framing::AMQFrame* frame){ getChannel(frame->getChannel()).handleBody(frame->getBody()); } -void Connection::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId){ - client->getConnection().close(MethodContext(&getChannel(0)), code, text, classId, methodId); +void Connection::close( + ReplyCode code, const string& text, ClassId classId, MethodId methodId) +{ + client->close(code, text, classId, methodId); getOutput().close(); } -// TODO aconway 2007-02-02: Should be delegated to the BrokerAdapter -// as it is part of the protocol. -void Connection::initiated(qpid::framing::ProtocolInitiation* header) { - if (client.get()) - // TODO aconway 2007-01-16: correct error code. - throw ConnectionException(0, "Connection initiated twice"); - client.reset(new qpid::framing::AMQP_ClientProxy( - out, header->getMajor(), header->getMinor())); +void Connection::initiated(framing::ProtocolInitiation* header) { + version = ProtocolVersion(header->getMajor(), header->getMinor()); FieldTable properties; string mechanisms("PLAIN"); string locales("en_US"); - client->getConnection().start( - MethodContext(&getChannel(0)), + getChannel(0).init(0, *out, getVersion()); + client = &getChannel(0).getAdatper().getProxy().getConnection(); + client->start( header->getMajor(), header->getMinor(), properties, mechanisms, locales); - getChannel(0).init(0, *out, client->getProtocolVersion()); } void Connection::idleOut(){} @@ -103,9 +105,10 @@ void Connection::closed(){ } } -void Connection::closeChannel(u_int16_t channel) { - getChannel(channel).close(); - channels.erase(channels.find(channel)); +void Connection::closeChannel(u_int16_t id) { + ChannelMap::iterator i = channels.find(id); + if (i != channels.end()) + i->close(); } @@ -115,7 +118,7 @@ Channel& Connection::getChannel(ChannelId id) { i = channels.insert( id, new Channel( *this, id, framemax, broker.getQueues().getStore(), - settings.stagingThreshold)).first; + broker.getStagingThreshold())).first; } return *i; } diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h index 4f1156dd01..27faab4967 100644 --- a/cpp/lib/broker/Connection.h +++ b/cpp/lib/broker/Connection.h @@ -27,66 +27,64 @@ #include <boost/ptr_container/ptr_map.hpp> #include <AMQFrame.h> -#include <AMQP_ClientProxy.h> #include <AMQP_ServerOperations.h> +#include <AMQP_ClientProxy.h> #include <sys/ConnectionOutputHandler.h> #include <sys/ConnectionInputHandler.h> #include <sys/TimeoutHandler.h> +#include "framing/ProtocolVersion.h" #include "Broker.h" #include "Exception.h" +#include "BrokerChannel.h" namespace qpid { namespace broker { -class Settings { - public: - const u_int32_t timeout;//timeout for auto-deleted queues (in ms) - const u_int64_t stagingThreshold; - - Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {} -}; +class Channel; class Connection : public sys::ConnectionInputHandler, public ConnectionToken { public: Connection(sys::ConnectionOutputHandler* out, Broker& broker); - // ConnectionInputHandler methods - void received(framing::AMQFrame* frame); - void initiated(framing::ProtocolInitiation* header); - void idleOut(); - void idleIn(); - void closed(); - sys::ConnectionOutputHandler& getOutput() { return *out; } + /** Get a channel. Create if it does not already exist */ + Channel& getChannel(framing::ChannelId channel); - const framing::ProtocolVersion& getVersion() { - return client->getProtocolVersion(); } + /** Close a channel */ + void closeChannel(framing::ChannelId channel); + + /** Close the connection */ + void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); + + sys::ConnectionOutputHandler& getOutput() const { return *out; } + framing::ProtocolVersion getVersion() const { return version; } u_int32_t getFrameMax() const { return framemax; } u_int16_t getHeartbeat() const { return heartbeat; } + u_int32_t getTimeout() const { return timeout; } + u_int64_t getStagingThreshold() const { return stagingThreshold; } void setFrameMax(u_int32_t fm) { framemax = fm; } void setHeartbeat(u_int16_t hb) { heartbeat = hb; } - - Broker& broker; - std::auto_ptr<framing::AMQP_ClientProxy> client; - Settings settings; - - std::vector<Queue::shared_ptr> exclusiveQueues; - + /** * Get named queue, never returns 0. * @return: named queue or default queue for channel if name="" * @exception: ChannelException if no queue of that name is found. - * @exception: ConnectionException if no queue specified and channel has not declared one. + * @exception: ConnectionException if name="" and channel has no default. */ Queue::shared_ptr getQueue(const string& name, u_int16_t channel); - Channel& newChannel(framing::ChannelId channel); - Channel& getChannel(framing::ChannelId channel); - void closeChannel(framing::ChannelId channel); - void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); + Broker& broker; + std::vector<Queue::shared_ptr> exclusiveQueues; + + // ConnectionInputHandler methods + void received(framing::AMQFrame* frame); + void initiated(framing::ProtocolInitiation* header); + void idleOut(); + void idleIn(); + void closed(); private: typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap; @@ -94,10 +92,15 @@ class Connection : public sys::ConnectionInputHandler, typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; Exchange::shared_ptr findExchange(const string& name); + framing::ProtocolVersion version; ChannelMap channels; sys::ConnectionOutputHandler* out; u_int32_t framemax; u_int16_t heartbeat; + framing::AMQP_ClientProxy::Connection* client; + const u_int32_t timeout; //timeout for auto-deleted queues (in ms) + const u_int64_t stagingThreshold; + }; }} diff --git a/cpp/lib/broker/HandlerImpl.h b/cpp/lib/broker/HandlerImpl.h new file mode 100644 index 0000000000..c55a36da45 --- /dev/null +++ b/cpp/lib/broker/HandlerImpl.h @@ -0,0 +1,71 @@ +#ifndef _broker_HandlerImpl_h +#define _broker_HandlerImpl_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "BrokerChannel.h" +#include "AMQP_ClientProxy.h" + +namespace qpid { + +namespace framing { +class AMQP_ClientProxy; +} + +namespace broker { + +class Broker; +class Channel; +class Connection; + +/** + * A collection of references to the core objects required by an adapter, + * and a client proxy. + */ +struct CoreRefs +{ + CoreRefs(Channel& ch, Connection& c, Broker& b) + : channel(ch), connection(c), broker(b), proxy(ch) {} + + Channel& channel; + Connection& connection; + Broker& broker; + framing::AMQP_ClientProxy proxy; +}; + + +/** + * Base template for protocol handler implementations. + * Provides the core references and appropriate AMQP class proxy. + */ +template <class ProxyType> +struct HandlerImpl : public CoreRefs { + typedef HandlerImpl<ProxyType> HandlerImplType; + HandlerImpl(CoreRefs& parent) + : CoreRefs(parent), client(ProxyType::get(proxy)) {} + ProxyType client; +}; + + + +}} // namespace qpid::broker + + + +#endif /*!_broker_HandlerImpl_h*/ diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 797e3fbbf9..0853aebcb1 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -25,16 +25,15 @@ #include "BrokerMessageMessage.h" #include "MessageAppendBody.h" #include "MessageTransferBody.h" +#include "BrokerAdapter.h" namespace qpid { namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b), references(ch), - client(connection.client->getMessage()) -{} +MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) + : HandlerImplType(parent), references(channel) {} // // Message class method handlers @@ -47,7 +46,7 @@ MessageHandlerImpl::append(const MethodContext& context, references.get(reference).append( boost::shared_polymorphic_downcast<MessageAppendBody>( context.methodBody)); - client.ok(context); + client.ok(context.getRequestId()); } @@ -56,7 +55,7 @@ MessageHandlerImpl::cancel(const MethodContext& context, const string& destination ) { channel.cancel(destination); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -73,7 +72,7 @@ MessageHandlerImpl::close(const MethodContext& context, const string& reference) { references.get(reference).close(); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -84,7 +83,7 @@ MessageHandlerImpl::consume(const MethodContext& context, bool noLocal, bool noAck, bool exclusive, - const qpid::framing::FieldTable& filter ) + const framing::FieldTable& filter ) { Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!destination.empty() && channel.exists(destination)) @@ -93,7 +92,7 @@ MessageHandlerImpl::consume(const MethodContext& context, channel.consume( tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - client.ok(context); + client.ok(context.getRequestId()); // Dispatch messages as there is now a consumer. queue->dispatch(); } @@ -117,9 +116,9 @@ MessageHandlerImpl::get( const MethodContext& context, connection.getQueue(queueName, context.channel->getId()); if(channel.get(queue, destination, !noAck)) - client.ok(context); + client.ok(context.getRequestId()); else - client.empty(context); + client.empty(context.getRequestId()); } void @@ -141,7 +140,7 @@ MessageHandlerImpl::open(const MethodContext& context, const string& reference) { references.open(reference); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -153,7 +152,7 @@ MessageHandlerImpl::qos(const MethodContext& context, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -161,7 +160,7 @@ MessageHandlerImpl::recover(const MethodContext& context, bool requeue) { channel.recover(requeue); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -204,8 +203,8 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*appId*/, const string& /*transactionId*/, const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content body, + const framing::FieldTable& /*applicationHeaders*/, + const framing::Content& body, bool /*mandatory*/) { MessageTransferBody::shared_ptr transfer( @@ -218,7 +217,7 @@ MessageHandlerImpl::transfer(const MethodContext& context, channel.handleInlineTransfer(message); else references.get(body.getValue()).addMessage(message); - client.ok(context); + client.ok(context.getRequestId()); } diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 0fef45bb19..cb7e7e3126 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -24,7 +24,7 @@ #include "AMQP_ServerOperations.h" #include "AMQP_ClientProxy.h" #include "Reference.h" -#include "BrokerChannel.h" +#include "HandlerImpl.h" namespace qpid { namespace broker { @@ -34,10 +34,11 @@ class Broker; class MessageMessage; class MessageHandlerImpl : - public framing::AMQP_ServerOperations::MessageHandler + public framing::AMQP_ServerOperations::MessageHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Message> { public: - MessageHandlerImpl(Channel& ch, Connection& c, Broker& b); + MessageHandlerImpl(CoreRefs& parent); void append(const framing::MethodContext&, const std::string& reference, @@ -116,14 +117,10 @@ class MessageHandlerImpl : const std::string& transactionId, const std::string& securityToken, const framing::FieldTable& applicationHeaders, - framing::Content body, + const framing::Content& body, bool mandatory ); private: - Channel& channel; - Connection& connection; - Broker& broker; ReferenceRegistry references; - framing::AMQP_ClientProxy::Message& client; }; }} // namespace qpid::broker diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index dd93c6ae8b..52910f5161 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -24,6 +24,7 @@ #include <QpidError.h> #include <MethodBodyInstances.h> #include "Connection.h" +#include "AMQP_ServerProxy.h" // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent // handling of errors that should close the connection or the channel. @@ -48,12 +49,23 @@ Channel::~Channel(){ close(); } +AMQP_ServerProxy& Channel::brokerProxy() { + assert(proxy.get()); + return *proxy; +} + +AMQMethodBody::shared_ptr Channel::brokerResponse() { + // FIXME aconway 2007-02-08: implement responses. + return AMQMethodBody::shared_ptr(); +} + void Channel::open(ChannelId id, Connection& con) { if (isOpen()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); connection = &con; init(id, con, con.getVersion()); // ChannelAdapter initialization. + proxy.reset(new AMQP_ServerProxy(*this)); string oob; if (id != 0) sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob)); diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index a34c95d2c4..1c082f3b59 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -24,6 +24,7 @@ #include <map> #include <string> #include <queue> +#include <boost/scoped_ptr.hpp> #include "sys/types.h" #include <framing/amqp_framing.h> @@ -39,8 +40,10 @@ #include "Thread.h" namespace qpid { + namespace framing { class ChannelCloseBody; +class AMQP_ServerProxy; } namespace client { @@ -102,6 +105,7 @@ class Channel : public framing::ChannelAdapter, u_int16_t prefetch; const bool transactional; framing::ProtocolVersion version; + boost::scoped_ptr<framing::AMQP_ServerProxy> proxy; void enqueue(); void retrieve(Message& msg); @@ -151,8 +155,6 @@ class Channel : public framing::ChannelAdapter, public: - bool isOpen() const; - /** * Creates a channel object. * @@ -358,9 +360,21 @@ class Channel : public framing::ChannelAdapter, * @see publish() */ void setReturnedMessageHandler(ReturnedMessageHandler* handler); + + bool isOpen() const; + + /** + * Returns a proxy for the "raw" AMQP broker protocol. Only for use by + * protocol experts. + */ + + framing::AMQP_ServerProxy& brokerProxy(); + /** + * Wait for the next method from the broker. + */ + framing::AMQMethodBody::shared_ptr brokerResponse(); }; -} -} +}} #endif /*!_client_ClientChannel_h*/ diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 0fafd29b90..2f91c44a22 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -43,7 +43,7 @@ const std::string Connection::OK("OK"); Connection::Connection( bool _debug, u_int32_t _max_frame_size, - const framing::ProtocolVersion& _version + framing::ProtocolVersion _version ) : version(_version), max_frame_size(_max_frame_size), defaultConnector(version, _debug, _max_frame_size), isOpen(false), debug(_debug) diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 2f9b35d5ef..275e02a105 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -96,12 +96,12 @@ class Connection : public ConnectionForChannel Connector* connector; framing::OutputHandler* out; volatile bool isOpen; + Channel channel0; + bool debug; void erase(framing::ChannelId); void channelException( Channel&, framing::AMQMethodBody*, const QpidError&); - Channel channel0; - bool debug; // TODO aconway 2007-01-26: too many friendships, untagle these classes. friend class Channel; @@ -120,9 +120,8 @@ class Connection : public ConnectionForChannel * @param max_frame_size the maximum frame size that the * client will accept. Optional and defaults to 65536. */ - Connection( - bool debug = false, u_int32_t max_frame_size = 65536, - const framing::ProtocolVersion& = framing::highestProtocolVersion); + Connection(bool debug = false, u_int32_t max_frame_size = 65536, + framing::ProtocolVersion=framing::highestProtocolVersion); ~Connection(); /** @@ -185,7 +184,7 @@ class Connection : public ConnectionForChannel inline u_int32_t getMaxFrameSize(){ return max_frame_size; } /** @return protocol version in use on this connection. */ - const framing::ProtocolVersion& getVersion() const { return version; } + framing::ProtocolVersion getVersion() const { return version; } }; }} // namespace qpid::client diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index 425cecaf6f..657ee77f1a 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -23,17 +23,19 @@ #include <sys/Time.h> #include "Connector.h" +namespace qpid { +namespace client { + using namespace qpid::sys; -using namespace qpid::client; using namespace qpid::framing; using qpid::QpidError; -Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, - bool _debug, u_int32_t buffer_size) : - debug(_debug), +Connector::Connector( + ProtocolVersion ver, bool _debug, u_int32_t buffer_size +) : debug(_debug), receive_buffer_size(buffer_size), send_buffer_size(buffer_size), - version(pVersion), + version(ver), closed(true), lastIn(0), lastOut(0), timeout(0), @@ -180,3 +182,5 @@ void Connector::run(){ handleClosed(); } } + +}} // namespace qpid::client diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index 1126e861e0..ccac39f849 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -77,7 +77,7 @@ class Connector : public framing::OutputHandler, friend class Channel; public: - Connector(const framing::ProtocolVersion& pVersion, + Connector(framing::ProtocolVersion pVersion, bool debug = false, u_int32_t buffer_size = 1024); virtual ~Connector(); virtual void connect(const std::string& host, int port); diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index eefff79d6f..c44480bddf 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -63,6 +63,7 @@ libqpidcommon_la_SOURCES = \ $(framing)/AMQHeaderBody.cpp \ $(framing)/AMQHeartbeatBody.cpp \ $(framing)/AMQMethodBody.cpp \ + $(framing)/MethodContext.cpp \ $(framing)/BasicHeaderProperties.cpp \ $(framing)/BodyHandler.cpp \ $(framing)/ChannelAdapter.cpp \ @@ -76,8 +77,8 @@ libqpidcommon_la_SOURCES = \ $(framing)/Requester.cpp \ $(framing)/Responder.cpp \ $(framing)/Value.cpp \ + $(framing)/Proxy.cpp \ $(gen)/AMQP_ClientProxy.cpp \ - $(gen)/AMQP_HighestVersion.h \ $(gen)/AMQP_MethodVersionMap.cpp \ $(gen)/AMQP_ServerProxy.cpp \ Exception.cpp \ @@ -87,6 +88,7 @@ libqpidcommon_la_SOURCES = \ sys/Time.cpp nobase_pkginclude_HEADERS = \ + $(gen)/AMQP_HighestVersion.h \ $(platform_hdr) \ $(framing)/AMQBody.h \ $(framing)/AMQContentBody.h \ @@ -95,6 +97,7 @@ nobase_pkginclude_HEADERS = \ $(framing)/AMQHeaderBody.h \ $(framing)/AMQHeartbeatBody.h \ $(framing)/AMQMethodBody.h \ + $(framing)/MethodContext.h \ $(framing)/BasicHeaderProperties.h \ $(framing)/BodyHandler.h \ $(framing)/ChannelAdapter.h \ @@ -111,6 +114,7 @@ nobase_pkginclude_HEADERS = \ $(framing)/Value.h \ $(framing)/amqp_framing.h \ $(framing)/amqp_types.h \ + $(framing)/Proxy.h \ Exception.h \ ExceptionHolder.h \ QpidError.h \ @@ -121,9 +125,9 @@ nobase_pkginclude_HEADERS = \ sys/Monitor.h \ sys/Mutex.h \ sys/Runnable.h \ - sys/ConnectionOutputHandler.h \ - sys/ConnectionInputHandler.h \ - sys/ConnectionInputHandlerFactory.h \ + sys/ConnectionOutputHandler.h \ + sys/ConnectionInputHandler.h \ + sys/ConnectionInputHandlerFactory.h \ sys/ShutdownHandler.h \ sys/Socket.h \ sys/Thread.h \ diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp index 9c5e295e22..4e061af2e1 100644 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ b/cpp/lib/common/framing/AMQFrame.cpp @@ -26,19 +26,24 @@ #include "AMQRequestBody.h" #include "AMQResponseBody.h" -using namespace qpid::framing; + +namespace qpid { +namespace framing { + AMQP_MethodVersionMap AMQFrame::versionMap; -AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version): +AMQFrame::AMQFrame(ProtocolVersion _version): version(_version) - {} + { + assert(version != ProtocolVersion(0,0)); + } -AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) : +AMQFrame::AMQFrame(ProtocolVersion _version, u_int16_t _channel, AMQBody* _body) : version(_version), channel(_channel), body(_body) {} -AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) : +AMQFrame::AMQFrame(ProtocolVersion _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) : version(_version), channel(_channel), body(_body) {} @@ -119,7 +124,7 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) body->decode(buffer, size); } -std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t) +std::ostream& operator<<(std::ostream& out, const AMQFrame& t) { out << "Frame[channel=" << t.channel << "; "; if (t.body.get() == 0) @@ -130,3 +135,5 @@ std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t) return out; } + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index 15b294a373..4e49b8871f 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -41,9 +41,9 @@ namespace framing { class AMQFrame : public AMQDataBlock { public: - AMQFrame(const qpid::framing::ProtocolVersion& _version = highestProtocolVersion); - AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body); - AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body); + AMQFrame(ProtocolVersion _version = highestProtocolVersion); + AMQFrame(ProtocolVersion _version, u_int16_t channel, AMQBody* body); + AMQFrame(ProtocolVersion _version, u_int16_t channel, const AMQBody::shared_ptr& body); virtual ~AMQFrame(); virtual void encode(Buffer& buffer); virtual bool decode(Buffer& buffer); @@ -62,7 +62,7 @@ class AMQFrame : public AMQDataBlock private: static AMQP_MethodVersionMap versionMap; - qpid::framing::ProtocolVersion version; + ProtocolVersion version; u_int16_t channel; u_int8_t type; diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp index 53ab30faa0..40241660f2 100644 --- a/cpp/lib/common/framing/ChannelAdapter.cpp +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -19,6 +19,7 @@ #include "ChannelAdapter.h" #include "AMQFrame.h" +#include "Exception.h" using boost::format; @@ -26,7 +27,7 @@ namespace qpid { namespace framing { void ChannelAdapter::init( - ChannelId i, OutputHandler& o, const ProtocolVersion& v) + ChannelId i, OutputHandler& o, ProtocolVersion v) { assertChannelNotOpen(); id = i; @@ -34,13 +35,15 @@ void ChannelAdapter::init( version = v; } -void ChannelAdapter::send(AMQBody::shared_ptr body) { +RequestId ChannelAdapter::send(AMQBody::shared_ptr body) { + RequestId result = 0; assertChannelOpen(); switch (body->type()) { case REQUEST_BODY: { AMQRequestBody::shared_ptr request = boost::shared_polymorphic_downcast<AMQRequestBody>(body); requester.sending(request->getData()); + result = request->getData().requestId; break; } case RESPONSE_BODY: { @@ -51,6 +54,7 @@ void ChannelAdapter::send(AMQBody::shared_ptr body) { } } out->send(new AMQFrame(getVersion(), getId(), body)); + return result; } void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h index c2eba2f4e9..9f654d9a5b 100644 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -35,9 +35,8 @@ namespace framing { class MethodContext; /** - * Base class for client and broker channel adapters. + * Base class for client and broker channels. * - * BodyHandler::handl* * - receives frame bodies from the network. * - Updates request/response data. * - Dispatches requests with a MethodContext for responses. @@ -55,21 +54,21 @@ class ChannelAdapter : public BodyHandler { *@param output Processed frames are forwarded to this handler. */ ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0, - const ProtocolVersion& ver=ProtocolVersion()) + ProtocolVersion ver=ProtocolVersion()) : id(id_), out(out_), version(ver) {} /** Initialize the channel adapter. */ - void init(ChannelId, OutputHandler&, const ProtocolVersion&); + void init(ChannelId, OutputHandler&, ProtocolVersion); ChannelId getId() const { return id; } - const ProtocolVersion& getVersion() const { return version; } + ProtocolVersion getVersion() const { return version; } /** * Wrap body in a frame and send the frame. * Takes ownership of body. */ - void send(AMQBody::shared_ptr body); - void send(AMQBody* body) { send(AMQBody::shared_ptr(body)); } + RequestId send(AMQBody::shared_ptr body); + RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); } void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>); void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>); @@ -95,7 +94,7 @@ class ChannelAdapter : public BodyHandler { ProtocolVersion version; Requester requester; Responder responder; - RequestId requestInProgress; // TODO aconway 2007-01-24: use it. + RequestId requestInProgress; }; }} diff --git a/cpp/lib/common/framing/FramingContent.h b/cpp/lib/common/framing/FramingContent.h index 0f4a4b8f64..cffd46ee24 100644 --- a/cpp/lib/common/framing/FramingContent.h +++ b/cpp/lib/common/framing/FramingContent.h @@ -27,9 +27,9 @@ class Content void encode(Buffer& buffer) const; void decode(Buffer& buffer); size_t size() const; - bool isInline() { return discriminator == INLINE; } - bool isReference() { return discriminator == REFERENCE; } - const string& getValue() { return value; } + bool isInline() const { return discriminator == INLINE; } + bool isReference() const { return discriminator == REFERENCE; } + const string& getValue() const { return value; } friend std::ostream& operator<<(std::ostream&, const Content&); }; diff --git a/cpp/lib/common/framing/MethodContext.cpp b/cpp/lib/common/framing/MethodContext.cpp new file mode 100644 index 0000000000..73af73f8e5 --- /dev/null +++ b/cpp/lib/common/framing/MethodContext.cpp @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "MethodContext.h" +#include "amqp_types.h" +#include "AMQRequestBody.h" + +namespace qpid { +namespace framing { + +RequestId MethodContext::getRequestId() const { + return boost::shared_polymorphic_downcast<AMQRequestBody>(methodBody) + ->getRequestId(); +} + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index afb499023d..3493924bf6 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -24,8 +24,6 @@ #include "OutputHandler.h" #include "ProtocolVersion.h" -#include <boost/shared_ptr.hpp> - namespace qpid { namespace framing { @@ -61,6 +59,12 @@ struct MethodContext * It's also provides the request ID when constructing a response. */ BodyPtr methodBody; + + /** + * Return methodBody's request ID. + * It is an error to call this if methodBody is not a request. + */ + RequestId getRequestId() const; }; // FIXME aconway 2007-02-01: Method context only required on Handler diff --git a/cpp/lib/common/framing/ProtocolInitiation.cpp b/cpp/lib/common/framing/ProtocolInitiation.cpp index 471f736a7d..c119b79d6d 100644 --- a/cpp/lib/common/framing/ProtocolInitiation.cpp +++ b/cpp/lib/common/framing/ProtocolInitiation.cpp @@ -20,15 +20,18 @@ */ #include <ProtocolInitiation.h> -qpid::framing::ProtocolInitiation::ProtocolInitiation(){} +namespace qpid { +namespace framing { -qpid::framing::ProtocolInitiation::ProtocolInitiation(u_int8_t _major, u_int8_t _minor) : version(_major, _minor) {} +ProtocolInitiation::ProtocolInitiation(){} -qpid::framing::ProtocolInitiation::ProtocolInitiation(const qpid::framing::ProtocolVersion& p) : version(p) {} +ProtocolInitiation::ProtocolInitiation(u_int8_t _major, u_int8_t _minor) : version(_major, _minor) {} -qpid::framing::ProtocolInitiation::~ProtocolInitiation(){} +ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {} -void qpid::framing::ProtocolInitiation::encode(Buffer& buffer){ +ProtocolInitiation::~ProtocolInitiation(){} + +void ProtocolInitiation::encode(Buffer& buffer){ buffer.putOctet('A'); buffer.putOctet('M'); buffer.putOctet('Q'); @@ -39,7 +42,7 @@ void qpid::framing::ProtocolInitiation::encode(Buffer& buffer){ buffer.putOctet(version.getMinor()); } -bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){ +bool ProtocolInitiation::decode(Buffer& buffer){ if(buffer.available() >= 8){ buffer.getOctet();//A buffer.getOctet();//M @@ -56,3 +59,5 @@ bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){ } //TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h index 6b3dbac88d..fb5cb3abed 100644 --- a/cpp/lib/common/framing/ProtocolInitiation.h +++ b/cpp/lib/common/framing/ProtocolInitiation.h @@ -37,14 +37,14 @@ private: public: ProtocolInitiation(); ProtocolInitiation(u_int8_t major, u_int8_t minor); - ProtocolInitiation(const ProtocolVersion& p); + ProtocolInitiation(ProtocolVersion p); virtual ~ProtocolInitiation(); virtual void encode(Buffer& buffer); virtual bool decode(Buffer& buffer); inline virtual u_int32_t size() const { return 8; } inline u_int8_t getMajor() const { return version.getMajor(); } inline u_int8_t getMinor() const { return version.getMinor(); } - inline const ProtocolVersion& getVersion() const { return version; } + inline ProtocolVersion getVersion() const { return version; } }; } diff --git a/cpp/lib/common/framing/ProtocolVersion.cpp b/cpp/lib/common/framing/ProtocolVersion.cpp index e65c8b79b8..fd4b1a645f 100644 --- a/cpp/lib/common/framing/ProtocolVersion.cpp +++ b/cpp/lib/common/framing/ProtocolVersion.cpp @@ -20,37 +20,9 @@ */ #include <ProtocolVersion.h> #include <sstream> -#include "AMQP_HighestVersion.h" using namespace qpid::framing; -ProtocolVersion::ProtocolVersion() { - *this = highestProtocolVersion; -} - -ProtocolVersion::ProtocolVersion(u_int8_t _major, u_int8_t _minor) : - major_(_major), - minor_(_minor) -{} - -ProtocolVersion::ProtocolVersion(const ProtocolVersion::ProtocolVersion& p): - major_(p.major_), - minor_(p.minor_) -{} - -ProtocolVersion::~ProtocolVersion() -{} - -bool ProtocolVersion::equals(u_int8_t _major, u_int8_t _minor) const -{ - return major_ == _major && minor_ == _minor; -} - -bool ProtocolVersion::equals(const ProtocolVersion::ProtocolVersion& p) const -{ - return major_ == p.major_ && minor_ == p.minor_; -} - const std::string ProtocolVersion::toString() const { std::stringstream ss; @@ -58,10 +30,15 @@ const std::string ProtocolVersion::toString() const return ss.str(); } -ProtocolVersion::ProtocolVersion ProtocolVersion::operator=(const ProtocolVersion& p) +ProtocolVersion& ProtocolVersion::operator=(ProtocolVersion p) { major_ = p.major_; minor_ = p.minor_; return *this; } +bool ProtocolVersion::operator==(ProtocolVersion p) const +{ + return major_ == p.major_ && minor_ == p.minor_; +} + diff --git a/cpp/lib/common/framing/ProtocolVersion.h b/cpp/lib/common/framing/ProtocolVersion.h index 6aba87c6f5..aa526aa293 100644 --- a/cpp/lib/common/framing/ProtocolVersion.h +++ b/cpp/lib/common/framing/ProtocolVersion.h @@ -35,19 +35,19 @@ private: u_int8_t minor_; public: - ProtocolVersion(); - ProtocolVersion(u_int8_t _major, u_int8_t _minor); - ProtocolVersion(const ProtocolVersion& p); - virtual ~ProtocolVersion(); - - inline u_int8_t getMajor() const { return major_; } - inline void setMajor(u_int8_t major) { major_ = major; } - inline u_int8_t getMinor() const { return minor_; } - inline void setMinor(u_int8_t minor) { minor_ = minor; } - virtual bool equals(u_int8_t _major, u_int8_t _minor) const; - virtual bool equals(const ProtocolVersion& p) const; - virtual const std::string toString() const; - ProtocolVersion operator=(const ProtocolVersion& p); + ProtocolVersion(u_int8_t _major=0, u_int8_t _minor=0) + : major_(_major), minor_(_minor) {} + + u_int8_t getMajor() const { return major_; } + void setMajor(u_int8_t major) { major_ = major; } + u_int8_t getMinor() const { return minor_; } + void setMinor(u_int8_t minor) { minor_ = minor; } + const std::string toString() const; + + ProtocolVersion& operator=(ProtocolVersion p); + + bool operator==(ProtocolVersion p) const; + bool operator!=(ProtocolVersion p) const { return ! (*this == p); } }; } // namespace framing diff --git a/cpp/lib/common/framing/ProtocolVersionException.h b/cpp/lib/common/framing/ProtocolVersionException.h index aff0cd91d6..8e2de8b843 100644 --- a/cpp/lib/common/framing/ProtocolVersionException.h +++ b/cpp/lib/common/framing/ProtocolVersionException.h @@ -40,7 +40,7 @@ public: template <class T> ProtocolVersionException( - const ProtocolVersion& ver, const T& msg) throw () : versionFound(ver) + ProtocolVersion ver, const T& msg) throw () : versionFound(ver) { init(boost::lexical_cast<std::string>(msg)); } template <class T> diff --git a/cpp/lib/common/framing/Proxy.cpp b/cpp/lib/common/framing/Proxy.cpp new file mode 100644 index 0000000000..0b2a882a49 --- /dev/null +++ b/cpp/lib/common/framing/Proxy.cpp @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Proxy.h" +#include "ChannelAdapter.h" +#include "ProtocolVersion.h" + +namespace qpid { +namespace framing { + +Proxy::~Proxy() {} + +ProtocolVersion Proxy::getProtocolVersion() const { + return channel.getVersion(); +} + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/Proxy.h b/cpp/lib/common/framing/Proxy.h new file mode 100644 index 0000000000..8ed46ed748 --- /dev/null +++ b/cpp/lib/common/framing/Proxy.h @@ -0,0 +1,51 @@ +#ifndef _framing_Proxy_h +#define _framing_Proxy_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "ProtocolVersion.h" + +namespace qpid { +namespace framing { + +class ChannelAdapter; +class FieldTable; +class Content; + +/** + * Base class for proxies. + */ +class Proxy +{ + + public: + Proxy(ChannelAdapter& ch) : channel(ch) {} + virtual ~Proxy(); + + ProtocolVersion getProtocolVersion() const; + + protected: + ChannelAdapter& channel; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_Proxy_h*/ diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h index 777d9e7bc5..2f56cb877e 100644 --- a/cpp/lib/common/framing/amqp_types.h +++ b/cpp/lib/common/framing/amqp_types.h @@ -20,6 +20,12 @@ * under the License. * */ + +/** \file + * Type definitions and forward declarations of all types used to + * in AMQP messages. + */ + #include <string> #ifdef _WINDOWS #include "windows.h" @@ -44,5 +50,8 @@ typedef u_int16_t ClassId; typedef u_int16_t MethodId; typedef u_int16_t ReplyCode; +// Types represented by classes. +class Content; +class FieldTable; }} // namespace qpid::framing #endif diff --git a/cpp/lib/common/framing/amqp_types_full.h b/cpp/lib/common/framing/amqp_types_full.h new file mode 100644 index 0000000000..6a24a99d38 --- /dev/null +++ b/cpp/lib/common/framing/amqp_types_full.h @@ -0,0 +1,36 @@ +#ifndef _framing_amqp_types_decl_h +#define _framing_amqp_types_decl_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** \file + * Type definitions and full declarations of all types used to + * in AMQP messages. + * + * Its better to include amqp_types.h in another header instead of this file + * unless the header actually needs the full declarations. Including + * full declarations when forward declarations would do increases compile + * times. + */ + +#include "amqp_types.h" +#include "FramingContent.h" +#include "FieldTable.h" + +#endif /*!_framing_amqp_types_decl_h*/ |
