diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-12 17:42:27 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-12 17:42:27 +0000 |
| commit | 23a0d956ffa79aa3e3fbf43e3755f1cea387b562 (patch) | |
| tree | 34ff95f37feb436b970715bd89975747075e07b2 /cpp/src/qpid | |
| parent | f821fb7fae3c74d8662e7783b255d52c785961f5 (diff) | |
| download | qpid-python-23a0d956ffa79aa3e3fbf43e3755f1cea387b562.tar.gz | |
QPID-3417: C++ broker - support adding arrival timestamp to received messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1182490 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 53 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 2 |
8 files changed, 78 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index bd94582d10..ec3cf9d340 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -43,6 +43,8 @@ #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" @@ -125,7 +127,8 @@ Broker::Options::Options(const std::string& name) : queueFlowStopRatio(80), queueFlowResumeRatio(70), queueThresholdEventRatio(80), - defaultMsgGroup("qpid.no-group") + defaultMsgGroup("qpid.no-group"), + timestampRcvMsgs(false) // set the 0.10 timestamp delivery property { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -162,7 +165,8 @@ Broker::Options::Options(const std::string& name) : ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") - ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier."); + ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.") + ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message."); } const std::string empty; @@ -301,6 +305,11 @@ Broker::Broker(const Broker::Options& conf) : else QPID_LOG(info, "Management not enabled"); + // this feature affects performance, so let's be sure that gets logged! + if (conf.timestampRcvMsgs) { + QPID_LOG(notice, "Receive message timestamping is ENABLED."); + } + /** * SASL setup, can fail and terminate startup */ @@ -492,9 +501,20 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, { _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args); status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext()); - status = Manageable::STATUS_OK; break; } + case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG: + { + _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args); + status = getTimestampConfig(a.o_receive, getManagementExecutionContext()); + break; + } + case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG: + { + _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args); + status = setTimestampConfig(a.i_receive, getManagementExecutionContext()); + break; + } default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -517,6 +537,8 @@ const std::string EXCHANGE_TYPE("exchange-type"); const std::string QUEUE_NAME("queue"); const std::string EXCHANGE_NAME("exchange"); +const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10"); + const std::string _TRUE("true"); const std::string _FALSE("false"); } @@ -711,6 +733,31 @@ Manageable::status_t Broker::queryQueue( const std::string& name, return Manageable::STATUS_OK;; } +Manageable::status_t Broker::getTimestampConfig(bool& receive, + const ConnectionState* context) +{ + std::string name; // none needed for broker + std::string userId = context->getUserId(); + if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId)); + } + receive = config.timestampRcvMsgs; + return Manageable::STATUS_OK; +} + +Manageable::status_t Broker::setTimestampConfig(const bool receive, + const ConnectionState* context) +{ + std::string name; // none needed for broker + std::string userId = context->getUserId(); + if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId)); + } + config.timestampRcvMsgs = receive; + QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED.")); + return Manageable::STATUS_OK; +} + void Broker::setLogLevel(const std::string& level) { QPID_LOG(notice, "Changing log level to " << level); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 8b347db3c0..b3b751be98 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -122,6 +122,7 @@ public: uint queueFlowResumeRatio; // producer flow control: off uint16_t queueThresholdEventRatio; std::string defaultMsgGroup; + bool timestampRcvMsgs; private: std::string getHome(); @@ -164,6 +165,10 @@ public: const std::string& userId, const std::string& connectionId, qpid::types::Variant::Map& results); + Manageable::status_t getTimestampConfig(bool& receive, + const ConnectionState* context); + Manageable::status_t setTimestampConfig(const bool receive, + const ConnectionState* context); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; std::auto_ptr<sys::Timer> clusterTimer; @@ -315,6 +320,7 @@ public: const boost::intrusive_ptr<Message>& msg)> deferDelivery; bool isAuthenticating ( ) { return config.auth; } + bool isTimestamping() { return config.timestampRcvMsgs; } typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 5ea7143366..d13109dad1 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -377,7 +377,15 @@ void Message::addTraceId(const std::string& id) } } -void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) +void Message::setTimestamp() +{ + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); + time_t now = ::time(0); + props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch +} + +void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) { sys::Mutex::ScopedLock l(lock); DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 2a23a25d06..dda45d73e6 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -81,7 +81,8 @@ public: QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); - QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); + /** determine msg expiration time using the TTL value if present */ + QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e); void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); bool hasExpired(); sys::AbsTime getExpiration() const { return expiration; } @@ -93,6 +94,8 @@ public: QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key); void setExchange(const std::string&); void clearApplicationHeadersFlag(); + /** set the timestamp delivery property to the current time-of-day */ + QPID_BROKER_EXTERN void setTimestamp(); framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index dda481778d..380ec656cb 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -472,7 +472,7 @@ const std::string nullstring; } void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { - msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); + msg->computeExpiration(getSession().getBroker().getExpiryPolicy()); std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index ddd6ae3f5b..1ab17e9893 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -259,6 +259,8 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) header.setEof(false); msg->getFrames().append(header); } + if (broker.isTimestamping()) + msg->setTimestamp(); msg->setPublisher(&getConnection()); msg->getIngressCompletion().begin(); semanticState.handle(msg); diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 5cf20c92eb..3badaf40ba 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -301,6 +301,7 @@ const std::string SUBJECT("qpid.subject"); const std::string X_APP_ID("x-amqp-0-10.app-id"); const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key"); const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); +const std::string X_TIMESTAMP("x-amqp-0-10.timestamp"); } void populateHeaders(qpid::messaging::Message& message, @@ -334,10 +335,13 @@ void populateHeaders(qpid::messaging::Message& message, if (messageProperties->hasContentEncoding()) { message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding(); } - // routing-key, others? + // routing-key, timestamp, others? if (deliveryProperties && deliveryProperties->hasRoutingKey()) { message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey(); } + if (deliveryProperties && deliveryProperties->hasTimestamp()) { + message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp(); + } } } diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 50fdc82ee0..5799a1adca 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -622,7 +622,7 @@ void ManagementAgent::sendBufferLH(const string& data, dp->setRoutingKey(routingKey); if (ttl_msec) { dp->setTtl(ttl_msec); - msg->setTimestamp(broker->getExpiryPolicy()); + msg->computeExpiration(broker->getExpiryPolicy()); } msg->getFrames().append(content); msg->setIsManagementMessage(true); |
