summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-06 21:54:50 +0000
committerAlan Conway <aconway@apache.org>2012-01-06 21:54:50 +0000
commit851a2e16c72d0b56e3896e845e96866d44a97208 (patch)
tree35d237a124b4f50741f45306ab661a5456a2f434
parent124ec294fafc538fb10e6029ee2b86e06bcde95e (diff)
downloadqpid-python-851a2e16c72d0b56e3896e845e96866d44a97208.tar.gz
QPID-3603: Replace public broker::Consumer::position variable with get/set function pair.
Done for the new HA work, but this is better practice in any case. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1228424 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h13
-rw-r--r--qpid/cpp/src/qpid/broker/FifoDistributor.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp2
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp2
7 files changed, 19 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 5b507728f7..97f24900ef 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -45,20 +45,27 @@ class Consumer {
public:
typedef boost::shared_ptr<Consumer> shared_ptr;
- framing::SequenceNumber position;
-
Consumer(const std::string& _name, bool preAcquires = true)
: acquires(preAcquires), inListeners(false), name(_name), position(0) {}
+ virtual ~Consumer(){}
+
bool preAcquires() const { return acquires; }
const std::string& getName() const { return name; }
+ virtual framing::SequenceNumber getPosition() const { return position; }
+ virtual void setPosition(framing::SequenceNumber pos) { position = pos; }
+
virtual bool deliver(QueuedMessage& msg) = 0;
virtual void notify() = 0;
virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
virtual OwnershipToken* getSession() = 0;
virtual bool isDelayedCompletion() const { return false; }
- virtual ~Consumer(){}
+
+ protected:
+ framing::SequenceNumber position;
+
+ private:
friend class QueueListeners;
};
diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
index d63feffd57..074c2b9a9d 100644
--- a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
+++ b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
@@ -42,7 +42,7 @@ bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- return messages.browse(c->position, next, false);
+ return messages.browse(c->getPosition(), next, false);
}
void FifoDistributor::query(qpid::types::Variant::Map&) const
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 24ac394e26..b2ab6bb2aa 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -207,7 +207,7 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
if (!messages.size())
return false;
- next.position = c->position;
+ next.position = c->getPosition();
if (!freeGroups.empty()) {
const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
if (nextFree < next.position) { // a free message is older than current
@@ -250,7 +250,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess
bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
// browse: allow access to any available msg, regardless of group ownership (?ok?)
- return messages.browse(c->position, next, false);
+ return messages.browse(c->getPosition(), next, false);
}
void MessageGroupManager::query(qpid::types::Variant::Map& status) const
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 6cb82606dd..963c092432 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -302,7 +302,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (allocator->nextConsumableMessage(c, msg)) {
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->position = msg.position;
+ c->setPosition(msg.position);
dequeue(0, msg);
continue;
}
@@ -350,7 +350,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
if (c->accept(msg.payload)) {
//consumer wants the message
- c->position = msg.position;
+ c->setPosition(msg.position);
m = msg;
return true;
} else {
@@ -361,7 +361,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
} else {
//consumer will never want this message, continue seeking
QPID_LOG(debug, "Browser skipping message from '" << name << "'");
- c->position = msg.position;
+ c->setPosition(msg.position);
}
}
return false;
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 394749aad2..109a690047 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -407,7 +407,7 @@ void Connection::shadowSetUser(const std::string& userId) {
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
- c->position = position;
+ c->setPosition(position);
c->setBlocked(blocked);
if (notifyEnabled) c->enableNotify(); else c->disableNotify();
updateIn.consumerNumbering.add(c);
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 2446c12f2b..ac1057045d 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -542,7 +542,7 @@ void UpdateClient::updateConsumer(
ci->getTag(),
ci->isBlocked(),
ci->isNotifyEnabled(),
- ci->position
+ ci->getPosition()
);
consumerNumbering.add(ci.get());
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index b70bb1ffbe..80541978d0 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -300,7 +300,7 @@ QPID_AUTO_TEST_CASE(testSeek){
TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
SequenceNumber seq(2);
- consumer->position = seq;
+ consumer->setPosition(seq);
QueuedMessage qm;
queue->dispatch(consumer);