summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2009-09-11 13:33:42 +0000
committerKim van der Riet <kpvdr@apache.org>2009-09-11 13:33:42 +0000
commitb171cc419ae5d2bc747ec2465ad1c76445f8bd37 (patch)
tree2f4d01f55832b1cee196214eae31af47f4ca4a78 /cpp
parent613071992900172cb00b5eff9f39b1cc06a5e2a8 (diff)
downloadqpid-python-b171cc419ae5d2bc747ec2465ad1c76445f8bd37.tar.gz
Joint checkin with cctrieloff. Refactor of exchange routing so that multi-queue policy differences may be resolved and allow for correct multi-queue flow-to-disk behavior. Different queues may have differing policies and persistence properties - these were previously being neglected. New c++ test added.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@813825 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp37
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp43
-rw-r--r--cpp/src/qpid/broker/Exchange.h15
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp35
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp34
-rw-r--r--cpp/src/qpid/broker/Message.cpp52
-rw-r--r--cpp/src/qpid/broker/Message.h31
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp4
-rw-r--r--cpp/src/qpid/broker/MessageReleaseManager.h54
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp53
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h26
-rw-r--r--cpp/src/qpid/broker/PersistableQueue.h18
-rw-r--r--cpp/src/qpid/broker/Queue.cpp138
-rw-r--r--cpp/src/qpid/broker/Queue.h24
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp36
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp20
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp54
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp62
-rw-r--r--cpp/src/qpid/sys/CopyOnWriteArray.h5
-rw-r--r--cpp/src/qpid/xml/XmlExchange.cpp63
-rw-r--r--cpp/src/qpid/xml/XmlExchange.h10
-rw-r--r--cpp/src/tests/QueueTest.cpp134
23 files changed, 537 insertions, 412 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 75cda31dbb..bf4ffdcbd1 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -536,6 +536,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.cpp \
qpid/broker/MessageBuilder.h \
+ qpid/broker/MessageReleaseMgr.h \
qpid/broker/MessageStore.h \
qpid/broker/MessageStoreModule.cpp \
qpid/broker/MessageStoreModule.h \
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index b9f24dee5f..6ffa7e2f8e 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,7 @@ using namespace qpid::sys;
using qpid::management::Manageable;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidFedOp("qpid.fed.op");
const std::string qpidFedTags("qpid.fed.tags");
@@ -98,7 +98,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
*/
std::vector<std::string> keys2prop;
{
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
for (Bindings::iterator iter = bindings.begin();
iter != bindings.end(); iter++) {
const BoundKey& bk = iter->second;
@@ -150,34 +150,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie
Mutex::ScopedLock l(lock);
p = bindings[routingKey].queues.snapshot();
}
- int count(0);
-
- if (p) {
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
- }
- }
-
- if(!count){
- QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey
- << "; no matching binding found");
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- }
- } else {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes(count);
- mgmtExchange->inc_byteRoutes(count * msg.contentSize());
- }
- }
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
- }
+ doRoute(msg, p);
}
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 90d81b81c6..9b5796bde3 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -76,6 +76,49 @@ Exchange::PreRoute::~PreRoute(){
}
}
+void Exchange::blockContentReleaseCheck(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p)
+{
+ bool allQueuesPersistent = true;
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); allQueuesPersistent && i!=p->end(); i++) {
+ allQueuesPersistent = (*i)->queue->getPersistenceId() > 0;
+ }
+ if (msg.getMessage().contentSize() && (!allQueuesPersistent || (p->size() > 1 && !msg.getMessage().isPersistent()))) {
+ msg.getMessage().blockRelease();
+ }
+}
+
+void Exchange::doRoute(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p)
+{
+ int count = 0;
+
+ if (p.get()) {
+ blockContentReleaseCheck(msg, p);
+
+ for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ }
+
+ if (mgmtExchange != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+}
+
void Exchange::routeIVE(){
if (ive && lastMsg.get()){
DeliverableMessage dmsg(lastMsg);
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index c1e878200f..c2393d0850 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,6 +29,7 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/CopyOnWriteArray.h"
#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Exchange.h"
@@ -78,12 +79,14 @@ protected:
private:
Exchange* parent;
};
-
+
+ void blockContentReleaseCheck(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr b);
+ void doRoute(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr b);
void routeIVE();
-
+
struct MatchQueue {
- const Queue::shared_ptr queue;
+ const Queue::shared_ptr queue;
MatchQueue(Queue::shared_ptr q);
bool operator()(Exchange::Binding::shared_ptr b);
};
@@ -143,7 +146,7 @@ public:
virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
-
+
//PersistableExchange:
QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index e9007ba682..972244c942 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,7 +26,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidFedOp("qpid.fed.op");
const std::string qpidFedTags("qpid.fed.tags");
@@ -106,34 +106,11 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons
return true;
}
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/)
+{
PreRoute pr(msg, this);
- uint32_t count(0);
-
BindingsArray::ConstPtr p = bindings.snapshot();
- if (p.get()){
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
- }
- }
-
- if (mgmtExchange != 0)
- {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
+ doRoute(msg, p);
}
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index c628c44909..e4825344a0 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -118,31 +118,17 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
PreRoute pr(msg, this);
- uint32_t count(0);
-
Bindings::ConstPtr p = bindings.snapshot();
- if (p.get()){
+ Bindings::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ if (p.get())
+ {
for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
if (match((*i)->args, *args)) {
- msg.deliverTo((*i)->queue);
- count++;
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
+ b->push_back(*i);
}
}
}
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
- if (count == 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- } else {
- mgmtExchange->inc_msgRoutes(count);
- mgmtExchange->inc_byteRoutes(count * msg.contentSize());
- }
- }
+ doRoute(msg, b);
}
@@ -163,7 +149,7 @@ HeadersExchange::~HeadersExchange() {}
const std::string HeadersExchange::typeName("headers");
-namespace
+namespace
{
bool match_values(const FieldValue& bind, const FieldValue& msg) {
@@ -181,7 +167,7 @@ bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
i != bind.end();
++i)
{
- if (i->first != x_match)
+ if (i->first != x_match)
{
Map::const_iterator j = msg.find(i->first);
if (j == msg.end()) return false;
@@ -194,7 +180,7 @@ bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
i != bind.end();
++i)
{
- if (i->first != x_match)
+ if (i->first != x_match)
{
Map::const_iterator j = msg.find(i->first);
if (j != msg.end()) {
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 639f04faa2..8731a29d24 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/NullMessageStore.h"
#include "qpid/StringUtils.h"
#include "qpid/framing/frame_functors.h"
#include "qpid/framing/FieldTable.h"
@@ -48,7 +49,7 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {}
Message::~Message()
@@ -75,7 +76,7 @@ std::string Message::getRoutingKey() const
return getAdapter().getRoutingKey(frames);
}
-std::string Message::getExchangeName() const
+std::string Message::getExchangeName() const
{
return getAdapter().getExchange(frames);
}
@@ -84,7 +85,7 @@ const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registr
{
if (!exchange) {
exchange = registry.get(getExchangeName());
- }
+ }
return exchange;
}
@@ -98,7 +99,7 @@ const FieldTable* Message::getApplicationHeaders() const
return getAdapter().getApplicationHeaders(frames);
}
-bool Message::isPersistent()
+bool Message::isPersistent() const
{
return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
}
@@ -175,26 +176,25 @@ void Message::decodeContent(framing::Buffer& buffer)
} else {
//adjust header flags
MarkLastSegment f;
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
}
//mark content loaded
loaded = true;
}
-void Message::releaseContent(MessageStore* _store)
+void Message::releaseContent(bool immediate)
{
- if (!store) {
- store = _store;
- }
- if (store) {
+ if (store && !NullMessageStore::isNullStore(store) && (immediate || releaseMgr.canRelease())) {
if (!getPersistenceId()) {
intrusive_ptr<PersistableMessage> pmsg(this);
store->stage(pmsg);
staged = true;
- }
- //remove any content frames from the frameset
- frames.remove(TypeFilter<CONTENT_BODY>());
- setContentReleased();
+ frames.remove(TypeFilter<CONTENT_BODY>());
+ setContentReleased();
+ } else if (immediate || releaseMgr.canRelease()) {
+ frames.remove(TypeFilter<CONTENT_BODY>());
+ setContentReleased();
+ }
}
}
@@ -213,7 +213,7 @@ bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxC
{
if (isContentReleased()) {
intrusive_ptr<const PersistableMessage> pmsg(this);
-
+
bool done = false;
string& data = frame.castBody<AMQContentBody>()->getData();
store->loadContent(queue, pmsg, data, offset, maxContentSize);
@@ -239,7 +239,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
+ {
AMQFrame frame((AMQContentBody()));
morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
@@ -257,7 +257,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
{
sys::Mutex::ScopedLock l(lock);
Relay f(out);
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
}
// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -287,7 +287,7 @@ bool Message::isContentLoaded() const
}
-namespace
+namespace
{
const std::string X_QPID_TRACE("x-qpid.trace");
}
@@ -324,13 +324,13 @@ void Message::addTraceId(const std::string& id)
trace += ",";
trace += id;
headers.setString(X_QPID_TRACE, trace);
- }
+ }
}
}
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
+ DeliveryProperties* props = getProperties<DeliveryProperties>();
if (props->getTtl()) {
// AMQP requires setting the expiration property to be posix
// time_t in seconds. TTL is in milliseconds
@@ -347,7 +347,7 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e;
- if (expiryPolicy)
+ if (expiryPolicy)
expiryPolicy->willExpire(*this);
}
@@ -362,7 +362,7 @@ boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor)
Replacement::iterator i = replacement.find(qfor);
if (i != replacement.end()){
return i->second;
- }
+ }
return empty;
}
@@ -410,7 +410,7 @@ void Message::setUpdateDestination(const std::string& d)
bool Message::isUpdateMessage()
{
- return updateDestination.size() && isA<MessageTransferBody>()
+ return updateDestination.size() && isA<MessageTransferBody>()
&& getMethod<MessageTransferBody>()->getDestination() == updateDestination;
}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 0024509bc8..92fc3df7ec 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,12 +34,12 @@
#include <vector>
namespace qpid {
-
+
namespace framing {
class FieldTable;
class SequenceNumber;
}
-
+
namespace broker {
class ConnectionToken;
class Exchange;
@@ -51,10 +51,10 @@ class ExpiryPolicy;
class Message : public PersistableMessage {
public:
typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
-
+
QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber());
QPID_BROKER_EXTERN ~Message();
-
+
uint64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
@@ -74,7 +74,7 @@ public:
bool isImmediate() const;
QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
framing::FieldTable& getOrInsertHeaders();
- QPID_BROKER_EXTERN bool isPersistent();
+ QPID_BROKER_EXTERN bool isPersistent() const;
bool requiresAccept();
QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
@@ -82,8 +82,8 @@ public:
bool hasExpired();
sys::AbsTime getExpiration() const { return expiration; }
- framing::FrameSet& getFrames() { return frames; }
- const framing::FrameSet& getFrames() const { return frames; }
+ framing::FrameSet& getFrames() { return frames; }
+ const framing::FrameSet& getFrames() const { return frames; }
template <class T> T* getProperties() {
qpid::framing::AMQHeaderBody* p = frames.getHeaders();
@@ -128,13 +128,13 @@ public:
QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer);
QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer);
-
+
/**
* Releases the in-memory content data held by this
* message. Must pass in a store from which the data can
* be reloaded.
*/
- void releaseContent(MessageStore* store);
+ void releaseContent(bool immediate = false);
void destroy();
bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const;
@@ -145,10 +145,11 @@ public:
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
-
- void forcePersistent();
- bool isForcedPersistent();
-
+
+ void forcePersistent();
+ bool isForcedPersistent();
+ void setStore(MessageStore* s) { store = s; }
+
boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 14b233fd6c..e886cc08a0 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -35,7 +35,6 @@ namespace
std::string type_str(uint8_t type);
const std::string QPID_MANAGEMENT("qpid.management");
}
-
MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
@@ -80,7 +79,7 @@ void MessageBuilder::handle(AMQFrame& frame)
&& !NullMessageStore::isNullStore(store)
&& message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
{
- message->releaseContent(store);
+ message->releaseContent(true);
staging = true;
}
}
@@ -96,6 +95,7 @@ void MessageBuilder::end()
void MessageBuilder::start(const SequenceNumber& id)
{
message = intrusive_ptr<Message>(new Message(id));
+ message->setStore(store);
state = METHOD;
staging = false;
}
diff --git a/cpp/src/qpid/broker/MessageReleaseManager.h b/cpp/src/qpid/broker/MessageReleaseManager.h
new file mode 100644
index 0000000000..1c3f615f59
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageReleaseManager.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _broker_MessageReleaseManager_h
+#define _broker_MessageReleaseManager_h
+
+namespace qpid {
+ namespace broker {
+
+ class MessageReleaseManager
+ {
+ private:
+ bool releaseBlocked;
+ bool releaseRequested;
+ bool released;
+
+ public:
+ MessageReleaseManager(): releaseBlocked(false), releaseRequested(false), released(false) {}
+ virtual ~MessageReleaseManager() {}
+
+ bool isReleaseBlocked() const { return releaseBlocked; }
+ void blockRelease() { if (!released) releaseBlocked = true; }
+
+ bool isReleaseRequested() const { return releaseRequested; }
+ void setReleaseRequested() { if (!released) releaseRequested = true; }
+
+ bool isReleased() const { return released; }
+ void setReleased() { released = true; }
+
+ bool canRelease() { return !releaseBlocked && releaseRequested; }
+ };
+
+ }
+}
+
+
+#endif /*_broker_MessageReleaseManager_h*/
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index c1f86d4ca4..dc855315db 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
#include "qpid/broker/PersistableMessage.h"
#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/NullMessageStore.h"
#include <iostream>
using namespace qpid::broker;
@@ -34,9 +35,8 @@ class MessageStore;
PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() :
- asyncEnqueueCounter(0),
+ asyncEnqueueCounter(0),
asyncDequeueCounter(0),
- contentReleased(false),
store(0)
{}
@@ -56,13 +56,22 @@ void PersistableMessage::flush()
if (q) {
store->flush(*q);
}
- }
+ }
}
-void PersistableMessage::setContentReleased() {contentReleased = true; }
+void PersistableMessage::setContentReleased() { releaseMgr.setReleased(); }
+
+void PersistableMessage::blockRelease() { releaseMgr.blockRelease(); }
+
+bool PersistableMessage::requestContentRelease()
+{
+ if (!store || NullMessageStore::isNullStore(store) || releaseMgr.isReleaseBlocked()) return false;
+ releaseMgr.setReleaseRequested();
+ return true;
+}
+
+bool PersistableMessage::isContentReleased()const { return releaseMgr.isReleased(); }
-bool PersistableMessage::isContentReleased()const { return contentReleased; }
-
bool PersistableMessage::isEnqueueComplete() {
sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
return asyncEnqueueCounter == 0;
@@ -85,8 +94,8 @@ void PersistableMessage::enqueueComplete() {
for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
PersistableQueue::shared_ptr q(i->lock());
if (q) q->notifyDurableIOComplete();
- }
- }
+ }
+ }
}
}
@@ -95,13 +104,13 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
PersistableQueue::shared_ptr q(i->lock());
if (q && q->getPersistenceId() == queue->getPersistenceId()) return true;
- }
- }
+ }
+ }
return false;
}
-void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
@@ -110,22 +119,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa
}
}
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
addToSyncList(queue, _store);
enqueueAsync();
}
-void PersistableMessage::enqueueAsync() {
+void PersistableMessage::enqueueAsync() {
sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
- asyncEnqueueCounter++;
+ asyncEnqueueCounter++;
}
-bool PersistableMessage::isDequeueComplete() {
+bool PersistableMessage::isDequeueComplete() {
sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
return asyncDequeueCounter == 0;
}
-
-void PersistableMessage::dequeueComplete() {
+
+void PersistableMessage::dequeueComplete() {
bool notify = false;
{
sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
@@ -138,7 +147,7 @@ void PersistableMessage::dequeueComplete() {
if (notify) allDequeuesComplete();
}
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
@@ -148,9 +157,9 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag
dequeueAsync();
}
-void PersistableMessage::dequeueAsync() {
+void PersistableMessage::dequeueAsync() {
sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
- asyncDequeueCounter++;
+ asyncDequeueCounter++;
}
}}
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index 05de9ff4c3..df645493c9 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,6 +31,7 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/MessageReleaseManager.h"
namespace qpid {
namespace broker {
@@ -46,7 +47,7 @@ class PersistableMessage : public Persistable
sys::Mutex asyncEnqueueLock;
sys::Mutex asyncDequeueLock;
sys::Mutex storeLock;
-
+
/**
* Tracks the number of outstanding asynchronous enqueue
* operations. When the message is enqueued asynchronously the
@@ -68,7 +69,6 @@ class PersistableMessage : public Persistable
void enqueueAsync();
void dequeueAsync();
- bool contentReleased;
syncList synclist;
protected:
@@ -81,6 +81,8 @@ class PersistableMessage : public Persistable
MessageStore* store;
+ MessageReleaseManager releaseMgr;
+
public:
typedef boost::shared_ptr<PersistableMessage> shared_ptr;
@@ -95,9 +97,15 @@ class PersistableMessage : public Persistable
PersistableMessage();
void flush();
-
+
+ bool requestContentRelease();
+
bool isContentReleased() const;
-
+
+ void blockRelease();
+
+ virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
+
QPID_BROKER_EXTERN bool isEnqueueComplete();
QPID_BROKER_EXTERN void enqueueComplete();
@@ -107,16 +115,16 @@ class PersistableMessage : public Persistable
QPID_BROKER_EXTERN bool isDequeueComplete();
-
+
QPID_BROKER_EXTERN void dequeueComplete();
QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
MessageStore* _store);
bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
-
+
void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
-
+
};
}}
diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h
index 8d85d36fef..aa6a751f61 100644
--- a/cpp/src/qpid/broker/PersistableQueue.h
+++ b/cpp/src/qpid/broker/PersistableQueue.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -55,18 +55,18 @@ public:
virtual const std::string& getName() const = 0;
virtual ~PersistableQueue() {
- if (externalQueueStore)
+ if (externalQueueStore)
delete externalQueueStore;
};
virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
-
+
inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;};
-
+
PersistableQueue():externalQueueStore(NULL){
};
-
-
+
+
/**
* call back to signal async AIO writes have
* completed (enqueue/dequeue etc)
@@ -76,9 +76,9 @@ public:
*/
virtual void notifyDurableIOComplete() = 0;
protected:
-
+
ExternalQueueStore* externalQueueStore;
-
+
};
}}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 08ee133981..c3b14688d6 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -56,7 +56,7 @@ using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
@@ -76,16 +76,16 @@ const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
}
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
Manageable* parent,
Broker* b) :
- name(_name),
+ name(_name),
autodelete(_autodelete),
store(_store),
- owner(_owner),
+ owner(_owner),
consumerCount(0),
exclusive(0),
noLocal(false),
@@ -182,9 +182,9 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg, true);
- if (store){
+ if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
- msg->addToSyncList(shared_from_this(), store);
+ msg->addToSyncList(shared_from_this(), store);
}
msg->enqueueComplete(); // mark the message as enqueued
mgntEnqStats(msg);
@@ -192,7 +192,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
- msg->releaseContent(store);
+ msg->releaseContent(true);
}
}
@@ -209,13 +209,13 @@ void Queue::requeue(const QueuedMessage& msg){
if (!isEnqueued(msg)) return;
QueueListeners::NotificationSet copy;
- {
+ {
Mutex::ScopedLock locker(messageLock);
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
- // for persistLastNode - don't force a message twice to disk, but force it if no force before
+ // for persistLastNode - don't force a message twice to disk, but force it if no force before
if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
msg.payload->forcePersistent();
if (msg.payload->isForcedPersistent() ){
@@ -234,7 +234,7 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){
}
}
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "Attempting to acquire message at " << position);
@@ -258,7 +258,7 @@ bool Queue::acquire(const QueuedMessage& msg) {
QPID_LOG(debug, "attempting to acquire " << msg.position);
for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set
- || (lastValueQueue && (i->position == msg.position) &&
+ || (lastValueQueue && (i->position == msg.position) &&
msg.payload.get() == checkLvqReplace(*i).payload.get()) ) {
clearLVQIndex(msg);
@@ -296,7 +296,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
case NO_MESSAGES:
default:
return false;
- }
+ }
} else {
return browseNextMessage(m, c);
}
@@ -317,7 +317,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c)
//enqueued and so is not available for consumption yet,
//register consumer for notification when this changes
listeners.addListener(c);
- return false;
+ return false;
} else {
//check that consumer has sufficient credit for the
//message (if it does not, no need to register it for
@@ -332,7 +332,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
+ if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
@@ -345,7 +345,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
}
if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
+ if (c->accept(msg.payload)) {
m = msg;
popMsg(msg);
return CONSUMED;
@@ -358,7 +358,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
//consumer will never want this message
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
return CANT_CONSUME;
- }
+ }
}
}
}
@@ -423,7 +423,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
if (c->position < getFront().position) {
msg = getFront();
return true;
- } else {
+ } else {
//TODO: can improve performance of this search, for now just searching linearly from end
Messages::reverse_iterator pos;
for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) {
@@ -524,7 +524,7 @@ void Queue::purgeExpired()
*/
uint32_t Queue::purge(const uint32_t purge_request){
Mutex::ScopedLock locker(messageLock);
- uint32_t purge_count = purge_request; // only comes into play if >0
+ uint32_t purge_count = purge_request; // only comes into play if >0
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
@@ -537,7 +537,7 @@ uint32_t Queue::purge(const uint32_t purge_request){
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
Mutex::ScopedLock locker(messageLock);
- uint32_t move_count = qty; // only comes into play if qty >0
+ uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
while((!qty || move_count--) && !messages.empty()) {
@@ -566,15 +566,16 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
Messages dequeues;
QueueListeners::NotificationSet copy;
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
+ msg->setStore(store);
if (policy.get()) {
policy->tryEnqueue(qm);
//depending on policy, may have some dequeues
if (!isRecovery) pendingDequeues.swap(dequeues);
}
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
-
+
LVQ::iterator i;
const framing::FieldTable* ft = msg->getApplicationHeaders();
if (lastValueQueue && ft){
@@ -584,7 +585,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (i == lvq.end() || msg->isUpdateMessage()){
messages.push_back(qm);
listeners.populate(copy);
- lvq[key] = msg;
+ lvq[key] = msg;
}else {
boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
if (!old) old = i->second;
@@ -594,10 +595,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
//recovery is complete
pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
} else {
- Mutex::ScopedUnlock u(messageLock);
+ Mutex::ScopedUnlock u(messageLock);
dequeue(0, QueuedMessage(qm.queue, old, qm.position));
}
- }
+ }
}else {
messages.push_back(qm);
listeners.populate(copy);
@@ -632,8 +633,8 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
if (ft) {
string key = ft->getAsString(qpidVQMatchProperty);
if (lvq.find(key) != lvq.end()){
- lvq[key] = replacement;
- }
+ lvq[key] = replacement;
+ }
}
msg.payload = replacement;
}
@@ -644,7 +645,7 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
-
+
uint32_t count = 0;
for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
//NOTE: don't need to use checkLvqReplace() here as it
@@ -652,7 +653,7 @@ uint32_t Queue::getMessageCount() const
//so the enqueueComplete check has no effect
if ( i->payload->isEnqueueComplete() ) count ++;
}
-
+
return count;
}
@@ -696,13 +697,13 @@ void Queue::setLastNodeFailure()
}
}
-// return true if store exists,
+// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
-
+
if (traceId.size()) {
msg->addTraceId(traceId);
}
@@ -716,13 +717,13 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
return false;
}
-// return true if store exists,
+// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
- if (!ctxt) {
+ if (!ctxt) {
dequeued(msg);
}
}
@@ -738,7 +739,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
- dequeued(msg);
+ dequeued(msg);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -794,7 +795,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
lastValueQueue = lastValueQueueNoBrowse;
}
-
+
persistLastNode= _settings.get(qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
@@ -803,7 +804,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
if (excludeList.size()) {
split(traceExclude, excludeList, ", ");
}
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
eventMode = _settings.getAsInt(qpidQueueEventGeneration);
@@ -859,9 +860,9 @@ const QueuePolicy* Queue::getPolicy()
return policy.get();
}
-uint64_t Queue::getPersistenceId() const
-{
- return persistenceId;
+uint64_t Queue::getPersistenceId() const
+{
+ return persistenceId;
}
void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -880,18 +881,18 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
persistenceId = _persistenceId;
}
-void Queue::encode(Buffer& buffer) const
+void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.put(settings);
- if (policy.get()) {
+ if (policy.get()) {
buffer.put(*policy);
}
}
uint32_t Queue::encodedSize() const
{
- return name.size() + 1/*short string size octet*/ + settings.encodedSize()
+ return name.size() + 1/*short string size octet*/ + settings.encodedSize()
+ (policy.get() ? (*policy).encodedSize() : 0);
}
@@ -922,50 +923,50 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
{
- if (broker.getQueues().destroyIf(queue->getName(),
+ if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
queue->unbind(broker.getExchanges(), queue);
queue->destroy();
}
}
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
-{
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
Mutex::ScopedLock locker(ownershipLock);
- return o == owner;
+ return o == owner;
}
-void Queue::releaseExclusiveOwnership()
-{
+void Queue::releaseExclusiveOwnership()
+{
Mutex::ScopedLock locker(ownershipLock);
- owner = 0;
+ owner = 0;
}
-bool Queue::setExclusiveOwner(const OwnershipToken* const o)
-{
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
Mutex::ScopedLock locker(ownershipLock);
if (owner) {
return false;
} else {
- owner = o;
+ owner = o;
return true;
}
}
-bool Queue::hasExclusiveOwner() const
-{
+bool Queue::hasExclusiveOwner() const
+{
Mutex::ScopedLock locker(ownershipLock);
- return owner != 0;
+ return owner != 0;
}
-bool Queue::hasExclusiveConsumer() const
-{
- return exclusive;
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
}
void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
- if (externalQueueStore!=inst && externalQueueStore)
- delete externalQueueStore;
+ if (externalQueueStore!=inst && externalQueueStore)
+ delete externalQueueStore;
externalQueueStore = inst;
if (inst) {
@@ -975,19 +976,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
-bool Queue::releaseMessageContent(const QueuedMessage& m)
-{
- if (store && !NullMessageStore::isNullStore(store)) {
- QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
- m.payload->releaseContent(store);
- return true;
- } else {
- QPID_LOG(warning, "Message " << m.position << " on " << name
- << " cannot be released from memory as the queue is not durable");
- return false;
- }
-}
-
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
@@ -1062,7 +1050,7 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
void Queue::addPendingDequeue(const QueuedMessage& msg)
{
//assumes lock is held - true at present but rather nasty as this is a public method
- pendingDequeues.push_back(msg);
+ pendingDequeues.push_back(msg);
}
QueueListeners& Queue::getListeners() { return listeners; }
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 77799fd967..286ac67124 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -156,8 +156,8 @@ namespace qpid {
typedef std::vector<shared_ptr> vector;
QPID_BROKER_EXTERN Queue(const string& name,
- bool autodelete = false,
- MessageStore* const store = 0,
+ bool autodelete = false,
+ MessageStore* const store = 0,
const OwnershipToken* const owner = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
@@ -213,11 +213,11 @@ namespace qpid {
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
+ uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
QPID_BROKER_EXTERN void purgeExpired();
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
@@ -254,8 +254,8 @@ namespace qpid {
* Inform queue of messages that were enqueued, have since
* been acquired but not yet accepted or released (and
* thus are still logically on the queue) - used in
- * clustered broker.
- */
+ * clustered broker.
+ */
void enqueued(const QueuedMessage& msg);
/**
@@ -266,9 +266,9 @@ namespace qpid {
* accepted it).
*/
bool isEnqueued(const QueuedMessage& msg);
-
+
/**
- * Gets the next available message
+ * Gets the next available message
*/
QPID_BROKER_EXTERN QueuedMessage get();
@@ -315,8 +315,6 @@ namespace qpid {
bindings.eachBinding(f);
}
- bool releaseMessageContent(const QueuedMessage&);
-
void popMsg(QueuedMessage& qmsg);
/** Set the position sequence number for the next message on the queue.
@@ -340,7 +338,7 @@ namespace qpid {
* queues. It is used for dequeueing messages in response
* to an enqueue while avoid holding lock over call to
* store.
- *
+ *
* Assumes messageLock is held - true for curent use case
* (QueuePolicy::tryEnqueue()) but rather nasty as this is a public
* method
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 39afe90134..03a7951237 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,7 @@
using namespace qpid::broker;
using namespace qpid::framing;
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
void QueuePolicy::enqueued(uint64_t _size)
@@ -89,7 +89,7 @@ void QueuePolicy::tryEnqueue(const QueuedMessage& m)
} else {
std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
throw ResourceLimitExceededException(
- QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
+ QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
<< " of size " << m.payload->contentSize() << " , policy: " << *this));
}
}
@@ -129,7 +129,7 @@ std::string QueuePolicy::getType(const FieldTable& settings)
FieldTable::ValuePtr v = settings.get(typeKey);
if (v && v->convertsTo<std::string>()) {
std::string t = v->get<std::string>();
- std::transform(t.begin(), t.end(), t.begin(), tolower);
+ std::transform(t.begin(), t.end(), t.begin(), tolower);
if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
}
return FLOW_TO_DISK;
@@ -152,7 +152,7 @@ void QueuePolicy::encode(Buffer& buffer) const
buffer.putLongLong(size.get());
}
-void QueuePolicy::decode ( Buffer& buffer )
+void QueuePolicy::decode ( Buffer& buffer )
{
maxCount = buffer.getLong();
maxSize = buffer.getLongLong();
@@ -179,15 +179,15 @@ const std::string QueuePolicy::RING("ring");
const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
-FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
+FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
{
- return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+ return QueuePolicy::checkLimit(m) || (m.queue->getPersistenceId() && m.payload->requestContentRelease());
}
-RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
bool before(const QueuedMessage& a, const QueuedMessage& b)
@@ -219,19 +219,19 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
- return find(m, pendingDequeues, false) || find(m, queue, false);
+ return find(m, pendingDequeues, false) || find(m, queue, false);
}
bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
{
if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
-
+
QueuedMessage oldest;
{
qpid::sys::Mutex::ScopedLock l(lock);
if (queue.empty()) {
- QPID_LOG(debug, "Message too large for ring queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ QPID_LOG(debug, "Message too large for ring queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
<< " [" << *this << "] "
<< ": message size = " << m.payload->contentSize() << " bytes");
return false;
@@ -251,13 +251,13 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
pendingDequeues.push_back(oldest);
}
oldest.queue->addPendingDequeue(oldest);
- QPID_LOG(debug, "Ring policy triggered in queue "
+ QPID_LOG(debug, "Ring policy triggered in queue "
<< (m.queue ? m.queue->getName() : std::string("unknown queue"))
<< ": removed message " << oldest.position << " to make way for " << m.position);
return true;
} else {
- QPID_LOG(debug, "Ring policy could not be triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ QPID_LOG(debug, "Ring policy could not be triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
<< ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
//in strict mode, if oldest message has been delivered (hence
//cannot be acquired) but not yet acked, it should not be
@@ -299,7 +299,7 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uin
}
}
-
+
namespace qpid {
namespace broker {
@@ -309,7 +309,7 @@ std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
else out << "size: unlimited";
out << "; ";
if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
- else out << "count: unlimited";
+ else out << "count: unlimited";
out << "; type=" << p.type;
return out;
}
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 2a19115fd1..e340209a44 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,7 +35,7 @@ namespace qpid {
namespace broker {
RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
- DtxManager& _dtxMgr, uint64_t _stagingThreshold)
+ DtxManager& _dtxMgr, uint64_t _stagingThreshold)
: queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
@@ -45,7 +45,7 @@ class RecoverableMessageImpl : public RecoverableMessage
intrusive_ptr<Message> msg;
const uint64_t stagingThreshold;
public:
- RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold);
+ RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold);
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
bool loadContent(uint64_t available);
@@ -61,7 +61,7 @@ class RecoverableQueueImpl : public RecoverableQueue
public:
RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {}
~RecoverableQueueImpl() {};
- void setPersistenceId(uint64_t id);
+ void setPersistenceId(uint64_t id);
uint64_t getPersistenceId() const;
const std::string& getName() const;
void setExternalQueueStore(ExternalQueueStore* inst);
@@ -129,10 +129,10 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff
{
boost::intrusive_ptr<Message> message(new Message());
message->decodeHeader(buffer);
- return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
+ return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
}
-RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
+RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn)
{
DtxBuffer::shared_ptr buffer(new DtxBuffer());
@@ -159,7 +159,7 @@ void RecoveryManagerImpl::recoveryComplete()
queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1));
}
-RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold)
+RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold)
{
if (!msg->isPersistent()) {
msg->forcePersistent(); // set so that message will get dequeued from store.
@@ -195,7 +195,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id)
{
queue->setPersistenceId(id);
}
-
+
uint64_t RecoverableQueueImpl::getPersistenceId() const
{
return queue->getPersistenceId();
@@ -205,7 +205,7 @@ const std::string& RecoverableQueueImpl::getName() const
{
return queue->getName();
}
-
+
void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst)
{
queue->setExternalQueueStore(inst);
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index bdd5f33601..87d7f6f97b 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -86,7 +86,7 @@ bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void SemanticState::consume(const string& tag,
+void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
@@ -103,7 +103,7 @@ void SemanticState::cancel(const string& tag){
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
-
+
}
}
@@ -173,7 +173,7 @@ void SemanticState::endDtx(const std::string& xid, bool fail)
dtxBuffer->fail();
} else {
dtxBuffer->markEnded();
- }
+ }
dtxBuffer.reset();
}
@@ -233,9 +233,9 @@ void SemanticState::record(const DeliveryRecord& delivery)
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
+SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
+ const string& _name,
+ Queue::shared_ptr _queue,
bool ack,
bool _acquire,
bool _exclusive,
@@ -244,20 +244,20 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const framing::FieldTable& _arguments
-) :
+) :
Consumer(_acquire),
- parent(_parent),
- name(_name),
- queue(_queue),
- ackExpected(ack),
+ parent(_parent),
+ name(_name),
+ queue(_queue),
+ ackExpected(ack),
acquire(_acquire),
- blocked(true),
+ blocked(true),
windowing(true),
exclusive(_exclusive),
resumeId(_resumeId),
resumeTtl(_resumeTtl),
arguments(_arguments),
- msgCredit(0),
+ msgCredit(0),
byteCredit(0),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -279,7 +279,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
if (!ackExpected) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
- }
+ }
if (acquire && !ackExpected) {
queue->dequeue(0, msg);
}
@@ -297,7 +297,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
// in future.
- //
+ //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
@@ -305,7 +305,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
uint32_t originalMsgCredit = msgCredit;
- uint32_t originalByteCredit = byteCredit;
+ uint32_t originalByteCredit = byteCredit;
if (msgCredit != 0xFFFFFFFF) {
msgCredit--;
}
@@ -315,13 +315,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
<< ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
-
+
}
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
- QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
+ QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
<< ", bytes: " << byteCredit << " msgs: " << msgCredit);
return false;
} else {
@@ -341,7 +341,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
- if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
+ if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
Queue::tryAutoDelete(session.getBroker(), queue);
}
}
@@ -366,7 +366,7 @@ const std::string nullstring;
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
-
+
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName)
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
@@ -393,7 +393,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
if (!strategy.delivered) {
//TODO:if discard-unroutable, just drop it
- //TODO:else if accept-mode is explicit, reject it
+ //TODO:else if accept-mode is explicit, reject it
//else route it to alternate exchange
if (cacheExchange->getAlternate()) {
cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -402,7 +402,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->destroy();
}
}
-
+ msg->releaseContent(); // release frames if release has been requested
}
void SemanticState::requestDispatch()
@@ -421,7 +421,7 @@ void SemanticState::ConsumerImpl::requestDispatch()
}
bool SemanticState::complete(DeliveryRecord& delivery)
-{
+{
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
i->second->complete(delivery);
@@ -449,7 +449,7 @@ void SemanticState::recover(bool requeue)
unacked.clear();
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
- for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
+ for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
//unconfirmed messages re redelivered and therefore have their
//id adjusted, confirmed messages are not and so the ordering
//w.r.t id is lost
@@ -570,7 +570,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const {
}
AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
-{
+{
return DeliveryRecord::findRange(unacked, first, last);
}
@@ -655,13 +655,13 @@ void SemanticState::accepted(const SequenceSet& commands) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
accumulatedAck.add(commands);
-
+
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
//unacked and record it against that transaction
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
+ dtxBuffer->enlist(txAck);
//mark the relevant messages as 'ended' in unacked
//if the messages are already completed, they can be
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 6bf0b104ea..99d6c1cb8d 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,7 +36,7 @@ namespace _qmf = qmf::org::apache::qpid::broker;
// - excessive string copying: should be 0 copy, match from original buffer.
// - match/lookup: use descision tree or other more efficient structure.
-namespace
+namespace
{
const std::string qpidFedOp("qpid.fed.op");
const std::string qpidFedTags("qpid.fed.tags");
@@ -53,7 +53,7 @@ namespace {
// Iterate over a string of '.'-separated tokens.
struct TokenIterator {
typedef pair<const char*,const char*> Token;
-
+
TokenIterator(const char* b, const char* e) : token(make_pair(b, find(b,e,'.'))), end(e) {}
bool finished() const { return !token.first; }
@@ -122,7 +122,7 @@ class Matcher {
Matcher(const string& p, const string& k)
: matched(), pattern(&p[0], &p[0]+p.size()), key(&k[0], &k[0]+k.size())
{ matched = match(); }
-
+
operator bool() const { return matched; }
private:
@@ -158,7 +158,7 @@ class Matcher {
}
if (!pattern.finished() && pattern.match1('#'))
pattern.next(); // Trailing # matches empty.
- return pattern.finished() && key.finished();
+ return pattern.finished() && key.finished();
}
bool matched;
@@ -173,7 +173,7 @@ string TopicExchange::normalize(const string& pattern) {
return normal;
}
-bool TopicExchange::match(const string& pattern, const string& key)
+bool TopicExchange::match(const string& pattern, const string& key)
{
return Matcher(pattern, key);
}
@@ -231,11 +231,11 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
*/
std::vector<std::string> keys2prop;
{
- RWlock::ScopedRlock l(lock);
+ RWlock::ScopedRlock l(lock);
for (BindingMap::iterator iter = bindings.begin();
iter != bindings.end(); iter++) {
const BoundKey& bk = iter->second;
-
+
if (bk.fedBinding.hasLocal()) {
keys2prop.push_back(iter->first);
}
@@ -293,44 +293,24 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern)
return q != qv.end();
}
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
- Binding::vector mb;
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
+ qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
PreRoute pr(msg, this);
- uint32_t count(0);
{
- RWlock::ScopedRlock l(lock);
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, routingKey)) {
- Binding::vector& qv(i->second.bindingVector);
- for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
- mb.push_back(*j);
+ RWlock::ScopedRlock l(lock);
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (match(i->first, routingKey)) {
+ Binding::vector& qv(i->second.bindingVector);
+ for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
+ b->push_back(*j);
+ }
}
}
}
- }
-
- for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) {
- msg.deliverTo((*j)->queue);
- if ((*j)->mgmtBinding != 0)
- (*j)->mgmtBinding->inc_msgMatched ();
- }
- if (mgmtExchange != 0)
- {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
+ doRoute(msg, b);
}
bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
@@ -343,7 +323,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing
return bindings.size() > 0;
} else if (routingKey) {
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *routingKey))
+ if (match(i->first, *routingKey))
return true;
}
} else {
diff --git a/cpp/src/qpid/sys/CopyOnWriteArray.h b/cpp/src/qpid/sys/CopyOnWriteArray.h
index e4ae3a6094..b7111f2b17 100644
--- a/cpp/src/qpid/sys/CopyOnWriteArray.h
+++ b/cpp/src/qpid/sys/CopyOnWriteArray.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,6 +39,7 @@ class CopyOnWriteArray
{
public:
typedef boost::shared_ptr<const std::vector<T> > ConstPtr;
+ typedef boost::shared_ptr<std::vector<T> > Ptr;
CopyOnWriteArray() {}
CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {}
diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp
index a41c8840ff..253b9ff8d0 100644
--- a/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/cpp/src/qpid/xml/XmlExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -69,7 +69,7 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable,
// #### TODO: The Binding should take the query text
// #### only. Consider encapsulating the entire block, including
// #### the if condition.
-
+
bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments)
{
@@ -97,7 +97,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const
if ((*it)->getStaticAnalysis().areContextFlagsUsed()) {
binding->parse_message_content = true;
break;
- }
+ }
}
}
@@ -129,11 +129,11 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons
}
return true;
} else {
- return false;
+ return false;
}
}
-bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content)
+bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content)
{
string msgContent;
@@ -151,12 +151,12 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F
QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
- XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
+ XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(),
msgContent.length(), "input" );
// This will parse the document using either Xerces or FastXDM, depending
// on your XQilla configuration. FastXDM can be as much as 10x faster.
-
+
Sequence seq(context->parseDocument(xml));
if(!seq.isEmpty() && seq.first()->isNode()) {
@@ -206,49 +206,26 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT
PreRoute pr(msg, this);
try {
XmlBinding::vector::ConstPtr p;
- {
+ qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ {
RWlock::ScopedRlock l(lock);
- p = bindingsMap[routingKey].snapshot();
- if (!p) return;
- }
- int count(0);
+ p = bindingsMap[routingKey].snapshot();
+ if (!p.get()) return;
+ }
for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
- if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
- msg.deliverTo((*i)->queue);
- count++;
- QPID_LOG(trace, "Delivered to queue" );
-
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
+ b->push_back(*i);
}
- }
- if (!count) {
- QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- } else {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- }
+ }
+ doRoute(msg, b);
} catch (...) {
QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey);
}
-
-
}
-bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
RWlock::ScopedRlock l(lock);
if (routingKey) {
@@ -274,12 +251,12 @@ bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKe
}
-XmlExchange::~XmlExchange()
+XmlExchange::~XmlExchange()
{
bindingsMap.clear();
}
const std::string XmlExchange::typeName("xml");
-
+
}
}
diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h
index 38cb7699b6..389bfebfd0 100644
--- a/cpp/src/qpid/xml/XmlExchange.h
+++ b/cpp/src/qpid/xml/XmlExchange.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -53,7 +53,7 @@ class XmlExchange : public virtual Exchange {
Binding(key, queue, parent), xquery(query), parse_message_content(true) {}
};
-
+
typedef std::map<string, XmlBinding::vector > XmlBindingsMap;
XmlBindingsMap bindingsMap;
@@ -64,13 +64,13 @@ class XmlExchange : public virtual Exchange {
public:
static const std::string typeName;
-
+
XmlExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0);
XmlExchange(const string& _name, bool _durable,
const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
-
+
virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index a655baeab8..b6932434b6 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -22,6 +22,8 @@
#include "test_tools.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -30,12 +32,14 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/QueueOptions.h"
+#include "qpid/framing/reply_exceptions.h"
#include <iostream>
#include "boost/format.hpp"
using boost::intrusive_ptr;
using namespace qpid;
using namespace qpid::broker;
+using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -273,14 +277,35 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
}
-class TestMessageStoreOC : public NullMessageStore
+const std::string nullxid = "";
+
+class SimpleDummyCtxt : public TransactionContext {};
+
+class DummyCtxt : public TPCTransactionContext
+{
+ const std::string xid;
+public:
+ DummyCtxt(const std::string& _xid) : xid(_xid) {}
+ static std::string getXid(TransactionContext& ctxt)
+ {
+ DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
+ return c ? c->xid : nullxid;
+ }
+};
+
+class TestMessageStoreOC : public MessageStore
{
+ std::set<std::string> prepared;
+ uint64_t nextPersistenceId;
public:
uint enqCnt;
uint deqCnt;
bool error;
+ TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {}
+ ~TestMessageStoreOC(){}
+
virtual void dequeue(TransactionContext*,
const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
const PersistableQueue& /*queue*/)
@@ -302,8 +327,32 @@ class TestMessageStoreOC : public NullMessageStore
error=true;
}
- TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {}
- ~TestMessageStoreOC(){}
+ bool init(const Options*) { return true; }
+ void truncateInit(const bool) {}
+ void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); }
+ void destroy(PersistableQueue&) {}
+ void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); }
+ void destroy(const PersistableExchange&) {}
+ void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+ void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+ void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); }
+ void destroy(const PersistableConfig&) {}
+ void stage(const boost::intrusive_ptr<PersistableMessage>&) {}
+ void destroy(PersistableMessage&) {}
+ void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {}
+ void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&,
+ std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); }
+ void flush(const qpid::broker::PersistableQueue&) {}
+ uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; }
+
+ std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); }
+ std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); }
+ void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); }
+ void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+ void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+ void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); }
+
+ void recover(RecoveryManager&) {}
};
@@ -703,7 +752,7 @@ not requeued to the store.
QPID_AUTO_TEST_CASE(testLastNodeJournalError){
/*
-simulate store excption going into last node standing
+simulate store exception going into last node standing
*/
TestMessageStoreOC testStore;
@@ -727,6 +776,83 @@ simulate store excption going into last node standing
}
+intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) {
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
+}
+
+QPID_AUTO_TEST_CASE(testFlowToDiskMsgProperties){
+
+ TestMessageStoreOC testStore;
+ client::QueueOptions args;
+ args.setSizePolicy(FLOW_TO_DISK, 0, 1);
+
+ intrusive_ptr<Message> msg1 = mkMsg("e", "A");
+ intrusive_ptr<Message> msg2 = mkMsg("e", "B");
+ intrusive_ptr<Message> msg3 = mkMsg("e", "C");
+ intrusive_ptr<Message> msg4 = mkMsg("e", "D");
+ intrusive_ptr<Message> msg5 = mkMsg("e", "E");
+ intrusive_ptr<Message> msg6 = mkMsg("e", "F");
+ intrusive_ptr<Message> msg7 = mkMsg("e", "G");
+ msg4->forcePersistent();
+ msg5->forcePersistent();
+ msg7->forcePersistent();
+
+ DeliverableMessage dmsg1(msg1);
+ DeliverableMessage dmsg2(msg2);
+ DeliverableMessage dmsg3(msg3);
+ DeliverableMessage dmsg4(msg4);
+ DeliverableMessage dmsg5(msg5);
+ DeliverableMessage dmsg6(msg6);
+ DeliverableMessage dmsg7(msg7);
+
+ FanOutExchange fanout1("fanout1", false, args);
+ FanOutExchange fanout2("fanout2", false, args);
+
+ Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
+ queue1->configure(args);
+ Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
+ queue2->configure(args);
+ Queue::shared_ptr queue3(new Queue("queue3", true));
+ fanout1.bind(queue1, "", 0);
+ fanout1.bind(queue2, "", 0);
+ fanout1.route(dmsg1, "", 0);
+ msg1->releaseContent();
+ fanout1.route(dmsg2, "", 0);
+ msg2->releaseContent();
+ fanout1.route(dmsg3, "", 0);
+ msg3->releaseContent();
+
+ BOOST_CHECK_EQUAL(3, queue1->getMessageCount());
+ BOOST_CHECK_EQUAL(3, queue2->getMessageCount());
+ BOOST_CHECK_EQUAL(msg1->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(msg2->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(msg3->isContentReleased(), true);
+
+ fanout1.bind(queue3, "", 0);
+ fanout1.route(dmsg4, "", 0);
+ msg4->releaseContent();
+ BOOST_CHECK_EQUAL(msg4->isContentReleased(), false);
+ fanout1.route(dmsg5, "", 0);
+ msg5->releaseContent();
+ BOOST_CHECK_EQUAL(msg5->isContentReleased(), false);
+
+ fanout2.bind(queue3, "", 0);
+ fanout2.route(dmsg6, "", 0);
+ fanout2.route(dmsg7, "", 0);
+ msg6->releaseContent();
+ BOOST_CHECK_EQUAL(msg6->isContentReleased(), false);
+ msg7->releaseContent();
+ BOOST_CHECK_EQUAL(msg7->isContentReleased(), false);
+
+}
+
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests