diff options
author | Alan Conway <aconway@apache.org> | 2012-01-06 21:54:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-06 21:54:50 +0000 |
commit | 851a2e16c72d0b56e3896e845e96866d44a97208 (patch) | |
tree | 35d237a124b4f50741f45306ab661a5456a2f434 | |
parent | 124ec294fafc538fb10e6029ee2b86e06bcde95e (diff) | |
download | qpid-python-851a2e16c72d0b56e3896e845e96866d44a97208.tar.gz |
QPID-3603: Replace public broker::Consumer::position variable with get/set function pair.
Done for the new HA work, but this is better practice in any case.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1228424 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/FifoDistributor.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 2 |
7 files changed, 19 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 5b507728f7..97f24900ef 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -45,20 +45,27 @@ class Consumer { public: typedef boost::shared_ptr<Consumer> shared_ptr; - framing::SequenceNumber position; - Consumer(const std::string& _name, bool preAcquires = true) : acquires(preAcquires), inListeners(false), name(_name), position(0) {} + virtual ~Consumer(){} + bool preAcquires() const { return acquires; } const std::string& getName() const { return name; } + virtual framing::SequenceNumber getPosition() const { return position; } + virtual void setPosition(framing::SequenceNumber pos) { position = pos; } + virtual bool deliver(QueuedMessage& msg) = 0; virtual void notify() = 0; virtual bool filter(boost::intrusive_ptr<Message>) { return true; } virtual bool accept(boost::intrusive_ptr<Message>) { return true; } virtual OwnershipToken* getSession() = 0; virtual bool isDelayedCompletion() const { return false; } - virtual ~Consumer(){} + + protected: + framing::SequenceNumber position; + + private: friend class QueueListeners; }; diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp index d63feffd57..074c2b9a9d 100644 --- a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp +++ b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp @@ -42,7 +42,7 @@ bool FifoDistributor::allocate(const std::string&, const QueuedMessage& ) bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { - return messages.browse(c->position, next, false); + return messages.browse(c->getPosition(), next, false); } void FifoDistributor::query(qpid::types::Variant::Map&) const diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 24ac394e26..b2ab6bb2aa 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -207,7 +207,7 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued if (!messages.size()) return false; - next.position = c->position; + next.position = c->getPosition(); if (!freeGroups.empty()) { const framing::SequenceNumber& nextFree = freeGroups.begin()->first; if (nextFree < next.position) { // a free message is older than current @@ -250,7 +250,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { // browse: allow access to any available msg, regardless of group ownership (?ok?) - return messages.browse(c->position, next, false); + return messages.browse(c->getPosition(), next, false); } void MessageGroupManager::query(qpid::types::Variant::Map& status) const diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 6cb82606dd..963c092432 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -302,7 +302,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (allocator->nextConsumableMessage(c, msg)) { if (msg.payload->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); - c->position = msg.position; + c->setPosition(msg.position); dequeue(0, msg); continue; } @@ -350,7 +350,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) if (c->filter(msg.payload) && !msg.payload->hasExpired()) { if (c->accept(msg.payload)) { //consumer wants the message - c->position = msg.position; + c->setPosition(msg.position); m = msg; return true; } else { @@ -361,7 +361,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) } else { //consumer will never want this message, continue seeking QPID_LOG(debug, "Browser skipping message from '" << name << "'"); - c->position = msg.position; + c->setPosition(msg.position); } } return false; diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 394749aad2..109a690047 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -407,7 +407,7 @@ void Connection::shadowSetUser(const std::string& userId) { void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) { broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); - c->position = position; + c->setPosition(position); c->setBlocked(blocked); if (notifyEnabled) c->enableNotify(); else c->disableNotify(); updateIn.consumerNumbering.add(c); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 2446c12f2b..ac1057045d 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -542,7 +542,7 @@ void UpdateClient::updateConsumer( ci->getTag(), ci->isBlocked(), ci->isNotifyEnabled(), - ci->position + ci->getPosition() ); consumerNumbering.add(ci.get()); diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index b70bb1ffbe..80541978d0 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -300,7 +300,7 @@ QPID_AUTO_TEST_CASE(testSeek){ TestConsumer::shared_ptr consumer(new TestConsumer("test", false)); SequenceNumber seq(2); - consumer->position = seq; + consumer->setPosition(seq); QueuedMessage qm; queue->dispatch(consumer); |