summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Channel.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Channel.h')
-rw-r--r--cpp/src/qpid/broker/Channel.h143
1 files changed, 32 insertions, 111 deletions
diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h
index e742f45279..ef6700ff80 100644
--- a/cpp/src/qpid/broker/Channel.h
+++ b/cpp/src/qpid/broker/Channel.h
@@ -19,17 +19,29 @@
#define _Channel_
#include <algorithm>
+#include <functional>
+#include <list>
#include <map>
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/framing/BasicPublishBody.h"
+#include "qpid/broker/AccumulatedAck.h"
#include "qpid/broker/Binding.h"
#include "qpid/broker/Consumer.h"
+#include "qpid/broker/DeletingTxOp.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/NameGenerator.h"
-#include "qpid/framing/OutputHandler.h"
+#include "qpid/broker/Prefetch.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/broker/TxAck.h"
+#include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TxPublish.h"
+#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/BasicPublishBody.h"
namespace qpid {
namespace broker {
@@ -37,8 +49,7 @@ namespace qpid {
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
- class Channel{
- private:
+ class Channel : private MessageBuilder::CompletionHandler{
class ConsumerImpl : public virtual Consumer{
Channel* parent;
string tag;
@@ -54,92 +65,29 @@ namespace qpid {
};
typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
-
- struct AckRecord{
- Message::shared_ptr msg;
- Queue::shared_ptr queue;
- string consumerTag;
- u_int64_t deliveryTag;
- bool pull;
-
- AckRecord(Message::shared_ptr _msg,
- Queue::shared_ptr _queue,
- const string _consumerTag,
- const u_int64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(_consumerTag),
- deliveryTag(_deliveryTag),
- pull(false){}
-
- AckRecord(Message::shared_ptr _msg,
- Queue::shared_ptr _queue,
- const u_int64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(""),
- deliveryTag(_deliveryTag),
- pull(true){}
- };
-
- typedef std::vector<AckRecord>::iterator ack_iterator;
-
- class MatchAck{
- const u_int64_t tag;
- public:
- MatchAck(u_int64_t tag);
- bool operator()(AckRecord& record) const;
- };
-
- class Requeue{
- public:
- void operator()(AckRecord& record) const;
- };
-
- class Redeliver{
- Channel* const channel;
- public:
- Redeliver(Channel* const channel);
- void operator()(AckRecord& record) const;
- };
-
- class CalculatePrefetch{
- u_int32_t size;
- u_int16_t count;
- public:
- CalculatePrefetch();
- void operator()(AckRecord& record);
- u_int32_t getSize();
- u_int16_t getCount();
- };
-
const int id;
qpid::framing::OutputHandler* out;
- u_int64_t deliveryTag;
+ u_int64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
bool transactional;
std::map<string, ConsumerImpl*> consumers;
u_int32_t prefetchSize;
u_int16_t prefetchCount;
- u_int32_t outstandingSize;
- u_int16_t outstandingCount;
+ Prefetch outstanding;
u_int32_t framesize;
- Message::shared_ptr message;
NameGenerator tagGenerator;
- std::vector<AckRecord> unacknowledged;
+ std::list<DeliveryRecord> unacked;
qpid::concurrent::MonitorImpl deliveryLock;
+ TxBuffer txBuffer;
+ AccumulatedAck accumulatedAck;
+ TransactionalStore* store;
+ MessageBuilder messageBuilder;//builder for in-progress message
+ Exchange* exchange;//exchange to which any in-progress message was published to
+ virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
- void checkMessage(const std::string& text);
- bool checkPrefetch(Message::shared_ptr& msg);
void cancel(consumer_iterator consumer);
-
- template<class Operation> Operation processMessage(Operation route){
- if(message->isComplete()){
- route(message);
- message.reset();
- }
- return route;
- }
-
+ bool checkPrefetch(Message::shared_ptr& msg);
public:
Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
@@ -158,37 +106,10 @@ namespace qpid {
void rollback();
void ack(u_int64_t deliveryTag, bool multiple);
void recover(bool requeue);
-
- /**
- * Handles the initial publish request though a
- * channel. The header and (if applicable) content will be
- * accumulated through calls to handleHeader() and
- * handleContent()
- */
- void handlePublish(Message* msg);
-
- /**
- * A template method that handles a received header and if
- * there is no content routes it using the functor passed
- * in.
- */
- template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
- checkMessage("Invalid message sequence: got header before publish.");
- message->setHeader(header);
- return processMessage(route);
- }
-
- /**
- * A template method that handles a received content and
- * if this completes the message, routes it using the
- * functor passed in.
- */
- template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
- checkMessage("Invalid message sequence: got content before publish.");
- message->addContent(content);
- return processMessage(route);
- }
-
+ void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
+ void handlePublish(Message* msg, Exchange* exchange);
+ void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void handleContent(qpid::framing::AMQContentBody::shared_ptr content);
};
struct InvalidAckException{};