summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-12 17:42:27 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-12 17:42:27 +0000
commit23a0d956ffa79aa3e3fbf43e3755f1cea387b562 (patch)
tree34ff95f37feb436b970715bd89975747075e07b2 /cpp/src/qpid
parentf821fb7fae3c74d8662e7783b255d52c785961f5 (diff)
downloadqpid-python-23a0d956ffa79aa3e3fbf43e3755f1cea387b562.tar.gz
QPID-3417: C++ broker - support adding arrival timestamp to received messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1182490 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp53
-rw-r--r--cpp/src/qpid/broker/Broker.h6
-rw-r--r--cpp/src/qpid/broker/Message.cpp10
-rw-r--r--cpp/src/qpid/broker/Message.h5
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp6
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp2
8 files changed, 78 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index bd94582d10..ec3cf9d340 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -43,6 +43,8 @@
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -125,7 +127,8 @@ Broker::Options::Options(const std::string& name) :
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
queueThresholdEventRatio(80),
- defaultMsgGroup("qpid.no-group")
+ defaultMsgGroup("qpid.no-group"),
+ timestampRcvMsgs(false) // set the 0.10 timestamp delivery property
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -162,7 +165,8 @@ Broker::Options::Options(const std::string& name) :
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
- ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.");
+ ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
+ ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.");
}
const std::string empty;
@@ -301,6 +305,11 @@ Broker::Broker(const Broker::Options& conf) :
else
QPID_LOG(info, "Management not enabled");
+ // this feature affects performance, so let's be sure that gets logged!
+ if (conf.timestampRcvMsgs) {
+ QPID_LOG(notice, "Receive message timestamping is ENABLED.");
+ }
+
/**
* SASL setup, can fail and terminate startup
*/
@@ -492,9 +501,20 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
{
_qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext());
- status = Manageable::STATUS_OK;
break;
}
+ case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG:
+ {
+ _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args);
+ status = getTimestampConfig(a.o_receive, getManagementExecutionContext());
+ break;
+ }
+ case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG:
+ {
+ _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args);
+ status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
+ break;
+ }
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -517,6 +537,8 @@ const std::string EXCHANGE_TYPE("exchange-type");
const std::string QUEUE_NAME("queue");
const std::string EXCHANGE_NAME("exchange");
+const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
+
const std::string _TRUE("true");
const std::string _FALSE("false");
}
@@ -711,6 +733,31 @@ Manageable::status_t Broker::queryQueue( const std::string& name,
return Manageable::STATUS_OK;;
}
+Manageable::status_t Broker::getTimestampConfig(bool& receive,
+ const ConnectionState* context)
+{
+ std::string name; // none needed for broker
+ std::string userId = context->getUserId();
+ if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId));
+ }
+ receive = config.timestampRcvMsgs;
+ return Manageable::STATUS_OK;
+}
+
+Manageable::status_t Broker::setTimestampConfig(const bool receive,
+ const ConnectionState* context)
+{
+ std::string name; // none needed for broker
+ std::string userId = context->getUserId();
+ if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId));
+ }
+ config.timestampRcvMsgs = receive;
+ QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED."));
+ return Manageable::STATUS_OK;
+}
+
void Broker::setLogLevel(const std::string& level)
{
QPID_LOG(notice, "Changing log level to " << level);
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 8b347db3c0..b3b751be98 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -122,6 +122,7 @@ public:
uint queueFlowResumeRatio; // producer flow control: off
uint16_t queueThresholdEventRatio;
std::string defaultMsgGroup;
+ bool timestampRcvMsgs;
private:
std::string getHome();
@@ -164,6 +165,10 @@ public:
const std::string& userId,
const std::string& connectionId,
qpid::types::Variant::Map& results);
+ Manageable::status_t getTimestampConfig(bool& receive,
+ const ConnectionState* context);
+ Manageable::status_t setTimestampConfig(const bool receive,
+ const ConnectionState* context);
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
std::auto_ptr<sys::Timer> clusterTimer;
@@ -315,6 +320,7 @@ public:
const boost::intrusive_ptr<Message>& msg)> deferDelivery;
bool isAuthenticating ( ) { return config.auth; }
+ bool isTimestamping() { return config.timestampRcvMsgs; }
typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 5ea7143366..d13109dad1 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -377,7 +377,15 @@ void Message::addTraceId(const std::string& id)
}
}
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp()
+{
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
+ time_t now = ::time(0);
+ props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch
+}
+
+void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
sys::Mutex::ScopedLock l(lock);
DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 2a23a25d06..dda45d73e6 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -81,7 +81,8 @@ public:
QPID_BROKER_EXTERN bool isPersistent() const;
bool requiresAccept();
- QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ /** determine msg expiration time using the TTL value if present */
+ QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
bool hasExpired();
sys::AbsTime getExpiration() const { return expiration; }
@@ -93,6 +94,8 @@ public:
QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
void setExchange(const std::string&);
void clearApplicationHeadersFlag();
+ /** set the timestamp delivery property to the current time-of-day */
+ QPID_BROKER_EXTERN void setTimestamp();
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index dda481778d..380ec656cb 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -472,7 +472,7 @@ const std::string nullstring;
}
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+ msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index ddd6ae3f5b..1ab17e9893 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -259,6 +259,8 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
header.setEof(false);
msg->getFrames().append(header);
}
+ if (broker.isTimestamping())
+ msg->setTimestamp();
msg->setPublisher(&getConnection());
msg->getIngressCompletion().begin();
semanticState.handle(msg);
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 5cf20c92eb..3badaf40ba 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -301,6 +301,7 @@ const std::string SUBJECT("qpid.subject");
const std::string X_APP_ID("x-amqp-0-10.app-id");
const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+const std::string X_TIMESTAMP("x-amqp-0-10.timestamp");
}
void populateHeaders(qpid::messaging::Message& message,
@@ -334,10 +335,13 @@ void populateHeaders(qpid::messaging::Message& message,
if (messageProperties->hasContentEncoding()) {
message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
}
- // routing-key, others?
+ // routing-key, timestamp, others?
if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
}
+ if (deliveryProperties && deliveryProperties->hasTimestamp()) {
+ message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp();
+ }
}
}
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 50fdc82ee0..5799a1adca 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -622,7 +622,7 @@ void ManagementAgent::sendBufferLH(const string& data,
dp->setRoutingKey(routingKey);
if (ttl_msec) {
dp->setTtl(ttl_msec);
- msg->setTimestamp(broker->getExpiryPolicy());
+ msg->computeExpiration(broker->getExpiryPolicy());
}
msg->getFrames().append(content);
msg->setIsManagementMessage(true);