diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-02 22:03:10 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-02 22:03:10 +0000 |
| commit | b5c270f10496f522ef6a03a8fa60f85d55c9187d (patch) | |
| tree | 714e7abf7ba591d00232d821440e51461175cb9e /cpp | |
| parent | 750f272ac99e8c830807affb3ae68ab0beeca63f (diff) | |
| download | qpid-python-b5c270f10496f522ef6a03a8fa60f85d55c9187d.tar.gz | |
* cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to
ChannelAdapter and Method Body. Request ID comes from body,
ChannelAdapter is used to send frames, not OutputHandler.
* cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member.
Context is per-method not per-channel.
* cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId
with MethodContext (for responses) or ChannelAdapter (for requests.)
Use context request-ID to construct responses, send all bodies via
ChannelAdapter.
* cpp/lib/broker/BrokerAdapter.cpp: Link broker::Channel to BrokerAdapter.
* cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters.
Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion&
* Cosmetic changes, many files:
- fixed indentation, broke long lines.
- removed unnecessary qpid:: prefixes.
* broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into
broker::channel.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502767 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
42 files changed, 976 insertions, 968 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index 18254484bc..f650452e33 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -54,7 +54,7 @@ Broker::Broker(const Configuration& conf) : factory(*this) { if (config.getStore().empty()) - store.reset(new NullMessageStore()); + store.reset(new NullMessageStore(config.isTrace())); else store.reset(new MessageStoreModule(config.getStore())); diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 73ece8b264..10e386ff41 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -28,168 +28,20 @@ namespace broker { using namespace qpid; using namespace qpid::framing; -typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; +typedef std::vector<Queue::shared_ptr> QueueVector; -class BrokerAdapter::ServerOps : public AMQP_ServerOperations -{ - public: - ServerOps(Channel& ch, Connection& c, Broker& b) : - basicHandler(ch, c, b), - channelHandler(ch, c, b), - connectionHandler(ch, c, b), - exchangeHandler(ch, c, b), - messageHandler(ch, c, b), - queueHandler(ch, c, b), - txHandler(ch, c, b) - {} - - ChannelHandler* getChannelHandler() { return &channelHandler; } - ConnectionHandler* getConnectionHandler() { return &connectionHandler; } - BasicHandler* getBasicHandler() { return &basicHandler; } - ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } - QueueHandler* getQueueHandler() { return &queueHandler; } - TxHandler* getTxHandler() { return &txHandler; } - MessageHandler* getMessageHandler() { return &messageHandler; } - AccessHandler* getAccessHandler() { - throw ConnectionException(540, "Access class not implemented"); } - FileHandler* getFileHandler() { - throw ConnectionException(540, "File class not implemented"); } - StreamHandler* getStreamHandler() { - throw ConnectionException(540, "Stream class not implemented"); } - DtxHandler* getDtxHandler() { - throw ConnectionException(540, "Dtx class not implemented"); } - TunnelHandler* getTunnelHandler() { - throw ConnectionException(540, "Tunnel class not implemented"); } - - private: - struct CoreRefs { - CoreRefs(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b) {} - - Channel& channel; - Connection& connection; - Broker& broker; - }; - - class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler { - public: - ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - - void startOk(const MethodContext& context, - const qpid::framing::FieldTable& clientProperties, - const std::string& mechanism, const std::string& response, - const std::string& locale); - void secureOk(const MethodContext& context, const std::string& response); - void tuneOk(const MethodContext& context, u_int16_t channelMax, - u_int32_t frameMax, u_int16_t heartbeat); - void open(const MethodContext& context, const std::string& virtualHost, - const std::string& capabilities, bool insist); - void close(const MethodContext& context, u_int16_t replyCode, - const std::string& replyText, - u_int16_t classId, u_int16_t methodId); - void closeOk(const MethodContext& context); - }; - - class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{ - public: - ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void open(const MethodContext& context, const std::string& outOfBand); - void flow(const MethodContext& context, bool active); - void flowOk(const MethodContext& context, bool active); - void ok( const MethodContext& context ); - void ping( const MethodContext& context ); - void pong( const MethodContext& context ); - void resume( const MethodContext& context, const std::string& channelId ); - void close(const MethodContext& context, u_int16_t replyCode, const - std::string& replyText, u_int16_t classId, u_int16_t methodId); - void closeOk(const MethodContext& context); - }; - - class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{ - public: - ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void declare(const MethodContext& context, u_int16_t ticket, - const std::string& exchange, const std::string& type, - bool passive, bool durable, bool autoDelete, - bool internal, bool nowait, - const qpid::framing::FieldTable& arguments); - void delete_(const MethodContext& context, u_int16_t ticket, - const std::string& exchange, bool ifUnused, bool nowait); - }; - - class QueueHandlerImpl : private CoreRefs, public QueueHandler{ - public: - QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void declare(const MethodContext& context, u_int16_t ticket, const std::string& queue, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, - const qpid::framing::FieldTable& arguments); - void bind(const MethodContext& context, u_int16_t ticket, const std::string& queue, - const std::string& exchange, const std::string& routingKey, - bool nowait, const qpid::framing::FieldTable& arguments); - void unbind(const MethodContext& context, - u_int16_t ticket, - const std::string& queue, - const std::string& exchange, - const std::string& routingKey, - const qpid::framing::FieldTable& arguments ); - void purge(const MethodContext& context, u_int16_t ticket, const std::string& queue, - bool nowait); - void delete_(const MethodContext& context, u_int16_t ticket, const std::string& queue, - bool ifUnused, bool ifEmpty, - bool nowait); - }; - - class BasicHandlerImpl : private CoreRefs, public BasicHandler{ - public: - BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void qos(const MethodContext& context, u_int32_t prefetchSize, - u_int16_t prefetchCount, bool global); - void consume( - const MethodContext& context, u_int16_t ticket, const std::string& queue, - const std::string& consumerTag, bool noLocal, bool noAck, - bool exclusive, bool nowait, - const qpid::framing::FieldTable& fields); - void cancel(const MethodContext& context, const std::string& consumerTag, - bool nowait); - void publish(const MethodContext& context, u_int16_t ticket, - const std::string& exchange, const std::string& routingKey, - bool mandatory, bool immediate); - void get(const MethodContext& context, u_int16_t ticket, const std::string& queue, - bool noAck); - void ack(const MethodContext& context, u_int64_t deliveryTag, bool multiple); - void reject(const MethodContext& context, u_int64_t deliveryTag, bool requeue); - void recover(const MethodContext& context, bool requeue); - }; - - class TxHandlerImpl : private CoreRefs, public TxHandler{ - public: - TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void select(const MethodContext& context); - void commit(const MethodContext& context); - void rollback(const MethodContext& context); - }; - - BasicHandlerImpl basicHandler; - ChannelHandlerImpl channelHandler; - ConnectionHandlerImpl connectionHandler; - ExchangeHandlerImpl exchangeHandler; - MessageHandlerImpl messageHandler; - QueueHandlerImpl queueHandler; - TxHandlerImpl txHandler; - -}; - -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk( - const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/, +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk( + const MethodContext& context , const FieldTable& /*clientProperties*/, + const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/){ connection.client->getConnection().tune( context, 100, connection.getFrameMax(), connection.getHeartbeat()); } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){} +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk( + const MethodContext&, const string& /*response*/){} -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk( +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk( const MethodContext&, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat) { @@ -197,12 +49,12 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk( connection.setHeartbeat(heartbeat); } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; connection.client->getConnection().openOk(context, knownhosts); } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close( +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { @@ -210,21 +62,21 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close( connection.getOutput().close(); } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){ +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ connection.getOutput().close(); } -void BrokerAdapter::ServerOps::ChannelHandlerImpl::open( +void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open( const MethodContext& context, const string& /*outOfBand*/){ channel.open(); // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 connection.client->getChannel().openOk(context, std::string()/* ID */); } -void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} -void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} +void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} +void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} -void BrokerAdapter::ServerOps::ChannelHandlerImpl::close( +void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) @@ -234,13 +86,13 @@ void BrokerAdapter::ServerOps::ChannelHandlerImpl::close( connection.closeChannel(channel.getId()); } -void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&){} +void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ +void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ if(passive){ if(!broker.getExchanges().get(exchange)) { @@ -265,17 +117,17 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& } } -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ +void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused broker.getExchanges().destroy(exchange); if(!nowait) connection.client->getExchange().deleteOk(context); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ +void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = connection.getQueue(name, channel.getId()); @@ -308,9 +160,9 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& co } } -void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ +void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); @@ -325,7 +177,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& conte } void -BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( +BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, @@ -344,15 +196,15 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( connection.client->getQueue().unbindOk(context); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); int count = queue->purge(); if(!nowait) connection.client->getQueue().purgeOk(context, count); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ +void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); @@ -363,7 +215,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& co }else{ //remove the queue from the list of exclusive queues if necessary if(q->isExclusiveOwner(&connection)){ - queue_iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); + QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i); } count = q->getMessageCount(); @@ -377,14 +229,14 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& co -void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); connection.client->getBasic().qosOk(context); } -void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, @@ -412,19 +264,23 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( } -void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ channel.cancel(consumerTag); if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag); } -void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/, - const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( + const MethodContext& context, u_int16_t /*ticket*/, + const string& exchangeName, const string& routingKey, + bool mandatory, bool immediate) +{ Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate); + BasicMessage* msg = new BasicMessage( + &connection, exchangeName, routingKey, mandatory, immediate, + context.methodBody); channel.handlePublish(msg, exchange); }else{ throw ChannelException( @@ -432,7 +288,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!connection.getChannel(channel.getId()).get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack @@ -441,7 +297,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& contex } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ try{ channel.ack(deliveryTag, multiple); }catch(InvalidAckException& e){ @@ -449,23 +305,23 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ channel.recover(requeue); } -void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){ +void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ channel.begin(); connection.client->getTx().selectOk(context); } -void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){ +void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ channel.commit(); connection.client->getTx().commitOk(context); } -void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){ +void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ channel.rollback(); connection.client->getTx().rollbackOk(context); @@ -473,82 +329,32 @@ void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& cont } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& ) +BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) { //no specific action required, generic response handling should be sufficient } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context) +BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) { connection.client->getChannel().ok(context); connection.client->getChannel().pong(context); } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context) +BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) { connection.client->getChannel().ok(context); } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::resume( +BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume( const MethodContext&, const string& /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } -BrokerAdapter::BrokerAdapter( - std::auto_ptr<Channel> ch, Connection& c, Broker& b -) : - channel(ch), - connection(c), - broker(b), - serverOps(new ServerOps(*channel,c,b)) -{ - init(channel->getId(), c.getOutput(), channel->getVersion()); -} - -void BrokerAdapter::handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context -) -{ - try{ - method->invoke(*serverOps, context); - }catch(ChannelException& e){ - connection.client->getChannel().close( - context, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(ConnectionException& e){ - connection.client->getConnection().close( - context, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.client->getConnection().close( - context, 541/*internal error*/, e.what(), - method->amqpClassId(), method->amqpMethodId()); - } -} - -void BrokerAdapter::handleHeader(AMQHeaderBody::shared_ptr body) { - channel->handleHeader(body); -} - -void BrokerAdapter::handleContent(AMQContentBody::shared_ptr body) { - channel->handleContent(body); -} - -void BrokerAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) { - // TODO aconway 2007-01-17: Implement heartbeats. -} - - -bool BrokerAdapter::isOpen() const { - return channel->isOpen(); -} }} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h index 72c09be849..cd34f17e58 100644 --- a/cpp/lib/broker/BrokerAdapter.h +++ b/cpp/lib/broker/BrokerAdapter.h @@ -18,18 +18,14 @@ * limitations under the License. * */ -#include <memory.h> - #include "AMQP_ServerOperations.h" -#include "BodyHandler.h" +#include "MessageHandlerImpl.h" #include "BrokerChannel.h" -#include "amqp_types.h" -#include "framing/ChannelAdapter.h" namespace qpid { namespace broker { -class AMQMethodBody; +class Channel; class Connection; class Broker; @@ -38,35 +34,173 @@ class Broker; * * Translates protocol bodies into calls on the core Channel, * Connection and Broker objects. - * - * Owns a channel, has references to Connection and Broker. */ -class BrokerAdapter : public framing::ChannelAdapter + +class ChannelHandler; +class ConnectionHandler; +class BasicHandler; +class ExchangeHandler; +class QueueHandler; +class TxHandler; +class MessageHandler; +class AccessHandler; +class FileHandler; +class StreamHandler; +class DtxHandler; +class TunnelHandler; + +class BrokerAdapter : public framing::AMQP_ServerOperations { public: - BrokerAdapter(std::auto_ptr<Channel> ch, Connection&, Broker&); - Channel& getChannel() { return *channel; } + BrokerAdapter(Channel& ch, Connection& c, Broker& b) : + basicHandler(ch, c, b), + channelHandler(ch, c, b), + connectionHandler(ch, c, b), + exchangeHandler(ch, c, b), + messageHandler(ch, c, b), + queueHandler(ch, c, b), + txHandler(ch, c, b) + {} + + ChannelHandler* getChannelHandler() { return &channelHandler; } + ConnectionHandler* getConnectionHandler() { return &connectionHandler; } + BasicHandler* getBasicHandler() { return &basicHandler; } + ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } + QueueHandler* getQueueHandler() { return &queueHandler; } + TxHandler* getTxHandler() { return &txHandler; } + MessageHandler* getMessageHandler() { return &messageHandler; } + AccessHandler* getAccessHandler() { + throw ConnectionException(540, "Access class not implemented"); } + FileHandler* getFileHandler() { + throw ConnectionException(540, "File class not implemented"); } + StreamHandler* getStreamHandler() { + throw ConnectionException(540, "Stream class not implemented"); } + DtxHandler* getDtxHandler() { + throw ConnectionException(540, "Dtx class not implemented"); } + TunnelHandler* getTunnelHandler() { + throw ConnectionException(540, "Tunnel class not implemented"); } - void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>); - void handleContent(boost::shared_ptr<framing::AMQContentBody>); - void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); + private: + struct CoreRefs { + CoreRefs(Channel& ch, Connection& c, Broker& b) + : channel(ch), connection(c), broker(b) {} - bool isOpen() const; + Channel& channel; + Connection& connection; + Broker& broker; + }; - private: - void handleMethodInContext( - boost::shared_ptr<framing::AMQMethodBody> method, - const framing::MethodContext& context); + class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler { + public: + ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + + void startOk(const framing::MethodContext& context, + const qpid::framing::FieldTable& clientProperties, + const std::string& mechanism, const std::string& response, + const std::string& locale); + void secureOk(const framing::MethodContext& context, + const std::string& response); + void tuneOk(const framing::MethodContext& context, + u_int16_t channelMax, + u_int32_t frameMax, u_int16_t heartbeat); + void open(const framing::MethodContext& context, + const std::string& virtualHost, + const std::string& capabilities, bool insist); + void close(const framing::MethodContext& context, u_int16_t replyCode, + const std::string& replyText, + u_int16_t classId, u_int16_t methodId); + void closeOk(const framing::MethodContext& context); + }; + + class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{ + public: + ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + void open(const framing::MethodContext& context, const std::string& outOfBand); + void flow(const framing::MethodContext& context, bool active); + void flowOk(const framing::MethodContext& context, bool active); + void ok( const framing::MethodContext& context ); + void ping( const framing::MethodContext& context ); + void pong( const framing::MethodContext& context ); + void resume( const framing::MethodContext& context, const std::string& channelId ); + void close(const framing::MethodContext& context, u_int16_t replyCode, const + std::string& replyText, u_int16_t classId, u_int16_t methodId); + void closeOk(const framing::MethodContext& context); + }; - class ServerOps; + class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{ + public: + ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + void declare(const framing::MethodContext& context, u_int16_t ticket, + const std::string& exchange, const std::string& type, + bool passive, bool durable, bool autoDelete, + bool internal, bool nowait, + const qpid::framing::FieldTable& arguments); + void delete_(const framing::MethodContext& context, u_int16_t ticket, + const std::string& exchange, bool ifUnused, bool nowait); + }; - std::auto_ptr<Channel> channel; - Connection& connection; - Broker& broker; - boost::shared_ptr<ServerOps> serverOps; -}; - + class QueueHandlerImpl : private CoreRefs, public QueueHandler{ + public: + QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, + const qpid::framing::FieldTable& arguments); + void bind(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, + const std::string& exchange, const std::string& routingKey, + bool nowait, const qpid::framing::FieldTable& arguments); + void unbind(const framing::MethodContext& context, + u_int16_t ticket, + const std::string& queue, + const std::string& exchange, + const std::string& routingKey, + const qpid::framing::FieldTable& arguments ); + void purge(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, + bool nowait); + void delete_(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, + bool ifUnused, bool ifEmpty, + bool nowait); + }; + + class BasicHandlerImpl : private CoreRefs, public BasicHandler{ + public: + BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + void qos(const framing::MethodContext& context, u_int32_t prefetchSize, + u_int16_t prefetchCount, bool global); + void consume( + const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, + const std::string& consumerTag, bool noLocal, bool noAck, + bool exclusive, bool nowait, + const qpid::framing::FieldTable& fields); + void cancel(const framing::MethodContext& context, const std::string& consumerTag, + bool nowait); + void publish(const framing::MethodContext& context, u_int16_t ticket, + const std::string& exchange, const std::string& routingKey, + bool mandatory, bool immediate); + void get(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, + bool noAck); + void ack(const framing::MethodContext& context, u_int64_t deliveryTag, bool multiple); + void reject(const framing::MethodContext& context, u_int64_t deliveryTag, bool requeue); + void recover(const framing::MethodContext& context, bool requeue); + }; + class TxHandlerImpl : private CoreRefs, public TxHandler{ + public: + TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + void select(const framing::MethodContext& context); + void commit(const framing::MethodContext& context); + void rollback(const framing::MethodContext& context); + }; + + BasicHandlerImpl basicHandler; + ChannelHandlerImpl channelHandler; + ConnectionHandlerImpl connectionHandler; + ExchangeHandlerImpl exchangeHandler; + MessageHandlerImpl messageHandler; + QueueHandlerImpl queueHandler; + TxHandlerImpl txHandler; + +}; }} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index f0b6ff6d07..96215a60ed 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -18,12 +18,25 @@ * under the License. * */ +#include <assert.h> + #include <iostream> #include <sstream> -#include <assert.h> +#include <algorithm> +#include <functional> -#include <BrokerChannel.h> +#include "BrokerChannel.h" +#include "DeletingTxOp.h" +#include "framing/ChannelAdapter.h" #include <QpidError.h> +#include <DeliverableMessage.h> +#include <BrokerQueue.h> +#include <BrokerMessage.h> +#include <MessageStore.h> +#include <TxAck.h> +#include <TxPublish.h> +#include "BrokerAdapter.h" +#include "Connection.h" using std::mem_fun_ref; using std::bind2nd; @@ -33,12 +46,12 @@ using namespace qpid::sys; Channel::Channel( - const ProtocolVersion& _version, OutputHandler* _out, int _id, + Connection& con, ChannelId id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold ) : - id(_id), - out(*_out), + ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()), + connection(con), currentDeliveryTag(1), transactional(false), prefetchSize(0), @@ -47,8 +60,8 @@ Channel::Channel( tagGenerator("sgen"), store(_store), messageBuilder(this, _store, _stagingThreshold), - version(_version), - opened(false) + opened(true), + adapter(new BrokerAdapter(*this, con, con.broker)) { outstanding.reset(); } @@ -61,7 +74,10 @@ bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { +void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, + bool exclusive, ConnectionToken* const connection, + const FieldTable*) +{ if(tag.empty()) tag = tagGenerator.generate(); ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ @@ -117,7 +133,10 @@ void Channel::rollback(){ accumulatedAck.clear(); } -void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ +void Channel::deliver( + Message::shared_ptr& msg, const string& consumerTag, + Queue::shared_ptr& queue, bool ackExpected) +{ Mutex::ScopedLock locker(deliveryLock); u_int64_t deliveryTag = currentDeliveryTag++; @@ -127,7 +146,7 @@ void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue outstanding.count++; } //send deliver method, header and content(s) - msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version); + msg->deliver(*this, consumerTag, deliveryTag, framesize); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ @@ -184,7 +203,7 @@ void Channel::handleContent(AMQContentBody::shared_ptr content){ messageBuilder.addContent(content); } -void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr) { +void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) { // TODO aconway 2007-01-17: Implement heartbeating. } @@ -255,7 +274,9 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ if(msg){ Mutex::ScopedLock locker(deliveryLock); u_int64_t myDeliveryTag = currentDeliveryTag++; - msg->sendGetOk(&out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version); + msg->sendGetOk(MethodContext(this, msg->getRespondTo()), + queue->getMessageCount() + 1, myDeliveryTag, + framesize); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -265,7 +286,32 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ } } -void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ - msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version); +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, + u_int64_t deliveryTag) +{ + msg->deliver(*this, consumerTag, deliveryTag, framesize); +} + +void Channel::handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const MethodContext& context +) +{ + try{ + method->invoke(*adapter, context); + }catch(ChannelException& e){ + connection.client->getChannel().close( + context, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + connection.closeChannel(getId()); + }catch(ConnectionException& e){ + connection.client->getConnection().close( + context, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + connection.client->getConnection().close( + context, 541/*internal error*/, e.what(), + method->amqpClassId(), method->amqpMethodId()); + } } diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 4e64d148e8..dd95e944bb 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -22,42 +22,39 @@ * */ -#include <algorithm> -#include <functional> #include <list> #include <map> + +#include <boost/scoped_ptr.hpp> +#include <boost/shared_ptr.hpp> + #include <AccumulatedAck.h> -#include <Binding.h> #include <Consumer.h> -#include <DeletingTxOp.h> -#include <DeliverableMessage.h> #include <DeliveryRecord.h> -#include <BrokerMessage.h> #include <MessageBuilder.h> #include <NameGenerator.h> #include <Prefetch.h> -#include <BrokerQueue.h> -#include <MessageStore.h> -#include <TxAck.h> #include <TxBuffer.h> -#include <TxPublish.h> -#include <sys/Monitor.h> -#include <OutputHandler.h> -#include <AMQContentBody.h> -#include <AMQHeaderBody.h> -#include <AMQHeartbeatBody.h> -#include <BasicPublishBody.h> +#include "framing/ChannelAdapter.h" namespace qpid { namespace broker { -using qpid::framing::string; +class ConnectionToken; +class Connection; +class Queue; +class BrokerAdapter; + +using framing::string; /** * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ -class Channel : private MessageBuilder::CompletionHandler { +class Channel : + public framing::ChannelAdapter, + private MessageBuilder::CompletionHandler +{ class ConsumerImpl : public virtual Consumer { Channel* parent; @@ -67,15 +64,18 @@ class Channel : private MessageBuilder::CompletionHandler { const bool ackExpected; bool blocked; public: - ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); + ConsumerImpl(Channel* parent, const string& tag, + Queue::shared_ptr queue, + ConnectionToken* const connection, bool ack); virtual bool deliver(Message::shared_ptr& msg); void cancel(); void requestDispatch(); }; typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; + + Connection& connection; u_int16_t id; - qpid::framing::OutputHandler& out; u_int64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; bool transactional; @@ -86,30 +86,32 @@ class Channel : private MessageBuilder::CompletionHandler { u_int32_t framesize; NameGenerator tagGenerator; std::list<DeliveryRecord> unacked; - qpid::sys::Mutex deliveryLock; + sys::Mutex deliveryLock; TxBuffer txBuffer; AccumulatedAck accumulatedAck; MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to - qpid::framing::ProtocolVersion version; // version used for this channel bool opened; + boost::scoped_ptr<BrokerAdapter> adapter; + virtual void complete(Message::shared_ptr& msg); void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); void cancel(consumer_iterator consumer); bool checkPrefetch(Message::shared_ptr& msg); public: - Channel( - const qpid::framing::ProtocolVersion& _version, - qpid::framing::OutputHandler* out, int id, u_int32_t framesize, - MessageStore* const _store = 0, u_int64_t stagingThreshold = 0); + Channel(Connection& channel, + framing::ChannelId id, + u_int32_t framesize, + MessageStore* const _store = 0, + u_int64_t stagingThreshold = 0); + ~Channel(); + bool isOpen() const { return opened; } - const framing::ProtocolVersion& getVersion() const { return version; } void open() { opened = true; } - u_int16_t getId() const { return id; } void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } Queue::shared_ptr getDefaultQueue() const { return defaultQueue; } u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; } @@ -118,7 +120,7 @@ class Channel : private MessageBuilder::CompletionHandler { bool exists(const string& consumerTag); void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0, - const qpid::framing::FieldTable* = 0); + const framing::FieldTable* = 0); void cancel(const string& tag); bool get(Queue::shared_ptr queue, bool ackExpected); void begin(); @@ -129,14 +131,18 @@ class Channel : private MessageBuilder::CompletionHandler { void recover(bool requeue); void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag); void handlePublish(Message* msg, Exchange::shared_ptr exchange); - void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr); - void handleContent(qpid::framing::AMQContentBody::shared_ptr); - void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr); + void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>); + void handleContent(boost::shared_ptr<framing::AMQContentBody>); + void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); + void handleMethodInContext( + boost::shared_ptr<framing::AMQMethodBody> method, + const framing::MethodContext& context); + }; struct InvalidAckException{}; -}} // namespace qpid::broker +}} // namespace broker #endif /*!_broker_BrokerChannel_h*/ diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index b738040470..69d4ba087f 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -27,31 +27,35 @@ #include <BasicDeliverBody.h> #include <BasicGetOkBody.h> #include "AMQFrame.h" +#include "framing/ChannelAdapter.h" using namespace boost; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -BasicMessage::BasicMessage(const ConnectionToken* const _publisher, - const string& _exchange, const string& _routingKey, - bool _mandatory, bool _immediate) : - Message(_exchange, _routingKey, _mandatory, _immediate), - publisher(_publisher), - size(0) +BasicMessage::BasicMessage( + const ConnectionToken* const _publisher, + const string& _exchange, const string& _routingKey, + bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo +) : + Message(_exchange, _routingKey, _mandatory, _immediate, respondTo), + publisher(_publisher), + size(0) { } -BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : - publisher(0), size(0) -{ +// FIXME aconway 2007-02-01: remove. +// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : +// publisher(0), size(0) +// { - decode(buffer, headersOnly, contentChunkSize); -} +// decode(buffer, headersOnly, contentChunkSize); +// } +// For tests only. BasicMessage::BasicMessage() : publisher(0), size(0) -{ -} +{} BasicMessage::~BasicMessage(){ if (content.get()) content->destroy(); @@ -73,34 +77,42 @@ bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } -void BasicMessage::deliver(OutputHandler* out, int channel, - const string& consumerTag, u_int64_t deliveryTag, - u_int32_t framesize, - ProtocolVersion* version){ - // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction - out->send(new AMQFrame(*version, channel, - new BasicDeliverBody(*version, consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey()))); - sendContent(out, channel, framesize, version); -} - -void BasicMessage::sendGetOk(OutputHandler* out, - int channel, - u_int32_t messageCount, - u_int64_t deliveryTag, - u_int32_t framesize, - ProtocolVersion* version){ - // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction - out->send(new AMQFrame(*version, channel, - new BasicGetOkBody(*version, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount))); - sendContent(out, channel, framesize, version); -} - -void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ - AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); - out->send(new AMQFrame(*version, channel, headerBody)); - +void BasicMessage::deliver(ChannelAdapter& channel, + const string& consumerTag, u_int64_t deliveryTag, + u_int32_t framesize) +{ + // CCT -- TODO - Update code generator to take pointer/ not + // instance to avoid extra contruction + channel.send( + new BasicDeliverBody( + channel.getVersion(), consumerTag, deliveryTag, + getRedelivered(), getExchange(), getRoutingKey())); + sendContent(channel, framesize); +} + +void BasicMessage::sendGetOk(const MethodContext& context, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize) +{ + // CCT -- TODO - Update code generator to take pointer/ not + // instance to avoid extra contruction + context.channel->send( + new BasicGetOkBody( + context.channel->getVersion(), + context.methodBody->getRequestId(), + deliveryTag, getRedelivered(), getExchange(), + getRoutingKey(), messageCount)); + sendContent(*context.channel, framesize); +} + +void BasicMessage::sendContent( + ChannelAdapter& channel, u_int32_t framesize) +{ + channel.send(header); Mutex::ScopedLock locker(contentLock); - if (content.get()) content->send(*version, out, channel, framesize); + if (content.get()) + content->send(channel, framesize); } BasicHeaderProperties* BasicMessage::getHeaderProperties(){ @@ -126,8 +138,8 @@ void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChu void BasicMessage::decodeHeader(Buffer& buffer) { - string exchange; - string routingKey; + string exchange; + string routingKey; buffer.getShortString(exchange); buffer.getShortString(routingKey); diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 4001af97a5..5ac9c83d0e 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -27,109 +27,111 @@ #include <boost/shared_ptr.hpp> #include <AMQContentBody.h> #include <AMQHeaderBody.h> -#include <ProtocolVersion.h> +#include "AMQMethodBody.h" #include <BasicHeaderProperties.h> #include <ConnectionToken.h> #include <Content.h> -#include <OutputHandler.h> #include <Mutex.h> #include <TxBuffer.h> namespace qpid { - namespace broker { - class MessageStore; - using qpid::framing::string; +namespace framing { +class MethodContext; +class ChannelAdapter; +} + +namespace broker { + +class MessageStore; +using framing::string; - /** - * Represents an AMQP message, i.e. a header body, a list of - * content bodies and some details about the publication - * request. - */ - class BasicMessage : public Message{ - const ConnectionToken* const publisher; - qpid::framing::AMQHeaderBody::shared_ptr header; - std::auto_ptr<Content> content; - qpid::sys::Mutex contentLock; - u_int64_t size; - - void sendContent(qpid::framing::OutputHandler* out, - int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version); - - public: - typedef boost::shared_ptr<BasicMessage> shared_ptr; - - BasicMessage(const ConnectionToken* const publisher, - const string& exchange, const string& routingKey, - bool mandatory, bool immediate); - BasicMessage(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); - BasicMessage(); - ~BasicMessage(); - void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); - void addContent(qpid::framing::AMQContentBody::shared_ptr data); - bool isComplete(); - const ConnectionToken* const getPublisher(); - - void deliver(qpid::framing::OutputHandler* out, - int channel, - const string& consumerTag, - u_int64_t deliveryTag, - u_int32_t framesize, - qpid::framing::ProtocolVersion* version); - void sendGetOk(qpid::framing::OutputHandler* out, - int channel, - u_int32_t messageCount, - u_int64_t deliveryTag, - u_int32_t framesize, - qpid::framing::ProtocolVersion* version); - - qpid::framing::BasicHeaderProperties* getHeaderProperties(); - bool isPersistent(); - u_int64_t contentSize() const { return size; } - - void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); - void decodeHeader(qpid::framing::Buffer& buffer); - void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0); - - void encode(qpid::framing::Buffer& buffer); - void encodeHeader(qpid::framing::Buffer& buffer); - void encodeContent(qpid::framing::Buffer& buffer); - /** - * @returns the size of the buffer needed to encode this - * message in its entirety - */ - u_int32_t encodedSize(); - /** - * @returns the size of the buffer needed to encode the - * 'header' of this message (not just the header frame, - * but other meta data e.g.routing key and exchange) - */ - u_int32_t encodedHeaderSize(); - /** - * @returns the size of the buffer needed to encode the - * (possibly partial) content held by this message - */ - u_int32_t encodedContentSize(); - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - void releaseContent(MessageStore* store); - /** - * If headers have been received, returns the expected - * content size else returns 0. - */ - u_int64_t expectedContentSize(); - /** - * Sets the 'content' implementation of this message (the - * message controls the lifecycle of the content instance - * it uses). - */ - void setContent(std::auto_ptr<Content>& content); - }; - - } +/** + * Represents an AMQP message, i.e. a header body, a list of + * content bodies and some details about the publication + * request. + */ +class BasicMessage : public Message { + const ConnectionToken* const publisher; + framing::AMQHeaderBody::shared_ptr header; + std::auto_ptr<Content> content; + sys::Mutex contentLock; + u_int64_t size; + + void sendContent(framing::ChannelAdapter&, u_int32_t framesize); + + public: + typedef boost::shared_ptr<BasicMessage> shared_ptr; + + BasicMessage(const ConnectionToken* const publisher, + const string& exchange, const string& routingKey, + bool mandatory, bool immediate, + framing::AMQMethodBody::shared_ptr respondTo); + BasicMessage(); + ~BasicMessage(); + void setHeader(framing::AMQHeaderBody::shared_ptr header); + void addContent(framing::AMQContentBody::shared_ptr data); + bool isComplete(); + const ConnectionToken* const getPublisher(); + + void deliver(framing::ChannelAdapter&, + const string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize); + + void sendGetOk(const framing::MethodContext&, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize); + + framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); + u_int64_t contentSize() const { return size; } + + void decode(framing::Buffer& buffer, bool headersOnly = false, + u_int32_t contentChunkSize = 0); + void decodeHeader(framing::Buffer& buffer); + void decodeContent(framing::Buffer& buffer, u_int32_t contentChunkSize = 0); + + void encode(framing::Buffer& buffer); + void encodeHeader(framing::Buffer& buffer); + void encodeContent(framing::Buffer& buffer); + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + */ + u_int32_t encodedSize(); + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + */ + u_int32_t encodedHeaderSize(); + /** + * @returns the size of the buffer needed to encode the + * (possibly partial) content held by this message + */ + u_int32_t encodedContentSize(); + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + void releaseContent(MessageStore* store); + /** + * If headers have been received, returns the expected + * content size else returns 0. + */ + u_int64_t expectedContentSize(); + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + void setContent(std::auto_ptr<Content>& content); +}; + +} } diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index 53fcf66aac..9a5b136ada 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -24,146 +24,148 @@ #include "AMQContentBody.h" #include "AMQHeaderBody.h" +#include "AMQMethodBody.h" #include "Content.h" +#include "framing/amqp_types.h" #include <string> #include <boost/shared_ptr.hpp> namespace qpid { - namespace framing { - class OutputHandler; - class ProtocolVersion; - class BasicHeaderProperties; - } +namespace framing { +class MethodContext; +class ChannelAdapter; +class BasicHeaderProperties; +} - namespace broker { - - class MessageStore; - class ConnectionToken; - - /** - * Base class for all types of internal broker messages - * abstracting away the operations - * TODO; AMS: for the moment this is mostly a placeholder - */ - class Message{ - std::string exchange; - std::string routingKey; - const bool mandatory; - const bool immediate; - u_int64_t persistenceId; - - bool redelivered; +namespace broker { - public: - typedef boost::shared_ptr<Message> shared_ptr; +class MessageStore; +class ConnectionToken; - Message(const std::string& _exchange, const std::string& _routingKey, - bool _mandatory, bool _immediate) : - exchange(_exchange), - routingKey(_routingKey), - mandatory(_mandatory), - immediate(_immediate), - persistenceId(0), - redelivered(false) - {} - - Message() : - mandatory(false), - immediate(false), - persistenceId(0), - redelivered(false) - {} - - virtual ~Message() {}; +/** + * Base class for all types of internal broker messages + * abstracting away the operations + * TODO; AMS: for the moment this is mostly a placeholder + */ +class Message{ + std::string exchange; + std::string routingKey; + const bool mandatory; + const bool immediate; + u_int64_t persistenceId; + bool redelivered; + framing::AMQMethodBody::shared_ptr respondTo; + + public: + typedef boost::shared_ptr<Message> shared_ptr; + + Message(const std::string& _exchange, const std::string& _routingKey, + bool _mandatory, bool _immediate, + framing::AMQMethodBody::shared_ptr respondTo_) : + exchange(_exchange), + routingKey(_routingKey), + mandatory(_mandatory), + immediate(_immediate), + persistenceId(0), + redelivered(false), + respondTo(respondTo_) + {} - // Accessors - const std::string& getRoutingKey() const { return routingKey; } - const std::string& getExchange() const { return exchange; } - u_int64_t getPersistenceId() const { return persistenceId; } - bool getRedelivered() const { return redelivered; } + Message() : + mandatory(false), + immediate(false), + persistenceId(0), + redelivered(false) + {} + + virtual ~Message() {}; - void setRouting(const std::string& _exchange, const std::string& _routingKey) - { exchange = _exchange; routingKey = _routingKey; } - void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests? - void redeliver() { redelivered = true; } - - /** - * Used to deliver the message from the queue - */ - virtual void deliver(qpid::framing::OutputHandler* out, - int channel, + // Accessors + const std::string& getRoutingKey() const { return routingKey; } + const std::string& getExchange() const { return exchange; } + u_int64_t getPersistenceId() const { return persistenceId; } + bool getRedelivered() const { return redelivered; } + framing::AMQMethodBody::shared_ptr getRespondTo() const { + return respondTo; + } + + void setRouting(const std::string& _exchange, const std::string& _routingKey) + { exchange = _exchange; routingKey = _routingKey; } + void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests? + void redeliver() { redelivered = true; } + + /** + * Used to deliver the message from the queue + */ + virtual void deliver(framing::ChannelAdapter& channel, const std::string& consumerTag, u_int64_t deliveryTag, - u_int32_t framesize, - qpid::framing::ProtocolVersion* version) = 0; - /** - * Used to return a message in response to a get from a queue - */ - virtual void sendGetOk(qpid::framing::OutputHandler* out, - int channel, + u_int32_t framesize) = 0; + /** + * Used to return a message in response to a get from a queue + */ + virtual void sendGetOk(const framing::MethodContext& context, u_int32_t messageCount, u_int64_t deliveryTag, - u_int32_t framesize, - qpid::framing::ProtocolVersion* version) = 0; + u_int32_t framesize) = 0; - virtual bool isComplete() = 0; + virtual bool isComplete() = 0; - virtual u_int64_t contentSize() const = 0; - virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0; - virtual bool isPersistent() = 0; - virtual const ConnectionToken* const getPublisher() = 0; - - virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? - virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? - - /** - * @returns the size of the buffer needed to encode this - * message in its entirety - * - * XXXX: Only used in tests? - */ - virtual u_int32_t encodedSize() = 0; - /** - * @returns the size of the buffer needed to encode the - * 'header' of this message (not just the header frame, - * but other meta data e.g.routing key and exchange) - * - * XXXX: Only used in tests? - */ - virtual u_int32_t encodedHeaderSize() = 0; - /** - * @returns the size of the buffer needed to encode the - * (possibly partial) content held by this message - */ - virtual u_int32_t encodedContentSize() = 0; - /** - * If headers have been received, returns the expected - * content size else returns 0. - */ - virtual u_int64_t expectedContentSize() = 0; + virtual u_int64_t contentSize() const = 0; + virtual framing::BasicHeaderProperties* getHeaderProperties() = 0; + virtual bool isPersistent() = 0; + virtual const ConnectionToken* const getPublisher() = 0; + + virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + * + * XXXX: Only used in tests? + */ + virtual u_int32_t encodedHeaderSize() = 0; + /** + * @returns the size of the buffer needed to encode the + * (possibly partial) content held by this message + */ + virtual u_int32_t encodedContentSize() = 0; + /** + * If headers have been received, returns the expected + * content size else returns 0. + */ + virtual u_int64_t expectedContentSize() = 0; - // TODO: AMS 29/1/2007 Don't think these are really part of base class + // TODO: AMS 29/1/2007 Don't think these are really part of base class - /** - * Sets the 'content' implementation of this message (the - * message controls the lifecycle of the content instance - * it uses). - */ - virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; - virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {}; - virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {}; - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - virtual void releaseContent(MessageStore* /*store*/) {}; - }; - - } -} + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; + virtual void setHeader(framing::AMQHeaderBody::shared_ptr /*header*/) {}; + virtual void addContent(framing::AMQContentBody::shared_ptr /*data*/) {}; + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + virtual void releaseContent(MessageStore* /*store*/) {}; +}; + +}} #endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index 46f583b978..4168ff639c 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -22,29 +22,28 @@ using namespace qpid::broker; -MessageMessage::MessageMessage(const qpid::framing::AMQMethodBody& _methodBody, - const std::string& _exchange, const std::string& _routingKey, - bool _mandatory, bool _immediate) : - Message(_exchange, _routingKey, _mandatory, _immediate), - methodBody(_methodBody) +MessageMessage::MessageMessage( + const qpid::framing::AMQMethodBody::shared_ptr _methodBody, + const std::string& _exchange, const std::string& _routingKey, + bool _mandatory, bool _immediate) : + Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody), + methodBody(_methodBody) { } -void MessageMessage::deliver(qpid::framing::OutputHandler* /*out*/, - int /*channel*/, - const std::string& /*consumerTag*/, - u_int64_t /*deliveryTag*/, - u_int32_t /*framesize*/, - qpid::framing::ProtocolVersion* /*version*/) +void MessageMessage::deliver( + framing::ChannelAdapter& /*out*/, + const std::string& /*consumerTag*/, + u_int64_t /*deliveryTag*/, + u_int32_t /*framesize*/) { } -void MessageMessage::sendGetOk(qpid::framing::OutputHandler* /*out*/, - int /*channel*/, - u_int32_t /*messageCount*/, - u_int64_t /*deliveryTag*/, - u_int32_t /*framesize*/, - qpid::framing::ProtocolVersion* /*version*/) +void MessageMessage::sendGetOk( + const framing::MethodContext& /*context*/, + u_int32_t /*messageCount*/, + u_int64_t /*deliveryTag*/, + u_int32_t /*framesize*/) { } diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index b49f60f5df..cad5cf15b0 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -25,47 +25,46 @@ #include "BrokerMessageBase.h" namespace qpid { - namespace framing { - class AMQMethodBody; - } +namespace framing { +class AMQMethodBody; +} - namespace broker { - class MessageMessage: public Message{ - const qpid::framing::AMQMethodBody& methodBody; +namespace broker { +class MessageMessage: public Message{ + const qpid::framing::AMQMethodBody::shared_ptr methodBody; - public: - MessageMessage(const qpid::framing::AMQMethodBody& methodBody, - const std::string& exchange, const std::string& routingKey, - bool mandatory, bool immediate); + public: + MessageMessage( + const framing::AMQMethodBody::shared_ptr methodBody, + const std::string& exchange, const std::string& routingKey, + bool mandatory, bool immediate); - // Default destructor okay + // Default destructor okay - void deliver(qpid::framing::OutputHandler* out, - int channel, - const std::string& consumerTag, - u_int64_t deliveryTag, - u_int32_t framesize, - qpid::framing::ProtocolVersion* version); - void sendGetOk(qpid::framing::OutputHandler* out, - int channel, - u_int32_t messageCount, - u_int64_t deliveryTag, - u_int32_t framesize, - qpid::framing::ProtocolVersion* version); - bool isComplete(); + void deliver(framing::ChannelAdapter& channel, + const std::string& consumerTag, + u_int64_t deliveryTag, + u_int32_t framesize); + + void sendGetOk(const framing::MethodContext& context, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize); + + bool isComplete(); - u_int64_t contentSize() const; - qpid::framing::BasicHeaderProperties* getHeaderProperties(); - bool isPersistent(); - const ConnectionToken* const getPublisher(); + u_int64_t contentSize() const; + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); + const ConnectionToken* const getPublisher(); - u_int32_t encodedSize(); - u_int32_t encodedHeaderSize(); - u_int32_t encodedContentSize(); - u_int64_t expectedContentSize(); - }; + u_int32_t encodedSize(); + u_int32_t encodedHeaderSize(); + u_int32_t encodedContentSize(); + u_int64_t expectedContentSize(); +}; - } +} } diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index b45aa375e6..6694a0ed13 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -60,9 +60,11 @@ Exchange::shared_ptr Connection::findExchange(const string& name){ void Connection::received(qpid::framing::AMQFrame* frame){ - getAdapter(frame->getChannel()).handleBody(frame->getBody()); + getChannel(frame->getChannel()).handleBody(frame->getBody()); } +// TODO aconway 2007-02-02: Should be delegated to the BrokerAdapter +// as it is part of the protocol. void Connection::initiated(qpid::framing::ProtocolInitiation* header) { if (client.get()) // TODO aconway 2007-01-16: correct error code. @@ -72,12 +74,11 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) { FieldTable properties; string mechanisms("PLAIN"); string locales("en_US"); - // TODO aconway 2007-01-16: Client call, move to adapter. client->getConnection().start( - MethodContext(0, &getAdapter(0)), + MethodContext(&getChannel(0)), header->getMajor(), header->getMinor(), properties, mechanisms, locales); - getAdapter(0).init(0, *out, client->getProtocolVersion()); + getChannel(0).init(0, *out, client->getProtocolVersion()); } void Connection::idleOut(){} @@ -99,28 +100,19 @@ void Connection::closed(){ void Connection::closeChannel(u_int16_t channel) { getChannel(channel).close(); - adapters.erase(adapters.find(channel)); + channels.erase(channels.find(channel)); } -BrokerAdapter& Connection::getAdapter(u_int16_t id) { - AdapterMap::iterator i = adapters.find(id); - if (i == adapters.end()) { - std::auto_ptr<Channel> ch( - new Channel( - client->getProtocolVersion(), out, id, - framemax, broker.getQueues().getStore(), - settings.stagingThreshold)); - BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker); - adapters.insert(id, adapter); - return *adapter; - } - else - return *i; -} - -Channel& Connection::getChannel(u_int16_t id) { - return getAdapter(id).getChannel(); +Channel& Connection::getChannel(ChannelId id) { + ChannelMap::iterator i = channels.find(id); + if (i == channels.end()) { + i = channels.insert( + id, new Channel( + *this, id, framemax, broker.getQueues().getStore(), + settings.stagingThreshold)).first; + } + return *i; } diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h index d2b0422f8d..e3d5156c9b 100644 --- a/cpp/lib/broker/Connection.h +++ b/cpp/lib/broker/Connection.h @@ -34,7 +34,6 @@ #include <sys/TimeoutHandler.h> #include "Broker.h" #include "Exception.h" -#include "BrokerAdapter.h" namespace qpid { namespace broker { @@ -47,19 +46,22 @@ class Settings { Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {} }; -class Connection : public qpid::sys::ConnectionInputHandler, +class Connection : public sys::ConnectionInputHandler, public ConnectionToken { public: - Connection(qpid::sys::ConnectionOutputHandler* out, Broker& broker); + Connection(sys::ConnectionOutputHandler* out, Broker& broker); // ConnectionInputHandler methods - void received(qpid::framing::AMQFrame* frame); - void initiated(qpid::framing::ProtocolInitiation* header); + void received(framing::AMQFrame* frame); + void initiated(framing::ProtocolInitiation* header); void idleOut(); void idleIn(); void closed(); - qpid::sys::ConnectionOutputHandler& getOutput() { return *out; } + sys::ConnectionOutputHandler& getOutput() { return *out; } + + const framing::ProtocolVersion& getVersion() { + return client->getProtocolVersion(); } u_int32_t getFrameMax() const { return framemax; } u_int16_t getHeartbeat() const { return heartbeat; } @@ -68,7 +70,7 @@ class Connection : public qpid::sys::ConnectionInputHandler, void setHeartbeat(u_int16_t hb) { heartbeat = hb; } Broker& broker; - std::auto_ptr<qpid::framing::AMQP_ClientProxy> client; + std::auto_ptr<framing::AMQP_ClientProxy> client; Settings settings; std::vector<Queue::shared_ptr> exclusiveQueues; @@ -81,20 +83,18 @@ class Connection : public qpid::sys::ConnectionInputHandler, */ Queue::shared_ptr getQueue(const string& name, u_int16_t channel); - Channel& newChannel(u_int16_t channel); - Channel& getChannel(u_int16_t channel); - void closeChannel(u_int16_t channel); + Channel& newChannel(framing::ChannelId channel); + Channel& getChannel(framing::ChannelId channel); + void closeChannel(framing::ChannelId channel); private: - typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap; + typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; Exchange::shared_ptr findExchange(const string& name); - BrokerAdapter& getAdapter(u_int16_t id); - - AdapterMap adapters; - qpid::sys::ConnectionOutputHandler* out; + ChannelMap channels; + sys::ConnectionOutputHandler* out; u_int32_t framemax; u_int16_t heartbeat; }; diff --git a/cpp/lib/broker/Content.h b/cpp/lib/broker/Content.h index a793bd8f72..3058fdfd6a 100644 --- a/cpp/lib/broker/Content.h +++ b/cpp/lib/broker/Content.h @@ -24,21 +24,25 @@ #include <AMQContentBody.h> #include <Buffer.h> #include <OutputHandler.h> -#include <ProtocolVersion.h> namespace qpid { - namespace broker { - class Content{ - public: - virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0; - virtual u_int32_t size() = 0; - virtual void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0; - virtual void encode(qpid::framing::Buffer& buffer) = 0; - virtual void destroy() = 0; - virtual ~Content(){} - }; - } + +namespace framing { +class ChannelAdapter; } +namespace broker { +class Content{ + public: + virtual void add(framing::AMQContentBody::shared_ptr data) = 0; + virtual u_int32_t size() = 0; + virtual void send(framing::ChannelAdapter& channel, + u_int32_t framesize) = 0; + virtual void encode(qpid::framing::Buffer& buffer) = 0; + virtual void destroy() = 0; + virtual ~Content(){} +}; +}} + #endif diff --git a/cpp/lib/broker/InMemoryContent.cpp b/cpp/lib/broker/InMemoryContent.cpp index 12721fe86e..e27ccd8c4a 100644 --- a/cpp/lib/broker/InMemoryContent.cpp +++ b/cpp/lib/broker/InMemoryContent.cpp @@ -20,6 +20,7 @@ */ #include <InMemoryContent.h> #include "AMQFrame.h" +#include "framing/ChannelAdapter.h" using namespace qpid::broker; using namespace qpid::framing; @@ -39,24 +40,26 @@ u_int32_t InMemoryContent::size() return sum; } -void InMemoryContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize) +// FIXME aconway 2007-02-01: Remove version parameter. +void InMemoryContent::send(ChannelAdapter& channel, u_int32_t framesize) { for (content_iterator i = content.begin(); i != content.end(); i++) { if ((*i)->size() > framesize) { u_int32_t offset = 0; for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) { string data = (*i)->getData().substr(offset, framesize); - out->send(new AMQFrame(version, channel, new AMQContentBody(data))); + channel.send(new AMQContentBody(data)); offset += framesize; } u_int32_t remainder = (*i)->size() % framesize; if (remainder) { string data = (*i)->getData().substr(offset, remainder); - out->send(new AMQFrame(version, channel, new AMQContentBody(data))); + channel.send(new AMQContentBody(data)); } } else { - AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); - out->send(new AMQFrame(version, channel, contentBody)); + AMQBody::shared_ptr contentBody = + static_pointer_cast<AMQBody, AMQContentBody>(*i); + channel.send(contentBody); } } } diff --git a/cpp/lib/broker/InMemoryContent.h b/cpp/lib/broker/InMemoryContent.h index 72c6b46a7f..7003ea2aab 100644 --- a/cpp/lib/broker/InMemoryContent.h +++ b/cpp/lib/broker/InMemoryContent.h @@ -35,7 +35,7 @@ namespace qpid { public: void add(qpid::framing::AMQContentBody::shared_ptr data); u_int32_t size(); - void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void send(framing::ChannelAdapter&, u_int32_t framesize); void encode(qpid::framing::Buffer& buffer); void destroy(); ~InMemoryContent(){} diff --git a/cpp/lib/broker/LazyLoadedContent.cpp b/cpp/lib/broker/LazyLoadedContent.cpp index 2c7a764ec9..d8f18d5c8b 100644 --- a/cpp/lib/broker/LazyLoadedContent.cpp +++ b/cpp/lib/broker/LazyLoadedContent.cpp @@ -20,6 +20,7 @@ */ #include <LazyLoadedContent.h> #include "AMQFrame.h" +#include "framing/ChannelAdapter.h" using namespace qpid::broker; using namespace qpid::framing; @@ -37,19 +38,21 @@ u_int32_t LazyLoadedContent::size() return 0;//all content is written as soon as it is added } -void LazyLoadedContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize) +void LazyLoadedContent::send(ChannelAdapter& channel, u_int32_t framesize) { if (expectedSize > framesize) { - for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) { + for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) + { u_int64_t remaining = expectedSize - offset; string data; - store->loadContent(msg, data, offset, remaining > framesize ? framesize : remaining); - out->send(new AMQFrame(version, channel, new AMQContentBody(data))); + store->loadContent(msg, data, offset, + remaining > framesize ? framesize : remaining); + channel.send(new AMQContentBody(data)); } } else { string data; store->loadContent(msg, data, 0, expectedSize); - out->send(new AMQFrame(version, channel, new AMQContentBody(data))); + channel.send(new AMQContentBody(data)); } } diff --git a/cpp/lib/broker/LazyLoadedContent.h b/cpp/lib/broker/LazyLoadedContent.h index 5322e5e711..56fdd7699b 100644 --- a/cpp/lib/broker/LazyLoadedContent.h +++ b/cpp/lib/broker/LazyLoadedContent.h @@ -31,10 +31,14 @@ namespace qpid { Message* const msg; const u_int64_t expectedSize; public: - LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize); + LazyLoadedContent( + MessageStore* const store, Message* const msg, + u_int64_t expectedSize); void add(qpid::framing::AMQContentBody::shared_ptr data); u_int32_t size(); - void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void send( + framing::ChannelAdapter&, + u_int32_t framesize); void encode(qpid::framing::Buffer& buffer); void destroy(); ~LazyLoadedContent(){} diff --git a/cpp/lib/broker/MessageBuilder.h b/cpp/lib/broker/MessageBuilder.h index 4e51f223f0..5b8516be42 100644 --- a/cpp/lib/broker/MessageBuilder.h +++ b/cpp/lib/broker/MessageBuilder.h @@ -39,7 +39,9 @@ namespace qpid { virtual void complete(Message::shared_ptr&) = 0; virtual ~CompletionHandler(){} }; - MessageBuilder(CompletionHandler* _handler, MessageStore* const store = 0, u_int64_t stagingThreshold = 0); + MessageBuilder(CompletionHandler* _handler, + MessageStore* const store = 0, + u_int64_t stagingThreshold = 0); void initialise(Message::shared_ptr& msg); void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header); void addContent(qpid::framing::AMQContentBody::shared_ptr& content); diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index f04b749996..71100996e7 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -34,8 +34,8 @@ using namespace framing; // void MessageHandlerImpl::append(const MethodContext&, - const string& /*reference*/, - const string& /*bytes*/ ) + const string& /*reference*/, + const string& /*bytes*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } @@ -43,7 +43,7 @@ MessageHandlerImpl::append(const MethodContext&, void MessageHandlerImpl::cancel( const MethodContext& context, - const string& destination ) + const string& destination ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature @@ -54,28 +54,28 @@ MessageHandlerImpl::cancel( const MethodContext& context, void MessageHandlerImpl::checkpoint(const MethodContext&, - const string& /*reference*/, - const string& /*identifier*/ ) + const string& /*reference*/, + const string& /*identifier*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void MessageHandlerImpl::close(const MethodContext&, - const string& /*reference*/ ) + const string& /*reference*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void MessageHandlerImpl::consume(const MethodContext& context, - u_int16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noLocal, - bool noAck, - bool exclusive, - const qpid::framing::FieldTable& filter ) + u_int16_t /*ticket*/, + const string& queueName, + const string& destination, + bool noLocal, + bool noAck, + bool exclusive, + const qpid::framing::FieldTable& filter ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature @@ -106,15 +106,15 @@ MessageHandlerImpl::empty( const MethodContext& ) void MessageHandlerImpl::get( const MethodContext& context, - u_int16_t /*ticket*/, - const string& queueName, - const string& /*destination*/, - bool noAck ) + u_int16_t /*ticket*/, + const string& queueName, + const string& /*destination*/, + bool noAck ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature Queue::shared_ptr queue = - connection.getQueue(queueName, context.channelId); + connection.getQueue(queueName, context.channel->getId()); // FIXME: get is probably Basic specific if(!channel.get(queue, !noAck)){ @@ -125,7 +125,7 @@ MessageHandlerImpl::get( const MethodContext& context, void MessageHandlerImpl::offset(const MethodContext&, - u_int64_t /*value*/ ) + u_int64_t /*value*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } @@ -138,16 +138,16 @@ MessageHandlerImpl::ok( const MethodContext& ) void MessageHandlerImpl::open(const MethodContext&, - const string& /*reference*/ ) + const string& /*reference*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void MessageHandlerImpl::qos(const MethodContext& context, - u_int32_t prefetchSize, - u_int16_t prefetchCount, - bool /*global*/ ) + u_int32_t prefetchSize, + u_int16_t prefetchCount, + bool /*global*/ ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature @@ -160,7 +160,7 @@ MessageHandlerImpl::qos(const MethodContext& context, void MessageHandlerImpl::recover(const MethodContext&, - bool requeue ) + bool requeue ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature @@ -170,45 +170,45 @@ MessageHandlerImpl::recover(const MethodContext&, void MessageHandlerImpl::reject(const MethodContext&, - u_int16_t /*code*/, - const string& /*text*/ ) + u_int16_t /*code*/, + const string& /*text*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void MessageHandlerImpl::resume(const MethodContext&, - const string& /*reference*/, - const string& /*identifier*/ ) + const string& /*reference*/, + const string& /*identifier*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void MessageHandlerImpl::transfer(const MethodContext& context, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool immediate, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& exchangeName, - const string& routingKey, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content body, - bool mandatory ) + u_int16_t /*ticket*/, + const string& /*destination*/, + bool /*redelivered*/, + bool immediate, + u_int64_t /*ttl*/, + u_int8_t /*priority*/, + u_int64_t /*timestamp*/, + u_int8_t /*deliveryMode*/, + u_int64_t /*expiration*/, + const string& exchangeName, + const string& routingKey, + const string& /*messageId*/, + const string& /*correlationId*/, + const string& /*replyTo*/, + const string& /*contentType*/, + const string& /*contentEncoding*/, + const string& /*userId*/, + const string& /*appId*/, + const string& /*transactionId*/, + const string& /*securityToken*/, + const qpid::framing::FieldTable& /*applicationHeaders*/, + qpid::framing::Content body, + bool mandatory ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature @@ -216,14 +216,15 @@ MessageHandlerImpl::transfer(const MethodContext& context, broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ if (body.isInline()) { - MessageMessage* msg = - new MessageMessage(*(context.methodBody), exchangeName, routingKey, mandatory, immediate); - channel.handlePublish(msg, exchange); + MessageMessage* msg = + new MessageMessage(context.methodBody, exchangeName, + routingKey, mandatory, immediate); + channel.handlePublish(msg, exchange); - connection.client->getMessageHandler()->ok(context); + connection.client->getMessageHandler()->ok(context); } else { - // Don't handle reference content yet - assert(body.isInline()); + // Don't handle reference content yet + assert(body.isInline()); } }else{ throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); diff --git a/cpp/lib/broker/NullMessageStore.h b/cpp/lib/broker/NullMessageStore.h index ef2bea8fd6..a5dfce504e 100644 --- a/cpp/lib/broker/NullMessageStore.h +++ b/cpp/lib/broker/NullMessageStore.h @@ -34,7 +34,7 @@ namespace qpid { class NullMessageStore : public MessageStore{ const bool warn; public: - NullMessageStore(bool warn = true); + NullMessageStore(bool warn = false); virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings); virtual void destroy(const Queue& queue); virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index a207763aac..dd93c6ae8b 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -75,7 +75,7 @@ void Channel::protocolInit( ConnectionTuneBody::shared_ptr proposal = sendAndReceive<ConnectionTuneBody>( new ConnectionStartOkBody( - version, props, mechanism, response, locale)); + version, responses.getRequestId(), props, mechanism, response, locale)); /** * Assume for now that further challenges will not be required @@ -85,9 +85,9 @@ void Channel::protocolInit( connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); **/ - (new ConnectionTuneOkBody( - version, proposal->getChannelMax(), connection->getMaxFrameSize(), - proposal->getHeartbeat()))->send(context); + send(new ConnectionTuneOkBody( + version, responses.getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(), + proposal->getHeartbeat())); u_int16_t heartbeat = proposal->getHeartbeat(); connection->connector->setReadTimeout(heartbeat * 2); @@ -96,8 +96,7 @@ void Channel::protocolInit( // Send connection open. std::string capabilities; responses.expect(); - (new ConnectionOpenBody(version, vhost, capabilities, true)) - ->send(context); + send(new ConnectionOpenBody(version, vhost, capabilities, true)); //receive connection.open-ok (or redirect, but ignore that for now //esp. as using force=true). responses.waitForResponse(); @@ -208,8 +207,7 @@ void Channel::cancel(const std::string& tag, bool synch) { if (i != consumers.end()) { Consumer& c = i->second; if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - (new BasicAckBody(version, c.lastDeliveryTag, true)) - ->send(context); + send(new BasicAckBody(version, c.lastDeliveryTag, true)); sendAndReceiveSync<BasicCancelOkBody>( synch, new BasicCancelBody(version, tag, !synch)); consumers.erase(tag); @@ -227,8 +225,7 @@ void Channel::cancelAll(){ // trying the rest. NB no memory leaks if we do, // ConsumerMap holds values, not pointers. // - (new BasicAckBody(version, c.lastDeliveryTag, true)) - ->send(context); + send(new BasicAckBody(version, c.lastDeliveryTag, true)); } } } @@ -249,7 +246,7 @@ void Channel::retrieve(Message& msg){ bool Channel::get(Message& msg, const Queue& queue, int ackMode) { string name = queue.getName(); responses.expect(); - (new BasicGetBody(version, 0, name, ackMode))->send(context); + send(new BasicGetBody(version, 0, name, ackMode)); responses.waitForResponse(); AMQMethodBody::shared_ptr response = responses.getResponse(); if(response->isA<BasicGetOkBody>()) { @@ -277,7 +274,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& string e = exchange.getName(); string key = routingKey; - (new BasicPublishBody(version, 0, e, key, mandatory, immediate))->send(context); + send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); @@ -426,8 +423,7 @@ void Channel::deliver(Consumer& consumer, Message& msg){ if(++(consumer.count) < prefetch) break; //else drop-through case AUTO_ACK: - (new BasicAckBody(version, msg.getDeliveryTag(), multiple)) - ->send(context); + send(new BasicAckBody(version, msg.getDeliveryTag(), multiple)); consumer.lastDeliveryTag = 0; } } @@ -512,7 +508,7 @@ void Channel::closeInternal() { void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m) { responses.expect(); - toSend->send(context); + send(toSend); responses.receive(c, m); } @@ -522,7 +518,7 @@ void Channel::sendAndReceiveSync( if(sync) sendAndReceive(body, c, m); else - body->send(context); + send(body); } diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp index ea234ac321..ea48fa2386 100644 --- a/cpp/lib/client/ResponseHandler.cpp +++ b/cpp/lib/client/ResponseHandler.cpp @@ -63,7 +63,7 @@ void ResponseHandler::receive(ClassId c, MethodId m) { THROW_QPID_ERROR( PROTOCOL_ERROR, "Channel closed unexpectedly."); } - if(!validate(response->amqpClassId(), response->amqpMethodId())) { + if(!validate(response->amqpClassId(), response->amqpMethodId())) { THROW_QPID_ERROR( PROTOCOL_ERROR, boost::format("Expected class:method %d:%d, got %d:%d") @@ -71,6 +71,10 @@ void ResponseHandler::receive(ClassId c, MethodId m) { } } +RequestId ResponseHandler::getRequestId() { + assert(response->getRequestId()); + return response->getRequestId(); +} void ResponseHandler::expect(){ waiting = true; } diff --git a/cpp/lib/client/ResponseHandler.h b/cpp/lib/client/ResponseHandler.h index c402bcbc65..af0c250eb1 100644 --- a/cpp/lib/client/ResponseHandler.h +++ b/cpp/lib/client/ResponseHandler.h @@ -20,8 +20,7 @@ */ #include <string> -#include <amqp_types.h> -#include <framing/amqp_framing.h> +#include <framing/amqp_framing.h> // FIXME aconway 2007-02-01: #include cleanup. #include <sys/Monitor.h> #ifndef _ResponseHandler_ @@ -52,11 +51,13 @@ class ResponseHandler{ bool validate(framing::ClassId, framing::MethodId); void receive(framing::ClassId, framing::MethodId); + framing::RequestId getRequestId(); + template <class BodyType> bool validate() { return validate(BodyType::CLASS_ID, BodyType::METHOD_ID); } template <class BodyType> void receive() { - return receive(BodyType::CLASS_ID, BodyType::METHOD_ID); + receive(BodyType::CLASS_ID, BodyType::METHOD_ID); } }; diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp index 17138a401e..ab1e15f845 100644 --- a/cpp/lib/common/framing/AMQMethodBody.cpp +++ b/cpp/lib/common/framing/AMQMethodBody.cpp @@ -56,8 +56,4 @@ void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/) { decodeContent(buffer); } -void AMQMethodBody::send(const MethodContext& context) { - context.out->send(new AMQFrame(version, context.channelId, this)); -} - }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h index b21ae379dc..492571c83c 100644 --- a/cpp/lib/common/framing/AMQMethodBody.h +++ b/cpp/lib/common/framing/AMQMethodBody.h @@ -57,11 +57,8 @@ class AMQMethodBody : public AMQBody return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID; } - /** - * Wrap this method in a frame and send using the current context. - * Note the frame takes ownership of the body, it will be deleted. - */ - virtual void send(const MethodContext& context); + /** Return request ID or response correlationID */ + virtual RequestId getRequestId() const { return 0; } protected: static u_int32_t baseSize() { return 4; } diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h index 390fe77881..9b8fc1d663 100644 --- a/cpp/lib/common/framing/AMQRequestBody.h +++ b/cpp/lib/common/framing/AMQRequestBody.h @@ -58,8 +58,8 @@ class AMQRequestBody : public AMQMethodBody Data& getData() { return data; } RequestId getRequestId() const { return data.requestId; } - void setRequestId(RequestId id) { data.requestId=id; } ResponseId getResponseMark() const { return data.responseMark; } + void setRequestId(RequestId id) { data.requestId=id; } void setResponseMark(ResponseId mark) { data.responseMark=mark; } protected: diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp index da70e24cd4..7da71a5d25 100644 --- a/cpp/lib/common/framing/AMQResponseBody.cpp +++ b/cpp/lib/common/framing/AMQResponseBody.cpp @@ -62,11 +62,4 @@ void AMQResponseBody::printPrefix(std::ostream& out) const { << ",batch=" << data.batchOffset << "): "; } -void AMQResponseBody::send(const MethodContext& context) { - setRequestId(context.requestId); - assert(context.out); - context.out->send( - new AMQFrame(version, context.channelId, this)); -} - }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h index 2520a481f2..3954836ec8 100644 --- a/cpp/lib/common/framing/AMQResponseBody.h +++ b/cpp/lib/common/framing/AMQResponseBody.h @@ -62,15 +62,13 @@ class AMQResponseBody : public AMQMethodBody void encode(Buffer& buffer) const; Data& getData() { return data; } - ResponseId getResponseId() { return data.responseId; } - RequestId getRequestId() { return data.requestId; } - BatchOffset getBatchOffset() { return data.batchOffset; } + ResponseId getResponseId() const { return data.responseId; } + RequestId getRequestId() const { return data.requestId; } + BatchOffset getBatchOffset() const { return data.batchOffset; } void setResponseId(ResponseId id) { data.responseId = id; } void setRequestId(RequestId id) { data.requestId = id; } void setBatchOffset(BatchOffset id) { data.batchOffset = id; } - virtual void send(const MethodContext& context); - protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } void printPrefix(std::ostream& out) const; diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp index 225e5e3393..149c8144b4 100644 --- a/cpp/lib/common/framing/ChannelAdapter.cpp +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -32,12 +32,10 @@ void ChannelAdapter::init( id = i; out = &o; version = v; - context = MethodContext(id, this); } -void ChannelAdapter::send(AMQFrame* frame) { +void ChannelAdapter::send(AMQBody::shared_ptr body) { assertChannelOpen(); - AMQBody::shared_ptr body = frame->getBody(); switch (body->type()) { case REQUEST_BODY: { AMQRequestBody::shared_ptr request = @@ -52,18 +50,13 @@ void ChannelAdapter::send(AMQFrame* frame) { break; } } - out->send(frame); -} - -void ChannelAdapter::send(AMQBody::shared_ptr body) { - send(new AMQFrame(getVersion(), getId(), body)); + out->send(new AMQFrame(getVersion(), getId(), body)); } void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { assertMethodOk(*request); responder.received(request->getData()); - context =MethodContext(id, request, this, request->getRequestId()); - handleMethodInContext(request, context); + handleMethodInContext(request, MethodContext(this, request)); } void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { @@ -76,8 +69,7 @@ void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) { assertMethodOk(*method); - context = MethodContext(id, method, this); - handleMethodInContext(method, context); + handleMethodInContext(method, MethodContext(this, method)); } void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const { diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h index f0b3d2469a..26cac76aae 100644 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -27,7 +27,7 @@ #include "BodyHandler.h" #include "Requester.h" #include "Responder.h" -#include "OutputHandler.h" +#include "framing/amqp_types.h" namespace qpid { namespace framing { @@ -37,24 +37,26 @@ class MethodContext; /** * Base class for client and broker channel adapters. * - * As BodyHandler: + * BodyHandler::handl* * - receives frame bodies from the network. * - Updates request/response data. * - Dispatches requests with a MethodContext for responses. * - * As OutputHandler: + * send() * - Updates request/resposne ID data. * - Forwards frame to the peer. * * Thread safety: OBJECT UNSAFE. Instances must not be called * concurrently. AMQP defines channels to be serialized. */ -class ChannelAdapter : public BodyHandler, public OutputHandler { +class ChannelAdapter : public BodyHandler { public: /** *@param output Processed frames are forwarded to this handler. */ - ChannelAdapter() : context(0, 0), id(0), out(0) {} + ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0, + const ProtocolVersion& ver=ProtocolVersion()) + : id(id_), out(out_), version(ver) {} /** Initialize the channel adapter. */ void init(ChannelId, OutputHandler&, const ProtocolVersion&); @@ -63,12 +65,6 @@ class ChannelAdapter : public BodyHandler, public OutputHandler { const ProtocolVersion& getVersion() const { return version; } /** - * Do request/response-id processing and then forward to - * handler provided to constructor. Response frames should - * have their request-id set before calling send. - */ - void send(AMQFrame* frame); - /** * Wrap body in a frame and send the frame. * Takes ownership of body. */ @@ -92,9 +88,6 @@ class ChannelAdapter : public BodyHandler, public OutputHandler { RequestId getRequestInProgress() { return requestInProgress; } - protected: - MethodContext context; - private: ChannelId id; OutputHandler* out; diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index 454025e377..d9717d90a0 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -19,6 +19,8 @@ * */ +#include <boost/shared_ptr.hpp> + #include "OutputHandler.h" #include "ProtocolVersion.h" @@ -29,54 +31,41 @@ namespace framing { class BodyHandler; class AMQMethodBody; +class ChannelAdapter; /** * Invocation context for an AMQP method. - * Some of the context information is related to the channel, some - * to the specific invocation - e.g. requestId. - * - * All generated proxy and handler functions take a MethodContext parameter. * - * The user does not need to create MethodContext objects explicitly, - * the constructor will implicitly create one from a channel ID. - * - * Other context members are for internal use. + * It provides the method being processed and the channel on which + * it arrived. + * + * All Handler functions take a MethodContext as the last parameter. */ struct MethodContext { - /** - * Passing a integer channel-id in place of a MethodContext - * will automatically construct the MethodContext. - */ - MethodContext(ChannelId channel, - OutputHandler* output=0, RequestId request=0) - : channelId(channel), out(output), requestId(request) - {} - - MethodContext(ChannelId channel, - boost::shared_ptr<AMQMethodBody> method, - OutputHandler* output=0, RequestId request=0) - : channelId(channel), out(output), requestId(request), - methodBody(method) - {} - - /** \internal Channel on which the method is sent. */ - ChannelId channelId; + typedef boost::shared_ptr<AMQMethodBody> BodyPtr; - /** Output handler for responses in this context */ - OutputHandler* out; + MethodContext(ChannelAdapter* ch=0, BodyPtr method=BodyPtr()) + : channel(ch), methodBody(method) {} - /** \internal If we are in the context of processing an incoming request, - * this is the ID. Otherwise it is 0. - */ - RequestId requestId; + /** + * Channel on which the method being processed arrived. + * 0 if the method was constructed by the caller + * rather than received from a channel. + */ + ChannelAdapter* channel; - /** \internal This is the Method Body itself - * It's useful for passing around instead of unpacking all its parameters + /** + * Body of the method being processed. + * It's useful for passing around instead of unpacking all its parameters. + * It's also provides the request ID when constructing a response. */ - boost::shared_ptr<AMQMethodBody> methodBody; + BodyPtr methodBody; }; +// FIXME aconway 2007-02-01: Method context only required on Handler +// functions, not on Proxy functions. + }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/ProtocolVersion.cpp b/cpp/lib/common/framing/ProtocolVersion.cpp index 69ff89ec32..e65c8b79b8 100644 --- a/cpp/lib/common/framing/ProtocolVersion.cpp +++ b/cpp/lib/common/framing/ProtocolVersion.cpp @@ -20,10 +20,13 @@ */ #include <ProtocolVersion.h> #include <sstream> +#include "AMQP_HighestVersion.h" using namespace qpid::framing; -ProtocolVersion::ProtocolVersion() {} +ProtocolVersion::ProtocolVersion() { + *this = highestProtocolVersion; +} ProtocolVersion::ProtocolVersion(u_int8_t _major, u_int8_t _minor) : major_(_major), diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index 760a4d3344..a3dabe6408 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -28,6 +28,9 @@ #include <memory> #include <AMQP_HighestVersion.h> #include "AMQFrame.h" +#include "DummyChannel.h" +#include "broker/Connection.h" +#include "ProtocolInitiation.h" using namespace boost; using namespace qpid::broker; @@ -36,12 +39,12 @@ using namespace qpid::sys; using std::string; using std::queue; -struct DummyHandler : OutputHandler{ +struct DummyHandler : ConnectionOutputHandler{ std::vector<AMQFrame*> frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); - } + void send(AMQFrame* frame){ frames.push_back(frame); } + + void close() {}; }; @@ -55,6 +58,10 @@ class ChannelTest : public CppUnit::TestCase CPPUNIT_TEST(testQueuePolicy); CPPUNIT_TEST_SUITE_END(); + Broker::shared_ptr broker; + Connection connection; + DummyHandler handler; + class MockMessageStore : public NullMessageStore { struct MethodCall @@ -135,9 +142,17 @@ class ChannelTest : public CppUnit::TestCase public: + ChannelTest() : + broker(Broker::create()), + connection(&handler, *broker) + { + connection.initiated(new ProtocolInitiation()); + } + + void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(qpid::framing::highestProtocolVersion, 0, 0, 0); + Channel channel(connection, 0, 0, 0); channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); @@ -162,12 +177,10 @@ class ChannelTest : public CppUnit::TestCase } void testDeliveryNoAck(){ - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); - + Channel channel(connection, 7, 10000); const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + Message::shared_ptr msg( + createMessage("test", "my_routing_key", "my_message_id", 14)); addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); ConnectionToken* owner(0); @@ -175,22 +188,25 @@ class ChannelTest : public CppUnit::TestCase channel.consume(tag, queue, false, false, owner); queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel()); + CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( + handler.frames[0]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( + handler.frames[1]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( + handler.frames[2]->getBody().get())); + AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( + handler.frames[3]->getBody().get()); CPPUNIT_ASSERT(contentBody); CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); } void testDeliveryAndRecovery(){ - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); + Channel channel(connection, 7, 10000); const string data("abcdefghijklmn"); Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); @@ -202,26 +218,32 @@ class ChannelTest : public CppUnit::TestCase channel.consume(tag, queue, true, false, owner); queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2]->getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3]->getChannel()); + CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( + handler.frames[0]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( + handler.frames[1]->getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( + handler.frames[2]->getBody().get())); + AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( + handler.frames[3]->getBody().get()); CPPUNIT_ASSERT(contentBody); CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); } void testStaging(){ MockMessageStore store; - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); + Channel channel( + connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); const string data[] = {"abcde", "fghij", "klmno"}; - Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); + Message* msg = new BasicMessage( + 0, "my_exchange", "my_routing_key", false, false, + DummyChannel::basicGetBody()); store.expect(); store.stage(msg); @@ -309,7 +331,9 @@ class ChannelTest : public CppUnit::TestCase Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize) { - BasicMessage* msg = new BasicMessage(0, exchange, routingKey, false, false); + BasicMessage* msg = new BasicMessage( + 0, exchange, routingKey, false, false, + DummyChannel::basicGetBody()); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(contentSize); msg->setHeader(header); diff --git a/cpp/tests/ExchangeTest.cpp b/cpp/tests/ExchangeTest.cpp index a31c369fe1..cccec92024 100644 --- a/cpp/tests/ExchangeTest.cpp +++ b/cpp/tests/ExchangeTest.cpp @@ -26,8 +26,10 @@ #include <TopicExchange.h> #include <qpid_test_plugin.h> #include <iostream> +#include "BasicGetBody.h" using namespace qpid::broker; +using namespace qpid::framing; using namespace qpid::sys; class ExchangeTest : public CppUnit::TestCase @@ -54,7 +56,11 @@ class ExchangeTest : public CppUnit::TestCase queue.reset(); queue2.reset(); - Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true)); + Message::shared_ptr msgPtr( + new BasicMessage( + 0, "e", "A", true, true, + AMQMethodBody::shared_ptr( + new BasicGetBody(ProtocolVersion())))); DeliverableMessage msg(msgPtr); topic.route(msg, "abc", 0); direct.route(msg, "abc", 0); diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp index 721d8ad857..f8754337c8 100644 --- a/cpp/tests/FramingTest.cpp +++ b/cpp/tests/FramingTest.cpp @@ -105,7 +105,7 @@ class FramingTest : public CppUnit::TestCase { std::string a = "hostA"; std::string b = "hostB"; - ConnectionRedirectBody in(version, a, b); + ConnectionRedirectBody in(version, 0, a, b); in.encodeContent(buffer); buffer.flip(); ConnectionRedirectBody out(version); @@ -142,7 +142,8 @@ class FramingTest : public CppUnit::TestCase { std::string a = "hostA"; std::string b = "hostB"; - AMQFrame in(version, 999, new ConnectionRedirectBody(version, a, b)); + AMQFrame in(version, 999, + new ConnectionRedirectBody(version, 0, a, b)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -153,7 +154,7 @@ class FramingTest : public CppUnit::TestCase void testBasicConsumeOkBodyFrame() { std::string s = "hostA"; - AMQFrame in(version, 999, new BasicConsumeOkBody(version, s)); + AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s)); in.encode(buffer); buffer.flip(); AMQFrame out; diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp index 1494518578..db54ea44a0 100644 --- a/cpp/tests/InMemoryContentTest.cpp +++ b/cpp/tests/InMemoryContentTest.cpp @@ -24,6 +24,7 @@ #include <iostream> #include <list> #include "AMQFrame.h" +#include "DummyChannel.h" using std::list; using std::string; @@ -31,13 +32,6 @@ using boost::dynamic_pointer_cast; using namespace qpid::broker; using namespace qpid::framing; -struct DummyHandler : OutputHandler{ - std::vector<AMQFrame*> frames; - - virtual void send(AMQFrame* frame){ - frames.push_back(frame); - } -}; class InMemoryContentTest : public CppUnit::TestCase { @@ -64,12 +58,21 @@ public: void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5) { InMemoryContent content; - DummyHandler handler; - u_int16_t channel = 3; + DummyChannel channel(3); addframes(content, inCount, in); - content.send(highestProtocolVersion, &handler, channel, framesize); - check(handler, channel, outCount, out); + content.send(channel, framesize); + CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); + + for (unsigned int i = 0; i < outCount; i++) { + AMQContentBody::shared_ptr chunk( + dynamic_pointer_cast<AMQContentBody>( + channel.out.frames[i]->getBody())); + CPPUNIT_ASSERT(chunk); + CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); + CPPUNIT_ASSERT_EQUAL( + ChannelId(3), channel.out.frames[i]->getChannel()); + } } void addframes(InMemoryContent& content, size_t frameCount, string* frameData) @@ -80,17 +83,7 @@ public: } } - void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks) - { - CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size()); - for (unsigned int i = 0; i < expectedChunkCount; i++) { - AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody())); - CPPUNIT_ASSERT(chunk); - CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData()); - CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel()); - } - } }; // Make this test suite a plugin. diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp index 624d2be3ff..49e4ecc4ae 100644 --- a/cpp/tests/LazyLoadedContentTest.cpp +++ b/cpp/tests/LazyLoadedContentTest.cpp @@ -26,20 +26,13 @@ #include <list> #include <sstream> #include "AMQFrame.h" - +#include "DummyChannel.h" using std::list; using std::string; using boost::dynamic_pointer_cast; using namespace qpid::broker; using namespace qpid::framing; -struct DummyHandler : OutputHandler{ - std::vector<AMQFrame*> frames; - - virtual void send(AMQFrame* frame){ - frames.push_back(frame); - } -}; class LazyLoadedContentTest : public CppUnit::TestCase @@ -99,21 +92,16 @@ public: { TestMessageStore store(in); LazyLoadedContent content(&store, 0, in.size()); - DummyHandler handler; - u_int16_t channel = 3; - content.send(highestProtocolVersion, &handler, channel, framesize); - check(handler, channel, outCount, out); - } - - void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks) - { - CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size()); + DummyChannel channel(3); + content.send(channel, framesize); + CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); - for (unsigned int i = 0; i < expectedChunkCount; i++) { - AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody())); + for (unsigned int i = 0; i < outCount; i++) { + AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[i]->getBody())); CPPUNIT_ASSERT(chunk); - CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData()); - CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel()); + CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); + CPPUNIT_ASSERT_EQUAL( + ChannelId(3), channel.out.frames[i]->getChannel()); } } }; diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp index e84d1df0e7..dc660751b7 100644 --- a/cpp/tests/MessageBuilderTest.cpp +++ b/cpp/tests/MessageBuilderTest.cpp @@ -26,6 +26,7 @@ #include <qpid_test_plugin.h> #include <iostream> #include <memory> +#include "DummyChannel.h" using namespace boost; using namespace qpid::broker; @@ -36,7 +37,7 @@ class MessageBuilderTest : public CppUnit::TestCase { struct DummyHandler : MessageBuilder::CompletionHandler{ Message::shared_ptr msg; - + virtual void complete(Message::shared_ptr& _msg){ msg = _msg; } @@ -48,7 +49,7 @@ class MessageBuilderTest : public CppUnit::TestCase Buffer* content; const u_int32_t contentBufferSize; - public: + public: void stage(Message* const msg) { @@ -98,7 +99,7 @@ class MessageBuilderTest : public CppUnit::TestCase } //dont care about any of the other methods: - TestMessageStore(u_int32_t _contentBufferSize) : NullMessageStore(false), header(0), content(0), + TestMessageStore(u_int32_t _contentBufferSize) : NullMessageStore(), header(0), content(0), contentBufferSize(_contentBufferSize) {} ~TestMessageStore(){} }; @@ -116,7 +117,10 @@ class MessageBuilderTest : public CppUnit::TestCase DummyHandler handler; MessageBuilder builder(&handler); - Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message( + new BasicMessage( + 0, "test", "my_routing_key", false, false, + DummyChannel::basicGetBody())); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(0); @@ -133,7 +137,9 @@ class MessageBuilderTest : public CppUnit::TestCase string data1("abcdefg"); - Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message( + new BasicMessage(0, "test", "my_routing_key", false, false, + DummyChannel::basicGetBody())); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(7); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -154,7 +160,9 @@ class MessageBuilderTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message( + new BasicMessage(0, "test", "my_routing_key", false, false, + DummyChannel::basicGetBody())); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -183,7 +191,9 @@ class MessageBuilderTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false)); + Message::shared_ptr message( + new BasicMessage(0, "test", "my_routing_key", false, false, + DummyChannel::basicGetBody())); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp index 62249e1f5f..103a23f0df 100644 --- a/cpp/tests/MessageTest.cpp +++ b/cpp/tests/MessageTest.cpp @@ -23,19 +23,12 @@ #include <iostream> #include <AMQP_HighestVersion.h> #include "AMQFrame.h" +#include "DummyChannel.h" using namespace boost; using namespace qpid::broker; using namespace qpid::framing; -struct DummyHandler : OutputHandler{ - std::vector<AMQFrame*> frames; - - virtual void send(AMQFrame* frame){ - frames.push_back(frame); - } -}; - class MessageTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(MessageTest); @@ -52,7 +45,9 @@ class MessageTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - Message::shared_ptr msg = Message::shared_ptr(new BasicMessage(0, exchange, routingKey, false, false)); + BasicMessage::shared_ptr msg( + new BasicMessage(0, exchange, routingKey, false, false, + DummyChannel::basicGetBody())); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -69,7 +64,8 @@ class MessageTest : public CppUnit::TestCase msg->encode(buffer); buffer.flip(); - msg = Message::shared_ptr(new BasicMessage(buffer)); + msg.reset(new BasicMessage()); + msg->decode(buffer); CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange()); CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId()); @@ -77,10 +73,11 @@ class MessageTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc")); CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize()); - DummyHandler handler; - msg->deliver(&handler, 0, "ignore", 0, 100, &(qpid::framing::highestProtocolVersion)); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); + DummyChannel channel(1); + // FIXME aconway 2007-02-02: deliver should take const ProtocolVersion& + msg->deliver(channel, "ignore", 0, 100); + CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size()); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[2]->getBody())); CPPUNIT_ASSERT(contentBody); CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData()); } diff --git a/cpp/tests/QueueTest.cpp b/cpp/tests/QueueTest.cpp index e156efc507..7105509de6 100644 --- a/cpp/tests/QueueTest.cpp +++ b/cpp/tests/QueueTest.cpp @@ -22,6 +22,7 @@ #include <QueueRegistry.h> #include <qpid_test_plugin.h> #include <iostream> +#include "DummyChannel.h" using namespace qpid::broker; using namespace qpid::sys; @@ -54,6 +55,12 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE_END(); public: + Message::shared_ptr message(std::string exchange, std::string routingKey) { + return Message::shared_ptr( + new BasicMessage(0, exchange, routingKey, true, true, + DummyChannel::basicGetBody())); + } + void testConsumers(){ Queue::shared_ptr queue(new Queue("my_queue", true)); @@ -66,9 +73,9 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount()); //Test basic delivery: - Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true)); - Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true)); - Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true)); + Message::shared_ptr msg1 = message("e", "A"); + Message::shared_ptr msg2 = message("e", "B"); + Message::shared_ptr msg3 = message("e", "C"); queue->deliver(msg1); CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); @@ -122,10 +129,9 @@ class QueueTest : public CppUnit::TestCase void testDequeue(){ Queue::shared_ptr queue(new Queue("my_queue", true)); - - Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true)); - Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true)); - Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true)); + Message::shared_ptr msg1 = message("e", "A"); + Message::shared_ptr msg2 = message("e", "B"); + Message::shared_ptr msg3 = message("e", "C"); Message::shared_ptr received; queue->deliver(msg1); diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp index a6fe0f1010..07381d187d 100644 --- a/cpp/tests/TxAckTest.cpp +++ b/cpp/tests/TxAckTest.cpp @@ -25,6 +25,7 @@ #include <iostream> #include <list> #include <vector> +#include "DummyChannel.h" using std::list; using std::vector; @@ -44,7 +45,7 @@ class TxAckTest : public CppUnit::TestCase dequeued.push_back(std::pair<Message*, const string*>(msg, xid)); } - TestMessageStore() : NullMessageStore(false) {} + TestMessageStore() : NullMessageStore() {} ~TestMessageStore(){} }; @@ -69,7 +70,9 @@ public: TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid) { for(int i = 0; i < 10; i++){ - Message::shared_ptr msg(new BasicMessage(0, "exchange", "routing_key", false, false)); + Message::shared_ptr msg( + new BasicMessage(0, "exchange", "routing_key", false, false, + DummyChannel::basicGetBody())); msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); messages.push_back(msg); diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp index 87658af62e..08e5339048 100644 --- a/cpp/tests/TxPublishTest.cpp +++ b/cpp/tests/TxPublishTest.cpp @@ -25,6 +25,7 @@ #include <iostream> #include <list> #include <vector> +#include "DummyChannel.h" using std::list; using std::pair; @@ -73,10 +74,12 @@ class TxPublishTest : public CppUnit::TestCase public: - TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), - queue2(new Queue("queue2", false, &store, 0)), - msg(new BasicMessage(0, "exchange", "routing_key", false, false)), - op(msg, &xid) + TxPublishTest() : + queue1(new Queue("queue1", false, &store, 0)), + queue2(new Queue("queue2", false, &store, 0)), + msg(new BasicMessage(0, "exchange", "routing_key", false, false, + DummyChannel::basicGetBody())), + op(msg, &xid) { msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); |
