diff options
Diffstat (limited to 'cpp/src/qpid/broker/Channel.h')
| -rw-r--r-- | cpp/src/qpid/broker/Channel.h | 143 |
1 files changed, 32 insertions, 111 deletions
diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h index e742f45279..ef6700ff80 100644 --- a/cpp/src/qpid/broker/Channel.h +++ b/cpp/src/qpid/broker/Channel.h @@ -19,17 +19,29 @@ #define _Channel_ #include <algorithm> +#include <functional> +#include <list> #include <map> -#include "qpid/framing/AMQContentBody.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "qpid/framing/BasicPublishBody.h" +#include "qpid/broker/AccumulatedAck.h" #include "qpid/broker/Binding.h" #include "qpid/broker/Consumer.h" +#include "qpid/broker/DeletingTxOp.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/broker/MessageBuilder.h" #include "qpid/broker/NameGenerator.h" -#include "qpid/framing/OutputHandler.h" +#include "qpid/broker/Prefetch.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/TransactionalStore.h" +#include "qpid/broker/TxAck.h" +#include "qpid/broker/TxBuffer.h" +#include "qpid/broker/TxPublish.h" +#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/BasicPublishBody.h" namespace qpid { namespace broker { @@ -37,8 +49,7 @@ namespace qpid { * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ - class Channel{ - private: + class Channel : private MessageBuilder::CompletionHandler{ class ConsumerImpl : public virtual Consumer{ Channel* parent; string tag; @@ -54,92 +65,29 @@ namespace qpid { }; typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; - - struct AckRecord{ - Message::shared_ptr msg; - Queue::shared_ptr queue; - string consumerTag; - u_int64_t deliveryTag; - bool pull; - - AckRecord(Message::shared_ptr _msg, - Queue::shared_ptr _queue, - const string _consumerTag, - const u_int64_t _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(_consumerTag), - deliveryTag(_deliveryTag), - pull(false){} - - AckRecord(Message::shared_ptr _msg, - Queue::shared_ptr _queue, - const u_int64_t _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(""), - deliveryTag(_deliveryTag), - pull(true){} - }; - - typedef std::vector<AckRecord>::iterator ack_iterator; - - class MatchAck{ - const u_int64_t tag; - public: - MatchAck(u_int64_t tag); - bool operator()(AckRecord& record) const; - }; - - class Requeue{ - public: - void operator()(AckRecord& record) const; - }; - - class Redeliver{ - Channel* const channel; - public: - Redeliver(Channel* const channel); - void operator()(AckRecord& record) const; - }; - - class CalculatePrefetch{ - u_int32_t size; - u_int16_t count; - public: - CalculatePrefetch(); - void operator()(AckRecord& record); - u_int32_t getSize(); - u_int16_t getCount(); - }; - const int id; qpid::framing::OutputHandler* out; - u_int64_t deliveryTag; + u_int64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; bool transactional; std::map<string, ConsumerImpl*> consumers; u_int32_t prefetchSize; u_int16_t prefetchCount; - u_int32_t outstandingSize; - u_int16_t outstandingCount; + Prefetch outstanding; u_int32_t framesize; - Message::shared_ptr message; NameGenerator tagGenerator; - std::vector<AckRecord> unacknowledged; + std::list<DeliveryRecord> unacked; qpid::concurrent::MonitorImpl deliveryLock; + TxBuffer txBuffer; + AccumulatedAck accumulatedAck; + TransactionalStore* store; + MessageBuilder messageBuilder;//builder for in-progress message + Exchange* exchange;//exchange to which any in-progress message was published to + virtual void complete(Message::shared_ptr& msg); void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected); - void checkMessage(const std::string& text); - bool checkPrefetch(Message::shared_ptr& msg); void cancel(consumer_iterator consumer); - - template<class Operation> Operation processMessage(Operation route){ - if(message->isComplete()){ - route(message); - message.reset(); - } - return route; - } - + bool checkPrefetch(Message::shared_ptr& msg); public: Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize); @@ -158,37 +106,10 @@ namespace qpid { void rollback(); void ack(u_int64_t deliveryTag, bool multiple); void recover(bool requeue); - - /** - * Handles the initial publish request though a - * channel. The header and (if applicable) content will be - * accumulated through calls to handleHeader() and - * handleContent() - */ - void handlePublish(Message* msg); - - /** - * A template method that handles a received header and if - * there is no content routes it using the functor passed - * in. - */ - template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){ - checkMessage("Invalid message sequence: got header before publish."); - message->setHeader(header); - return processMessage(route); - } - - /** - * A template method that handles a received content and - * if this completes the message, routes it using the - * functor passed in. - */ - template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){ - checkMessage("Invalid message sequence: got content before publish."); - message->addContent(content); - return processMessage(route); - } - + void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag); + void handlePublish(Message* msg, Exchange* exchange); + void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void handleContent(qpid::framing::AMQContentBody::shared_ptr content); }; struct InvalidAckException{}; |
