diff options
author | Gordon Sim <gsim@apache.org> | 2016-05-25 07:02:56 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2016-05-25 07:02:56 +0000 |
commit | 2c36085a66c2c5b7271bced836f97a531a637ae5 (patch) | |
tree | e8c899931ec7be848b539da377933d118ef4bbee | |
parent | edee47f4d91d218b6076560936bb579b893cc9b8 (diff) | |
download | qpid-python-2c36085a66c2c5b7271bced836f97a531a637ae5.tar.gz |
QPID-4397: log more detail for expired messages when debug enabled
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1745448 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Message.cpp | 79 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Message.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h | 1 |
8 files changed, 107 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index f41cf767c7..76be9742c6 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -301,6 +301,11 @@ qpid::types::Variant Message::getProperty(const std::string& key) const return r.getResult(); } +std::string Message::printProperties() const +{ + return sharedState->printProperties(); +} + boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const { return persistentContext; diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 1865a4fa4e..f704c7a876 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -81,6 +81,7 @@ public: virtual std::string getTo() const = 0; virtual std::string getSubject() const = 0; virtual std::string getReplyTo() const = 0; + virtual std::string printProperties() const = 0; }; class SharedState : public Encoding @@ -153,6 +154,7 @@ public: QPID_BROKER_EXTERN std::string getPropertyAsString(const std::string& key) const; QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const; void processProperties(qpid::amqp::MapHandler&) const; + std::string printProperties() const; QPID_BROKER_EXTERN uint64_t getMessageSize() const; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index f8934d291a..d90bd1110a 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -421,7 +421,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) QueueCursor cursor = c->getCursor(); // Save current position. Message* msg = messages->next(*c); // Advances c. if (msg) { - if (msg->getExpiration() < sys::AbsTime::now()) { + if (isExpired(name, *msg, sys::AbsTime::now())) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); //ERROR: don't hold lock across call to store!! @@ -627,11 +627,14 @@ void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, cons } } -namespace{ -bool hasExpired(const Message& m, AbsTime now) +bool Queue::isExpired(const std::string& name, const Message& m, AbsTime now) { - return m.getExpiration() < now; -} + if (m.getExpiration() < now) { + QPID_LOG(debug, "Message expired from queue '" << name << "': " << m.printProperties()); + return true; + } else { + return false; + } } /** @@ -646,7 +649,7 @@ void Queue::purgeExpired(sys::Duration lapse) { int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { sys::AbsTime time = sys::AbsTime::now(); - uint32_t count = remove(0, boost::bind(&hasExpired, _1, time), 0, CONSUMER, settings.autodelete); + uint32_t count = remove(0, boost::bind(&isExpired, name, _1, time), 0, CONSUMER, settings.autodelete); QPID_LOG(debug, "Purged " << count << " expired messages from " << getName()); // // Report the count of discarded-by-ttl messages diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index efca9b9d40..875b996637 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -530,6 +530,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, //utility function static bool reroute(boost::shared_ptr<Exchange> e, const Message& m); + static bool isExpired(const std::string& queueName, const Message&, qpid::sys::AbsTime); friend class QueueFactory; }; diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.cpp b/qpid/cpp/src/qpid/broker/amqp/Message.cpp index 857ca2c313..2e19c9a9ad 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Message.cpp @@ -152,6 +152,85 @@ std::string Message::getPropertyAsString(const std::string& key) const } namespace { +class PropertyPrinter : public MapHandler +{ + public: + std::stringstream out; + + PropertyPrinter() : first(true) {} + void handleVoid(const CharSequence&) {} + void handleBool(const CharSequence& key, bool value) { handle(key, value); } + void handleUint8(const CharSequence& key, uint8_t value) { handle(key, value); } + void handleUint16(const CharSequence& key, uint16_t value) { handle(key, value); } + void handleUint32(const CharSequence& key, uint32_t value) { handle(key, value); } + void handleUint64(const CharSequence& key, uint64_t value) { handle(key, value); } + void handleInt8(const CharSequence& key, int8_t value) { handle(key, value); } + void handleInt16(const CharSequence& key, int16_t value) { handle(key, value); } + void handleInt32(const CharSequence& key, int32_t value) { handle(key, value); } + void handleInt64(const CharSequence& key, int64_t value) { handle(key, value); } + void handleFloat(const CharSequence& key, float value) { handle(key, value); } + void handleDouble(const CharSequence& key, double value) { handle(key, value); } + void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/) + { + handle(key, value.str()); + } + std::string str() { return out.str(); } + bool print(const std::string& key, const std::string& value, bool prependComma) { + if (prependComma) out << ", "; + if (!value.empty()) { + out << key << "=" << value; + return true; + } else { + return false; + } + } + template <typename T> bool print_(const std::string& key, T value, bool prependComma) { + if (prependComma) out << ", "; + if (value) { + out << key << "=" << value; + return true; + } else { + return false; + } + } + + private: + bool first; + + template <typename T> void handle(const CharSequence& key, T value) + { + if (first) { + first = false; + } else { + out << ", "; + } + out << key.str() << "=" << value; + } +}; +} + +std::string Message::printProperties() const +{ + PropertyPrinter r; + bool comma = false; + comma = r.print("subject", getSubject(), comma); + comma = r.print("message-id", getMessageId().str(), comma); + comma = r.print("correlation-id", getCorrelationId().str(), comma); + comma = r.print("user-id", getUserId(), comma); + comma = r.print("to", getTo(), comma); + comma = r.print("reply-to", getReplyTo(), comma); + comma = r.print_("priority", (uint32_t) getPriority(), comma); + comma = r.print_("durable", isPersistent(), comma); + uint64_t ttl(0); + getTtl(ttl); + comma = r.print_("ttl", ttl, comma); + r.out << ", application-properties={"; + processProperties(r); + r.out << "}"; + return r.str(); +} + +namespace { class PropertyAdapter : public Reader { MapHandler& handler; CharSequence key; diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h index 39ed1f60b6..67b99a6866 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.h +++ b/qpid/cpp/src/qpid/broker/amqp/Message.h @@ -51,6 +51,7 @@ class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amq bool getTtl(uint64_t&) const; std::string getContent() const; void processProperties(qpid::amqp::MapHandler&) const; + std::string printProperties() const; std::string getUserId() const; uint64_t getTimestamp() const; std::string getTo() const; diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp index 74423b3a26..7b64226bdf 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp @@ -395,6 +395,15 @@ bool MessageTransfer::isLastQMFResponse(const qpid::broker::Message& message, co return transfer && transfer->isLastQMFResponse(correlation); } +std::string MessageTransfer::printProperties() const +{ + std::stringstream out; + const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>(); + if (mp) { + out << *mp; + } + return out.str(); +} void MessageTransfer::processProperties(qpid::amqp::MapHandler& handler) const { diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h index fdf4fd0f95..af7dfbba74 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h @@ -54,6 +54,7 @@ class MessageTransfer : public qpid::broker::Message::SharedStateImpl, public qp bool hasExpiration() const; std::string getExchangeName() const; void processProperties(qpid::amqp::MapHandler&) const; + std::string printProperties() const; std::string getUserId() const; void setTimestamp(); uint64_t getTimestamp() const; |