diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-09-11 13:33:42 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-09-11 13:33:42 +0000 |
commit | b171cc419ae5d2bc747ec2465ad1c76445f8bd37 (patch) | |
tree | 2f4d01f55832b1cee196214eae31af47f4ca4a78 /cpp | |
parent | 613071992900172cb00b5eff9f39b1cc06a5e2a8 (diff) | |
download | qpid-python-b171cc419ae5d2bc747ec2465ad1c76445f8bd37.tar.gz |
Joint checkin with cctrieloff. Refactor of exchange routing so that multi-queue policy differences may be resolved and allow for correct multi-queue flow-to-disk behavior. Different queues may have differing policies and persistence properties - these were previously being neglected. New c++ test added.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@813825 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
23 files changed, 537 insertions, 412 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 75cda31dbb..bf4ffdcbd1 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -536,6 +536,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.cpp \ qpid/broker/MessageBuilder.h \ + qpid/broker/MessageReleaseMgr.h \ qpid/broker/MessageStore.h \ qpid/broker/MessageStoreModule.cpp \ qpid/broker/MessageStoreModule.h \ diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index b9f24dee5f..6ffa7e2f8e 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,7 +28,7 @@ using namespace qpid::sys; using qpid::management::Manageable; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); @@ -98,7 +98,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con */ std::vector<std::string> keys2prop; { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); for (Bindings::iterator iter = bindings.begin(); iter != bindings.end(); iter++) { const BoundKey& bk = iter->second; @@ -150,34 +150,7 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie Mutex::ScopedLock l(lock); p = bindings[routingKey].queues.snapshot(); } - int count(0); - - if (p) { - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); - } - } - - if(!count){ - QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey - << "; no matching binding found"); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops(); - mgmtExchange->inc_byteDrops(msg.contentSize()); - } - } else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes(count); - mgmtExchange->inc_byteRoutes(count * msg.contentSize()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives(); - mgmtExchange->inc_byteReceives(msg.contentSize()); - } + doRoute(msg, p); } diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 90d81b81c6..9b5796bde3 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -76,6 +76,49 @@ Exchange::PreRoute::~PreRoute(){ } } +void Exchange::blockContentReleaseCheck(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p) +{ + bool allQueuesPersistent = true; + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); allQueuesPersistent && i!=p->end(); i++) { + allQueuesPersistent = (*i)->queue->getPersistenceId() > 0; + } + if (msg.getMessage().contentSize() && (!allQueuesPersistent || (p->size() > 1 && !msg.getMessage().isPersistent()))) { + msg.getMessage().blockRelease(); + } +} + +void Exchange::doRoute(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr p) +{ + int count = 0; + + if (p.get()) { + blockContentReleaseCheck(msg, p); + + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + } + + if (mgmtExchange != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } +} + void Exchange::routeIVE(){ if (ive && lastMsg.get()){ DeliverableMessage dmsg(lastMsg); diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index c1e878200f..c2393d0850 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,6 +29,7 @@ #include "qpid/broker/MessageStore.h" #include "qpid/broker/PersistableExchange.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/CopyOnWriteArray.h" #include "qpid/sys/Mutex.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Exchange.h" @@ -78,12 +79,14 @@ protected: private: Exchange* parent; }; - + + void blockContentReleaseCheck(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr b); + void doRoute(Deliverable& msg, qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::ConstPtr b); void routeIVE(); - + struct MatchQueue { - const Queue::shared_ptr queue; + const Queue::shared_ptr queue; MatchQueue(Queue::shared_ptr q); bool operator()(Exchange::Binding::shared_ptr b); }; @@ -143,7 +146,7 @@ public: virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&); virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; - + //PersistableExchange: QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index e9007ba682..972244c942 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,7 +26,7 @@ using namespace qpid::framing; using namespace qpid::sys; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); @@ -106,34 +106,11 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons return true; } -void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ +void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/) +{ PreRoute pr(msg, this); - uint32_t count(0); - BindingsArray::ConstPtr p = bindings.snapshot(); - if (p.get()){ - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } - } - - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } + doRoute(msg, p); } bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index c628c44909..e4825344a0 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -118,31 +118,17 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons PreRoute pr(msg, this); - uint32_t count(0); - Bindings::ConstPtr p = bindings.snapshot(); - if (p.get()){ + Bindings::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); + if (p.get()) + { for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { if (match((*i)->args, *args)) { - msg.deliverTo((*i)->queue); - count++; - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); + b->push_back(*i); } } } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives(); - mgmtExchange->inc_byteReceives(msg.contentSize()); - if (count == 0) { - mgmtExchange->inc_msgDrops(); - mgmtExchange->inc_byteDrops(msg.contentSize()); - } else { - mgmtExchange->inc_msgRoutes(count); - mgmtExchange->inc_byteRoutes(count * msg.contentSize()); - } - } + doRoute(msg, b); } @@ -163,7 +149,7 @@ HeadersExchange::~HeadersExchange() {} const std::string HeadersExchange::typeName("headers"); -namespace +namespace { bool match_values(const FieldValue& bind, const FieldValue& msg) { @@ -181,7 +167,7 @@ bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { i != bind.end(); ++i) { - if (i->first != x_match) + if (i->first != x_match) { Map::const_iterator j = msg.find(i->first); if (j == msg.end()) return false; @@ -194,7 +180,7 @@ bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { i != bind.end(); ++i) { - if (i->first != x_match) + if (i->first != x_match) { Map::const_iterator j = msg.find(i->first); if (j != msg.end()) { diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 639f04faa2..8731a29d24 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ #include "qpid/broker/Message.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/NullMessageStore.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" @@ -48,7 +49,7 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {} Message::~Message() @@ -75,7 +76,7 @@ std::string Message::getRoutingKey() const return getAdapter().getRoutingKey(frames); } -std::string Message::getExchangeName() const +std::string Message::getExchangeName() const { return getAdapter().getExchange(frames); } @@ -84,7 +85,7 @@ const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registr { if (!exchange) { exchange = registry.get(getExchangeName()); - } + } return exchange; } @@ -98,7 +99,7 @@ const FieldTable* Message::getApplicationHeaders() const return getAdapter().getApplicationHeaders(frames); } -bool Message::isPersistent() +bool Message::isPersistent() const { return (getAdapter().isPersistent(frames) || forcePersistentPolicy); } @@ -175,26 +176,25 @@ void Message::decodeContent(framing::Buffer& buffer) } else { //adjust header flags MarkLastSegment f; - frames.map_if(f, TypeFilter<HEADER_BODY>()); + frames.map_if(f, TypeFilter<HEADER_BODY>()); } //mark content loaded loaded = true; } -void Message::releaseContent(MessageStore* _store) +void Message::releaseContent(bool immediate) { - if (!store) { - store = _store; - } - if (store) { + if (store && !NullMessageStore::isNullStore(store) && (immediate || releaseMgr.canRelease())) { if (!getPersistenceId()) { intrusive_ptr<PersistableMessage> pmsg(this); store->stage(pmsg); staged = true; - } - //remove any content frames from the frameset - frames.remove(TypeFilter<CONTENT_BODY>()); - setContentReleased(); + frames.remove(TypeFilter<CONTENT_BODY>()); + setContentReleased(); + } else if (immediate || releaseMgr.canRelease()) { + frames.remove(TypeFilter<CONTENT_BODY>()); + setContentReleased(); + } } } @@ -213,7 +213,7 @@ bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxC { if (isContentReleased()) { intrusive_ptr<const PersistableMessage> pmsg(this); - + bool done = false; string& data = frame.castBody<AMQContentBody>()->getData(); store->loadContent(queue, pmsg, data, offset, maxContentSize); @@ -239,7 +239,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16 uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); @@ -257,7 +257,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) { sys::Mutex::ScopedLock l(lock); Relay f(out); - frames.map_if(f, TypeFilter<HEADER_BODY>()); + frames.map_if(f, TypeFilter<HEADER_BODY>()); } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over @@ -287,7 +287,7 @@ bool Message::isContentLoaded() const } -namespace +namespace { const std::string X_QPID_TRACE("x-qpid.trace"); } @@ -324,13 +324,13 @@ void Message::addTraceId(const std::string& id) trace += ","; trace += id; headers.setString(X_QPID_TRACE, trace); - } + } } } -void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + DeliveryProperties* props = getProperties<DeliveryProperties>(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds @@ -347,7 +347,7 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; - if (expiryPolicy) + if (expiryPolicy) expiryPolicy->willExpire(*this); } @@ -362,7 +362,7 @@ boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) Replacement::iterator i = replacement.find(qfor); if (i != replacement.end()){ return i->second; - } + } return empty; } @@ -410,7 +410,7 @@ void Message::setUpdateDestination(const std::string& d) bool Message::isUpdateMessage() { - return updateDestination.size() && isA<MessageTransferBody>() + return updateDestination.size() && isA<MessageTransferBody>() && getMethod<MessageTransferBody>()->getDestination() == updateDestination; } diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 0024509bc8..92fc3df7ec 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,12 +34,12 @@ #include <vector> namespace qpid { - + namespace framing { class FieldTable; class SequenceNumber; } - + namespace broker { class ConnectionToken; class Exchange; @@ -51,10 +51,10 @@ class ExpiryPolicy; class Message : public PersistableMessage { public: typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback; - + QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber()); QPID_BROKER_EXTERN ~Message(); - + uint64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } @@ -74,7 +74,7 @@ public: bool isImmediate() const; QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; framing::FieldTable& getOrInsertHeaders(); - QPID_BROKER_EXTERN bool isPersistent(); + QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); @@ -82,8 +82,8 @@ public: bool hasExpired(); sys::AbsTime getExpiration() const { return expiration; } - framing::FrameSet& getFrames() { return frames; } - const framing::FrameSet& getFrames() const { return frames; } + framing::FrameSet& getFrames() { return frames; } + const framing::FrameSet& getFrames() const { return frames; } template <class T> T* getProperties() { qpid::framing::AMQHeaderBody* p = frames.getHeaders(); @@ -128,13 +128,13 @@ public: QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer); QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer); - + /** * Releases the in-memory content data held by this * message. Must pass in a store from which the data can * be reloaded. */ - void releaseContent(MessageStore* store); + void releaseContent(bool immediate = false); void destroy(); bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const; @@ -145,10 +145,11 @@ public: bool isExcluded(const std::vector<std::string>& excludes) const; void addTraceId(const std::string& id); - - void forcePersistent(); - bool isForcedPersistent(); - + + void forcePersistent(); + bool isForcedPersistent(); + void setStore(MessageStore* s) { store = s; } + boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const; void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor); diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index 14b233fd6c..e886cc08a0 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -35,7 +35,6 @@ namespace std::string type_str(uint8_t type); const std::string QPID_MANAGEMENT("qpid.management"); } - MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) : state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {} @@ -80,7 +79,7 @@ void MessageBuilder::handle(AMQFrame& frame) && !NullMessageStore::isNullStore(store) && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */) { - message->releaseContent(store); + message->releaseContent(true); staging = true; } } @@ -96,6 +95,7 @@ void MessageBuilder::end() void MessageBuilder::start(const SequenceNumber& id) { message = intrusive_ptr<Message>(new Message(id)); + message->setStore(store); state = METHOD; staging = false; } diff --git a/cpp/src/qpid/broker/MessageReleaseManager.h b/cpp/src/qpid/broker/MessageReleaseManager.h new file mode 100644 index 0000000000..1c3f615f59 --- /dev/null +++ b/cpp/src/qpid/broker/MessageReleaseManager.h @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _broker_MessageReleaseManager_h +#define _broker_MessageReleaseManager_h + +namespace qpid { + namespace broker { + + class MessageReleaseManager + { + private: + bool releaseBlocked; + bool releaseRequested; + bool released; + + public: + MessageReleaseManager(): releaseBlocked(false), releaseRequested(false), released(false) {} + virtual ~MessageReleaseManager() {} + + bool isReleaseBlocked() const { return releaseBlocked; } + void blockRelease() { if (!released) releaseBlocked = true; } + + bool isReleaseRequested() const { return releaseRequested; } + void setReleaseRequested() { if (!released) releaseRequested = true; } + + bool isReleased() const { return released; } + void setReleased() { released = true; } + + bool canRelease() { return !releaseBlocked && releaseRequested; } + }; + + } +} + + +#endif /*_broker_MessageReleaseManager_h*/ diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index c1f86d4ca4..dc855315db 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ #include "qpid/broker/PersistableMessage.h" #include "qpid/broker/MessageStore.h" +#include "qpid/broker/NullMessageStore.h" #include <iostream> using namespace qpid::broker; @@ -34,9 +35,8 @@ class MessageStore; PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : - asyncEnqueueCounter(0), + asyncEnqueueCounter(0), asyncDequeueCounter(0), - contentReleased(false), store(0) {} @@ -56,13 +56,22 @@ void PersistableMessage::flush() if (q) { store->flush(*q); } - } + } } -void PersistableMessage::setContentReleased() {contentReleased = true; } +void PersistableMessage::setContentReleased() { releaseMgr.setReleased(); } + +void PersistableMessage::blockRelease() { releaseMgr.blockRelease(); } + +bool PersistableMessage::requestContentRelease() +{ + if (!store || NullMessageStore::isNullStore(store) || releaseMgr.isReleaseBlocked()) return false; + releaseMgr.setReleaseRequested(); + return true; +} + +bool PersistableMessage::isContentReleased()const { return releaseMgr.isReleased(); } -bool PersistableMessage::isContentReleased()const { return contentReleased; } - bool PersistableMessage::isEnqueueComplete() { sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); return asyncEnqueueCounter == 0; @@ -85,8 +94,8 @@ void PersistableMessage::enqueueComplete() { for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { PersistableQueue::shared_ptr q(i->lock()); if (q) q->notifyDurableIOComplete(); - } - } + } + } } } @@ -95,13 +104,13 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { PersistableQueue::shared_ptr q(i->lock()); if (q && q->getPersistenceId() == queue->getPersistenceId()) return true; - } - } + } + } return false; } -void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -110,22 +119,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa } } -void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { addToSyncList(queue, _store); enqueueAsync(); } -void PersistableMessage::enqueueAsync() { +void PersistableMessage::enqueueAsync() { sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); - asyncEnqueueCounter++; + asyncEnqueueCounter++; } -bool PersistableMessage::isDequeueComplete() { +bool PersistableMessage::isDequeueComplete() { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); return asyncDequeueCounter == 0; } - -void PersistableMessage::dequeueComplete() { + +void PersistableMessage::dequeueComplete() { bool notify = false; { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); @@ -138,7 +147,7 @@ void PersistableMessage::dequeueComplete() { if (notify) allDequeuesComplete(); } -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -148,9 +157,9 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag dequeueAsync(); } -void PersistableMessage::dequeueAsync() { +void PersistableMessage::dequeueAsync() { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); - asyncDequeueCounter++; + asyncDequeueCounter++; } }} diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 05de9ff4c3..df645493c9 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -31,6 +31,7 @@ #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" #include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/MessageReleaseManager.h" namespace qpid { namespace broker { @@ -46,7 +47,7 @@ class PersistableMessage : public Persistable sys::Mutex asyncEnqueueLock; sys::Mutex asyncDequeueLock; sys::Mutex storeLock; - + /** * Tracks the number of outstanding asynchronous enqueue * operations. When the message is enqueued asynchronously the @@ -68,7 +69,6 @@ class PersistableMessage : public Persistable void enqueueAsync(); void dequeueAsync(); - bool contentReleased; syncList synclist; protected: @@ -81,6 +81,8 @@ class PersistableMessage : public Persistable MessageStore* store; + MessageReleaseManager releaseMgr; + public: typedef boost::shared_ptr<PersistableMessage> shared_ptr; @@ -95,9 +97,15 @@ class PersistableMessage : public Persistable PersistableMessage(); void flush(); - + + bool requestContentRelease(); + bool isContentReleased() const; - + + void blockRelease(); + + virtual QPID_BROKER_EXTERN bool isPersistent() const = 0; + QPID_BROKER_EXTERN bool isEnqueueComplete(); QPID_BROKER_EXTERN void enqueueComplete(); @@ -107,16 +115,16 @@ class PersistableMessage : public Persistable QPID_BROKER_EXTERN bool isDequeueComplete(); - + QPID_BROKER_EXTERN void dequeueComplete(); QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store); bool isStoredOnQueue(PersistableQueue::shared_ptr queue); - + void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); - + }; }} diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h index 8d85d36fef..aa6a751f61 100644 --- a/cpp/src/qpid/broker/PersistableQueue.h +++ b/cpp/src/qpid/broker/PersistableQueue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -55,18 +55,18 @@ public: virtual const std::string& getName() const = 0; virtual ~PersistableQueue() { - if (externalQueueStore) + if (externalQueueStore) delete externalQueueStore; }; virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0; - + inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;}; - + PersistableQueue():externalQueueStore(NULL){ }; - - + + /** * call back to signal async AIO writes have * completed (enqueue/dequeue etc) @@ -76,9 +76,9 @@ public: */ virtual void notifyDurableIOComplete() = 0; protected: - + ExternalQueueStore* externalQueueStore; - + }; }} diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 08ee133981..c3b14688d6 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -56,7 +56,7 @@ using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidMaxSize("qpid.max_size"); const std::string qpidMaxCount("qpid.max_count"); @@ -76,16 +76,16 @@ const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; } -Queue::Queue(const string& _name, bool _autodelete, +Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, Manageable* parent, Broker* b) : - name(_name), + name(_name), autodelete(_autodelete), store(_store), - owner(_owner), + owner(_owner), consumerCount(0), exclusive(0), noLocal(false), @@ -182,9 +182,9 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg, true); - if (store){ + if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure - msg->addToSyncList(shared_from_this(), store); + msg->addToSyncList(shared_from_this(), store); } msg->enqueueComplete(); // mark the message as enqueued mgntEnqStats(msg); @@ -192,7 +192,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this - msg->releaseContent(store); + msg->releaseContent(true); } } @@ -209,13 +209,13 @@ void Queue::requeue(const QueuedMessage& msg){ if (!isEnqueued(msg)) return; QueueListeners::NotificationSet copy; - { + { Mutex::ScopedLock locker(messageLock); msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); listeners.populate(copy); - // for persistLastNode - don't force a message twice to disk, but force it if no force before + // for persistLastNode - don't force a message twice to disk, but force it if no force before if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { msg.payload->forcePersistent(); if (msg.payload->isForcedPersistent() ){ @@ -234,7 +234,7 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ } } -bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); QPID_LOG(debug, "Attempting to acquire message at " << position); @@ -258,7 +258,7 @@ bool Queue::acquire(const QueuedMessage& msg) { QPID_LOG(debug, "attempting to acquire " << msg.position); for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set - || (lastValueQueue && (i->position == msg.position) && + || (lastValueQueue && (i->position == msg.position) && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { clearLVQIndex(msg); @@ -296,7 +296,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) case NO_MESSAGES: default: return false; - } + } } else { return browseNextMessage(m, c); } @@ -317,7 +317,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) //enqueued and so is not available for consumption yet, //register consumer for notification when this changes listeners.addListener(c); - return false; + return false; } else { //check that consumer has sufficient credit for the //message (if it does not, no need to register it for @@ -332,7 +332,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { + if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; @@ -345,7 +345,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ } if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { + if (c->accept(msg.payload)) { m = msg; popMsg(msg); return CONSUMED; @@ -358,7 +358,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ //consumer will never want this message QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); return CANT_CONSUME; - } + } } } } @@ -423,7 +423,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { if (c->position < getFront().position) { msg = getFront(); return true; - } else { + } else { //TODO: can improve performance of this search, for now just searching linearly from end Messages::reverse_iterator pos; for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) { @@ -524,7 +524,7 @@ void Queue::purgeExpired() */ uint32_t Queue::purge(const uint32_t purge_request){ Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 + uint32_t purge_count = purge_request; // only comes into play if >0 uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. @@ -537,7 +537,7 @@ uint32_t Queue::purge(const uint32_t purge_request){ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 + uint32_t move_count = qty; // only comes into play if qty >0 uint32_t count = 0; // count how many were moved for returning while((!qty || move_count--) && !messages.empty()) { @@ -566,15 +566,16 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ Messages dequeues; QueueListeners::NotificationSet copy; { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); + msg->setStore(store); if (policy.get()) { policy->tryEnqueue(qm); //depending on policy, may have some dequeues if (!isRecovery) pendingDequeues.swap(dequeues); } if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - + LVQ::iterator i; const framing::FieldTable* ft = msg->getApplicationHeaders(); if (lastValueQueue && ft){ @@ -584,7 +585,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (i == lvq.end() || msg->isUpdateMessage()){ messages.push_back(qm); listeners.populate(copy); - lvq[key] = msg; + lvq[key] = msg; }else { boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); if (!old) old = i->second; @@ -594,10 +595,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ //recovery is complete pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); } else { - Mutex::ScopedUnlock u(messageLock); + Mutex::ScopedUnlock u(messageLock); dequeue(0, QueuedMessage(qm.queue, old, qm.position)); } - } + } }else { messages.push_back(qm); listeners.populate(copy); @@ -632,8 +633,8 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) if (ft) { string key = ft->getAsString(qpidVQMatchProperty); if (lvq.find(key) != lvq.end()){ - lvq[key] = replacement; - } + lvq[key] = replacement; + } } msg.payload = replacement; } @@ -644,7 +645,7 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) uint32_t Queue::getMessageCount() const { Mutex::ScopedLock locker(messageLock); - + uint32_t count = 0; for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { //NOTE: don't need to use checkLvqReplace() here as it @@ -652,7 +653,7 @@ uint32_t Queue::getMessageCount() const //so the enqueueComplete check has no effect if ( i->payload->isEnqueueComplete() ) count ++; } - + return count; } @@ -696,13 +697,13 @@ void Queue::setLastNodeFailure() } } -// return true if store exists, +// return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } - + if (traceId.size()) { msg->addTraceId(traceId); } @@ -716,13 +717,13 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) return false; } -// return true if store exists, +// return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; - if (!ctxt) { + if (!ctxt) { dequeued(msg); } } @@ -738,7 +739,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); - dequeued(msg); + dequeued(msg); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); @@ -794,7 +795,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering) QPID_LOG(debug, "Configured queue as Last Value Queue No Browse"); lastValueQueue = lastValueQueueNoBrowse; } - + persistLastNode= _settings.get(qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); @@ -803,7 +804,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (excludeList.size()) { split(traceExclude, excludeList, ", "); } - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); eventMode = _settings.getAsInt(qpidQueueEventGeneration); @@ -859,9 +860,9 @@ const QueuePolicy* Queue::getPolicy() return policy.get(); } -uint64_t Queue::getPersistenceId() const -{ - return persistenceId; +uint64_t Queue::getPersistenceId() const +{ + return persistenceId; } void Queue::setPersistenceId(uint64_t _persistenceId) const @@ -880,18 +881,18 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const persistenceId = _persistenceId; } -void Queue::encode(Buffer& buffer) const +void Queue::encode(Buffer& buffer) const { buffer.putShortString(name); buffer.put(settings); - if (policy.get()) { + if (policy.get()) { buffer.put(*policy); } } uint32_t Queue::encodedSize() const { - return name.size() + 1/*short string size octet*/ + settings.encodedSize() + return name.size() + 1/*short string size octet*/ + settings.encodedSize() + (policy.get() ? (*policy).encodedSize() : 0); } @@ -922,50 +923,50 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) { - if (broker.getQueues().destroyIf(queue->getName(), + if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { queue->unbind(broker.getExchanges(), queue); queue->destroy(); } } -bool Queue::isExclusiveOwner(const OwnershipToken* const o) const -{ +bool Queue::isExclusiveOwner(const OwnershipToken* const o) const +{ Mutex::ScopedLock locker(ownershipLock); - return o == owner; + return o == owner; } -void Queue::releaseExclusiveOwnership() -{ +void Queue::releaseExclusiveOwnership() +{ Mutex::ScopedLock locker(ownershipLock); - owner = 0; + owner = 0; } -bool Queue::setExclusiveOwner(const OwnershipToken* const o) -{ +bool Queue::setExclusiveOwner(const OwnershipToken* const o) +{ Mutex::ScopedLock locker(ownershipLock); if (owner) { return false; } else { - owner = o; + owner = o; return true; } } -bool Queue::hasExclusiveOwner() const -{ +bool Queue::hasExclusiveOwner() const +{ Mutex::ScopedLock locker(ownershipLock); - return owner != 0; + return owner != 0; } -bool Queue::hasExclusiveConsumer() const -{ - return exclusive; +bool Queue::hasExclusiveConsumer() const +{ + return exclusive; } void Queue::setExternalQueueStore(ExternalQueueStore* inst) { - if (externalQueueStore!=inst && externalQueueStore) - delete externalQueueStore; + if (externalQueueStore!=inst && externalQueueStore) + delete externalQueueStore; externalQueueStore = inst; if (inst) { @@ -975,19 +976,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } -bool Queue::releaseMessageContent(const QueuedMessage& m) -{ - if (store && !NullMessageStore::isNullStore(store)) { - QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); - m.payload->releaseContent(store); - return true; - } else { - QPID_LOG(warning, "Message " << m.position << " on " << name - << " cannot be released from memory as the queue is not durable"); - return false; - } -} - ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; @@ -1062,7 +1050,7 @@ bool Queue::isEnqueued(const QueuedMessage& msg) void Queue::addPendingDequeue(const QueuedMessage& msg) { //assumes lock is held - true at present but rather nasty as this is a public method - pendingDequeues.push_back(msg); + pendingDequeues.push_back(msg); } QueueListeners& Queue::getListeners() { return listeners; } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 77799fd967..286ac67124 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -156,8 +156,8 @@ namespace qpid { typedef std::vector<shared_ptr> vector; QPID_BROKER_EXTERN Queue(const string& name, - bool autodelete = false, - MessageStore* const store = 0, + bool autodelete = false, + MessageStore* const store = 0, const OwnershipToken* const owner = 0, management::Manageable* parent = 0, Broker* broker = 0); @@ -213,11 +213,11 @@ namespace qpid { bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages + uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages QPID_BROKER_EXTERN void purgeExpired(); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getConsumerCount() const; @@ -254,8 +254,8 @@ namespace qpid { * Inform queue of messages that were enqueued, have since * been acquired but not yet accepted or released (and * thus are still logically on the queue) - used in - * clustered broker. - */ + * clustered broker. + */ void enqueued(const QueuedMessage& msg); /** @@ -266,9 +266,9 @@ namespace qpid { * accepted it). */ bool isEnqueued(const QueuedMessage& msg); - + /** - * Gets the next available message + * Gets the next available message */ QPID_BROKER_EXTERN QueuedMessage get(); @@ -315,8 +315,6 @@ namespace qpid { bindings.eachBinding(f); } - bool releaseMessageContent(const QueuedMessage&); - void popMsg(QueuedMessage& qmsg); /** Set the position sequence number for the next message on the queue. @@ -340,7 +338,7 @@ namespace qpid { * queues. It is used for dequeueing messages in response * to an enqueue while avoid holding lock over call to * store. - * + * * Assumes messageLock is held - true for curent use case * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public * method diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 39afe90134..03a7951237 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,7 +28,7 @@ using namespace qpid::broker; using namespace qpid::framing; -QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : +QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {} void QueuePolicy::enqueued(uint64_t _size) @@ -89,7 +89,7 @@ void QueuePolicy::tryEnqueue(const QueuedMessage& m) } else { std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue"); throw ResourceLimitExceededException( - QPID_MSG("Policy exceeded on " << queue << " by message " << m.position + QPID_MSG("Policy exceeded on " << queue << " by message " << m.position << " of size " << m.payload->contentSize() << " , policy: " << *this)); } } @@ -129,7 +129,7 @@ std::string QueuePolicy::getType(const FieldTable& settings) FieldTable::ValuePtr v = settings.get(typeKey); if (v && v->convertsTo<std::string>()) { std::string t = v->get<std::string>(); - std::transform(t.begin(), t.end(), t.begin(), tolower); + std::transform(t.begin(), t.end(), t.begin(), tolower); if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t; } return FLOW_TO_DISK; @@ -152,7 +152,7 @@ void QueuePolicy::encode(Buffer& buffer) const buffer.putLongLong(size.get()); } -void QueuePolicy::decode ( Buffer& buffer ) +void QueuePolicy::decode ( Buffer& buffer ) { maxCount = buffer.getLong(); maxSize = buffer.getLongLong(); @@ -179,15 +179,15 @@ const std::string QueuePolicy::RING("ring"); const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); -FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : +FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {} bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m) { - return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m); + return QueuePolicy::checkLimit(m) || (m.queue->getPersistenceId() && m.payload->requestContentRelease()); } -RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : +RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} bool before(const QueuedMessage& a, const QueuedMessage& b) @@ -219,19 +219,19 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) //for non-strict ring policy, a message can be replaced (and //therefore dequeued) before it is accepted or released by //subscriber; need to detect this - return find(m, pendingDequeues, false) || find(m, queue, false); + return find(m, pendingDequeues, false) || find(m, queue, false); } bool RingQueuePolicy::checkLimit(const QueuedMessage& m) { if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept - + QueuedMessage oldest; { qpid::sys::Mutex::ScopedLock l(lock); if (queue.empty()) { - QPID_LOG(debug, "Message too large for ring queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) + QPID_LOG(debug, "Message too large for ring queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) << " [" << *this << "] " << ": message size = " << m.payload->contentSize() << " bytes"); return false; @@ -251,13 +251,13 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m) pendingDequeues.push_back(oldest); } oldest.queue->addPendingDequeue(oldest); - QPID_LOG(debug, "Ring policy triggered in queue " + QPID_LOG(debug, "Ring policy triggered in queue " << (m.queue ? m.queue->getName() : std::string("unknown queue")) << ": removed message " << oldest.position << " to make way for " << m.position); return true; } else { - QPID_LOG(debug, "Ring policy could not be triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) + QPID_LOG(debug, "Ring policy could not be triggered in queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); //in strict mode, if oldest message has been delivered (hence //cannot be acquired) but not yet acked, it should not be @@ -299,7 +299,7 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uin } } - + namespace qpid { namespace broker { @@ -309,7 +309,7 @@ std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) else out << "size: unlimited"; out << "; "; if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get(); - else out << "count: unlimited"; + else out << "count: unlimited"; out << "; type=" << p.type; return out; } diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 2a19115fd1..e340209a44 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -35,7 +35,7 @@ namespace qpid { namespace broker { RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, - DtxManager& _dtxMgr, uint64_t _stagingThreshold) + DtxManager& _dtxMgr, uint64_t _stagingThreshold) : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} @@ -45,7 +45,7 @@ class RecoverableMessageImpl : public RecoverableMessage intrusive_ptr<Message> msg; const uint64_t stagingThreshold; public: - RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold); + RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold); ~RecoverableMessageImpl() {}; void setPersistenceId(uint64_t id); bool loadContent(uint64_t available); @@ -61,7 +61,7 @@ class RecoverableQueueImpl : public RecoverableQueue public: RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {} ~RecoverableQueueImpl() {}; - void setPersistenceId(uint64_t id); + void setPersistenceId(uint64_t id); uint64_t getPersistenceId() const; const std::string& getName() const; void setExternalQueueStore(ExternalQueueStore* inst); @@ -129,10 +129,10 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff { boost::intrusive_ptr<Message> message(new Message()); message->decodeHeader(buffer); - return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); + return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); } -RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, +RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn) { DtxBuffer::shared_ptr buffer(new DtxBuffer()); @@ -159,7 +159,7 @@ void RecoveryManagerImpl::recoveryComplete() queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1)); } -RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold) +RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold) { if (!msg->isPersistent()) { msg->forcePersistent(); // set so that message will get dequeued from store. @@ -195,7 +195,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id) { queue->setPersistenceId(id); } - + uint64_t RecoverableQueueImpl::getPersistenceId() const { return queue->getPersistenceId(); @@ -205,7 +205,7 @@ const std::string& RecoverableQueueImpl::getName() const { return queue->getName(); } - + void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst) { queue->setExternalQueueStore(inst); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index bdd5f33601..87d7f6f97b 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -86,7 +86,7 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void SemanticState::consume(const string& tag, +void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { @@ -103,7 +103,7 @@ void SemanticState::cancel(const string& tag){ //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); - + } } @@ -173,7 +173,7 @@ void SemanticState::endDtx(const std::string& xid, bool fail) dtxBuffer->fail(); } else { dtxBuffer->markEnded(); - } + } dtxBuffer.reset(); } @@ -233,9 +233,9 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, + const string& _name, + Queue::shared_ptr _queue, bool ack, bool _acquire, bool _exclusive, @@ -244,20 +244,20 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const framing::FieldTable& _arguments -) : +) : Consumer(_acquire), - parent(_parent), - name(_name), - queue(_queue), - ackExpected(ack), + parent(_parent), + name(_name), + queue(_queue), + ackExpected(ack), acquire(_acquire), - blocked(true), + blocked(true), windowing(true), exclusive(_exclusive), resumeId(_resumeId), resumeTtl(_resumeTtl), arguments(_arguments), - msgCredit(0), + msgCredit(0), byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), @@ -279,7 +279,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) if (!ackExpected) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); - } + } if (acquire && !ackExpected) { queue->dequeue(0, msg); } @@ -297,7 +297,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages // in future. - // + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -305,7 +305,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; + uint32_t originalByteCredit = byteCredit; if (msgCredit != 0xFFFFFFFF) { msgCredit--; } @@ -315,13 +315,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); - + } bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { - QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent + QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent << ", bytes: " << byteCredit << " msgs: " << msgCredit); return false; } else { @@ -341,7 +341,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); - if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { + if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { Queue::tryAutoDelete(session.getBroker(), queue); } } @@ -366,7 +366,7 @@ const std::string nullstring; void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); - + std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName) cacheExchange = session.getBroker().getExchanges().get(exchangeName); @@ -393,7 +393,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { if (!strategy.delivered) { //TODO:if discard-unroutable, just drop it - //TODO:else if accept-mode is explicit, reject it + //TODO:else if accept-mode is explicit, reject it //else route it to alternate exchange if (cacheExchange->getAlternate()) { cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); @@ -402,7 +402,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->destroy(); } } - + msg->releaseContent(); // release frames if release has been requested } void SemanticState::requestDispatch() @@ -421,7 +421,7 @@ void SemanticState::ConsumerImpl::requestDispatch() } bool SemanticState::complete(DeliveryRecord& delivery) -{ +{ ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { i->second->complete(delivery); @@ -449,7 +449,7 @@ void SemanticState::recover(bool requeue) unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ - for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); + for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); //unconfirmed messages re redelivered and therefore have their //id adjusted, confirmed messages are not and so the ordering //w.r.t id is lost @@ -570,7 +570,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const { } AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) -{ +{ return DeliveryRecord::findRange(unacked, first, last); } @@ -655,13 +655,13 @@ void SemanticState::accepted(const SequenceSet& commands) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: accumulatedAck.add(commands); - + if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); - dtxBuffer->enlist(txAck); + dtxBuffer->enlist(txAck); //mark the relevant messages as 'ended' in unacked //if the messages are already completed, they can be diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 6bf0b104ea..99d6c1cb8d 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -36,7 +36,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; // - excessive string copying: should be 0 copy, match from original buffer. // - match/lookup: use descision tree or other more efficient structure. -namespace +namespace { const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); @@ -53,7 +53,7 @@ namespace { // Iterate over a string of '.'-separated tokens. struct TokenIterator { typedef pair<const char*,const char*> Token; - + TokenIterator(const char* b, const char* e) : token(make_pair(b, find(b,e,'.'))), end(e) {} bool finished() const { return !token.first; } @@ -122,7 +122,7 @@ class Matcher { Matcher(const string& p, const string& k) : matched(), pattern(&p[0], &p[0]+p.size()), key(&k[0], &k[0]+k.size()) { matched = match(); } - + operator bool() const { return matched; } private: @@ -158,7 +158,7 @@ class Matcher { } if (!pattern.finished() && pattern.match1('#')) pattern.next(); // Trailing # matches empty. - return pattern.finished() && key.finished(); + return pattern.finished() && key.finished(); } bool matched; @@ -173,7 +173,7 @@ string TopicExchange::normalize(const string& pattern) { return normal; } -bool TopicExchange::match(const string& pattern, const string& key) +bool TopicExchange::match(const string& pattern, const string& key) { return Matcher(pattern, key); } @@ -231,11 +231,11 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons */ std::vector<std::string> keys2prop; { - RWlock::ScopedRlock l(lock); + RWlock::ScopedRlock l(lock); for (BindingMap::iterator iter = bindings.begin(); iter != bindings.end(); iter++) { const BoundKey& bk = iter->second; - + if (bk.fedBinding.hasLocal()) { keys2prop.push_back(iter->first); } @@ -293,44 +293,24 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) return q != qv.end(); } -void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ - Binding::vector mb; +void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) +{ + qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); PreRoute pr(msg, this); - uint32_t count(0); { - RWlock::ScopedRlock l(lock); - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, routingKey)) { - Binding::vector& qv(i->second.bindingVector); - for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ - mb.push_back(*j); + RWlock::ScopedRlock l(lock); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (match(i->first, routingKey)) { + Binding::vector& qv(i->second.bindingVector); + for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){ + b->push_back(*j); + } } } } - } - - for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) { - msg.deliverTo((*j)->queue); - if ((*j)->mgmtBinding != 0) - (*j)->mgmtBinding->inc_msgMatched (); - } - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } + doRoute(msg, b); } bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) @@ -343,7 +323,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing return bindings.size() > 0; } else if (routingKey) { for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, *routingKey)) + if (match(i->first, *routingKey)) return true; } } else { diff --git a/cpp/src/qpid/sys/CopyOnWriteArray.h b/cpp/src/qpid/sys/CopyOnWriteArray.h index e4ae3a6094..b7111f2b17 100644 --- a/cpp/src/qpid/sys/CopyOnWriteArray.h +++ b/cpp/src/qpid/sys/CopyOnWriteArray.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,6 +39,7 @@ class CopyOnWriteArray { public: typedef boost::shared_ptr<const std::vector<T> > ConstPtr; + typedef boost::shared_ptr<std::vector<T> > Ptr; CopyOnWriteArray() {} CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {} diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp index a41c8840ff..253b9ff8d0 100644 --- a/cpp/src/qpid/xml/XmlExchange.cpp +++ b/cpp/src/qpid/xml/XmlExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -69,7 +69,7 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, // #### TODO: The Binding should take the query text // #### only. Consider encapsulating the entire block, including // #### the if condition. - + bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments) { @@ -97,7 +97,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const if ((*it)->getStaticAnalysis().areContextFlagsUsed()) { binding->parse_message_content = true; break; - } + } } } @@ -129,11 +129,11 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons } return true; } else { - return false; + return false; } } -bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) +bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) { string msgContent; @@ -151,12 +151,12 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F QPID_LOG(trace, "matches: message content is [" << msgContent << "]"); - XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), + XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), msgContent.length(), "input" ); // This will parse the document using either Xerces or FastXDM, depending // on your XQilla configuration. FastXDM can be as much as 10x faster. - + Sequence seq(context->parseDocument(xml)); if(!seq.isEmpty() && seq.first()->isNode()) { @@ -206,49 +206,26 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT PreRoute pr(msg, this); try { XmlBinding::vector::ConstPtr p; - { + qpid::sys::CopyOnWriteArray<Binding::shared_ptr>::Ptr b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); + { RWlock::ScopedRlock l(lock); - p = bindingsMap[routingKey].snapshot(); - if (!p) return; - } - int count(0); + p = bindingsMap[routingKey].snapshot(); + if (!p.get()) return; + } for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) { - if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { - msg.deliverTo((*i)->queue); - count++; - QPID_LOG(trace, "Delivered to queue" ); - - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); + if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { + b->push_back(*i); } - } - if (!count) { - QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - } else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - } + } + doRoute(msg, b); } catch (...) { QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); } - - } -bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) +bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { RWlock::ScopedRlock l(lock); if (routingKey) { @@ -274,12 +251,12 @@ bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKe } -XmlExchange::~XmlExchange() +XmlExchange::~XmlExchange() { bindingsMap.clear(); } const std::string XmlExchange::typeName("xml"); - + } } diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h index 38cb7699b6..389bfebfd0 100644 --- a/cpp/src/qpid/xml/XmlExchange.h +++ b/cpp/src/qpid/xml/XmlExchange.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -53,7 +53,7 @@ class XmlExchange : public virtual Exchange { Binding(key, queue, parent), xquery(query), parse_message_content(true) {} }; - + typedef std::map<string, XmlBinding::vector > XmlBindingsMap; XmlBindingsMap bindingsMap; @@ -64,13 +64,13 @@ class XmlExchange : public virtual Exchange { public: static const std::string typeName; - + XmlExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); XmlExchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } - + virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index a655baeab8..b6932434b6 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -22,6 +22,8 @@ #include "test_tools.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" @@ -30,12 +32,14 @@ #include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" +#include "qpid/framing/reply_exceptions.h" #include <iostream> #include "boost/format.hpp" using boost::intrusive_ptr; using namespace qpid; using namespace qpid::broker; +using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; @@ -273,14 +277,35 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ } -class TestMessageStoreOC : public NullMessageStore +const std::string nullxid = ""; + +class SimpleDummyCtxt : public TransactionContext {}; + +class DummyCtxt : public TPCTransactionContext +{ + const std::string xid; +public: + DummyCtxt(const std::string& _xid) : xid(_xid) {} + static std::string getXid(TransactionContext& ctxt) + { + DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt)); + return c ? c->xid : nullxid; + } +}; + +class TestMessageStoreOC : public MessageStore { + std::set<std::string> prepared; + uint64_t nextPersistenceId; public: uint enqCnt; uint deqCnt; bool error; + TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {} + ~TestMessageStoreOC(){} + virtual void dequeue(TransactionContext*, const boost::intrusive_ptr<PersistableMessage>& /*msg*/, const PersistableQueue& /*queue*/) @@ -302,8 +327,32 @@ class TestMessageStoreOC : public NullMessageStore error=true; } - TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {} - ~TestMessageStoreOC(){} + bool init(const Options*) { return true; } + void truncateInit(const bool) {} + void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); } + void destroy(PersistableQueue&) {} + void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); } + void destroy(const PersistableExchange&) {} + void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} + void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} + void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); } + void destroy(const PersistableConfig&) {} + void stage(const boost::intrusive_ptr<PersistableMessage>&) {} + void destroy(PersistableMessage&) {} + void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {} + void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&, + std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); } + void flush(const qpid::broker::PersistableQueue&) {} + uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; } + + std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); } + std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); } + void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); } + void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } + void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } + void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); } + + void recover(RecoveryManager&) {} }; @@ -703,7 +752,7 @@ not requeued to the store. QPID_AUTO_TEST_CASE(testLastNodeJournalError){ /* -simulate store excption going into last node standing +simulate store exception going into last node standing */ TestMessageStoreOC testStore; @@ -727,6 +776,83 @@ simulate store excption going into last node standing } +intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) { + intrusive_ptr<Message> msg(new Message()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); + AMQFrame header((AMQHeaderBody())); + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + return msg; +} + +QPID_AUTO_TEST_CASE(testFlowToDiskMsgProperties){ + + TestMessageStoreOC testStore; + client::QueueOptions args; + args.setSizePolicy(FLOW_TO_DISK, 0, 1); + + intrusive_ptr<Message> msg1 = mkMsg("e", "A"); + intrusive_ptr<Message> msg2 = mkMsg("e", "B"); + intrusive_ptr<Message> msg3 = mkMsg("e", "C"); + intrusive_ptr<Message> msg4 = mkMsg("e", "D"); + intrusive_ptr<Message> msg5 = mkMsg("e", "E"); + intrusive_ptr<Message> msg6 = mkMsg("e", "F"); + intrusive_ptr<Message> msg7 = mkMsg("e", "G"); + msg4->forcePersistent(); + msg5->forcePersistent(); + msg7->forcePersistent(); + + DeliverableMessage dmsg1(msg1); + DeliverableMessage dmsg2(msg2); + DeliverableMessage dmsg3(msg3); + DeliverableMessage dmsg4(msg4); + DeliverableMessage dmsg5(msg5); + DeliverableMessage dmsg6(msg6); + DeliverableMessage dmsg7(msg7); + + FanOutExchange fanout1("fanout1", false, args); + FanOutExchange fanout2("fanout2", false, args); + + Queue::shared_ptr queue1(new Queue("queue1", true, &testStore )); + queue1->configure(args); + Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); + queue2->configure(args); + Queue::shared_ptr queue3(new Queue("queue3", true)); + fanout1.bind(queue1, "", 0); + fanout1.bind(queue2, "", 0); + fanout1.route(dmsg1, "", 0); + msg1->releaseContent(); + fanout1.route(dmsg2, "", 0); + msg2->releaseContent(); + fanout1.route(dmsg3, "", 0); + msg3->releaseContent(); + + BOOST_CHECK_EQUAL(3, queue1->getMessageCount()); + BOOST_CHECK_EQUAL(3, queue2->getMessageCount()); + BOOST_CHECK_EQUAL(msg1->isContentReleased(), false); + BOOST_CHECK_EQUAL(msg2->isContentReleased(), true); + BOOST_CHECK_EQUAL(msg3->isContentReleased(), true); + + fanout1.bind(queue3, "", 0); + fanout1.route(dmsg4, "", 0); + msg4->releaseContent(); + BOOST_CHECK_EQUAL(msg4->isContentReleased(), false); + fanout1.route(dmsg5, "", 0); + msg5->releaseContent(); + BOOST_CHECK_EQUAL(msg5->isContentReleased(), false); + + fanout2.bind(queue3, "", 0); + fanout2.route(dmsg6, "", 0); + fanout2.route(dmsg7, "", 0); + msg6->releaseContent(); + BOOST_CHECK_EQUAL(msg6->isContentReleased(), false); + msg7->releaseContent(); + BOOST_CHECK_EQUAL(msg7->isContentReleased(), false); + +} + + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |