diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-18 06:27:50 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-18 06:27:50 +0000 |
| commit | 9e8a7c77a94a92c6cf92cf60be508817f0778040 (patch) | |
| tree | 0ff1882596aae1da3fd6bd6c2b4e13a3b9ae5de7 /cpp | |
| parent | 762cba1e7db15cb3c9e987a9edcbf3106c7244cb (diff) | |
| download | qpid-python-9e8a7c77a94a92c6cf92cf60be508817f0778040.tar.gz | |
There are a ton of FIXMES and request/response IDs are not yet working fully
but all tests are passing.
* broker::Broker: Removed requester/responder from broker.
* framing::BodyHandler: added Requester/Responder to BodyHandler, becomes
the base class for channel adapters in broker and client.
* broker::BrokerAdapter: Inherit BodyHandler, wraps a broker::Channel.
Hide private *HandlerImpl detail classes in BodyHandler.cpp.
* broker::Connection: Requester/Responder/Adapter now per-channel.
Connection channel map replaced with adapter map of BrokerAdapters.
handle* functions moved to BrokerAdapter.
All methods now handled by a BrokerAdapter for the relevant channel.
ChannelHandlerImpl is repsonsible for checking that
- No method on a non-0 channel is processed before open()
- Channel 0 methods only happen on channel 0 and similar for non-zero methods
Checks are not yet complete (see FIXMES)
* client::ResponseHandler: fix for client hang if broker crashs.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497319 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/lib/broker/Broker.h | 6 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 603 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerAdapter.h | 242 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 35 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 155 | ||||
| -rw-r--r-- | cpp/lib/broker/Connection.cpp | 135 | ||||
| -rw-r--r-- | cpp/lib/broker/Connection.h | 42 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.cpp | 1 | ||||
| -rw-r--r-- | cpp/lib/client/ResponseHandler.cpp | 14 | ||||
| -rw-r--r-- | cpp/lib/common/framing/BodyHandler.cpp | 52 | ||||
| -rw-r--r-- | cpp/lib/common/framing/BodyHandler.h | 71 | ||||
| -rw-r--r-- | cpp/tests/BodyHandlerTest.cpp | 110 | ||||
| -rw-r--r-- | cpp/tests/ChannelTest.cpp | 1 | ||||
| -rw-r--r-- | cpp/tests/Makefile.am | 1 | ||||
| -rwxr-xr-x | cpp/tests/start_broker | 4 |
15 files changed, 681 insertions, 791 deletions
diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index 929ed4360e..ef5e7bbafb 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -29,8 +29,6 @@ #include <SharedObject.h> #include <MessageStore.h> #include <AutoDelete.h> -#include "Requester.h" -#include "Responder.h" #include <ExchangeRegistry.h> #include <BrokerChannel.h> #include <ConnectionToken.h> @@ -86,8 +84,6 @@ class Broker : public qpid::sys::Runnable, u_int32_t getTimeout() { return timeout; } u_int64_t getStagingThreshold() { return stagingThreshold; } AutoDelete& getCleaner() { return cleaner; } - qpid::framing::Requester& getRequester() { return requester; } - qpid::framing::Responder& getResponder() { return responder; } private: Broker(const Configuration& config); @@ -100,8 +96,6 @@ class Broker : public qpid::sys::Runnable, u_int64_t stagingThreshold; AutoDelete cleaner; ConnectionFactory factory; - qpid::framing::Requester requester; - qpid::framing::Responder responder; }; }} diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 86a7d166b0..2a5b136b3e 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -15,10 +15,11 @@ * limitations under the License. * */ - #include "BrokerAdapter.h" #include "Connection.h" #include "Exception.h" +#include "AMQMethodBody.h" +#include "Exception.h" namespace qpid { namespace broker { @@ -28,75 +29,263 @@ using namespace qpid::framing; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; -BrokerAdapter::BrokerAdapter(Connection& c) : - connection(c), - basicHandler(c), - channelHandler(c), - connectionHandler(c), - exchangeHandler(c), - messageHandler(c), - queueHandler(c), - txHandler(c) -{} - -typedef qpid::framing::AMQP_ServerOperations Ops; - -Ops::ChannelHandler* BrokerAdapter::getChannelHandler() { - return &channelHandler; -} -Ops::ConnectionHandler* BrokerAdapter::getConnectionHandler() { - return &connectionHandler; -} -Ops::BasicHandler* BrokerAdapter::getBasicHandler() { - return &basicHandler; -} -Ops::ExchangeHandler* BrokerAdapter::getExchangeHandler() { - return &exchangeHandler; -} -Ops::QueueHandler* BrokerAdapter::getQueueHandler() { - return &queueHandler; -} -Ops::TxHandler* BrokerAdapter::getTxHandler() { - return &txHandler; -} -Ops::MessageHandler* BrokerAdapter::getMessageHandler() { - return &messageHandler; -} -Ops::AccessHandler* BrokerAdapter::getAccessHandler() { - throw ConnectionException(540, "Access class not implemented"); -} -Ops::FileHandler* BrokerAdapter::getFileHandler() { - throw ConnectionException(540, "File class not implemented"); -} -Ops::StreamHandler* BrokerAdapter::getStreamHandler() { - throw ConnectionException(540, "Stream class not implemented"); -} -Ops::DtxHandler* BrokerAdapter::getDtxHandler() { - throw ConnectionException(540, "Dtx class not implemented"); -} -Ops::TunnelHandler* BrokerAdapter::getTunnelHandler() { - throw ConnectionException(540, "Tunnel class not implemented"); -} - -void BrokerAdapter::ConnectionHandlerImpl::startOk( +class BrokerAdapter::ServerOps : public AMQP_ServerOperations +{ + public: + ServerOps(Channel& ch, Connection& c, Broker& b) : + basicHandler(ch, c, b), + channelHandler(ch, c, b), + connectionHandler(ch, c, b), + exchangeHandler(ch, c, b), + messageHandler(ch, c, b), + queueHandler(ch, c, b), + txHandler(ch, c, b) + {} + + ChannelHandler* getChannelHandler() { return &channelHandler; } + ConnectionHandler* getConnectionHandler() { return &connectionHandler; } + BasicHandler* getBasicHandler() { return &basicHandler; } + ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } + QueueHandler* getQueueHandler() { return &queueHandler; } + TxHandler* getTxHandler() { return &txHandler; } + MessageHandler* getMessageHandler() { return &messageHandler; } + AccessHandler* getAccessHandler() { + throw ConnectionException(540, "Access class not implemented"); } + FileHandler* getFileHandler() { + throw ConnectionException(540, "File class not implemented"); } + StreamHandler* getStreamHandler() { + throw ConnectionException(540, "Stream class not implemented"); } + DtxHandler* getDtxHandler() { + throw ConnectionException(540, "Dtx class not implemented"); } + TunnelHandler* getTunnelHandler() { + throw ConnectionException(540, "Tunnel class not implemented"); } + + private: + struct CoreRefs { + CoreRefs(Channel& ch, Connection& c, Broker& b) + : channel(ch), connection(c), broker(b) {} + + Channel& channel; + Connection& connection; + Broker& broker; + }; + + class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler { + public: + ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + + void startOk(u_int16_t channel, + const qpid::framing::FieldTable& clientProperties, + const std::string& mechanism, const std::string& response, + const std::string& locale); + void secureOk(u_int16_t channel, const std::string& response); + void tuneOk(u_int16_t channel, u_int16_t channelMax, + u_int32_t frameMax, u_int16_t heartbeat); + void open(u_int16_t channel, const std::string& virtualHost, + const std::string& capabilities, bool insist); + void close(u_int16_t channel, u_int16_t replyCode, + const std::string& replyText, + u_int16_t classId, u_int16_t methodId); + void closeOk(u_int16_t channel); + }; + + class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{ + public: + ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + void open(u_int16_t channel, const std::string& outOfBand); + void flow(u_int16_t channel, bool active); + void flowOk(u_int16_t channel, bool active); + void ok( u_int16_t channel ); + void ping( u_int16_t channel ); + void pong( u_int16_t channel ); + void resume( u_int16_t channel, const std::string& channelId ); + void close(u_int16_t channel, u_int16_t replyCode, const + std::string& replyText, u_int16_t classId, u_int16_t methodId); + void closeOk(u_int16_t channel); + }; + + class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{ + public: + ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + void declare(u_int16_t channel, u_int16_t ticket, + const std::string& exchange, const std::string& type, + bool passive, bool durable, bool autoDelete, + bool internal, bool nowait, + const qpid::framing::FieldTable& arguments); + void delete_(u_int16_t channel, u_int16_t ticket, + const std::string& exchange, bool ifUnused, bool nowait); + void unbind(u_int16_t channel, + u_int16_t ticket, const std::string& queue, + const std::string& exchange, const std::string& routingKey, + const qpid::framing::FieldTable& arguments ); + }; + + class QueueHandlerImpl : private CoreRefs, public QueueHandler{ + public: + QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, + const qpid::framing::FieldTable& arguments); + void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue, + const std::string& exchange, const std::string& routingKey, + bool nowait, const qpid::framing::FieldTable& arguments); + void unbind(u_int16_t channel, + u_int16_t ticket, + const std::string& queue, + const std::string& exchange, + const std::string& routingKey, + const qpid::framing::FieldTable& arguments ); + void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue, + bool nowait); + void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue, + bool ifUnused, bool ifEmpty, + bool nowait); + }; + + class BasicHandlerImpl : private CoreRefs, public BasicHandler{ + public: + BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + void qos(u_int16_t channel, u_int32_t prefetchSize, + u_int16_t prefetchCount, bool global); + void consume( + u_int16_t channel, u_int16_t ticket, const std::string& queue, + const std::string& consumerTag, bool noLocal, bool noAck, + bool exclusive, bool nowait, + const qpid::framing::FieldTable& fields); + void cancel(u_int16_t channel, const std::string& consumerTag, + bool nowait); + void publish(u_int16_t channel, u_int16_t ticket, + const std::string& exchange, const std::string& routingKey, + bool mandatory, bool immediate); + void get(u_int16_t channel, u_int16_t ticket, const std::string& queue, + bool noAck); + void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); + void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); + void recover(u_int16_t channel, bool requeue); + }; + + class TxHandlerImpl : private CoreRefs, public TxHandler{ + public: + TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + void select(u_int16_t channel); + void commit(u_int16_t channel); + void rollback(u_int16_t channel); + }; + + class MessageHandlerImpl : private CoreRefs, public MessageHandler { + public: + MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + + void append( u_int16_t channel, + const std::string& reference, + const std::string& bytes ); + + void cancel( u_int16_t channel, + const std::string& destination ); + + void checkpoint( u_int16_t channel, + const std::string& reference, + const std::string& identifier ); + + void close( u_int16_t channel, + const std::string& reference ); + + void consume( u_int16_t channel, + u_int16_t ticket, + const std::string& queue, + const std::string& destination, + bool noLocal, + bool noAck, + bool exclusive, + const qpid::framing::FieldTable& filter ); + + void empty( u_int16_t channel ); + + void get( u_int16_t channel, + u_int16_t ticket, + const std::string& queue, + const std::string& destination, + bool noAck ); + + void offset( u_int16_t channel, + u_int64_t value ); + + void ok( u_int16_t channel ); + + void open( u_int16_t channel, + const std::string& reference ); + + void qos( u_int16_t channel, + u_int32_t prefetchSize, + u_int16_t prefetchCount, + bool global ); + + void recover( u_int16_t channel, + bool requeue ); + + void reject( u_int16_t channel, + u_int16_t code, + const std::string& text ); + + void resume( u_int16_t channel, + const std::string& reference, + const std::string& identifier ); + + void transfer( u_int16_t channel, + u_int16_t ticket, + const std::string& destination, + bool redelivered, + bool immediate, + u_int64_t ttl, + u_int8_t priority, + u_int64_t timestamp, + u_int8_t deliveryMode, + u_int64_t expiration, + const std::string& exchange, + const std::string& routingKey, + const std::string& messageId, + const std::string& correlationId, + const std::string& replyTo, + const std::string& contentType, + const std::string& contentEncoding, + const std::string& userId, + const std::string& appId, + const std::string& transactionId, + const std::string& securityToken, + const qpid::framing::FieldTable& applicationHeaders, + qpid::framing::Content body ); + }; + + BasicHandlerImpl basicHandler; + ChannelHandlerImpl channelHandler; + ConnectionHandlerImpl connectionHandler; + ExchangeHandlerImpl exchangeHandler; + MessageHandlerImpl messageHandler; + QueueHandlerImpl queueHandler; + TxHandlerImpl txHandler; + +}; + +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk( u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/){ connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat); } -void BrokerAdapter::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} -void BrokerAdapter::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ connection.framemax = framemax; connection.heartbeat = heartbeat; } -void BrokerAdapter::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; connection.client->getConnection().openOk(0, knownhosts); } -void BrokerAdapter::ConnectionHandlerImpl::close( +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close( u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { @@ -104,47 +293,55 @@ void BrokerAdapter::ConnectionHandlerImpl::close( connection.context->close(); } -void BrokerAdapter::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ connection.context->close(); } -void BrokerAdapter::ChannelHandlerImpl::open( - u_int16_t channel, const string& /*outOfBand*/){ - connection.openChannel(channel); +void BrokerAdapter::ServerOps::ChannelHandlerImpl::open( + u_int16_t channelId, const string& /*outOfBand*/){ + // FIXME aconway 2007-01-17: Assertions on all channel methods, + // Drop channelId param. + assertChannelNonZero(channel.getId()); + if (channel.isOpen()) + throw ConnectionException(504, "Channel already open"); + channel.open(); // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 - connection.client->getChannel().openOk(channel, std::string()/* ID */); + connection.client->getChannel().openOk(channelId, std::string()/* ID */); } -void BrokerAdapter::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} -void BrokerAdapter::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} +void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} +void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} -void BrokerAdapter::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/){ +void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/){ connection.closeChannel(channel); connection.client->getChannel().closeOk(channel); } -void BrokerAdapter::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} +void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} -void BrokerAdapter::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ if(passive){ - if(!connection.broker.getExchanges().get(exchange)){ - throw ChannelException(404, "Exchange not found: " + exchange); + if(!broker.getExchanges().get(exchange)) { + throw ChannelException(404, "Exchange not found: " + exchange); } }else{ try{ - std::pair<Exchange::shared_ptr, bool> response = connection.broker.getExchanges().declare(exchange, type); + std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type); if(!response.second && response.first->getType() != type){ - throw ConnectionException(507, "Exchange already declared to be of type " - + response.first->getType() + ", requested " + type); + throw ConnectionException( + 507, + "Exchange already declared to be of type " + + response.first->getType() + ", requested " + type); } }catch(UnknownExchangeTypeException& e){ - throw ConnectionException(503, "Exchange type not implemented: " + type); + throw ConnectionException( + 503, "Exchange type not implemented: " + type); } } if(!nowait){ @@ -153,7 +350,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /* } -void BrokerAdapter::ExchangeHandlerImpl::unbind( +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::unbind( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*queue*/, @@ -166,23 +363,23 @@ void BrokerAdapter::ExchangeHandlerImpl::unbind( -void BrokerAdapter::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused - connection.broker.getExchanges().destroy(exchange); + broker.getExchanges().destroy(exchange); if(!nowait) connection.client->getExchange().deleteOk(channel); } -void BrokerAdapter::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ +void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = connection.getQueue(name, channel); } else { std::pair<Queue::shared_ptr, bool> queue_created = - connection.broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0); + broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -192,11 +389,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*tic queue_created.first->create(arguments); //add default binding: - connection.broker.getExchanges().getDefault()->bind(queue, name, 0); + broker.getExchanges().getDefault()->bind(queue, name, 0); if (exclusive) { connection.exclusiveQueues.push_back(queue); } else if(autoDelete){ - connection.broker.getCleaner().add(queue); + broker.getCleaner().add(queue); } } } @@ -209,12 +406,12 @@ void BrokerAdapter::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*tic } } -void BrokerAdapter::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ +void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ Queue::shared_ptr queue = connection.getQueue(queueName, channel); - Exchange::shared_ptr exchange = connection.broker.getExchanges().get(exchangeName); + Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if(exchange){ // kpvdr - cannot use this any longer as routingKey is now const // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); @@ -223,19 +420,20 @@ void BrokerAdapter::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket exchange->bind(queue, exchangeRoutingKey, &arguments); if(!nowait) connection.client->getQueue().bindOk(channel); }else{ - throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); + throw ChannelException( + 404, "Bind failed. No such exchange: " + exchangeName); } } -void BrokerAdapter::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = connection.getQueue(queueName, channel); int count = queue->purge(); if(!nowait) connection.client->getQueue().purgeOk(channel, count); } -void BrokerAdapter::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ +void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); Queue::shared_ptr q = connection.getQueue(queue, channel); @@ -251,7 +449,7 @@ void BrokerAdapter::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*tic } count = q->getMessageCount(); q->destroy(); - connection.broker.getQueues().destroy(queue); + broker.getQueues().destroy(queue); } if(!nowait) connection.client->getQueue().deleteOk(channel, count); @@ -260,14 +458,14 @@ void BrokerAdapter::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*tic -void BrokerAdapter::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global connection.getChannel(channel).setPrefetchSize(prefetchSize); connection.getChannel(channel).setPrefetchCount(prefetchCount); connection.client->getBasic().qosOk(channel); } -void BrokerAdapter::BasicHandlerImpl::consume( +void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, @@ -296,26 +494,27 @@ void BrokerAdapter::BasicHandlerImpl::consume( } -void BrokerAdapter::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ connection.getChannel(channel).cancel(consumerTag); if(!nowait) connection.client->getBasic().cancelOk(channel, consumerTag); } -void BrokerAdapter::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchangeName, const string& routingKey, + bool mandatory, bool immediate){ - Exchange::shared_ptr exchange = exchangeName.empty() ? connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName); + Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate); connection.getChannel(channel).handlePublish(msg, exchange); }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + throw ChannelException( + 404, "Exchange not found '" + exchangeName + "'"); } } -void BrokerAdapter::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channelId); if(!connection.getChannel(channelId).get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack @@ -324,7 +523,7 @@ void BrokerAdapter::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticke } } -void BrokerAdapter::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ try{ connection.getChannel(channel).ack(deliveryTag, multiple); }catch(InvalidAckException& e){ @@ -332,23 +531,23 @@ void BrokerAdapter::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryT } } -void BrokerAdapter::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} +void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ connection.getChannel(channel).recover(requeue); } -void BrokerAdapter::TxHandlerImpl::select(u_int16_t channel){ +void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t channel){ connection.getChannel(channel).begin(); connection.client->getTx().selectOk(channel); } -void BrokerAdapter::TxHandlerImpl::commit(u_int16_t channel){ +void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t channel){ connection.getChannel(channel).commit(); connection.client->getTx().commitOk(channel); } -void BrokerAdapter::TxHandlerImpl::rollback(u_int16_t channel){ +void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t channel){ connection.getChannel(channel).rollback(); connection.client->getTx().rollbackOk(channel); @@ -356,7 +555,7 @@ void BrokerAdapter::TxHandlerImpl::rollback(u_int16_t channel){ } void -BrokerAdapter::QueueHandlerImpl::unbind( +BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*queue*/, @@ -368,25 +567,25 @@ BrokerAdapter::QueueHandlerImpl::unbind( } void -BrokerAdapter::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void -BrokerAdapter::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void -BrokerAdapter::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) +BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void -BrokerAdapter::ChannelHandlerImpl::resume( +BrokerAdapter::ServerOps::ChannelHandlerImpl::resume( u_int16_t /*channel*/, const string& /*channelId*/ ) { @@ -395,143 +594,191 @@ BrokerAdapter::ChannelHandlerImpl::resume( // Message class method handlers void -BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*bytes*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::append( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*bytes*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t /*channel*/, - const string& /*destination*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::cancel( u_int16_t /*channel*/, + const string& /*destination*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/, - const string& /*reference*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::close( u_int16_t /*channel*/, + const string& /*reference*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::consume( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noLocal*/, - bool /*noAck*/, - bool /*exclusive*/, - const qpid::framing::FieldTable& /*filter*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::consume( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noLocal*/, + bool /*noAck*/, + bool /*exclusive*/, + const qpid::framing::FieldTable& /*filter*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::get( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noAck*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::get( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noAck*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/, - u_int64_t /*value*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::offset( u_int16_t /*channel*/, + u_int64_t /*value*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/, - const string& /*reference*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::open( u_int16_t /*channel*/, + const string& /*reference*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::qos( u_int16_t /*channel*/, - u_int32_t /*prefetchSize*/, - u_int16_t /*prefetchCount*/, - bool /*global*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::qos( u_int16_t /*channel*/, + u_int32_t /*prefetchSize*/, + u_int16_t /*prefetchCount*/, + bool /*global*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::recover( u_int16_t /*channel*/, - bool /*requeue*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::recover( u_int16_t /*channel*/, + bool /*requeue*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/, - u_int16_t /*code*/, - const string& /*text*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::reject( u_int16_t /*channel*/, + u_int16_t /*code*/, + const string& /*text*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::resume( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool /*immediate*/, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) +BrokerAdapter::ServerOps::MessageHandlerImpl::transfer( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*destination*/, + bool /*redelivered*/, + bool /*immediate*/, + u_int64_t /*ttl*/, + u_int8_t /*priority*/, + u_int64_t /*timestamp*/, + u_int8_t /*deliveryMode*/, + u_int64_t /*expiration*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const string& /*messageId*/, + const string& /*correlationId*/, + const string& /*replyTo*/, + const string& /*contentType*/, + const string& /*contentEncoding*/, + const string& /*userId*/, + const string& /*appId*/, + const string& /*transactionId*/, + const string& /*securityToken*/, + const qpid::framing::FieldTable& /*applicationHeaders*/, + qpid::framing::Content /*body*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } +BrokerAdapter::BrokerAdapter( + Channel* ch, Connection& c, Broker& b +) : + channel(ch), + connection(c), + broker(b), + serverOps(new ServerOps(*ch,c,b)) +{ + assert(ch); +} + +void BrokerAdapter::handleMethod( + boost::shared_ptr<qpid::framing::AMQMethodBody> method) +{ + try{ + // FIXME aconway 2007-01-17: invoke to take Channel&? + method->invoke(*serverOps, channel->getId()); + }catch(ChannelException& e){ + connection.closeChannel(channel->getId()); + connection.client->getChannel().close( + channel->getId(), e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + }catch(ConnectionException& e){ + connection.client->getConnection().close( + 0, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + connection.client->getConnection().close( + 0, 541/*internal error*/, e.what(), + method->amqpClassId(), method->amqpMethodId()); + } +} + +void BrokerAdapter::handleHeader(AMQHeaderBody::shared_ptr body) { + channel->handleHeader(body); +} + +void BrokerAdapter::handleContent(AMQContentBody::shared_ptr body) { + channel->handleContent(body); +} + +void BrokerAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) { + // TODO aconway 2007-01-17: Implement heartbeats. +} + + + }} // namespace qpid::broker + diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h index ec304c67d5..5309d767f5 100644 --- a/cpp/lib/broker/BrokerAdapter.h +++ b/cpp/lib/broker/BrokerAdapter.h @@ -20,237 +20,45 @@ */ #include "AMQP_ServerOperations.h" +#include "BodyHandler.h" +#include "BrokerChannel.h" namespace qpid { namespace broker { +class AMQMethodBody; class Connection; +class Broker; + +// FIXME aconway 2007-01-17: Rename to ChannelAdapter. /** - * Protocol adapter class for the broker. + * Per-channel protocol adapter. + * + * Translates protocol bodies into calls on the core Channel, + * Connection and Broker objects. + * + * Owns a channel, has references to Connection and Broker. */ -class BrokerAdapter : public qpid::framing::AMQP_ServerOperations +class BrokerAdapter : public qpid::framing::BodyHandler { public: - BrokerAdapter(Connection& connection); - AccessHandler* getAccessHandler(); - BasicHandler* getBasicHandler(); - ChannelHandler* getChannelHandler(); - ConnectionHandler* getConnectionHandler(); - DtxHandler* getDtxHandler(); - ExchangeHandler* getExchangeHandler(); - FileHandler* getFileHandler(); - MessageHandler* getMessageHandler(); - QueueHandler* getQueueHandler(); - StreamHandler* getStreamHandler(); - TunnelHandler* getTunnelHandler(); - TxHandler* getTxHandler(); - - private: - - class ConnectionHandlerImpl : public ConnectionHandler{ - Connection& connection; - public: - ConnectionHandlerImpl(Connection& c) : connection(c) {} - - void startOk(u_int16_t channel, - const qpid::framing::FieldTable& clientProperties, - const std::string& mechanism, const std::string& response, - const std::string& locale); - void secureOk(u_int16_t channel, const std::string& response); - void tuneOk(u_int16_t channel, u_int16_t channelMax, - u_int32_t frameMax, u_int16_t heartbeat); - void open(u_int16_t channel, const std::string& virtualHost, - const std::string& capabilities, bool insist); - void close(u_int16_t channel, u_int16_t replyCode, - const std::string& replyText, - u_int16_t classId, u_int16_t methodId); - void closeOk(u_int16_t channel); - }; - - class ChannelHandlerImpl : public ChannelHandler{ - Connection& connection; - public: - ChannelHandlerImpl(Connection& c) : connection(c) {} - void open(u_int16_t channel, const std::string& outOfBand); - void flow(u_int16_t channel, bool active); - void flowOk(u_int16_t channel, bool active); - void ok( u_int16_t channel ); - void ping( u_int16_t channel ); - void pong( u_int16_t channel ); - void resume( u_int16_t channel, const std::string& channelId ); - void close(u_int16_t channel, u_int16_t replyCode, const - std::string& replyText, u_int16_t classId, u_int16_t methodId); - void closeOk(u_int16_t channel); - }; - - class ExchangeHandlerImpl : public ExchangeHandler{ - Connection& connection; - public: - ExchangeHandlerImpl(Connection& c) : connection(c) {} - void declare(u_int16_t channel, u_int16_t ticket, - const std::string& exchange, const std::string& type, - bool passive, bool durable, bool autoDelete, - bool internal, bool nowait, - const qpid::framing::FieldTable& arguments); - void delete_(u_int16_t channel, u_int16_t ticket, - const std::string& exchange, bool ifUnused, bool nowait); - void unbind(u_int16_t channel, - u_int16_t ticket, const std::string& queue, - const std::string& exchange, const std::string& routingKey, - const qpid::framing::FieldTable& arguments ); - }; - - class QueueHandlerImpl : public QueueHandler{ - Connection& connection; - public: - QueueHandlerImpl(Connection& c) : connection(c) {} - void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, - const qpid::framing::FieldTable& arguments); - void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue, - const std::string& exchange, const std::string& routingKey, - bool nowait, const qpid::framing::FieldTable& arguments); - void unbind(u_int16_t channel, - u_int16_t ticket, - const std::string& queue, - const std::string& exchange, - const std::string& routingKey, - const qpid::framing::FieldTable& arguments ); - void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue, - bool nowait); - void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue, - bool ifUnused, bool ifEmpty, - bool nowait); - }; - - class BasicHandlerImpl : public BasicHandler{ - Connection& connection; - public: - BasicHandlerImpl(Connection& c) : connection(c) {} - void qos(u_int16_t channel, u_int32_t prefetchSize, - u_int16_t prefetchCount, bool global); - void consume( - u_int16_t channel, u_int16_t ticket, const std::string& queue, - const std::string& consumerTag, bool noLocal, bool noAck, - bool exclusive, bool nowait, - const qpid::framing::FieldTable& fields); - void cancel(u_int16_t channel, const std::string& consumerTag, - bool nowait); - void publish(u_int16_t channel, u_int16_t ticket, - const std::string& exchange, const std::string& routingKey, - bool mandatory, bool immediate); - void get(u_int16_t channel, u_int16_t ticket, const std::string& queue, - bool noAck); - void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); - void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); - void recover(u_int16_t channel, bool requeue); - }; - - class TxHandlerImpl : public TxHandler{ - Connection& connection; - public: - TxHandlerImpl(Connection& c) : connection(c) {} - void select(u_int16_t channel); - void commit(u_int16_t channel); - void rollback(u_int16_t channel); - }; - - class MessageHandlerImpl : public MessageHandler { - Connection& connection; - public: - MessageHandlerImpl(Connection& c) : connection(c) {} - - void append( u_int16_t channel, - const std::string& reference, - const std::string& bytes ); - - void cancel( u_int16_t channel, - const std::string& destination ); + // FIXME aconway 2007-01-18: takes ownership, should pass auto_ptr<Channel> + BrokerAdapter(Channel* ch, Connection&, Broker&); + Channel& getChannel() { return *channel; } - void checkpoint( u_int16_t channel, - const std::string& reference, - const std::string& identifier ); + void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>); + void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); + void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); + void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); - void close( u_int16_t channel, - const std::string& reference ); - - void consume( u_int16_t channel, - u_int16_t ticket, - const std::string& queue, - const std::string& destination, - bool noLocal, - bool noAck, - bool exclusive, - const qpid::framing::FieldTable& filter ); - - void empty( u_int16_t channel ); - - void get( u_int16_t channel, - u_int16_t ticket, - const std::string& queue, - const std::string& destination, - bool noAck ); - - void offset( u_int16_t channel, - u_int64_t value ); - - void ok( u_int16_t channel ); - - void open( u_int16_t channel, - const std::string& reference ); - - void qos( u_int16_t channel, - u_int32_t prefetchSize, - u_int16_t prefetchCount, - bool global ); - - void recover( u_int16_t channel, - bool requeue ); - - void reject( u_int16_t channel, - u_int16_t code, - const std::string& text ); - - void resume( u_int16_t channel, - const std::string& reference, - const std::string& identifier ); - - void transfer( u_int16_t channel, - u_int16_t ticket, - const std::string& destination, - bool redelivered, - bool immediate, - u_int64_t ttl, - u_int8_t priority, - u_int64_t timestamp, - u_int8_t deliveryMode, - u_int64_t expiration, - const std::string& exchange, - const std::string& routingKey, - const std::string& messageId, - const std::string& correlationId, - const std::string& replyTo, - const std::string& contentType, - const std::string& contentEncoding, - const std::string& userId, - const std::string& appId, - const std::string& transactionId, - const std::string& securityToken, - const qpid::framing::FieldTable& applicationHeaders, - qpid::framing::Content body ); - }; + private: + class ServerOps; + std::auto_ptr<Channel> channel; Connection& connection; - - BasicHandlerImpl basicHandler; - ChannelHandlerImpl channelHandler; - ConnectionHandlerImpl connectionHandler; - ExchangeHandlerImpl exchangeHandler; - MessageHandlerImpl messageHandler; - QueueHandlerImpl queueHandler; - TxHandlerImpl txHandler; + Broker& broker; + boost::shared_ptr<ServerOps> serverOps; }; diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 50b5d15b7d..f0b6ff6d07 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -18,12 +18,13 @@ * under the License. * */ -#include <BrokerChannel.h> -#include <QpidError.h> #include <iostream> #include <sstream> #include <assert.h> +#include <BrokerChannel.h> +#include <QpidError.h> + using std::mem_fun_ref; using std::bind2nd; using namespace qpid::broker; @@ -31,9 +32,13 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) : - id(_id), - out(_out), +Channel::Channel( + const ProtocolVersion& _version, OutputHandler* _out, int _id, + u_int32_t _framesize, MessageStore* const _store, + u_int64_t _stagingThreshold +) : + id(_id), + out(*_out), currentDeliveryTag(1), transactional(false), prefetchSize(0), @@ -43,9 +48,8 @@ Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, store(_store), messageBuilder(this, _store, _stagingThreshold), version(_version), - isClosed(false) + opened(false) { - outstanding.reset(); } @@ -57,7 +61,7 @@ bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*){ +void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { if(tag.empty()) tag = tagGenerator.generate(); ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ @@ -86,8 +90,8 @@ void Channel::cancel(const string& tag){ } void Channel::close(){ - if (!isClosed) { - isClosed = true; + if (isOpen()) { + opened = false; while (!consumers.empty()) cancel(consumers.begin()); //requeue: @@ -123,7 +127,7 @@ void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue outstanding.count++; } //send deliver method, header and content(s) - msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); + msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ @@ -180,6 +184,10 @@ void Channel::handleContent(AMQContentBody::shared_ptr content){ messageBuilder.addContent(content); } +void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr) { + // TODO aconway 2007-01-17: Implement heartbeating. +} + void Channel::complete(Message::shared_ptr& msg){ if(exchange){ if(transactional){ @@ -247,7 +255,7 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ if(msg){ Mutex::ScopedLock locker(deliveryLock); u_int64_t myDeliveryTag = currentDeliveryTag++; - msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version); + msg->sendGetOk(&out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -258,5 +266,6 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ } void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ - msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version); + msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version); } + diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 8ba56f20f1..dabf6ebe40 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -48,83 +48,94 @@ #include <BasicPublishBody.h> namespace qpid { - namespace broker { - using qpid::framing::string; +namespace broker { - /** - * Maintains state for an AMQP channel. Handles incoming and - * outgoing messages for that channel. - */ - class Channel : private MessageBuilder::CompletionHandler{ - class ConsumerImpl : public virtual Consumer{ - Channel* parent; - const string tag; - Queue::shared_ptr queue; - ConnectionToken* const connection; - const bool ackExpected; - bool blocked; - public: - ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); - virtual bool deliver(Message::shared_ptr& msg); - void cancel(); - void requestDispatch(); - }; +using qpid::framing::string; - typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; - const int id; - qpid::framing::OutputHandler* out; - u_int64_t currentDeliveryTag; - Queue::shared_ptr defaultQueue; - bool transactional; - std::map<string, ConsumerImpl*> consumers; - u_int32_t prefetchSize; - u_int16_t prefetchCount; - Prefetch outstanding; - u_int32_t framesize; - NameGenerator tagGenerator; - std::list<DeliveryRecord> unacked; - qpid::sys::Mutex deliveryLock; - TxBuffer txBuffer; - AccumulatedAck accumulatedAck; - MessageStore* const store; - MessageBuilder messageBuilder;//builder for in-progress message - Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to - qpid::framing::ProtocolVersion version; // version used for this channel - bool isClosed; +/** + * Maintains state for an AMQP channel. Handles incoming and + * outgoing messages for that channel. + */ +class Channel : private MessageBuilder::CompletionHandler +{ + class ConsumerImpl : public virtual Consumer + { + Channel* parent; + const string tag; + Queue::shared_ptr queue; + ConnectionToken* const connection; + const bool ackExpected; + bool blocked; + public: + ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); + virtual bool deliver(Message::shared_ptr& msg); + void cancel(); + void requestDispatch(); + }; + + typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; + u_int16_t id; + qpid::framing::OutputHandler& out; + u_int64_t currentDeliveryTag; + Queue::shared_ptr defaultQueue; + bool transactional; + std::map<string, ConsumerImpl*> consumers; + u_int32_t prefetchSize; + u_int16_t prefetchCount; + Prefetch outstanding; + u_int32_t framesize; + NameGenerator tagGenerator; + std::list<DeliveryRecord> unacked; + qpid::sys::Mutex deliveryLock; + TxBuffer txBuffer; + AccumulatedAck accumulatedAck; + MessageStore* const store; + MessageBuilder messageBuilder;//builder for in-progress message + Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to + qpid::framing::ProtocolVersion version; // version used for this channel + bool opened; - virtual void complete(Message::shared_ptr& msg); - void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); - void cancel(consumer_iterator consumer); - bool checkPrefetch(Message::shared_ptr& msg); + virtual void complete(Message::shared_ptr& msg); + void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); + void cancel(consumer_iterator consumer); + bool checkPrefetch(Message::shared_ptr& msg); - public: - Channel(qpid::framing::ProtocolVersion& _version, qpid::framing::OutputHandler* out, int id, u_int32_t framesize, - MessageStore* const _store = 0, u_int64_t stagingThreshold = 0); - ~Channel(); - inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } - inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } - inline u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; } - inline u_int16_t setPrefetchCount(u_int16_t count){ return prefetchCount = count; } - bool exists(const string& consumerTag); - void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, - ConnectionToken* const connection = 0, const qpid::framing::FieldTable* = 0); - void cancel(const string& tag); - bool get(Queue::shared_ptr queue, bool ackExpected); - void begin(); - void close(); - void commit(); - void rollback(); - void ack(u_int64_t deliveryTag, bool multiple); - void recover(bool requeue); - void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag); - void handlePublish(Message* msg, Exchange::shared_ptr exchange); - void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header); - void handleContent(qpid::framing::AMQContentBody::shared_ptr content); - }; + public: + Channel( + const qpid::framing::ProtocolVersion& _version, + qpid::framing::OutputHandler* out, int id, u_int32_t framesize, + MessageStore* const _store = 0, u_int64_t stagingThreshold = 0); + ~Channel(); + bool isOpen() const { return opened; } + void open() { opened = true; } + u_int16_t getId() const { return id; } + void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } + Queue::shared_ptr getDefaultQueue() const { return defaultQueue; } + u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; } + u_int16_t setPrefetchCount(u_int16_t n){ return prefetchCount = n; } + + bool exists(const string& consumerTag); + void consume(string& tag, Queue::shared_ptr queue, bool acks, + bool exclusive, ConnectionToken* const connection = 0, + const qpid::framing::FieldTable* = 0); + void cancel(const string& tag); + bool get(Queue::shared_ptr queue, bool ackExpected); + void begin(); + void close(); + void commit(); + void rollback(); + void ack(u_int64_t deliveryTag, bool multiple); + void recover(bool requeue); + void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag); + void handlePublish(Message* msg, Exchange::shared_ptr exchange); + void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr); + void handleContent(qpid::framing::AMQContentBody::shared_ptr); + void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr); +}; + +struct InvalidAckException{}; - struct InvalidAckException{}; - } -} +}} // namespace qpid::broker #endif /*!_broker_BrokerChannel_h*/ diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index c220fb9092..3db5bcd074 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -23,10 +23,6 @@ #include "Connection.h" -// TODO aconway 2007-01-16: move to channel. -#include "Requester.h" -#include "Responder.h" - using namespace boost; using namespace qpid::sys; using namespace qpid::framing; @@ -36,9 +32,6 @@ namespace qpid { namespace broker { Connection::Connection(SessionContext* context_, Broker& broker_) : - adapter(*this), - requester(broker.getRequester()), - responder(broker.getResponder()), context(context_), framemax(65536), heartbeat(0), @@ -65,89 +58,15 @@ Exchange::shared_ptr Connection::findExchange(const string& name){ return broker.getExchanges().get(name); } -void Connection::handleMethod( - u_int16_t channel, qpid::framing::AMQBody::shared_ptr body) -{ - AMQMethodBody::shared_ptr method = - shared_polymorphic_cast<AMQMethodBody, AMQBody>(body); - try{ - method->invoke(adapter, channel); - }catch(ChannelException& e){ - closeChannel(channel); - client->getChannel().close( - channel, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - }catch(ConnectionException& e){ - client->getConnection().close( - 0, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - client->getConnection().close( - 0, 541/*internal error*/, e.what(), - method->amqpClassId(), method->amqpMethodId()); - } -} void Connection::received(qpid::framing::AMQFrame* frame){ - u_int16_t channel = frame->getChannel(); - AMQBody::shared_ptr body = frame->getBody(); - switch(body->type()) - { - case REQUEST_BODY: - responder.received(AMQRequestBody::getData(body)); - handleMethod(channel, body); - break; - case RESPONSE_BODY: - // Must process responses before marking them received. - handleMethod(channel, body); - requester.processed(AMQResponseBody::getData(body)); - break; - // TODO aconway 2007-01-15: Leftover from 0-8 support, remove. - case METHOD_BODY: - handleMethod(channel, body); - break; - case HEADER_BODY: - handleHeader( - channel, shared_polymorphic_cast<AMQHeaderBody>(body)); - break; - - case CONTENT_BODY: - handleContent( - channel, shared_polymorphic_cast<AMQContentBody>(body)); - break; - - case HEARTBEAT_BODY: - assert(channel == 0); - handleHeartbeat( - shared_polymorphic_cast<AMQHeartbeatBody>(body)); - break; - } -} - -/** - * An OutputHandler that does request/response procssing before - * delgating to another OutputHandler. - */ -Connection::Sender::Sender( - OutputHandler& oh, Requester& req, Responder& resp) - : out(oh), requester(req), responder(resp) -{} - -void Connection::Sender::send(AMQFrame* frame) { - AMQBody::shared_ptr body = frame->getBody(); - u_int16_t type = body->type(); - if (type == REQUEST_BODY) - requester.sending(AMQRequestBody::getData(body)); - else if (type == RESPONSE_BODY) - responder.sending(AMQResponseBody::getData(body)); - out.send(frame); + getAdapter(frame->getChannel()).handleBody(frame->getBody()); } void Connection::initiated(qpid::framing::ProtocolInitiation* header) { if (client.get()) - // TODO aconway 2007-01-16: correct code. + // TODO aconway 2007-01-16: correct error code. throw ConnectionException(0, "Connection initiated twice"); - client.reset(new qpid::framing::AMQP_ClientProxy( context, header->getMajor(), header->getMinor())); FieldTable properties; @@ -159,7 +78,6 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) { mechanisms, locales); } - void Connection::idleOut(){} void Connection::idleIn(){} @@ -177,42 +95,29 @@ void Connection::closed(){ } } -// TODO aconway 2007-01-16: colapse these. -void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel).handleHeader(body); -} - -void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel).handleContent(body); -} - -void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - std::cout << "Connection::handleHeartbeat()" << std::endl; +void Connection::closeChannel(u_int16_t channel) { + getChannel(channel).close(); + adapters.erase(adapters.find(channel)); } -void Connection::openChannel(u_int16_t channel) { - if (channel == 0) - throw ConnectionException(504, "Illegal channel 0"); - if (channels.find(channel) != channels.end()) - throw ConnectionException(504, "Channel already open: " + channel); - channels.insert( - channel, - new Channel( - client->getProtocolVersion(), context, channel, framemax, - broker.getQueues().getStore(), settings.stagingThreshold)); -} -void Connection::closeChannel(u_int16_t channel) { - getChannel(channel).close(); // throws if channel does not exist. - channels.erase(channels.find(channel)); +BrokerAdapter& Connection::getAdapter(u_int16_t id) { + AdapterMap::iterator i = adapters.find(id); + if (i == adapters.end()) { + Channel* ch=new Channel( + client->getProtocolVersion(), context, id, + framemax, broker.getQueues().getStore(), + settings.stagingThreshold); + BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker); + adapters.insert(id, adapter); + return *adapter; + } + else + return *i; } - -Channel& Connection::getChannel(u_int16_t channel){ - ChannelMap::iterator i = channels.find(channel); - if(i == channels.end()) - throw ConnectionException(504, "Unknown channel: " + channel); - return *i; +Channel& Connection::getChannel(u_int16_t id) { + return getAdapter(id).getChannel(); } diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h index 1ad3353da8..90346b7c9d 100644 --- a/cpp/lib/broker/Connection.h +++ b/cpp/lib/broker/Connection.h @@ -33,8 +33,8 @@ #include <sys/ConnectionInputHandler.h> #include <sys/TimeoutHandler.h> #include "Broker.h" -#include "BrokerAdapter.h" #include "Exception.h" +#include "BrokerAdapter.h" namespace qpid { namespace broker { @@ -50,38 +50,15 @@ class Settings { class Connection : public qpid::sys::ConnectionInputHandler, public ConnectionToken { - typedef boost::ptr_map<u_int16_t, Channel> ChannelMap; - - // TODO aconway 2007-01-16: belongs on broker. - typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - - class Sender : public qpid::framing::OutputHandler { - public: - Sender(qpid::framing::OutputHandler&, - qpid::framing::Requester&, qpid::framing::Responder&); - void send(qpid::framing::AMQFrame* frame); - private: - OutputHandler& out; - qpid::framing::Requester& requester; - qpid::framing::Responder& responder; - }; - - BrokerAdapter adapter; - // FIXME aconway 2007-01-16: On Channel - qpid::framing::Requester& requester; - qpid::framing::Responder& responder; - ChannelMap channels; - - void handleHeader(u_int16_t channel, - qpid::framing::AMQHeaderBody::shared_ptr body); - void handleContent(u_int16_t channel, - qpid::framing::AMQContentBody::shared_ptr body); - void handleMethod(u_int16_t channel, - qpid::framing::AMQBody::shared_ptr body); - void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap; // FIXME aconway 2007-01-16: on broker. + typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; Exchange::shared_ptr findExchange(const string& name); + + BrokerAdapter& getAdapter(u_int16_t id); + + AdapterMap adapters; public: Connection(qpid::sys::SessionContext* context, Broker& broker); @@ -111,10 +88,9 @@ class Connection : public qpid::sys::ConnectionInputHandler, */ Queue::shared_ptr getQueue(const string& name, u_int16_t channel); - - void openChannel(u_int16_t channel); - void closeChannel(u_int16_t channel); + Channel& newChannel(u_int16_t channel); Channel& getChannel(u_int16_t channel); + void closeChannel(u_int16_t channel); }; }} diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 10a0b50aad..dd1e372095 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -253,4 +253,5 @@ void Connection::shutdown(){ for(iterator i = channels.begin(); i != channels.end(); i++){ i->second->stop(); } + responses.signalResponse(AMQMethodBody::shared_ptr()); } diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp index ac8b4a9ced..68c7e52013 100644 --- a/cpp/lib/client/ResponseHandler.cpp +++ b/cpp/lib/client/ResponseHandler.cpp @@ -29,28 +29,28 @@ qpid::client::ResponseHandler::ResponseHandler() : waiting(false){} qpid::client::ResponseHandler::~ResponseHandler(){} bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ - return expected.match(response.get()); + return response != 0 && expected.match(response.get()); } void qpid::client::ResponseHandler::waitForResponse(){ Monitor::ScopedLock l(monitor); - if(waiting){ + while (waiting) monitor.wait(); - } } -void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){ - response = _response; +void qpid::client::ResponseHandler::signalResponse( + qpid::framing::AMQMethodBody::shared_ptr _response) +{ Monitor::ScopedLock l(monitor); + response = _response; waiting = false; monitor.notify(); } void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ Monitor::ScopedLock l(monitor); - if(waiting){ + while (waiting) monitor.wait(); - } if(!validate(expected)){ THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); } diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp index 1f761bf3f1..72ba82468f 100644 --- a/cpp/lib/common/framing/BodyHandler.cpp +++ b/cpp/lib/common/framing/BodyHandler.cpp @@ -18,38 +18,62 @@ * under the License. * */ -#include <boost/shared_ptr.hpp> -#include <BodyHandler.h> +#include "QpidError.h" +#include "BodyHandler.h" +#include <AMQRequestBody.h> +#include <AMQResponseBody.h> +#include <AMQMethodBody.h> +#include <AMQHeaderBody.h> +#include <AMQContentBody.h> +#include <AMQHeartbeatBody.h> using namespace qpid::framing; using namespace boost; BodyHandler::~BodyHandler() {} -void BodyHandler::handleBody(const AMQBody::shared_ptr& body){ - +void BodyHandler::handleBody(shared_ptr<AMQBody> body) { switch(body->type()) { - case METHOD_BODY: case REQUEST_BODY: + handleRequest(shared_polymorphic_cast<AMQRequestBody>(body)); + break; case RESPONSE_BODY: - handleMethod(dynamic_pointer_cast<AMQMethodBody, AMQBody>(body)); + handleResponse(shared_polymorphic_cast<AMQResponseBody>(body)); + break; + case METHOD_BODY: + handleMethod(shared_polymorphic_cast<AMQMethodBody>(body)); break; - case HEADER_BODY: - handleHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); + handleHeader(shared_polymorphic_cast<AMQHeaderBody>(body)); break; - case CONTENT_BODY: - handleContent(dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); + handleContent(shared_polymorphic_cast<AMQContentBody>(body)); break; - case HEARTBEAT_BODY: - handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); + handleHeartbeat(shared_polymorphic_cast<AMQHeartbeatBody>(body)); break; - default: - throw UnknownBodyType(body->type()); + QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type()); } +} + +void BodyHandler::handleRequest(AMQRequestBody::shared_ptr request) { + responder.received(request->getData()); + handleMethod(request); +} + +void BodyHandler::handleResponse(AMQResponseBody::shared_ptr response) { + handleMethod(response); + requester.processed(response->getData()); +} + +void BodyHandler::assertChannelZero(u_int16_t id) { + if (id != 0) + throw ConnectionException(504, "Invalid channel id, not 0"); +} +void BodyHandler::assertChannelNonZero(u_int16_t id) { + if (id == 0) + throw ConnectionException(504, "Invalid channel id 0"); } diff --git a/cpp/lib/common/framing/BodyHandler.h b/cpp/lib/common/framing/BodyHandler.h index 0260a35e88..c9c74e2b3f 100644 --- a/cpp/lib/common/framing/BodyHandler.h +++ b/cpp/lib/common/framing/BodyHandler.h @@ -1,3 +1,6 @@ +#ifndef _BodyHandler_ +#define _BodyHandler_ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,37 +21,55 @@ * under the License. * */ -#include <string> -#ifndef _BodyHandler_ -#define _BodyHandler_ +#include <boost/shared_ptr.hpp> -#include <AMQMethodBody.h> -#include <AMQHeaderBody.h> -#include <AMQContentBody.h> -#include <AMQHeartbeatBody.h> +#include "Requester.h" +#include "Responder.h" namespace qpid { namespace framing { - class BodyHandler{ - public: - virtual ~BodyHandler(); - virtual void handleMethod(AMQMethodBody::shared_ptr body) = 0; - virtual void handleHeader(AMQHeaderBody::shared_ptr body) = 0; - virtual void handleContent(AMQContentBody::shared_ptr body) = 0; - virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body) = 0; - - void handleBody(const AMQBody::shared_ptr& body); - }; - - class UnknownBodyType{ - public: - const u_int16_t type; - inline UnknownBodyType(u_int16_t _type) : type(_type){} - }; -} -} +class AMQRequestBody; +class AMQResponseBody; +class AMQMethodBody; +class AMQHeaderBody; +class AMQContentBody; +class AMQHeartbeatBody; + +/** + * Base class for client and broker channel handlers. + * + * Handles request/response id management common to client and broker. + * Derived classes provide remaining client/broker specific handling. + */ +class BodyHandler { + public: + virtual ~BodyHandler(); + + void handleBody(boost::shared_ptr<AMQBody> body); + + protected: + virtual void handleRequest(boost::shared_ptr<AMQRequestBody>); + virtual void handleResponse(boost::shared_ptr<AMQResponseBody>); + + virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0; + virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0; + virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0; + virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0; + + protected: + /** Throw protocol exception if this is not channel 0. */ + static void assertChannelZero(u_int16_t id); + /** Throw protocol exception if this is channel 0. */ + static void assertChannelNonZero(u_int16_t id); + + private: + Requester requester; + Responder responder; +}; + +}} #endif diff --git a/cpp/tests/BodyHandlerTest.cpp b/cpp/tests/BodyHandlerTest.cpp deleted file mode 100644 index bf60dd21e3..0000000000 --- a/cpp/tests/BodyHandlerTest.cpp +++ /dev/null @@ -1,110 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <AMQP_HighestVersion.h> -#include <amqp_framing.h> -#include <qpid_test_plugin.h> -using namespace qpid::framing; - -class BodyHandlerTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(BodyHandlerTest); - CPPUNIT_TEST(testMethod); - CPPUNIT_TEST(testHeader); - CPPUNIT_TEST(testContent); - CPPUNIT_TEST(testHeartbeat); - CPPUNIT_TEST_SUITE_END(); -private: - - class TestBodyHandler : public BodyHandler{ - AMQMethodBody* const method; - AMQHeaderBody* const header; - AMQContentBody* const content; - AMQHeartbeatBody* const heartbeat; - - public: - - TestBodyHandler(AMQMethodBody* _method) : method(_method), header(0), content(0), heartbeat(0){} - TestBodyHandler(AMQHeaderBody* _header) : method(0), header(_header), content(0), heartbeat(0){} - TestBodyHandler(AMQContentBody* _content) : method(0), header(0), content(_content), heartbeat(0){} - TestBodyHandler(AMQHeartbeatBody* _heartbeat) : method(0), header(0), content(0), heartbeat(_heartbeat){} - - virtual void handleMethod(AMQMethodBody::shared_ptr body){ - CPPUNIT_ASSERT(method); - CPPUNIT_ASSERT_EQUAL(method, body.get()); - } - virtual void handleHeader(AMQHeaderBody::shared_ptr body){ - CPPUNIT_ASSERT(header); - CPPUNIT_ASSERT_EQUAL(header, body.get()); - } - virtual void handleContent(AMQContentBody::shared_ptr body){ - CPPUNIT_ASSERT(content); - CPPUNIT_ASSERT_EQUAL(content, body.get()); - } - virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ - CPPUNIT_ASSERT(heartbeat); - CPPUNIT_ASSERT_EQUAL(heartbeat, body.get()); - } - }; - ProtocolVersion v; - -public: - - BodyHandlerTest() : v(8, 0) {} - - void testMethod() - { - AMQMethodBody* method = new QueueDeclareBody(v); - AMQFrame frame(highestProtocolVersion, 0, method); - TestBodyHandler handler(method); - handler.handleBody(frame.getBody()); - } - - void testHeader() - { - AMQHeaderBody* header = new AMQHeaderBody(); - AMQFrame frame(highestProtocolVersion, 0, header); - TestBodyHandler handler(header); - handler.handleBody(frame.getBody()); - } - - void testContent() - { - AMQContentBody* content = new AMQContentBody(); - AMQFrame frame(highestProtocolVersion, 0, content); - TestBodyHandler handler(content); - handler.handleBody(frame.getBody()); - } - - void testHeartbeat() - { - AMQHeartbeatBody* heartbeat = new AMQHeartbeatBody(); - AMQFrame frame(highestProtocolVersion, 0, heartbeat); - TestBodyHandler handler(heartbeat); - handler.handleBody(frame.getBody()); - } -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(BodyHandlerTest); - diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index f0860b8a28..e7a02d631d 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -135,6 +135,7 @@ class ChannelTest : public CppUnit::TestCase void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); Channel channel(qpid::framing::highestProtocolVersion, 0, 0, 0); + channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); ConnectionToken* owner = 0; diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am index d08249e759..774ecafa6e 100644 --- a/cpp/tests/Makefile.am +++ b/cpp/tests/Makefile.am @@ -41,7 +41,6 @@ broker_tests = \ ValueTest framing_tests = \ - BodyHandlerTest \ FieldTableTest \ FramingTest \ HeaderTest diff --git a/cpp/tests/start_broker b/cpp/tests/start_broker index 6f67e7d8c5..cc95083d85 100755 --- a/cpp/tests/start_broker +++ b/cpp/tests/start_broker @@ -8,3 +8,7 @@ rm -rf $LOG $PID # Start the daemon, recording its PID. ../src/qpidd > $LOG 2>&1 & echo $! > $PID + +# FIXME aconway 2007-01-18: qpidd should not return till it is accepting +# connections, remove arbitrary sleep. +sleep 1 |
