summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2016-05-25 07:02:56 +0000
committerGordon Sim <gsim@apache.org>2016-05-25 07:02:56 +0000
commit2c36085a66c2c5b7271bced836f97a531a637ae5 (patch)
treee8c899931ec7be848b539da377933d118ef4bbee
parentedee47f4d91d218b6076560936bb579b893cc9b8 (diff)
downloadqpid-python-2c36085a66c2c5b7271bced836f97a531a637ae5.tar.gz
QPID-4397: log more detail for expired messages when debug enabled
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1745448 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Message.cpp79
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Message.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h1
8 files changed, 107 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index f41cf767c7..76be9742c6 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -301,6 +301,11 @@ qpid::types::Variant Message::getProperty(const std::string& key) const
return r.getResult();
}
+std::string Message::printProperties() const
+{
+ return sharedState->printProperties();
+}
+
boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const
{
return persistentContext;
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 1865a4fa4e..f704c7a876 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -81,6 +81,7 @@ public:
virtual std::string getTo() const = 0;
virtual std::string getSubject() const = 0;
virtual std::string getReplyTo() const = 0;
+ virtual std::string printProperties() const = 0;
};
class SharedState : public Encoding
@@ -153,6 +154,7 @@ public:
QPID_BROKER_EXTERN std::string getPropertyAsString(const std::string& key) const;
QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const;
void processProperties(qpid::amqp::MapHandler&) const;
+ std::string printProperties() const;
QPID_BROKER_EXTERN uint64_t getMessageSize() const;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index f8934d291a..d90bd1110a 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -421,7 +421,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
QueueCursor cursor = c->getCursor(); // Save current position.
Message* msg = messages->next(*c); // Advances c.
if (msg) {
- if (msg->getExpiration() < sys::AbsTime::now()) {
+ if (isExpired(name, *msg, sys::AbsTime::now())) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
//ERROR: don't hold lock across call to store!!
@@ -627,11 +627,14 @@ void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, cons
}
}
-namespace{
-bool hasExpired(const Message& m, AbsTime now)
+bool Queue::isExpired(const std::string& name, const Message& m, AbsTime now)
{
- return m.getExpiration() < now;
-}
+ if (m.getExpiration() < now) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "': " << m.printProperties());
+ return true;
+ } else {
+ return false;
+ }
}
/**
@@ -646,7 +649,7 @@ void Queue::purgeExpired(sys::Duration lapse) {
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
sys::AbsTime time = sys::AbsTime::now();
- uint32_t count = remove(0, boost::bind(&hasExpired, _1, time), 0, CONSUMER, settings.autodelete);
+ uint32_t count = remove(0, boost::bind(&isExpired, name, _1, time), 0, CONSUMER, settings.autodelete);
QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
//
// Report the count of discarded-by-ttl messages
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index efca9b9d40..875b996637 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -530,6 +530,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
//utility function
static bool reroute(boost::shared_ptr<Exchange> e, const Message& m);
+ static bool isExpired(const std::string& queueName, const Message&, qpid::sys::AbsTime);
friend class QueueFactory;
};
diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.cpp b/qpid/cpp/src/qpid/broker/amqp/Message.cpp
index 857ca2c313..2e19c9a9ad 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Message.cpp
@@ -152,6 +152,85 @@ std::string Message::getPropertyAsString(const std::string& key) const
}
namespace {
+class PropertyPrinter : public MapHandler
+{
+ public:
+ std::stringstream out;
+
+ PropertyPrinter() : first(true) {}
+ void handleVoid(const CharSequence&) {}
+ void handleBool(const CharSequence& key, bool value) { handle(key, value); }
+ void handleUint8(const CharSequence& key, uint8_t value) { handle(key, value); }
+ void handleUint16(const CharSequence& key, uint16_t value) { handle(key, value); }
+ void handleUint32(const CharSequence& key, uint32_t value) { handle(key, value); }
+ void handleUint64(const CharSequence& key, uint64_t value) { handle(key, value); }
+ void handleInt8(const CharSequence& key, int8_t value) { handle(key, value); }
+ void handleInt16(const CharSequence& key, int16_t value) { handle(key, value); }
+ void handleInt32(const CharSequence& key, int32_t value) { handle(key, value); }
+ void handleInt64(const CharSequence& key, int64_t value) { handle(key, value); }
+ void handleFloat(const CharSequence& key, float value) { handle(key, value); }
+ void handleDouble(const CharSequence& key, double value) { handle(key, value); }
+ void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/)
+ {
+ handle(key, value.str());
+ }
+ std::string str() { return out.str(); }
+ bool print(const std::string& key, const std::string& value, bool prependComma) {
+ if (prependComma) out << ", ";
+ if (!value.empty()) {
+ out << key << "=" << value;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ template <typename T> bool print_(const std::string& key, T value, bool prependComma) {
+ if (prependComma) out << ", ";
+ if (value) {
+ out << key << "=" << value;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private:
+ bool first;
+
+ template <typename T> void handle(const CharSequence& key, T value)
+ {
+ if (first) {
+ first = false;
+ } else {
+ out << ", ";
+ }
+ out << key.str() << "=" << value;
+ }
+};
+}
+
+std::string Message::printProperties() const
+{
+ PropertyPrinter r;
+ bool comma = false;
+ comma = r.print("subject", getSubject(), comma);
+ comma = r.print("message-id", getMessageId().str(), comma);
+ comma = r.print("correlation-id", getCorrelationId().str(), comma);
+ comma = r.print("user-id", getUserId(), comma);
+ comma = r.print("to", getTo(), comma);
+ comma = r.print("reply-to", getReplyTo(), comma);
+ comma = r.print_("priority", (uint32_t) getPriority(), comma);
+ comma = r.print_("durable", isPersistent(), comma);
+ uint64_t ttl(0);
+ getTtl(ttl);
+ comma = r.print_("ttl", ttl, comma);
+ r.out << ", application-properties={";
+ processProperties(r);
+ r.out << "}";
+ return r.str();
+}
+
+namespace {
class PropertyAdapter : public Reader {
MapHandler& handler;
CharSequence key;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h
index 39ed1f60b6..67b99a6866 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Message.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Message.h
@@ -51,6 +51,7 @@ class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amq
bool getTtl(uint64_t&) const;
std::string getContent() const;
void processProperties(qpid::amqp::MapHandler&) const;
+ std::string printProperties() const;
std::string getUserId() const;
uint64_t getTimestamp() const;
std::string getTo() const;
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
index 74423b3a26..7b64226bdf 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
@@ -395,6 +395,15 @@ bool MessageTransfer::isLastQMFResponse(const qpid::broker::Message& message, co
return transfer && transfer->isLastQMFResponse(correlation);
}
+std::string MessageTransfer::printProperties() const
+{
+ std::stringstream out;
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+ if (mp) {
+ out << *mp;
+ }
+ return out.str();
+}
void MessageTransfer::processProperties(qpid::amqp::MapHandler& handler) const
{
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
index fdf4fd0f95..af7dfbba74 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
@@ -54,6 +54,7 @@ class MessageTransfer : public qpid::broker::Message::SharedStateImpl, public qp
bool hasExpiration() const;
std::string getExchangeName() const;
void processProperties(qpid::amqp::MapHandler&) const;
+ std::string printProperties() const;
std::string getUserId() const;
void setTimestamp();
uint64_t getTimestamp() const;