summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.h')
-rw-r--r--cpp/src/qpid/broker/SemanticState.h45
1 files changed, 26 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index b2e648410a..6d88dd56d9 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -65,7 +65,7 @@ class SessionContext;
*
* Message delivery is driven by ConsumerImpl::doOutput(), which is
* called when a client's socket is ready to write data.
- *
+ *
*/
class SemanticState : private boost::noncopyable {
public:
@@ -75,14 +75,15 @@ class SemanticState : private boost::noncopyable {
{
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
- const std::string name;
const boost::shared_ptr<Queue> queue;
const bool ackExpected;
const bool acquire;
bool blocked;
bool windowing;
+ bool windowActive;
bool exclusive;
std::string resumeId;
+ const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
uint64_t resumeTtl;
framing::FieldTable arguments;
uint32_t msgCredit;
@@ -99,15 +100,16 @@ class SemanticState : private boost::noncopyable {
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
- ConsumerImpl(SemanticState* parent,
+ ConsumerImpl(SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+ const std::string& tag, const std::string& resumeId,
+ uint64_t resumeTtl, const framing::FieldTable& arguments);
~ConsumerImpl();
OwnershipToken* getSession();
- bool deliver(QueuedMessage& msg);
- bool filter(boost::intrusive_ptr<Message> msg);
- bool accept(boost::intrusive_ptr<Message> msg);
+ bool deliver(QueuedMessage& msg);
+ bool filter(boost::intrusive_ptr<Message> msg);
+ bool accept(boost::intrusive_ptr<Message> msg);
void disableNotify();
void enableNotify();
@@ -122,15 +124,13 @@ class SemanticState : private boost::noncopyable {
void addMessageCredit(uint32_t value);
void flush();
void stop();
- void complete(DeliveryRecord&);
+ void complete(DeliveryRecord&);
boost::shared_ptr<Queue> getQueue() const { return queue; }
bool isBlocked() const { return blocked; }
bool setBlocked(bool set) { std::swap(set, blocked); return set; }
bool doOutput();
- std::string getName() const { return name; }
-
bool isAckExpected() const { return ackExpected; }
bool isAcquire() const { return acquire; }
bool isWindowing() const { return windowing; }
@@ -138,6 +138,7 @@ class SemanticState : private boost::noncopyable {
uint32_t getMsgCredit() const { return msgCredit; }
uint32_t getByteCredit() const { return byteCredit; }
std::string getResumeId() const { return resumeId; };
+ const std::string& getTag() const { return tag; }
uint64_t getResumeTtl() const { return resumeTtl; }
const framing::FieldTable& getArguments() const { return arguments; }
@@ -148,9 +149,10 @@ class SemanticState : private boost::noncopyable {
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
+ typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
+
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
- typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
SessionContext& session;
DeliveryAdapter& deliveryAdapter;
@@ -163,7 +165,6 @@ class SemanticState : private boost::noncopyable {
DtxBufferMap suspendedXids;
framing::SequenceSet accumulatedAck;
boost::shared_ptr<Exchange> cacheExchange;
- AclModule* acl;
const bool authMsg;
const std::string userID;
const std::string userName;
@@ -181,14 +182,16 @@ class SemanticState : private boost::noncopyable {
void disable(ConsumerImpl::shared_ptr);
public:
+
SemanticState(DeliveryAdapter&, SessionContext&);
~SemanticState();
SessionContext& getSession() { return session; }
const SessionContext& getSession() const { return session; }
- ConsumerImpl& find(const std::string& destination);
-
+ const ConsumerImpl::shared_ptr find(const std::string& destination) const;
+ bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
+
/**
* Get named queue, never returns 0.
* @return: named queue
@@ -196,16 +199,16 @@ class SemanticState : private boost::noncopyable {
* @exception: ConnectionException if name="" and session has no default.
*/
boost::shared_ptr<Queue> getQueue(const std::string& name) const;
-
+
bool exists(const std::string& consumerTag);
- void consume(const std::string& destination,
- boost::shared_ptr<Queue> queue,
+ void consume(const std::string& destination,
+ boost::shared_ptr<Queue> queue,
bool ackRequired, bool acquire, bool exclusive,
const std::string& resumeId=std::string(), uint64_t resumeTtl=0,
const framing::FieldTable& = framing::FieldTable());
- void cancel(const std::string& tag);
+ bool cancel(const std::string& tag);
void setWindowMode(const std::string& destination);
void setCreditMode(const std::string& destination);
@@ -218,12 +221,13 @@ class SemanticState : private boost::noncopyable {
void commit(MessageStore* const store);
void rollback();
void selectDtx();
+ bool getDtxSelected() const { return dtxSelected; }
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
void recover(bool requeue);
- void deliver(DeliveryRecord& message, bool sync);
+ void deliver(DeliveryRecord& message, bool sync);
void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
@@ -244,9 +248,12 @@ class SemanticState : private boost::noncopyable {
DeliveryRecords& getUnacked() { return unacked; }
framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
+ DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; }
void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
+ void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
+ DtxBufferMap& getSuspendedXids() { return suspendedXids; }
};
}} // namespace qpid::broker