summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-01-21 18:27:39 +0000
committerGordon Sim <gsim@apache.org>2014-01-21 18:27:39 +0000
commit7b5e69f135d93fc9c42196a023b1d87c798ba5ea (patch)
treec4ff0c53811ba9b1603de6a0ad0b63fa02275a96 /qpid/cpp/src
parenta34b4b253171cc68d1af3b4b3e3119ff093b4d7a (diff)
downloadqpid-python-7b5e69f135d93fc9c42196a023b1d87c798ba5ea.tar.gz
QPID-5498: restore expiration on paged messages
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560126 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/PagedQueue.cpp34
-rw-r--r--qpid/cpp/src/qpid/broker/PagedQueue.h7
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFactory.cpp2
4 files changed, 34 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index feef911f21..848f6a69dd 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -476,7 +476,7 @@ void Exchange::destroy()
deletionListeners.swap(copy);
}
for (std::map<std::string, boost::function0<void> >::iterator i = copy.begin(); i != copy.end(); ++i) {
- QPID_LOG(notice, "Exchange::destroy() notifying " << i->first);
+ QPID_LOG(debug, "Exchange::destroy() notifying " << i->first);
if (i->second) i->second();
}
}
diff --git a/qpid/cpp/src/qpid/broker/PagedQueue.cpp b/qpid/cpp/src/qpid/broker/PagedQueue.cpp
index 3186182735..43208d74ee 100644
--- a/qpid/cpp/src/qpid/broker/PagedQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PagedQueue.cpp
@@ -24,13 +24,18 @@
#include "qpid/broker/Message.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Time.h"
#include <string.h>
namespace qpid {
namespace broker {
namespace {
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::EPOCH;
+using qpid::sys::FAR_FUTURE;
using qpid::sys::MemoryMappedFile;
-const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/);
+const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/ + 8/*expiration*/);
size_t encodedSize(const Message& msg)
{
@@ -46,30 +51,45 @@ size_t encode(const Message& msg, char* data, size_t size)
buffer.putLong(encoded);
buffer.putLong(msg.getSequence());
buffer.putLongLong(msg.getPersistentContext()->getPersistenceId());
+ sys::AbsTime expiration = msg.getExpiration();
+ int64_t t(0);
+ if (expiration < FAR_FUTURE) {
+ t = Duration(EPOCH, expiration);
+ }
+ buffer.putLongLong(t);
msg.getPersistentContext()->encode(buffer);
assert(buffer.getPosition() == required);
return required;
}
-size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size)
+size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size,
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
{
qpid::framing::Buffer metadata(const_cast<char*>(data), size);
uint32_t encoded = metadata.getLong();
uint32_t sequence = metadata.getLong();
uint64_t persistenceId = metadata.getLongLong();
+ int64_t t = metadata.getLongLong();
assert(metadata.available() >= encoded);
qpid::framing::Buffer buffer(const_cast<char*>(data) + metadata.getPosition(), encoded);
msg = protocols.decode(buffer);
assert(buffer.getPosition() == encoded);
msg.setSequence(qpid::framing::SequenceNumber(sequence));
msg.getPersistentContext()->setPersistenceId(persistenceId);
+ if (t) {
+ sys::AbsTime expiration(EPOCH, t);
+ msg.setExpiryPolicy(expiryPolicy);
+ msg.setExpiration(expiration);
+ }
return encoded + metadata.getPosition();
}
}
-PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p)
- : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0)
+PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p,
+ boost::intrusive_ptr<ExpiryPolicy> e)
+ : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0),
+ expiryPolicy(e)
{
path = file.open(name, directory);
QPID_LOG(debug, "PagedQueue[" << path << "]");
@@ -299,7 +319,7 @@ Message* PagedQueue::Page::find(qpid::framing::SequenceNumber position)
//if it is the last in the page, decrement the hint count of the page
}
-void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols)
+void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols, boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
{
QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size);
assert(region == 0);
@@ -313,7 +333,7 @@ void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols)
//decode messages into Page::messages
for (size_t i = 0; i < count; ++i) {
Message message;
- used += decode(protocols, message, region + used, size - used);
+ used += decode(protocols, message, region + used, size - used, expiryPolicy);
if (!contents.contains(message.getSequence())) {
message.setState(DELETED);
QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence());
@@ -366,7 +386,7 @@ void PagedQueue::load(Page& page)
assert(i != used.rend());
unload(i->second);
}
- page.load(file, protocols);
+ page.load(file, protocols, expiryPolicy);
++loaded;
QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded");
}
diff --git a/qpid/cpp/src/qpid/broker/PagedQueue.h b/qpid/cpp/src/qpid/broker/PagedQueue.h
index 7a28ac545a..cb83fa9f34 100644
--- a/qpid/cpp/src/qpid/broker/PagedQueue.h
+++ b/qpid/cpp/src/qpid/broker/PagedQueue.h
@@ -31,13 +31,15 @@
namespace qpid {
namespace broker {
+class ExpiryPolicy;
class ProtocolRegistry;
/**
*
*/
class PagedQueue : public Messages {
public:
- PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols);
+ PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols,
+ boost::intrusive_ptr<ExpiryPolicy>);
~PagedQueue();
size_t size();
bool deleted(const QueueCursor&);
@@ -60,7 +62,7 @@ class PagedQueue : public Messages {
bool add(const Message&);
Message* next(uint32_t version, QueueCursor&);
Message* find(qpid::framing::SequenceNumber);
- void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&);
+ void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&, boost::intrusive_ptr<ExpiryPolicy>);
void unload(qpid::sys::MemoryMappedFile&);
void clear(qpid::sys::MemoryMappedFile&);
size_t available() const;
@@ -87,6 +89,7 @@ class PagedQueue : public Messages {
std::list<Page> free;
uint loaded;
uint32_t version;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;//needed on reload
void addPages(size_t count);
Page& newPage(qpid::framing::SequenceNumber);
diff --git a/qpid/cpp/src/qpid/broker/QueueFactory.cpp b/qpid/cpp/src/qpid/broker/QueueFactory.cpp
index d31b120cae..e60349edfb 100644
--- a/qpid/cpp/src/qpid/broker/QueueFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFactory.cpp
@@ -80,7 +80,7 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que
queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getPagingDirectoryPath(),
settings.maxPages ? settings.maxPages : 4,
settings.pageFactor ? settings.pageFactor : 1,
- broker->getProtocolRegistry()));
+ broker->getProtocolRegistry(), broker->getExpiryPolicy()));
}
} else if (settings.lvqKey.empty()) {//LVQ already handled above
queue->messages = std::auto_ptr<Messages>(new MessageDeque());